test_registry.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. from __future__ import absolute_import
  2. from celery.app.registry import _unpickle_task, _unpickle_task_v2
  3. from celery.tests.case import AppCase, depends_on_current_app
  4. def returns():
  5. return 1
  6. class test_unpickle_task(AppCase):
  7. @depends_on_current_app
  8. def test_unpickle_v1(self):
  9. self.app.tasks['txfoo'] = 'bar'
  10. self.assertEqual(_unpickle_task('txfoo'), 'bar')
  11. @depends_on_current_app
  12. def test_unpickle_v2(self):
  13. self.app.tasks['txfoo1'] = 'bar1'
  14. self.assertEqual(_unpickle_task_v2('txfoo1'), 'bar1')
  15. self.assertEqual(_unpickle_task_v2('txfoo1', module='celery'), 'bar1')
  16. class test_TaskRegistry(AppCase):
  17. def setup(self):
  18. self.mytask = self.app.task(name='A', shared=False)(returns)
  19. self.myperiodic = self.app.task(
  20. name='B', shared=False, type='periodic',
  21. )(returns)
  22. def test_NotRegistered_str(self):
  23. self.assertTrue(repr(self.app.tasks.NotRegistered('tasks.add')))
  24. def assertRegisterUnregisterCls(self, r, task):
  25. r.unregister(task)
  26. with self.assertRaises(r.NotRegistered):
  27. r.unregister(task)
  28. r.register(task)
  29. self.assertIn(task.name, r)
  30. def assertRegisterUnregisterFunc(self, r, task, task_name):
  31. with self.assertRaises(r.NotRegistered):
  32. r.unregister(task_name)
  33. r.register(task, task_name)
  34. self.assertIn(task_name, r)
  35. def test_task_registry(self):
  36. r = self.app._tasks
  37. self.assertIsInstance(r, dict, 'TaskRegistry is mapping')
  38. self.assertRegisterUnregisterCls(r, self.mytask)
  39. self.assertRegisterUnregisterCls(r, self.myperiodic)
  40. r.register(self.myperiodic)
  41. r.unregister(self.myperiodic.name)
  42. self.assertNotIn(self.myperiodic, r)
  43. r.register(self.myperiodic)
  44. tasks = dict(r)
  45. self.assertIs(tasks.get(self.mytask.name), self.mytask)
  46. self.assertIs(tasks.get(self.myperiodic.name), self.myperiodic)
  47. self.assertIs(r[self.mytask.name], self.mytask)
  48. self.assertIs(r[self.myperiodic.name], self.myperiodic)
  49. r.unregister(self.mytask)
  50. self.assertNotIn(self.mytask.name, r)
  51. r.unregister(self.myperiodic)
  52. self.assertNotIn(self.myperiodic.name, r)
  53. self.assertTrue(self.mytask.run())
  54. self.assertTrue(self.myperiodic.run())
  55. def test_compat(self):
  56. self.assertTrue(self.app.tasks.regular())
  57. self.assertTrue(self.app.tasks.periodic())