signals.py 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. __all__ = ('pre_init', 'post_init', 'pre_save', 'pre_save_post_validation',
  2. 'post_save', 'pre_delete', 'post_delete')
  3. signals_available = False
  4. try:
  5. from blinker import Namespace
  6. signals_available = True
  7. except ImportError:
  8. class Namespace(object):
  9. def signal(self, name, doc=None):
  10. return _FakeSignal(name, doc)
  11. class _FakeSignal(object):
  12. """If blinker is unavailable, create a fake class with the same
  13. interface that allows sending of signals but will fail with an
  14. error on anything else. Instead of doing anything on send, it
  15. will just ignore the arguments and do nothing instead.
  16. """
  17. def __init__(self, name, doc=None):
  18. self.name = name
  19. self.__doc__ = doc
  20. def _fail(self, *args, **kwargs):
  21. raise RuntimeError('signalling support is unavailable '
  22. 'because the blinker library is '
  23. 'not installed.')
  24. send = lambda *a, **kw: None # noqa
  25. connect = disconnect = has_receivers_for = receivers_for = \
  26. temporarily_connected_to = _fail
  27. del _fail
  28. # the namespace for code signals. If you are not mongoengine code, do
  29. # not put signals in here. Create your own namespace instead.
  30. _signals = Namespace()
  31. pre_init = _signals.signal('pre_init')
  32. post_init = _signals.signal('post_init')
  33. pre_save = _signals.signal('pre_save')
  34. pre_save_post_validation = _signals.signal('pre_save_post_validation')
  35. post_save = _signals.signal('post_save')
  36. pre_delete = _signals.signal('pre_delete')
  37. post_delete = _signals.signal('post_delete')
  38. pre_bulk_insert = _signals.signal('pre_bulk_insert')
  39. post_bulk_insert = _signals.signal('post_bulk_insert')