123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- from __future__ import absolute_import
- from mock import patch
- from celery.utils.threads import (
- _LocalStack,
- _FastLocalStack,
- LocalManager,
- Local,
- bgThread,
- )
- from celery.tests.case import Case, override_stdouts
- class test_bgThread(Case):
- def test_crash(self):
- class T(bgThread):
- def body(self):
- raise KeyError()
- with patch('os._exit') as _exit:
- with override_stdouts():
- _exit.side_effect = ValueError()
- t = T()
- with self.assertRaises(ValueError):
- t.run()
- _exit.assert_called_with(1)
- def test_interface(self):
- x = bgThread()
- with self.assertRaises(NotImplementedError):
- x.body()
- class test_Local(Case):
- def test_iter(self):
- x = Local()
- x.foo = 'bar'
- ident = x.__ident_func__()
- self.assertIn((ident, {'foo': 'bar'}), list(iter(x)))
- delattr(x, 'foo')
- self.assertNotIn((ident, {'foo': 'bar'}), list(iter(x)))
- with self.assertRaises(AttributeError):
- delattr(x, 'foo')
- self.assertIsNotNone(x(lambda: 'foo'))
- class test_LocalStack(Case):
- def test_stack(self):
- x = _LocalStack()
- self.assertIsNone(x.pop())
- x.__release_local__()
- ident = x.__ident_func__
- x.__ident_func__ = ident
- with self.assertRaises(RuntimeError):
- x()[0]
- x.push(['foo'])
- self.assertEqual(x()[0], 'foo')
- x.pop()
- with self.assertRaises(RuntimeError):
- x()[0]
- class test_FastLocalStack(Case):
- def test_stack(self):
- x = _FastLocalStack()
- x.push(['foo'])
- x.push(['bar'])
- self.assertEqual(x.top, ['bar'])
- self.assertEqual(len(x), 2)
- x.pop()
- self.assertEqual(x.top, ['foo'])
- x.pop()
- self.assertIsNone(x.top)
- class test_LocalManager(Case):
- def test_init(self):
- x = LocalManager()
- self.assertListEqual(x.locals, [])
- self.assertTrue(x.ident_func)
- ident = lambda: 1
- loc = Local()
- x = LocalManager([loc], ident_func=ident)
- self.assertListEqual(x.locals, [loc])
- x = LocalManager(loc, ident_func=ident)
- self.assertListEqual(x.locals, [loc])
- self.assertIs(x.ident_func, ident)
- self.assertIs(x.locals[0].__ident_func__, ident)
- self.assertEqual(x.get_ident(), 1)
- with patch('celery.utils.threads.release_local') as release:
- x.cleanup()
- release.assert_called_with(loc)
- self.assertTrue(repr(x))
|