12345678910111213141516171819202122232425262728293031323334353637383940 |
- from __future__ import absolute_import
- # some of these are tested in test_worker, so I've only written tests
- # here to complete coverage. Should move everyting to this module at some
- # point [-ask]
- from mock import Mock
- from celery.worker.components import (
- Queues,
- Pool,
- )
- from celery.tests.case import AppCase
- class test_Queues(AppCase):
- def test_create_when_eventloop(self):
- w = Mock()
- w.use_eventloop = w.pool_putlocks = w.pool_cls.uses_semaphore = True
- q = Queues(w)
- q.create(w)
- self.assertIs(w.process_task, w._process_task_sem)
- class test_Pool(AppCase):
- def test_close_terminate(self):
- w = Mock()
- comp = Pool(w)
- pool = w.pool = Mock()
- comp.close(w)
- pool.close.assert_called_with()
- comp.terminate(w)
- pool.terminate.assert_called_with()
- w.pool = None
- comp.close(w)
- comp.terminate(w)
|