threadpool.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. # Copyright (c) 2005 Allan Saddi <allan@saddi.com>
  2. # All rights reserved.
  3. #
  4. # Redistribution and use in source and binary forms, with or without
  5. # modification, are permitted provided that the following conditions
  6. # are met:
  7. # 1. Redistributions of source code must retain the above copyright
  8. # notice, this list of conditions and the following disclaimer.
  9. # 2. Redistributions in binary form must reproduce the above copyright
  10. # notice, this list of conditions and the following disclaimer in the
  11. # documentation and/or other materials provided with the distribution.
  12. #
  13. # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
  14. # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  15. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  16. # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
  17. # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  18. # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
  19. # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  20. # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  21. # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
  22. # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  23. # SUCH DAMAGE.
  24. #
  25. # $Id$
  26. __author__ = 'Allan Saddi <allan@saddi.com>'
  27. __version__ = '$Revision$'
  28. import sys
  29. import thread
  30. import threading
  31. class ThreadPool(object):
  32. """
  33. Thread pool that maintains the number of idle threads between
  34. minSpare and maxSpare inclusive. By default, there is no limit on
  35. the number of threads that can be started, but this can be controlled
  36. by maxThreads.
  37. """
  38. def __init__(self, minSpare=1, maxSpare=5, maxThreads=sys.maxint):
  39. self._minSpare = minSpare
  40. self._maxSpare = maxSpare
  41. self._maxThreads = max(minSpare, maxThreads)
  42. self._lock = threading.Condition()
  43. self._workQueue = []
  44. self._idleCount = self._workerCount = maxSpare
  45. # Start the minimum number of worker threads.
  46. for i in range(maxSpare):
  47. thread.start_new_thread(self._worker, ())
  48. def addJob(self, job, allowQueuing=True):
  49. """
  50. Adds a job to the work queue. The job object should have a run()
  51. method. If allowQueuing is True (the default), the job will be
  52. added to the work queue regardless if there are any idle threads
  53. ready. (The only way for there to be no idle threads is if maxThreads
  54. is some reasonable, finite limit.)
  55. Otherwise, if allowQueuing is False, and there are no more idle
  56. threads, the job will not be queued.
  57. Returns True if the job was queued, False otherwise.
  58. """
  59. self._lock.acquire()
  60. try:
  61. # Maintain minimum number of spares.
  62. while self._idleCount < self._minSpare and \
  63. self._workerCount < self._maxThreads:
  64. self._workerCount += 1
  65. self._idleCount += 1
  66. thread.start_new_thread(self._worker, ())
  67. # Hand off the job.
  68. if self._idleCount or allowQueuing:
  69. self._workQueue.append(job)
  70. self._lock.notify()
  71. return True
  72. else:
  73. return False
  74. finally:
  75. self._lock.release()
  76. def _worker(self):
  77. """
  78. Worker thread routine. Waits for a job, executes it, repeat.
  79. """
  80. self._lock.acquire()
  81. while True:
  82. while not self._workQueue:
  83. self._lock.wait()
  84. # We have a job to do...
  85. job = self._workQueue.pop(0)
  86. assert self._idleCount > 0
  87. self._idleCount -= 1
  88. self._lock.release()
  89. try:
  90. job.run()
  91. except:
  92. # FIXME: This should really be reported somewhere.
  93. # But we can't simply report it to stderr because of fcgi
  94. pass
  95. self._lock.acquire()
  96. if self._idleCount == self._maxSpare:
  97. break # NB: lock still held
  98. self._idleCount += 1
  99. assert self._idleCount <= self._maxSpare
  100. # Die off...
  101. assert self._workerCount > self._maxSpare
  102. self._workerCount -= 1
  103. self._lock.release()