From c12806450806909177058eb8e7f85dcbd24cbf1c Mon Sep 17 00:00:00 2001
From: Pixel <pixel@nobis-crew.org>
Date: Thu, 12 Nov 2009 18:22:55 -0800
Subject: BigClean; Task pass - getting it more threadsafe

---
 include/Task.h    | 42 +++++++++++++++++-------------
 include/TaskMan.h | 10 +++++++-
 lib/Action.cc     |  3 ++-
 lib/Task.cc       | 76 -------------------------------------------------------
 lib/TaskMan.cc    | 65 +++++++++++++++++++++++++++++++++++++++--------
 5 files changed, 89 insertions(+), 107 deletions(-)

diff --git a/include/Task.h b/include/Task.h
index d976124..32cbf85 100644
--- a/include/Task.h
+++ b/include/Task.h
@@ -29,6 +29,7 @@
 #include <vector>
 #include <Exceptions.h>
 #include <Handle.h>
+#include <TaskMan.h>
 
 #undef E_HANDLE
 #undef Yield
@@ -41,42 +42,47 @@ class Task : public Base {
         EVT_PROCESS,
         EVT_TIMEOUT,
         EVT_TASK,
+        EVT_IDLE,
     };
     enum {
         TASK_ON_HOLD = 0,
         TASK_DONE,
         TASK_BURST,
+        TASK_IDLE,
+        TASK_IDLE_REST,
     };
     enum {
         W4_STICKY = 1,
         W4_READING = 2,
         W4_WRITING = 4,
     };
-      Task();
+      Task() : current(0), state(TASK_ON_HOLD), stopped(false), suspended(false), yielded(false), wbta(0), wta(0), BurstHandle(0) { TaskMan::AddTask(this); }
       virtual ~Task();
-    virtual String GetName();
+    virtual String GetName() { return "Unknow Task"; }
     int Run();
     int DryRun();
-    int GetState();
+    int GetState() { return state; }
     void Suspend(int = -1) throw (GeneralException);
-    void WaitFor(Task *);
-    void WaitFor(Handle *, int = 0);
-    void WaitFor(pid_t);
-    void WaitFor(const timeval &, int = 0);
-    void Yield();
-    bool Yielded();
-    void Unyield();
-    Task * WaitedBy();
-    void SetBurst();
-    void Stop();
-    void Restart();
-    bool IsStopped();
-    void RemoveFromWatches();
-    void RemoveTimeout();
+    void WaitFor(Task * t) { t->wbta = this; wta = t; }
+    void WaitFor(Handle * h, int flags = 0) { h->SetNonBlock(); TaskMan::WaitFor(h, this, flags); }
+    void WaitFor(pid_t p) { TaskMan::WaitFor(p, this); }
+    void WaitFor(const timeval & t, int flags = 0) { TaskMan::WaitFor(t, this, flags); }
+    void Yield() { yielded = true; Suspend(TASK_ON_HOLD); }
+    bool Yielded() { return yielded; }
+    void Unyield() { yielded = false; SetBurst(); }
+    Task * WaitedBy() { return wbta; }
+    void SetBurst() { state = TASK_BURST; }
+    void SetIdle() { state = TASK_IDLE; }
+    void IdleRest() { state = TASK_IDLE_REST; }
+    void Stop() { stopped = true; }
+    void Restart() { stopped = false; }
+    bool IsStopped() { return stopped; }
+    void RemoveFromWatches() { wbta = 0; }
+    void RemoveTimeout() { TaskMan::RemoveTimeout(this); }
     Handle * BurstHandle;
 
   protected:
-    virtual int Do() throw (GeneralException);
+    virtual int Do() throw (GeneralException) { return TASK_ON_HOLD; }
     int current;
     
   private:
diff --git a/include/TaskMan.h b/include/TaskMan.h
index 59363f8..2bc7495 100644
--- a/include/TaskMan.h
+++ b/include/TaskMan.h
@@ -25,8 +25,11 @@ typedef int sigset_t;
 #endif
 
 #include <signal.h>
-#include <Task.h>
+#include <Atomic.h>
 #include <vector>
+#include <Handle.h>
+
+class Task;
 
 class TaskMan : public Base {
   public:
@@ -80,6 +83,8 @@ class TaskMan : public Base {
 	Task * T;
     };
     typedef std::vector<Task *> TaskList_t;
+    
+    static void ProcessNewTasks();
 
     static TaskList_t TaskList;
     static TaskList_t Zombies;
@@ -96,6 +101,9 @@ class TaskMan : public Base {
     static sigset_t sigchildset;
     static int got_sigchild;
     static bool CheckDead(Task *);
+    static Atomic::Queue<Task> new_tasks;
 };
 
+#include <Task.h>
+
 #endif
diff --git a/lib/Action.cc b/lib/Action.cc
index b15ad52..33b0b36 100644
--- a/lib/Action.cc
+++ b/lib/Action.cc
@@ -23,6 +23,7 @@
 #include "BString.h"
 #include "Action.h"
 #include "HttpServ.h"
+#include "Atomic.h"
 
 Action * Action::start = 0;
 
@@ -35,7 +36,7 @@ static String genurl(const String & u) {
     } else {
 	// Si l'url pass�e en param�tre est vide, on g�n�re une URL
 	// sous la forme TmpXXXX o� XXXX est une valeur qui s'incr�mente.
-	return String("Tmp") + (counter++);
+	return String("Tmp") + Atomic::Prefetch::Increment(&counter);
     }
 }
 
diff --git a/lib/Task.cc b/lib/Task.cc
index 9dc28ed..51be838 100644
--- a/lib/Task.cc
+++ b/lib/Task.cc
@@ -29,10 +29,6 @@
 #include "BString.h"
 #include "gettext.h"
 
-Task::Task() : current(0), state(TASK_ON_HOLD), stopped(false), suspended(false), yielded(false), wbta(0), wta(0), BurstHandle(0) {
-    TaskMan::AddTask(this);
-}
-
 Task::~Task() {
     TaskMan::RemoveFromWatches(this);
     TaskMan::RemoveTimeout(this);
@@ -42,10 +38,6 @@ Task::~Task() {
         wta->wbta = 0;
 }
 
-int Task::Do() throw (GeneralException) {
-    return TASK_ON_HOLD;
-}
-
 int Task::Run() {
     if (TaskMan::Event() == Task::EVT_TASK)
         wta = 0;
@@ -80,14 +72,6 @@ int Task::DryRun() {
     return state;
 }
 
-int Task::GetState() {
-    return state;
-}
-
-String Task::GetName() {
-    return _("Unknow Task");
-}
-
 void Task::Suspend(int newstate) throw (GeneralException) {
     if (newstate != -1) {
 	state = newstate;
@@ -95,63 +79,3 @@ void Task::Suspend(int newstate) throw (GeneralException) {
     suspended = true;
     throw TaskSwitch();
 }
-
-void Task::WaitFor(Handle * h, int flags) {
-    h->SetNonBlock();
-    TaskMan::WaitFor(h, this, flags);
-}
-
-void Task::WaitFor(Task * t) {
-    t->wbta = this;
-    wta = t;
-}
-
-void Task::WaitFor(pid_t p) {
-    TaskMan::WaitFor(p, this);
-}
-
-void Task::WaitFor(const timeval & t, int flags) {
-    TaskMan::WaitFor(t, this, flags);
-}
-
-void Task::Yield() {
-    yielded = true;
-    Suspend(TASK_ON_HOLD);
-}
-
-bool Task::Yielded() {
-    return yielded;
-}
-
-void Task::Unyield() {
-    yielded = false;
-    SetBurst();
-}
-
-void Task::SetBurst() {
-    state = TASK_BURST;
-}
-
-void Task::Stop() {
-    stopped = true;
-}
-
-void Task::Restart() {
-    stopped = false;
-}
-
-bool Task::IsStopped() {
-    return stopped;
-}
-
-Task * Task::WaitedBy() {
-    return wbta;
-}
-
-void Task::RemoveFromWatches() {
-    wbta = 0;
-}
-
-void Task::RemoveTimeout() {
-    TaskMan::RemoveTimeout(this);
-}
diff --git a/lib/TaskMan.cc b/lib/TaskMan.cc
index 83fe5aa..fd630d8 100644
--- a/lib/TaskMan.cc
+++ b/lib/TaskMan.cc
@@ -51,6 +51,8 @@ Handle * TaskMan::ehandle;
 
 sigset_t TaskMan::sigchildset;
 
+Atomic::Queue<Task> TaskMan::new_tasks;
+
 #ifndef _WIN32
 void taskman_sigchild(int sig) {
     TaskMan::SigChild();
@@ -382,9 +384,16 @@ void TaskMan::AddTask(Task * t) {
 	Init();
     }
     
-    if (t) {
-	TaskList.push_back(t);
-	number++;
+    if (t)
+        new_tasks.enqueue(t);
+}
+
+void TaskMan::ProcessNewTasks() {
+    Task * t;
+    
+    while(new_tasks.unqueue(&t)) {
+        TaskList.push_back(t);
+        number++;
     }
 }
 
@@ -524,13 +533,14 @@ void TaskMan::MainLoop() throw (GeneralException) {
     struct pollfd * ufsd;
     unsigned int nfds;
     
-    int no_burst;
+    bool no_burst, no_idle;
     
     if (!inited) {
 	Init();
     }
-
-    while (1) {
+    
+    while (true) {
+        ProcessNewTasks();
 	if (number == 0) {
 	    throw GeneralException(_("TaskMan: No more task to manage."));
 	}
@@ -545,15 +555,17 @@ void TaskMan::MainLoop() throw (GeneralException) {
 	if (!TaskList.empty()) {
 	    for (TaskList_t::iterator p = TaskList.begin(); p != TaskList.end(); p++) {
 		Task * t = *p;
+		if (t->GetState() == Task::TASK_IDLE_REST)
+		    t->SetIdle();
 //	 	   cerr << "-=- TaskMan: task " << t->GetName() << endl;
 	    }
 	}
 	
 //	cerr << "-=- TaskMan: processing burning tasks.\n";
 	
-	no_burst = 0;
+	no_burst = false;
 	while (!no_burst) {
-	    no_burst = 1;
+	    no_burst = true;
             /* First, we will check for any burning task and run 'em */
             event = Task::EVT_BURST;
 	    if (!TaskList.empty()) {
@@ -578,7 +590,38 @@ void TaskMan::MainLoop() throw (GeneralException) {
                             t->BurstHandle = 0;
                         }
 		        /* if the task added some new tasks, we have to rerun the loop */
-			no_burst = 0;
+			no_burst = false;
+		        break;
+		    }
+		
+                    // Additionnally, if the task died, let's put it in the zombies list.
+                    // This check is done on the whole TaskList at each loop.
+                    if (CheckDead(t))
+                        p = TaskList.begin();
+		}
+	    }
+	}
+
+	no_idle = false;
+	while (!no_idle) {
+	    no_idle = true;
+            /* First, we will check for any idle task and run 'em */
+            event = Task::EVT_IDLE;
+	    if (!TaskList.empty()) {
+		for (TaskList_t::iterator p = TaskList.begin(); p != TaskList.end(); p++) {
+		    Task * t = *p;
+		
+		    if (t->IsStopped()) {
+			continue;
+		    }
+		
+                    if (t->GetState() == Task::TASK_IDLE) {
+//		    	cerr << "-=- TaskMan: running idle task " << t->GetName() << endl;
+			t->Run();
+		        /* if the task added some new tasks, we have to rerun the loop */
+			no_idle = false;
+			/* and toggle the task so it's not running through the loop again. */
+			t->IdleRest();
 		        break;
 		    }
 		
@@ -647,7 +690,7 @@ void TaskMan::MainLoop() throw (GeneralException) {
 
 	event = Task::EVT_HANDLE;
 	nfds = w4ha.size();
-	no_burst = 1;
+	no_burst = true;
 
         if (nfds != 0) {
 	    int r;
@@ -672,7 +715,7 @@ void TaskMan::MainLoop() throw (GeneralException) {
                         // considering it already processed for this handle loop.
                         // It'll be run immediately next cycle.
 		        p->T->SetBurst();
-		        no_burst = 0;
+		        no_burst = false;
 		        p->dirty = true;
 		        q->fd = 0;
 		        q->events = 0;
-- 
cgit v1.2.3