123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236 |
- """
- Sliding-window-based job/task queue class (& example of use.)
- May use ``multiprocessing.Process`` or ``threading.Thread`` objects as queue
- items, though within Fabric itself only ``Process`` objects are used/supported.
- """
- from __future__ import with_statement
- import time
- import Queue
- from multiprocessing import Process
- from fabric.network import ssh
- from fabric.context_managers import settings
- class JobQueue(object):
- """
- The goal of this class is to make a queue of processes to run, and go
- through them running X number at any given time.
- So if the bubble is 5 start with 5 running and move the bubble of running
- procs along the queue looking something like this:
- Start
- ...........................
- [~~~~~]....................
- ___[~~~~~].................
- _________[~~~~~]...........
- __________________[~~~~~]..
- ____________________[~~~~~]
- ___________________________
- End
- """
- def __init__(self, max_running, comms_queue):
- """
- Setup the class to resonable defaults.
- """
- self._queued = []
- self._running = []
- self._completed = []
- self._num_of_jobs = 0
- self._max = max_running
- self._comms_queue = comms_queue
- self._finished = False
- self._closed = False
- self._debug = False
- def _all_alive(self):
- """
- Simply states if all procs are alive or not. Needed to determine when
- to stop looping, and pop dead procs off and add live ones.
- """
- if self._running:
- return all([x.is_alive() for x in self._running])
- else:
- return False
- def __len__(self):
- """
- Just going to use number of jobs as the JobQueue length.
- """
- return self._num_of_jobs
- def close(self):
- """
- A sanity check, so that the need to care about new jobs being added in
- the last throws of the job_queue's run are negated.
- """
- if self._debug:
- print("job queue closed.")
- self._closed = True
- def append(self, process):
- """
- Add the Process() to the queue, so that later it can be checked up on.
- That is if the JobQueue is still open.
- If the queue is closed, this will just silently do nothing.
- To get data back out of this process, give ``process`` access to a
- ``multiprocessing.Queue`` object, and give it here as ``queue``. Then
- ``JobQueue.run`` will include the queue's contents in its return value.
- """
- if not self._closed:
- self._queued.append(process)
- self._num_of_jobs += 1
- if self._debug:
- print("job queue appended %s." % process.name)
- def run(self):
- """
- This is the workhorse. It will take the intial jobs from the _queue,
- start them, add them to _running, and then go into the main running
- loop.
- This loop will check for done procs, if found, move them out of
- _running into _completed. It also checks for a _running queue with open
- spots, which it will then fill as discovered.
- To end the loop, there have to be no running procs, and no more procs
- to be run in the queue.
- This function returns an iterable of all its children's exit codes.
- """
- def _advance_the_queue():
- """
- Helper function to do the job of poping a new proc off the queue
- start it, then add it to the running queue. This will eventually
- depleate the _queue, which is a condition of stopping the running
- while loop.
- It also sets the env.host_string from the job.name, so that fabric
- knows that this is the host to be making connections on.
- """
- job = self._queued.pop()
- if self._debug:
- print("Popping '%s' off the queue and starting it" % job.name)
- with settings(clean_revert=True, host_string=job.name, host=job.name):
- job.start()
- self._running.append(job)
- # Prep return value so we can start filling it during main loop
- results = {}
- for job in self._queued:
- results[job.name] = dict.fromkeys(('exit_code', 'results'))
- if not self._closed:
- raise Exception("Need to close() before starting.")
- if self._debug:
- print("Job queue starting.")
- while len(self._running) < self._max:
- _advance_the_queue()
- # Main loop!
- while not self._finished:
- while len(self._running) < self._max and self._queued:
- _advance_the_queue()
- if not self._all_alive():
- for id, job in enumerate(self._running):
- if not job.is_alive():
- if self._debug:
- print("Job queue found finished proc: %s." %
- job.name)
- done = self._running.pop(id)
- self._completed.append(done)
- if self._debug:
- print("Job queue has %d running." % len(self._running))
- if not (self._queued or self._running):
- if self._debug:
- print("Job queue finished.")
- for job in self._completed:
- job.join()
- self._finished = True
- # Each loop pass, try pulling results off the queue to keep its
- # size down. At this point, we don't actually care if any results
- # have arrived yet; they will be picked up after the main loop.
- self._fill_results(results)
- time.sleep(ssh.io_sleep)
- # Consume anything left in the results queue. Note that there is no
- # need to block here, as the main loop ensures that all workers will
- # already have finished.
- self._fill_results(results)
- # Attach exit codes now that we're all done & have joined all jobs
- for job in self._completed:
- if isinstance(job, Process):
- results[job.name]['exit_code'] = job.exitcode
- return results
- def _fill_results(self, results):
- """
- Attempt to pull data off self._comms_queue and add to 'results' dict.
- If no data is available (i.e. the queue is empty), bail immediately.
- """
- while True:
- try:
- datum = self._comms_queue.get_nowait()
- results[datum['name']]['results'] = datum['result']
- except Queue.Empty:
- break
- #### Sample
- def try_using(parallel_type):
- """
- This will run the queue through it's paces, and show a simple way of using
- the job queue.
- """
- def print_number(number):
- """
- Simple function to give a simple task to execute.
- """
- print(number)
- if parallel_type == "multiprocessing":
- from multiprocessing import Process as Bucket
- elif parallel_type == "threading":
- from threading import Thread as Bucket
- # Make a job_queue with a bubble of len 5, and have it print verbosely
- queue = Queue.Queue()
- jobs = JobQueue(5, queue)
- jobs._debug = True
- # Add 20 procs onto the stack
- for x in range(20):
- jobs.append(Bucket(
- target=print_number,
- args=[x],
- kwargs={},
- ))
- # Close up the queue and then start it's execution
- jobs.close()
- jobs.run()
- if __name__ == '__main__':
- try_using("multiprocessing")
- try_using("threading")
|