test_threads.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. from __future__ import absolute_import
  2. from mock import patch
  3. from celery.utils.threads import (
  4. _LocalStack,
  5. _FastLocalStack,
  6. LocalManager,
  7. Local,
  8. bgThread,
  9. )
  10. from celery.tests.case import Case, override_stdouts
  11. class test_bgThread(Case):
  12. def test_crash(self):
  13. class T(bgThread):
  14. def body(self):
  15. raise KeyError()
  16. with patch('os._exit') as _exit:
  17. with override_stdouts():
  18. _exit.side_effect = ValueError()
  19. t = T()
  20. with self.assertRaises(ValueError):
  21. t.run()
  22. _exit.assert_called_with(1)
  23. def test_interface(self):
  24. x = bgThread()
  25. with self.assertRaises(NotImplementedError):
  26. x.body()
  27. class test_Local(Case):
  28. def test_iter(self):
  29. x = Local()
  30. x.foo = 'bar'
  31. ident = x.__ident_func__()
  32. self.assertIn((ident, {'foo': 'bar'}), list(iter(x)))
  33. delattr(x, 'foo')
  34. self.assertNotIn((ident, {'foo': 'bar'}), list(iter(x)))
  35. with self.assertRaises(AttributeError):
  36. delattr(x, 'foo')
  37. self.assertIsNotNone(x(lambda: 'foo'))
  38. class test_LocalStack(Case):
  39. def test_stack(self):
  40. x = _LocalStack()
  41. self.assertIsNone(x.pop())
  42. x.__release_local__()
  43. ident = x.__ident_func__
  44. x.__ident_func__ = ident
  45. with self.assertRaises(RuntimeError):
  46. x()[0]
  47. x.push(['foo'])
  48. self.assertEqual(x()[0], 'foo')
  49. x.pop()
  50. with self.assertRaises(RuntimeError):
  51. x()[0]
  52. class test_FastLocalStack(Case):
  53. def test_stack(self):
  54. x = _FastLocalStack()
  55. x.push(['foo'])
  56. x.push(['bar'])
  57. self.assertEqual(x.top, ['bar'])
  58. self.assertEqual(len(x), 2)
  59. x.pop()
  60. self.assertEqual(x.top, ['foo'])
  61. x.pop()
  62. self.assertIsNone(x.top)
  63. class test_LocalManager(Case):
  64. def test_init(self):
  65. x = LocalManager()
  66. self.assertListEqual(x.locals, [])
  67. self.assertTrue(x.ident_func)
  68. ident = lambda: 1
  69. loc = Local()
  70. x = LocalManager([loc], ident_func=ident)
  71. self.assertListEqual(x.locals, [loc])
  72. x = LocalManager(loc, ident_func=ident)
  73. self.assertListEqual(x.locals, [loc])
  74. self.assertIs(x.ident_func, ident)
  75. self.assertIs(x.locals[0].__ident_func__, ident)
  76. self.assertEqual(x.get_ident(), 1)
  77. with patch('celery.utils.threads.release_local') as release:
  78. x.cleanup()
  79. release.assert_called_with(loc)
  80. self.assertTrue(repr(x))