test_threads.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. from __future__ import absolute_import
  2. from mock import Mock
  3. from celery.concurrency.threads import NullDict, TaskPool, apply_target
  4. from celery.tests.case import AppCase, Case, mask_modules, mock_module
  5. class test_NullDict(Case):
  6. def test_setitem(self):
  7. x = NullDict()
  8. x['foo'] = 1
  9. with self.assertRaises(KeyError):
  10. x['foo']
  11. class test_TaskPool(AppCase):
  12. def test_without_threadpool(self):
  13. with mask_modules('threadpool'):
  14. with self.assertRaises(ImportError):
  15. TaskPool()
  16. def test_with_threadpool(self):
  17. with mock_module('threadpool'):
  18. x = TaskPool()
  19. self.assertTrue(x.ThreadPool)
  20. self.assertTrue(x.WorkRequest)
  21. def test_on_start(self):
  22. with mock_module('threadpool'):
  23. x = TaskPool()
  24. x.on_start()
  25. self.assertTrue(x._pool)
  26. self.assertIsInstance(x._pool.workRequests, NullDict)
  27. def test_on_stop(self):
  28. with mock_module('threadpool'):
  29. x = TaskPool()
  30. x.on_start()
  31. x.on_stop()
  32. x._pool.dismissWorkers.assert_called_with(x.limit, do_join=True)
  33. def test_on_apply(self):
  34. with mock_module('threadpool'):
  35. x = TaskPool()
  36. x.on_start()
  37. callback = Mock()
  38. accept_callback = Mock()
  39. target = Mock()
  40. req = x.on_apply(target, args=(1, 2), kwargs={'a': 10},
  41. callback=callback,
  42. accept_callback=accept_callback)
  43. x.WorkRequest.assert_called_with(
  44. apply_target,
  45. (target, (1, 2), {'a': 10}, callback, accept_callback),
  46. )
  47. x._pool.putRequest.assert_called_with(req)
  48. x._pool._results_queue.queue.clear.assert_called_with()