WvStreams
wvsubprocqueue.cc
1 /*
2  * Worldvisions Weaver Software:
3  * Copyright (C) 1997-2002 Net Integration Technologies, Inc.
4  *
5  * A way to enqueue a series of WvSubProc objects. See wvsubprocqueue.h.
6  */
7 #include "wvsubprocqueue.h"
8 #include <unistd.h>
9 #include <assert.h>
10 
11 
12 WvSubProcQueue::WvSubProcQueue(unsigned _maxrunning)
13 {
14  maxrunning = _maxrunning;
15 }
16 
17 
18 WvSubProcQueue::~WvSubProcQueue()
19 {
20 }
21 
22 
23 void WvSubProcQueue::add(void *cookie, WvSubProc *proc)
24 {
25  assert(proc);
26  assert(!proc->running);
27  if (cookie)
28  {
29  // search for other enqueued objects with this cookie
30  EntList::Iter i(waitq);
31  for (i.rewind(); i.next(); )
32  {
33  if (i->cookie == cookie)
34  {
35  // already enqueued; mark it as "redo" unless it's already
36  // the last one. That way we guarantee it'll still run
37  // in the future from now, and it'll come later than anything
38  // else in the queue, but it won't pointlessly run twice at
39  // the end.
40  Ent *e = i.ptr();
41  if (i.next())
42  e->redo = true;
43  delete proc;
44  return;
45  }
46  }
47  }
48 
49  waitq.append(new Ent(cookie, proc), true);
50 }
51 
52 
53 void WvSubProcQueue::add(void *cookie,
54  const char *cmd, const char * const *argv)
55 {
56  WvSubProc *p = new WvSubProc;
57  p->preparev(cmd, argv);
58  add(cookie, p);
59 }
60 
61 
62 bool WvSubProcQueue::cookie_running()
63 {
64  EntList::Iter i(runq);
65  for (i.rewind(); i.next(); )
66  if (i->cookie)
67  return true;
68  return false;
69 }
70 
71 
73 {
74  int started = 0;
75 
76  //fprintf(stderr, "go: %d waiting, %d running\n",
77  // waitq.count(), runq.count());
78 
79  // first we need to clean up any finished processes
80  {
81  EntList::Iter i(runq);
82  for (i.rewind(); i.next(); )
83  {
84  Ent *e = i.ptr();
85 
86  e->proc->wait(0, true);
87  if (!e->proc->running)
88  {
89  if (e->redo)
90  {
91  // someone re-enqueued this task while it was
92  // waiting/running
93  e->redo = false;
94  i.xunlink(false);
95  waitq.append(e, true);
96  }
97  else
98  i.xunlink();
99  }
100  }
101  }
102 
103  while (!waitq.isempty() && runq.count() < maxrunning)
104  {
105  EntList::Iter i(waitq);
106  for (i.rewind(); i.next(); )
107  {
108  // elements with cookies are "sync points" in the queue;
109  // they guarantee that everything before that point has
110  // finished running before they run, and don't let anything
111  // after them run until they've finished.
112  if (i->cookie && !runq.isempty())
113  goto out;
114  if (cookie_running())
115  goto out;
116 
117  // jump it into the running queue, but be careful not to
118  // delete the object when removing!
119  Ent *e = i.ptr();
120  i.xunlink(false);
121  runq.append(e, true);
122  e->proc->start_again();
123  started++;
124  break;
125  }
126  }
127 
128 out:
129  assert(runq.count() <= maxrunning);
130  return started;
131 }
132 
133 
134 unsigned WvSubProcQueue::running() const
135 {
136  return runq.count();
137 }
138 
139 
141 {
142  return runq.count() + waitq.count();
143 }
144 
145 
147 {
148  return runq.isempty() && waitq.isempty();
149 }
150 
151 
153 {
154  while (!isempty())
155  {
156  go();
157  if (!isempty())
158  usleep(100*1000);
159  }
160 }
unsigned running() const
Return the number of currently running processes.
unsigned remaining() const
Return the number of unfinished (ie. running or waiting) processes.
int go()
Clean up after any running processes in the queue, and start running additional processes if any are ...
void add(void *cookie, WvSubProc *proc)
Enqueue a process.
WvSubProcQueue(unsigned _maxrunning)
Create a WvSubProcQueue.
bool isempty() const
True if there are no unfinished (ie. running or waiting) processes.
void finish()
Wait synchronously for all processes in the entire queue to finish.