test_messaging.py 357 B

12345678910111213
  1. from __future__ import absolute_import
  2. from celery import messaging
  3. from celery.tests.case import AppCase, depends_on_current_app
  4. @depends_on_current_app
  5. class test_compat_messaging_module(AppCase):
  6. def test_get_consume_set(self):
  7. conn = messaging.establish_connection()
  8. messaging.get_consumer_set(conn).close()
  9. conn.close()