job_queue.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. """
  2. Sliding-window-based job/task queue class (& example of use.)
  3. May use ``multiprocessing.Process`` or ``threading.Thread`` objects as queue
  4. items, though within Fabric itself only ``Process`` objects are used/supported.
  5. """
  6. from __future__ import with_statement
  7. import time
  8. import Queue
  9. from multiprocessing import Process
  10. from fabric.network import ssh
  11. from fabric.context_managers import settings
  12. class JobQueue(object):
  13. """
  14. The goal of this class is to make a queue of processes to run, and go
  15. through them running X number at any given time.
  16. So if the bubble is 5 start with 5 running and move the bubble of running
  17. procs along the queue looking something like this:
  18. Start
  19. ...........................
  20. [~~~~~]....................
  21. ___[~~~~~].................
  22. _________[~~~~~]...........
  23. __________________[~~~~~]..
  24. ____________________[~~~~~]
  25. ___________________________
  26. End
  27. """
  28. def __init__(self, max_running, comms_queue):
  29. """
  30. Setup the class to resonable defaults.
  31. """
  32. self._queued = []
  33. self._running = []
  34. self._completed = []
  35. self._num_of_jobs = 0
  36. self._max = max_running
  37. self._comms_queue = comms_queue
  38. self._finished = False
  39. self._closed = False
  40. self._debug = False
  41. def _all_alive(self):
  42. """
  43. Simply states if all procs are alive or not. Needed to determine when
  44. to stop looping, and pop dead procs off and add live ones.
  45. """
  46. if self._running:
  47. return all([x.is_alive() for x in self._running])
  48. else:
  49. return False
  50. def __len__(self):
  51. """
  52. Just going to use number of jobs as the JobQueue length.
  53. """
  54. return self._num_of_jobs
  55. def close(self):
  56. """
  57. A sanity check, so that the need to care about new jobs being added in
  58. the last throws of the job_queue's run are negated.
  59. """
  60. if self._debug:
  61. print("job queue closed.")
  62. self._closed = True
  63. def append(self, process):
  64. """
  65. Add the Process() to the queue, so that later it can be checked up on.
  66. That is if the JobQueue is still open.
  67. If the queue is closed, this will just silently do nothing.
  68. To get data back out of this process, give ``process`` access to a
  69. ``multiprocessing.Queue`` object, and give it here as ``queue``. Then
  70. ``JobQueue.run`` will include the queue's contents in its return value.
  71. """
  72. if not self._closed:
  73. self._queued.append(process)
  74. self._num_of_jobs += 1
  75. if self._debug:
  76. print("job queue appended %s." % process.name)
  77. def run(self):
  78. """
  79. This is the workhorse. It will take the intial jobs from the _queue,
  80. start them, add them to _running, and then go into the main running
  81. loop.
  82. This loop will check for done procs, if found, move them out of
  83. _running into _completed. It also checks for a _running queue with open
  84. spots, which it will then fill as discovered.
  85. To end the loop, there have to be no running procs, and no more procs
  86. to be run in the queue.
  87. This function returns an iterable of all its children's exit codes.
  88. """
  89. def _advance_the_queue():
  90. """
  91. Helper function to do the job of poping a new proc off the queue
  92. start it, then add it to the running queue. This will eventually
  93. depleate the _queue, which is a condition of stopping the running
  94. while loop.
  95. It also sets the env.host_string from the job.name, so that fabric
  96. knows that this is the host to be making connections on.
  97. """
  98. job = self._queued.pop()
  99. if self._debug:
  100. print("Popping '%s' off the queue and starting it" % job.name)
  101. with settings(clean_revert=True, host_string=job.name, host=job.name):
  102. job.start()
  103. self._running.append(job)
  104. # Prep return value so we can start filling it during main loop
  105. results = {}
  106. for job in self._queued:
  107. results[job.name] = dict.fromkeys(('exit_code', 'results'))
  108. if not self._closed:
  109. raise Exception("Need to close() before starting.")
  110. if self._debug:
  111. print("Job queue starting.")
  112. while len(self._running) < self._max:
  113. _advance_the_queue()
  114. # Main loop!
  115. while not self._finished:
  116. while len(self._running) < self._max and self._queued:
  117. _advance_the_queue()
  118. if not self._all_alive():
  119. for id, job in enumerate(self._running):
  120. if not job.is_alive():
  121. if self._debug:
  122. print("Job queue found finished proc: %s." %
  123. job.name)
  124. done = self._running.pop(id)
  125. self._completed.append(done)
  126. if self._debug:
  127. print("Job queue has %d running." % len(self._running))
  128. if not (self._queued or self._running):
  129. if self._debug:
  130. print("Job queue finished.")
  131. for job in self._completed:
  132. job.join()
  133. self._finished = True
  134. # Each loop pass, try pulling results off the queue to keep its
  135. # size down. At this point, we don't actually care if any results
  136. # have arrived yet; they will be picked up after the main loop.
  137. self._fill_results(results)
  138. time.sleep(ssh.io_sleep)
  139. # Consume anything left in the results queue. Note that there is no
  140. # need to block here, as the main loop ensures that all workers will
  141. # already have finished.
  142. self._fill_results(results)
  143. # Attach exit codes now that we're all done & have joined all jobs
  144. for job in self._completed:
  145. if isinstance(job, Process):
  146. results[job.name]['exit_code'] = job.exitcode
  147. return results
  148. def _fill_results(self, results):
  149. """
  150. Attempt to pull data off self._comms_queue and add to 'results' dict.
  151. If no data is available (i.e. the queue is empty), bail immediately.
  152. """
  153. while True:
  154. try:
  155. datum = self._comms_queue.get_nowait()
  156. results[datum['name']]['results'] = datum['result']
  157. except Queue.Empty:
  158. break
  159. #### Sample
  160. def try_using(parallel_type):
  161. """
  162. This will run the queue through it's paces, and show a simple way of using
  163. the job queue.
  164. """
  165. def print_number(number):
  166. """
  167. Simple function to give a simple task to execute.
  168. """
  169. print(number)
  170. if parallel_type == "multiprocessing":
  171. from multiprocessing import Process as Bucket
  172. elif parallel_type == "threading":
  173. from threading import Thread as Bucket
  174. # Make a job_queue with a bubble of len 5, and have it print verbosely
  175. queue = Queue.Queue()
  176. jobs = JobQueue(5, queue)
  177. jobs._debug = True
  178. # Add 20 procs onto the stack
  179. for x in range(20):
  180. jobs.append(Bucket(
  181. target=print_number,
  182. args=[x],
  183. kwargs={},
  184. ))
  185. # Close up the queue and then start it's execution
  186. jobs.close()
  187. jobs.run()
  188. if __name__ == '__main__':
  189. try_using("multiprocessing")
  190. try_using("threading")