test_components.py 937 B

12345678910111213141516171819202122232425262728293031323334353637383940
  1. from __future__ import absolute_import
  2. # some of these are tested in test_worker, so I've only written tests
  3. # here to complete coverage. Should move everyting to this module at some
  4. # point [-ask]
  5. from mock import Mock
  6. from celery.worker.components import (
  7. Queues,
  8. Pool,
  9. )
  10. from celery.tests.case import AppCase
  11. class test_Queues(AppCase):
  12. def test_create_when_eventloop(self):
  13. w = Mock()
  14. w.use_eventloop = w.pool_putlocks = w.pool_cls.uses_semaphore = True
  15. q = Queues(w)
  16. q.create(w)
  17. self.assertIs(w.process_task, w._process_task_sem)
  18. class test_Pool(AppCase):
  19. def test_close_terminate(self):
  20. w = Mock()
  21. comp = Pool(w)
  22. pool = w.pool = Mock()
  23. comp.close(w)
  24. pool.close.assert_called_with()
  25. comp.terminate(w)
  26. pool.terminate.assert_called_with()
  27. w.pool = None
  28. comp.close(w)
  29. comp.terminate(w)