task_queue.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. # -*- coding: utf-8 -*-
  2. import threading
  3. import sys
  4. import logging
  5. logger = logging.getLogger(__name__)
  6. try:
  7. import Queue as queue
  8. except ImportError:
  9. import queue
  10. import traceback
  11. class TaskQueue(object):
  12. def __init__(self, producer, consumers):
  13. self.__producer = producer
  14. self.__consumers = consumers
  15. self.__threads = []
  16. # must be an infinite queue, otherwise producer may be blocked after all consumers being dead.
  17. self.__queue = queue.Queue()
  18. self.__lock = threading.Lock()
  19. self.__exc_info = None
  20. self.__exc_stack = ''
  21. def run(self):
  22. self.__add_and_run(threading.Thread(target=self.__producer_func))
  23. for c in self.__consumers:
  24. self.__add_and_run(threading.Thread(target=self.__consumer_func, args=(c,)))
  25. # give KeyboardInterrupt chances to happen by joining with timeouts.
  26. while self.__any_active():
  27. for t in self.__threads:
  28. t.join(1)
  29. if self.__exc_info:
  30. logger.error('An exception was thrown by producer or consumer, backtrace: {0}'.format(self.__exc_stack))
  31. raise self.__exc_info[1]
  32. def put(self, data):
  33. assert data is not None
  34. self.__queue.put(data)
  35. def get(self):
  36. return self.__queue.get()
  37. def ok(self):
  38. with self.__lock:
  39. return self.__exc_info is None
  40. def __add_and_run(self, thread):
  41. thread.daemon = True
  42. thread.start()
  43. self.__threads.append(thread)
  44. def __any_active(self):
  45. return any(t.is_alive() for t in self.__threads)
  46. def __producer_func(self):
  47. try:
  48. self.__producer(self)
  49. except:
  50. self.__on_exception(sys.exc_info())
  51. self.__put_end()
  52. else:
  53. self.__put_end()
  54. def __consumer_func(self, consumer):
  55. try:
  56. consumer(self)
  57. except:
  58. self.__on_exception(sys.exc_info())
  59. def __put_end(self):
  60. for i in range(len(self.__consumers)):
  61. self.__queue.put(None)
  62. def __on_exception(self, exc_info):
  63. with self.__lock:
  64. if self.__exc_info is None:
  65. self.__exc_info = exc_info
  66. self.__exc_stack = traceback.format_exc()