squeues.py 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. """
  2. Scheduler queues
  3. """
  4. import marshal
  5. from six.moves import cPickle as pickle
  6. from queuelib import queue
  7. def _serializable_queue(queue_class, serialize, deserialize):
  8. class SerializableQueue(queue_class):
  9. def push(self, obj):
  10. s = serialize(obj)
  11. super(SerializableQueue, self).push(s)
  12. def pop(self):
  13. s = super(SerializableQueue, self).pop()
  14. if s:
  15. return deserialize(s)
  16. return SerializableQueue
  17. def _pickle_serialize(obj):
  18. try:
  19. return pickle.dumps(obj, protocol=2)
  20. # Python <= 3.4 raises pickle.PicklingError here while
  21. # 3.5 <= Python < 3.6 raises AttributeError and
  22. # Python >= 3.6 raises TypeError
  23. except (pickle.PicklingError, AttributeError, TypeError) as e:
  24. raise ValueError(str(e))
  25. PickleFifoDiskQueue = _serializable_queue(queue.FifoDiskQueue,
  26. _pickle_serialize, pickle.loads)
  27. PickleLifoDiskQueue = _serializable_queue(queue.LifoDiskQueue,
  28. _pickle_serialize, pickle.loads)
  29. MarshalFifoDiskQueue = _serializable_queue(queue.FifoDiskQueue,
  30. marshal.dumps, marshal.loads)
  31. MarshalLifoDiskQueue = _serializable_queue(queue.LifoDiskQueue,
  32. marshal.dumps, marshal.loads)
  33. FifoMemoryQueue = queue.FifoMemoryQueue
  34. LifoMemoryQueue = queue.LifoMemoryQueue