__init__.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. """Kombu transport using the Django database as a message store."""
  2. from __future__ import absolute_import
  3. from anyjson import loads, dumps
  4. from django.conf import settings
  5. from django.core import exceptions as errors
  6. from kombu.five import Empty
  7. from kombu.transport import virtual
  8. from kombu.utils import cached_property, symbol_by_name
  9. from kombu.utils.encoding import bytes_to_str
  10. try:
  11. from django.apps import AppConfig
  12. except ImportError: # pragma: no cover
  13. pass
  14. else:
  15. class KombuAppConfig(AppConfig):
  16. name = 'kombu.transport.django'
  17. label = name.replace('.', '_')
  18. verbose_name = 'Message queue'
  19. default_app_config = 'kombu.transport.django.KombuAppConfig'
  20. VERSION = (1, 0, 0)
  21. __version__ = '.'.join(map(str, VERSION))
  22. POLLING_INTERVAL = getattr(settings, 'KOMBU_POLLING_INTERVAL',
  23. getattr(settings, 'DJKOMBU_POLLING_INTERVAL', 5.0))
  24. class Channel(virtual.Channel):
  25. queue_model = 'kombu.transport.django.models:Queue'
  26. def _new_queue(self, queue, **kwargs):
  27. self.Queue.objects.get_or_create(name=queue)
  28. def _put(self, queue, message, **kwargs):
  29. self.Queue.objects.publish(queue, dumps(message))
  30. def basic_consume(self, queue, *args, **kwargs):
  31. qinfo = self.state.bindings[queue]
  32. exchange = qinfo[0]
  33. if self.typeof(exchange).type == 'fanout':
  34. return
  35. super(Channel, self).basic_consume(queue, *args, **kwargs)
  36. def _get(self, queue):
  37. m = self.Queue.objects.fetch(queue)
  38. if m:
  39. return loads(bytes_to_str(m))
  40. raise Empty()
  41. def _size(self, queue):
  42. return self.Queue.objects.size(queue)
  43. def _purge(self, queue):
  44. return self.Queue.objects.purge(queue)
  45. def refresh_connection(self):
  46. from django import db
  47. db.close_connection()
  48. @cached_property
  49. def Queue(self):
  50. return symbol_by_name(self.queue_model)
  51. class Transport(virtual.Transport):
  52. Channel = Channel
  53. default_port = 0
  54. polling_interval = POLLING_INTERVAL
  55. channel_errors = (
  56. virtual.Transport.channel_errors + (
  57. errors.ObjectDoesNotExist, errors.MultipleObjectsReturned)
  58. )
  59. driver_type = 'sql'
  60. driver_name = 'django'
  61. def driver_version(self):
  62. import django
  63. return '.'.join(map(str, django.VERSION))