session.py 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.backends.database.session
  4. ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  5. SQLAlchemy sessions.
  6. """
  7. from __future__ import absolute_import
  8. from collections import defaultdict
  9. from multiprocessing.util import register_after_fork
  10. from sqlalchemy import create_engine
  11. from sqlalchemy.orm import sessionmaker
  12. from sqlalchemy.ext.declarative import declarative_base
  13. ResultModelBase = declarative_base()
  14. _SETUP = defaultdict(lambda: False)
  15. _ENGINES = {}
  16. _SESSIONS = {}
  17. __all__ = ['ResultSession', 'get_engine', 'create_session']
  18. class _after_fork(object):
  19. registered = False
  20. def __call__(self):
  21. self.registered = False # child must reregister
  22. for engine in list(_ENGINES.values()):
  23. engine.dispose()
  24. _ENGINES.clear()
  25. _SESSIONS.clear()
  26. after_fork = _after_fork()
  27. def get_engine(dburi, **kwargs):
  28. try:
  29. return _ENGINES[dburi]
  30. except KeyError:
  31. engine = _ENGINES[dburi] = create_engine(dburi, **kwargs)
  32. after_fork.registered = True
  33. register_after_fork(after_fork, after_fork)
  34. return engine
  35. def create_session(dburi, short_lived_sessions=False, **kwargs):
  36. engine = get_engine(dburi, **kwargs)
  37. if short_lived_sessions or dburi not in _SESSIONS:
  38. _SESSIONS[dburi] = sessionmaker(bind=engine)
  39. return engine, _SESSIONS[dburi]
  40. def setup_results(engine):
  41. if not _SETUP['results']:
  42. ResultModelBase.metadata.create_all(engine)
  43. _SETUP['results'] = True
  44. def ResultSession(dburi, **kwargs):
  45. engine, session = create_session(dburi, **kwargs)
  46. setup_results(engine)
  47. return session()