1 """SCons.Job
2
3 This module defines the Serial and Parallel classes that execute tasks to
4 complete a build. The Jobs class provides a higher level interface to start,
5 stop, and wait on jobs.
6
7 """
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 __revision__ = "src/engine/SCons/Job.py 2725 2008/03/31 12:52:02 knight"
33
34 import SCons.compat
35
36
37
38
39
40
41
42
43
44
45 default_stack_size = 256
46
47
49 """An instance of this class initializes N jobs, and provides
50 methods for starting, stopping, and waiting on all N jobs.
51 """
52
54 """
55 create 'num' jobs using the given taskmaster.
56
57 If 'num' is 1 or less, then a serial job will be used,
58 otherwise a parallel job with 'num' worker threads will
59 be used.
60
61 The 'num_jobs' attribute will be set to the actual number of jobs
62 allocated. If more than one job is requested but the Parallel
63 class can't do it, it gets reset to 1. Wrapping interfaces that
64 care should check the value of 'num_jobs' after initialization.
65 """
66
67 self.job = None
68 if num > 1:
69 try:
70 stack_size = SCons.Job.stack_size
71 except AttributeError:
72 stack_size = default_stack_size
73
74 try:
75 self.job = Parallel(taskmaster, num, stack_size)
76 self.num_jobs = num
77 except NameError:
78 pass
79 if self.job is None:
80 self.job = Serial(taskmaster)
81 self.num_jobs = 1
82
84 """run the job"""
85 try:
86 self.job.start()
87 except KeyboardInterrupt:
88
89
90
91
92 import signal
93 signal.signal(signal.SIGINT, signal.SIG_IGN)
94 raise
95
98
100 """This class is used to execute tasks in series, and is more efficient
101 than Parallel, but is only appropriate for non-parallel builds. Only
102 one instance of this class should be in existence at a time.
103
104 This class is not thread safe.
105 """
106
108 """Create a new serial job given a taskmaster.
109
110 The taskmaster's next_task() method should return the next task
111 that needs to be executed, or None if there are no more tasks. The
112 taskmaster's executed() method will be called for each task when it
113 is successfully executed or failed() will be called if it failed to
114 execute (e.g. execute() raised an exception)."""
115
116 self.taskmaster = taskmaster
117
119 """Start the job. This will begin pulling tasks from the taskmaster
120 and executing them, and return when there are no more tasks. If a task
121 fails to execute (i.e. execute() raises an exception), then the job will
122 stop."""
123
124 while 1:
125 task = self.taskmaster.next_task()
126
127 if task is None:
128 break
129
130 try:
131 task.prepare()
132 task.execute()
133 except KeyboardInterrupt:
134 raise
135 except:
136 task.exception_set()
137
138
139 task.failed()
140 else:
141 task.executed()
142
143 task.postprocess()
144
147
148
149
150
151 try:
152 import Queue
153 import threading
154 except ImportError:
155 pass
156 else:
157 - class Worker(threading.Thread):
158 """A worker thread waits on a task to be posted to its request queue,
159 dequeues the task, executes it, and posts a tuple including the task
160 and a boolean indicating whether the task executed successfully. """
161
162 - def __init__(self, requestQueue, resultsQueue):
163 threading.Thread.__init__(self)
164 self.setDaemon(1)
165 self.requestQueue = requestQueue
166 self.resultsQueue = resultsQueue
167 self.start()
168
170 while 1:
171 task = self.requestQueue.get()
172
173 if not task:
174
175
176
177 break
178
179 try:
180 task.execute()
181 except KeyboardInterrupt:
182
183 ok = False
184 except:
185 task.exception_set()
186 ok = False
187 else:
188 ok = True
189
190 self.resultsQueue.put((task, ok))
191
193 """This class is responsible for spawning and managing worker threads."""
194
196 """Create the request and reply queues, and 'num' worker threads.
197
198 One must specify the stack size of the worker threads. The
199 stack size is specified in kilobytes.
200 """
201 self.requestQueue = Queue.Queue(0)
202 self.resultsQueue = Queue.Queue(0)
203
204 try:
205 prev_size = threading.stack_size(stack_size*1024)
206 except AttributeError, e:
207
208
209 if hasattr(SCons.Job, 'stack_size'):
210 msg = "Setting stack size is unsupported by this version of Python:\n " + \
211 e.args[0]
212 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
213 except ValueError, e:
214 msg = "Setting stack size failed:\n " + \
215 e.message
216 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
217
218
219 self.workers = []
220 for _ in range(num):
221 worker = Worker(self.requestQueue, self.resultsQueue)
222 self.workers.append(worker)
223
224
225
226 if 'prev_size' in locals().keys():
227 threading.stack_size(prev_size)
228
229 - def put(self, obj):
230 """Put task into request queue."""
231 self.requestQueue.put(obj)
232
234 """Remove and return a result tuple from the results queue."""
235 return self.resultsQueue.get(block)
236
239
241 """
242 Shuts down the thread pool, giving each worker thread a
243 chance to shut down gracefully.
244 """
245
246
247
248
249 for _ in self.workers:
250 self.requestQueue.put(None)
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265 for worker in self.workers:
266 worker.join(1.0)
267 self.workers = []
268
270 """This class is used to execute tasks in parallel, and is somewhat
271 less efficient than Serial, but is appropriate for parallel builds.
272
273 This class is thread safe.
274 """
275
276 - def __init__(self, taskmaster, num, stack_size):
277 """Create a new parallel job given a taskmaster.
278
279 The taskmaster's next_task() method should return the next
280 task that needs to be executed, or None if there are no more
281 tasks. The taskmaster's executed() method will be called
282 for each task when it is successfully executed or failed()
283 will be called if the task failed to execute (i.e. execute()
284 raised an exception).
285
286 Note: calls to taskmaster are serialized, but calls to
287 execute() on distinct tasks are not serialized, because
288 that is the whole point of parallel jobs: they can execute
289 multiple tasks simultaneously. """
290
291 self.taskmaster = taskmaster
292 self.tp = ThreadPool(num, stack_size)
293
294 self.maxjobs = num
295
297 """Start the job. This will begin pulling tasks from the
298 taskmaster and executing them, and return when there are no
299 more tasks. If a task fails to execute (i.e. execute() raises
300 an exception), then the job will stop."""
301
302 jobs = 0
303
304 while 1:
305
306
307 while jobs < self.maxjobs:
308 task = self.taskmaster.next_task()
309 if task is None:
310 break
311
312
313 try:
314 task.prepare()
315 except KeyboardInterrupt:
316 raise
317 except:
318
319
320 task.exception_set()
321 self.tp.preparation_failed(task)
322 jobs = jobs + 1
323 continue
324
325
326 self.tp.put(task)
327 jobs = jobs + 1
328
329 if not task and not jobs: break
330
331
332
333 while 1:
334 task, ok = self.tp.get()
335
336 jobs = jobs - 1
337 if ok:
338 task.executed()
339 else:
340 task.failed()
341
342 task.postprocess()
343
344 if self.tp.resultsQueue.empty():
345 break
346
349