1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162 |
- from __future__ import absolute_import
- from mock import Mock
- from celery.concurrency.threads import NullDict, TaskPool, apply_target
- from celery.tests.case import AppCase, Case, mask_modules, mock_module
- class test_NullDict(Case):
- def test_setitem(self):
- x = NullDict()
- x['foo'] = 1
- with self.assertRaises(KeyError):
- x['foo']
- class test_TaskPool(AppCase):
- def test_without_threadpool(self):
- with mask_modules('threadpool'):
- with self.assertRaises(ImportError):
- TaskPool()
- def test_with_threadpool(self):
- with mock_module('threadpool'):
- x = TaskPool()
- self.assertTrue(x.ThreadPool)
- self.assertTrue(x.WorkRequest)
- def test_on_start(self):
- with mock_module('threadpool'):
- x = TaskPool()
- x.on_start()
- self.assertTrue(x._pool)
- self.assertIsInstance(x._pool.workRequests, NullDict)
- def test_on_stop(self):
- with mock_module('threadpool'):
- x = TaskPool()
- x.on_start()
- x.on_stop()
- x._pool.dismissWorkers.assert_called_with(x.limit, do_join=True)
- def test_on_apply(self):
- with mock_module('threadpool'):
- x = TaskPool()
- x.on_start()
- callback = Mock()
- accept_callback = Mock()
- target = Mock()
- req = x.on_apply(target, args=(1, 2), kwargs={'a': 10},
- callback=callback,
- accept_callback=accept_callback)
- x.WorkRequest.assert_called_with(
- apply_target,
- (target, (1, 2), {'a': 10}, callback, accept_callback),
- )
- x._pool.putRequest.assert_called_with(req)
- x._pool._results_queue.queue.clear.assert_called_with()
|