Package SCons :: Module Job
[hide private]
[frames] | no frames]

Source Code for Module SCons.Job

  1  """SCons.Job 
  2   
  3  This module defines the Serial and Parallel classes that execute tasks to 
  4  complete a build. The Jobs class provides a higher level interface to start, 
  5  stop, and wait on jobs. 
  6   
  7  """ 
  8   
  9  # 
 10  # Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008 The SCons Foundation 
 11  # 
 12  # Permission is hereby granted, free of charge, to any person obtaining 
 13  # a copy of this software and associated documentation files (the 
 14  # "Software"), to deal in the Software without restriction, including 
 15  # without limitation the rights to use, copy, modify, merge, publish, 
 16  # distribute, sublicense, and/or sell copies of the Software, and to 
 17  # permit persons to whom the Software is furnished to do so, subject to 
 18  # the following conditions: 
 19  # 
 20  # The above copyright notice and this permission notice shall be included 
 21  # in all copies or substantial portions of the Software. 
 22  # 
 23  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY 
 24  # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE 
 25  # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 
 26  # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE 
 27  # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION 
 28  # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 
 29  # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 
 30  # 
 31   
 32  __revision__ = "src/engine/SCons/Job.py 2725 2008/03/31 12:52:02 knight" 
 33   
 34  import SCons.compat 
 35   
 36   
 37  # The default stack size (in kilobytes) of the threads used to execute 
 38  # jobs in parallel. 
 39  # 
 40  # We use a stack size of 256 kilobytes. The default on some platforms 
 41  # is too large and prevents us from creating enough threads to fully 
 42  # parallelized the build. For example, the default stack size on linux 
 43  # is 8 MBytes. 
 44   
 45  default_stack_size = 256 
 46   
 47   
48 -class Jobs:
49 """An instance of this class initializes N jobs, and provides 50 methods for starting, stopping, and waiting on all N jobs. 51 """ 52
53 - def __init__(self, num, taskmaster):
54 """ 55 create 'num' jobs using the given taskmaster. 56 57 If 'num' is 1 or less, then a serial job will be used, 58 otherwise a parallel job with 'num' worker threads will 59 be used. 60 61 The 'num_jobs' attribute will be set to the actual number of jobs 62 allocated. If more than one job is requested but the Parallel 63 class can't do it, it gets reset to 1. Wrapping interfaces that 64 care should check the value of 'num_jobs' after initialization. 65 """ 66 67 self.job = None 68 if num > 1: 69 try: 70 stack_size = SCons.Job.stack_size 71 except AttributeError: 72 stack_size = default_stack_size 73 74 try: 75 self.job = Parallel(taskmaster, num, stack_size) 76 self.num_jobs = num 77 except NameError: 78 pass 79 if self.job is None: 80 self.job = Serial(taskmaster) 81 self.num_jobs = 1
82
83 - def run(self):
84 """run the job""" 85 try: 86 self.job.start() 87 except KeyboardInterrupt: 88 # mask any further keyboard interrupts so that scons 89 # can shutdown cleanly: 90 # (this only masks the keyboard interrupt for Python, 91 # child processes can still get the keyboard interrupt) 92 import signal 93 signal.signal(signal.SIGINT, signal.SIG_IGN) 94 raise
95
96 - def cleanup(self):
97 self.job.cleanup()
98
99 -class Serial:
100 """This class is used to execute tasks in series, and is more efficient 101 than Parallel, but is only appropriate for non-parallel builds. Only 102 one instance of this class should be in existence at a time. 103 104 This class is not thread safe. 105 """ 106
107 - def __init__(self, taskmaster):
108 """Create a new serial job given a taskmaster. 109 110 The taskmaster's next_task() method should return the next task 111 that needs to be executed, or None if there are no more tasks. The 112 taskmaster's executed() method will be called for each task when it 113 is successfully executed or failed() will be called if it failed to 114 execute (e.g. execute() raised an exception).""" 115 116 self.taskmaster = taskmaster
117
118 - def start(self):
119 """Start the job. This will begin pulling tasks from the taskmaster 120 and executing them, and return when there are no more tasks. If a task 121 fails to execute (i.e. execute() raises an exception), then the job will 122 stop.""" 123 124 while 1: 125 task = self.taskmaster.next_task() 126 127 if task is None: 128 break 129 130 try: 131 task.prepare() 132 task.execute() 133 except KeyboardInterrupt: 134 raise 135 except: 136 task.exception_set() 137 # Let the failed() callback function arrange for the 138 # build to stop if that's appropriate. 139 task.failed() 140 else: 141 task.executed() 142 143 task.postprocess()
144
145 - def cleanup(self):
146 pass
147 148 # Trap import failure so that everything in the Job module but the 149 # Parallel class (and its dependent classes) will work if the interpreter 150 # doesn't support threads. 151 try: 152 import Queue 153 import threading 154 except ImportError: 155 pass 156 else:
157 - class Worker(threading.Thread):
158 """A worker thread waits on a task to be posted to its request queue, 159 dequeues the task, executes it, and posts a tuple including the task 160 and a boolean indicating whether the task executed successfully. """ 161
162 - def __init__(self, requestQueue, resultsQueue):
163 threading.Thread.__init__(self) 164 self.setDaemon(1) 165 self.requestQueue = requestQueue 166 self.resultsQueue = resultsQueue 167 self.start()
168
169 - def run(self):
170 while 1: 171 task = self.requestQueue.get() 172 173 if not task: 174 # The "None" value is used as a sentinel by 175 # ThreadPool.cleanup(). This indicates that there 176 # are no more tasks, so we should quit. 177 break 178 179 try: 180 task.execute() 181 except KeyboardInterrupt: 182 # be explicit here for test/interrupts.py 183 ok = False 184 except: 185 task.exception_set() 186 ok = False 187 else: 188 ok = True 189 190 self.resultsQueue.put((task, ok))
191
192 - class ThreadPool:
193 """This class is responsible for spawning and managing worker threads.""" 194
195 - def __init__(self, num, stack_size):
196 """Create the request and reply queues, and 'num' worker threads. 197 198 One must specify the stack size of the worker threads. The 199 stack size is specified in kilobytes. 200 """ 201 self.requestQueue = Queue.Queue(0) 202 self.resultsQueue = Queue.Queue(0) 203 204 try: 205 prev_size = threading.stack_size(stack_size*1024) 206 except AttributeError, e: 207 # Only print a warning if the stack size has been 208 # explicitely set. 209 if hasattr(SCons.Job, 'stack_size'): 210 msg = "Setting stack size is unsupported by this version of Python:\n " + \ 211 e.args[0] 212 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) 213 except ValueError, e: 214 msg = "Setting stack size failed:\n " + \ 215 e.message 216 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg) 217 218 # Create worker threads 219 self.workers = [] 220 for _ in range(num): 221 worker = Worker(self.requestQueue, self.resultsQueue) 222 self.workers.append(worker) 223 224 # Once we drop Python 1.5 we can change the following to: 225 #if 'prev_size' in locals(): 226 if 'prev_size' in locals().keys(): 227 threading.stack_size(prev_size)
228
229 - def put(self, obj):
230 """Put task into request queue.""" 231 self.requestQueue.put(obj)
232
233 - def get(self, block = True):
234 """Remove and return a result tuple from the results queue.""" 235 return self.resultsQueue.get(block)
236
237 - def preparation_failed(self, obj):
238 self.resultsQueue.put((obj, False))
239
240 - def cleanup(self):
241 """ 242 Shuts down the thread pool, giving each worker thread a 243 chance to shut down gracefully. 244 """ 245 # For each worker thread, put a sentinel "None" value 246 # on the requestQueue (indicating that there's no work 247 # to be done) so that each worker thread will get one and 248 # terminate gracefully. 249 for _ in self.workers: 250 self.requestQueue.put(None) 251 252 # Wait for all of the workers to terminate. 253 # 254 # If we don't do this, later Python versions (2.4, 2.5) often 255 # seem to raise exceptions during shutdown. This happens 256 # in requestQueue.get(), as an assertion failure that 257 # requestQueue.not_full is notified while not acquired, 258 # seemingly because the main thread has shut down (or is 259 # in the process of doing so) while the workers are still 260 # trying to pull sentinels off the requestQueue. 261 # 262 # Normally these terminations should happen fairly quickly, 263 # but we'll stick a one-second timeout on here just in case 264 # someone gets hung. 265 for worker in self.workers: 266 worker.join(1.0) 267 self.workers = []
268
269 - class Parallel:
270 """This class is used to execute tasks in parallel, and is somewhat 271 less efficient than Serial, but is appropriate for parallel builds. 272 273 This class is thread safe. 274 """ 275
276 - def __init__(self, taskmaster, num, stack_size):
277 """Create a new parallel job given a taskmaster. 278 279 The taskmaster's next_task() method should return the next 280 task that needs to be executed, or None if there are no more 281 tasks. The taskmaster's executed() method will be called 282 for each task when it is successfully executed or failed() 283 will be called if the task failed to execute (i.e. execute() 284 raised an exception). 285 286 Note: calls to taskmaster are serialized, but calls to 287 execute() on distinct tasks are not serialized, because 288 that is the whole point of parallel jobs: they can execute 289 multiple tasks simultaneously. """ 290 291 self.taskmaster = taskmaster 292 self.tp = ThreadPool(num, stack_size) 293 294 self.maxjobs = num
295
296 - def start(self):
297 """Start the job. This will begin pulling tasks from the 298 taskmaster and executing them, and return when there are no 299 more tasks. If a task fails to execute (i.e. execute() raises 300 an exception), then the job will stop.""" 301 302 jobs = 0 303 304 while 1: 305 # Start up as many available tasks as we're 306 # allowed to. 307 while jobs < self.maxjobs: 308 task = self.taskmaster.next_task() 309 if task is None: 310 break 311 312 # prepare task for execution 313 try: 314 task.prepare() 315 except KeyboardInterrupt: 316 raise 317 except: 318 # Let the failed() callback function arrange 319 # for the build to stop if that's appropriate. 320 task.exception_set() 321 self.tp.preparation_failed(task) 322 jobs = jobs + 1 323 continue 324 325 # dispatch task 326 self.tp.put(task) 327 jobs = jobs + 1 328 329 if not task and not jobs: break 330 331 # Let any/all completed tasks finish up before we go 332 # back and put the next batch of tasks on the queue. 333 while 1: 334 task, ok = self.tp.get() 335 336 jobs = jobs - 1 337 if ok: 338 task.executed() 339 else: 340 task.failed() 341 342 task.postprocess() 343 344 if self.tp.resultsQueue.empty(): 345 break
346
347 - def cleanup(self):
348 self.tp.cleanup()
349