threadpool.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. """
  2. Thread pool job processor with variable number of worker threads (between max/min amount).
  3. Pyro - Python Remote Objects. Copyright by Irmen de Jong (irmen@razorvine.net).
  4. """
  5. from __future__ import with_statement
  6. import time
  7. import logging
  8. import Pyro4.threadutil
  9. import Pyro4.util
  10. __all__ = ["PoolError", "NoFreeWorkersError", "Pool"]
  11. log = logging.getLogger("Pyro4.threadpool")
  12. class PoolError(Exception):
  13. pass
  14. class NoFreeWorkersError(PoolError):
  15. pass
  16. class Worker(Pyro4.threadutil.Thread):
  17. def __init__(self, pool):
  18. super(Worker, self).__init__()
  19. self.daemon = True
  20. self.name = "Pyro-Worker-%d " % id(self)
  21. self.job_available = Pyro4.threadutil.Event()
  22. self.job = None
  23. self.pool = pool
  24. def process(self, job):
  25. self.job = job
  26. self.job_available.set()
  27. def run(self):
  28. while True:
  29. self.job_available.wait()
  30. self.job_available.clear()
  31. if self.job is None:
  32. break
  33. try:
  34. self.job()
  35. except Exception:
  36. log.exception("unhandled exception from job in worker thread %s: %s", self.name)
  37. self.job = None
  38. self.pool.notify_done(self)
  39. self.pool = None
  40. class Pool(object):
  41. """
  42. A job processing pool that is using a pool of worker threads.
  43. The amount of worker threads in the pool is configurable and scales between min/max size.
  44. """
  45. def __init__(self):
  46. if Pyro4.config.THREADPOOL_SIZE < 1 or Pyro4.config.THREADPOOL_SIZE_MIN < 1:
  47. raise ValueError("threadpool sizes must be greater than zero")
  48. if Pyro4.config.THREADPOOL_SIZE_MIN > Pyro4.config.THREADPOOL_SIZE:
  49. raise ValueError("minimum threadpool size must be less than or equal to max size")
  50. self.idle = set()
  51. self.busy = set()
  52. self.closed = False
  53. for _ in range(Pyro4.config.THREADPOOL_SIZE_MIN):
  54. worker = Worker(self)
  55. self.idle.add(worker)
  56. worker.start()
  57. log.debug("worker pool created with initial size %d", self.num_workers())
  58. self.count_lock = Pyro4.threadutil.Lock()
  59. def __enter__(self):
  60. return self
  61. def __exit__(self, exc_type, exc_val, exc_tb):
  62. self.close()
  63. def close(self):
  64. if not self.closed:
  65. log.debug("closing down")
  66. for w in self.busy:
  67. w.process(None)
  68. for w in self.idle:
  69. w.process(None)
  70. self.closed = True
  71. time.sleep(0.1)
  72. idle, self.idle = self.idle, set()
  73. busy, self.busy = self.busy, set()
  74. # check if the threads that are joined are not the current thread,
  75. # otherwise Python 2.x crashes with "cannot join current thread".
  76. current_thread = Pyro4.threadutil.current_thread()
  77. while idle:
  78. p = idle.pop()
  79. if p is not current_thread:
  80. p.join(timeout=0.1)
  81. while busy:
  82. p = busy.pop()
  83. if p is not current_thread:
  84. p.join(timeout=0.1)
  85. def __repr__(self):
  86. return "<%s.%s at 0x%x; %d busy workers; %d idle workers>" % \
  87. (self.__class__.__module__, self.__class__.__name__, id(self), len(self.busy), len(self.idle))
  88. def num_workers(self):
  89. return len(self.busy) + len(self.idle)
  90. def process(self, job):
  91. if self.closed:
  92. raise PoolError("job queue is closed")
  93. if self.idle:
  94. worker = self.idle.pop()
  95. elif self.num_workers() < Pyro4.config.THREADPOOL_SIZE:
  96. worker = Worker(self)
  97. worker.start()
  98. else:
  99. raise NoFreeWorkersError("no free workers available, increase thread pool size")
  100. self.busy.add(worker)
  101. worker.process(job)
  102. log.debug("worker counts: %d busy, %d idle", len(self.busy), len(self.idle))
  103. def notify_done(self, worker):
  104. if worker in self.busy:
  105. self.busy.remove(worker)
  106. if self.closed:
  107. worker.process(None)
  108. return
  109. if len(self.idle) >= Pyro4.config.THREADPOOL_SIZE_MIN:
  110. worker.process(None)
  111. else:
  112. self.idle.add(worker)
  113. log.debug("worker counts: %d busy, %d idle", len(self.busy), len(self.idle))