123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- """Garbage collection thread for representing zmq refcount of Python objects
- used in zero-copy sends.
- """
- # Copyright (C) PyZMQ Developers
- # Distributed under the terms of the Modified BSD License.
- import atexit
- import struct
- from os import getpid
- from collections import namedtuple
- from threading import Thread, Event, Lock
- import warnings
- import zmq
- gcref = namedtuple('gcref', ['obj', 'event'])
- class GarbageCollectorThread(Thread):
- """Thread in which garbage collection actually happens."""
- def __init__(self, gc):
- super(GarbageCollectorThread, self).__init__()
- self.gc = gc
- self.daemon = True
- self.pid = getpid()
- self.ready = Event()
-
- def run(self):
- # detect fork at beginning of the thread
- if getpid is None or getpid() != self.pid:
- self.ready.set()
- return
- try:
- s = self.gc.context.socket(zmq.PULL)
- s.linger = 0
- s.bind(self.gc.url)
- finally:
- self.ready.set()
- while True:
- # detect fork
- if getpid is None or getpid() != self.pid:
- return
- msg = s.recv()
- if msg == b'DIE':
- break
- fmt = 'L' if len(msg) == 4 else 'Q'
- key = struct.unpack(fmt, msg)[0]
- tup = self.gc.refs.pop(key, None)
- if tup and tup.event:
- tup.event.set()
- del tup
- s.close()
- class GarbageCollector(object):
- """PyZMQ Garbage Collector
-
- Used for representing the reference held by libzmq during zero-copy sends.
- This object holds a dictionary, keyed by Python id,
- of the Python objects whose memory are currently in use by zeromq.
-
- When zeromq is done with the memory, it sends a message on an inproc PUSH socket
- containing the packed size_t (32 or 64-bit unsigned int),
- which is the key in the dict.
- When the PULL socket in the gc thread receives that message,
- the reference is popped from the dict,
- and any tracker events that should be signaled fire.
- """
-
- refs = None
- _context = None
- _lock = None
- url = "inproc://pyzmq.gc.01"
-
- def __init__(self, context=None):
- super(GarbageCollector, self).__init__()
- self.refs = {}
- self.pid = None
- self.thread = None
- self._context = context
- self._lock = Lock()
- self._stay_down = False
- self._push = None
- self._push_mutex = None
- atexit.register(self._atexit)
-
- @property
- def context(self):
- if self._context is None:
- if Thread.__module__.startswith('gevent'):
- # gevent has monkey-patched Thread, use green Context
- from zmq import green
- self._context = green.Context()
- else:
- self._context = zmq.Context()
- return self._context
-
- @context.setter
- def context(self, ctx):
- if self.is_alive():
- if self.refs:
- warnings.warn("Replacing gc context while gc is running", RuntimeWarning)
- self.stop()
- self._context = ctx
-
- def _atexit(self):
- """atexit callback
-
- sets _stay_down flag so that gc doesn't try to start up again in other atexit handlers
- """
- self._stay_down = True
- self.stop()
-
- def stop(self):
- """stop the garbage-collection thread"""
- if not self.is_alive():
- return
- self._stop()
-
- def _stop(self):
- push = self.context.socket(zmq.PUSH)
- push.connect(self.url)
- push.send(b'DIE')
- push.close()
- if self._push:
- self._push.close()
- self._push = None
- self._push_mutex = None
- self.thread.join()
- self.context.term()
- self.refs.clear()
- self.context = None
- @property
- def _push_socket(self):
- """The PUSH socket for use in the zmq message destructor callback.
- """
- if not self.is_alive() or self._push is None:
- self._push = self.context.socket(zmq.PUSH)
- self._push.connect(self.url)
- return self._push
-
- def start(self):
- """Start a new garbage collection thread.
-
- Creates a new zmq Context used for garbage collection.
- Under most circumstances, this will only be called once per process.
- """
- if self.thread is not None and self.pid != getpid():
- # It's re-starting, must free earlier thread's context
- # since a fork probably broke it
- self._stop()
- self.pid = getpid()
- self.refs = {}
- self.thread = GarbageCollectorThread(self)
- self.thread.start()
- self.thread.ready.wait()
-
- def is_alive(self):
- """Is the garbage collection thread currently running?
-
- Includes checks for process shutdown or fork.
- """
- if (getpid is None or
- getpid() != self.pid or
- self.thread is None or
- not self.thread.is_alive()
- ):
- return False
- return True
-
- def store(self, obj, event=None):
- """store an object and (optionally) event for zero-copy"""
- if not self.is_alive():
- if self._stay_down:
- return 0
- # safely start the gc thread
- # use lock and double check,
- # so we don't start multiple threads
- with self._lock:
- if not self.is_alive():
- self.start()
- tup = gcref(obj, event)
- theid = id(tup)
- self.refs[theid] = tup
- return theid
-
- def __del__(self):
- if not self.is_alive():
- return
- try:
- self.stop()
- except Exception as e:
- raise (e)
- gc = GarbageCollector()
|