selectors34.py 22 KB


  1. """Selectors module.
  2. This module allows high-level and efficient I/O multiplexing, built upon the
  3. `select` module primitives.
  4. The following code adapted from trollius.selectors.
  5. """
  6. from abc import ABCMeta, abstractmethod
  7. from collections import namedtuple, Mapping
  8. import math
  9. import select
  10. import sys
  11. import six
  12. # compatibility code
  13. PY33 = (sys.version_info >= (3, 3))
  14. def _wrap_error(exc, mapping, key):
  15. if key not in mapping:
  16. return
  17. new_err_cls = mapping[key]
  18. new_err = new_err_cls(*exc.args)
  19. # raise a new exception with the original traceback
  20. if hasattr(exc, '__traceback__'):
  21. traceback = exc.__traceback__
  22. else:
  23. traceback = sys.exc_info()[2]
  24. six.reraise(new_err_cls, new_err, traceback)
  25. if PY33:
  26. import builtins
  27. BlockingIOError = builtins.BlockingIOError
  28. BrokenPipeError = builtins.BrokenPipeError
  29. ChildProcessError = builtins.ChildProcessError
  30. ConnectionRefusedError = builtins.ConnectionRefusedError
  31. ConnectionResetError = builtins.ConnectionResetError
  32. InterruptedError = builtins.InterruptedError
  33. ConnectionAbortedError = builtins.ConnectionAbortedError
  34. PermissionError = builtins.PermissionError
  35. FileNotFoundError = builtins.FileNotFoundError
  36. ProcessLookupError = builtins.ProcessLookupError
  37. def wrap_error(func, *args, **kw):
  38. return func(*args, **kw)
  39. else:
  40. import errno
  41. import select
  42. import socket
  43. class BlockingIOError(OSError):
  44. pass
  45. class BrokenPipeError(OSError):
  46. pass
  47. class ChildProcessError(OSError):
  48. pass
  49. class ConnectionRefusedError(OSError):
  50. pass
  51. class InterruptedError(OSError):
  52. pass
  53. class ConnectionResetError(OSError):
  54. pass
  55. class ConnectionAbortedError(OSError):
  56. pass
  57. class PermissionError(OSError):
  58. pass
  59. class FileNotFoundError(OSError):
  60. pass
  61. class ProcessLookupError(OSError):
  62. pass
  63. _MAP_ERRNO = {
  64. errno.EACCES: PermissionError,
  65. errno.EAGAIN: BlockingIOError,
  66. errno.EALREADY: BlockingIOError,
  67. errno.ECHILD: ChildProcessError,
  68. errno.ECONNABORTED: ConnectionAbortedError,
  69. errno.ECONNREFUSED: ConnectionRefusedError,
  70. errno.ECONNRESET: ConnectionResetError,
  71. errno.EINPROGRESS: BlockingIOError,
  72. errno.EINTR: InterruptedError,
  73. errno.ENOENT: FileNotFoundError,
  74. errno.EPERM: PermissionError,
  75. errno.EPIPE: BrokenPipeError,
  76. errno.ESHUTDOWN: BrokenPipeError,
  77. errno.EWOULDBLOCK: BlockingIOError,
  78. errno.ESRCH: ProcessLookupError,
  79. }
  80. def wrap_error(func, *args, **kw):
  81. """
  82. Wrap socket.error, IOError, OSError, select.error to raise new specialized
  83. exceptions of Python 3.3 like InterruptedError (PEP 3151).
  84. """
  85. try:
  86. return func(*args, **kw)
  87. except (socket.error, IOError, OSError) as exc:
  88. if hasattr(exc, 'winerror'):
  89. _wrap_error(exc, _MAP_ERRNO, exc.winerror)
  90. # _MAP_ERRNO does not contain all Windows errors.
  91. # For some errors like "file not found", exc.errno should
  92. # be used (ex: ENOENT).
  93. _wrap_error(exc, _MAP_ERRNO, exc.errno)
  94. raise
  95. except select.error as exc:
  96. if exc.args:
  97. _wrap_error(exc, _MAP_ERRNO, exc.args[0])
  98. raise
  99. # generic events, that must be mapped to implementation-specific ones
  100. EVENT_READ = (1 << 0)
  101. EVENT_WRITE = (1 << 1)
  102. def _fileobj_to_fd(fileobj):
  103. """Return a file descriptor from a file object.
  104. Parameters:
  105. fileobj -- file object or file descriptor
  106. Returns:
  107. corresponding file descriptor
  108. Raises:
  109. ValueError if the object is invalid
  110. """
  111. if isinstance(fileobj, six.integer_types):
  112. fd = fileobj
  113. else:
  114. try:
  115. fd = int(fileobj.fileno())
  116. except (AttributeError, TypeError, ValueError):
  117. raise ValueError("Invalid file object: "
  118. "{0!r}".format(fileobj))
  119. if fd < 0:
  120. raise ValueError("Invalid file descriptor: {0}".format(fd))
  121. return fd
  122. SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])
  123. """Object used to associate a file object to its backing file descriptor,
  124. selected event mask and attached data."""
  125. class _SelectorMapping(Mapping):
  126. """Mapping of file objects to selector keys."""
  127. def __init__(self, selector):
  128. self._selector = selector
  129. def __len__(self):
  130. return len(self._selector._fd_to_key)
  131. def __getitem__(self, fileobj):
  132. try:
  133. fd = self._selector._fileobj_lookup(fileobj)
  134. return self._selector._fd_to_key[fd]
  135. except KeyError:
  136. raise KeyError("{0!r} is not registered".format(fileobj))
  137. def __iter__(self):
  138. return iter(self._selector._fd_to_key)
  139. class BaseSelector(six.with_metaclass(ABCMeta)):
  140. """Selector abstract base class.
  141. A selector supports registering file objects to be monitored for specific
  142. I/O events.
  143. A file object is a file descriptor or any object with a `fileno()` method.
  144. An arbitrary object can be attached to the file object, which can be used
  145. for example to store context information, a callback, etc.
  146. A selector can use various implementations (select(), poll(), epoll()...)
  147. depending on the platform. The default `Selector` class uses the most
  148. efficient implementation on the current platform.
  149. """
  150. @abstractmethod
  151. def register(self, fileobj, events, data=None):
  152. """Register a file object.
  153. Parameters:
  154. fileobj -- file object or file descriptor
  155. events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE)
  156. data -- attached data
  157. Returns:
  158. SelectorKey instance
  159. Raises:
  160. ValueError if events is invalid
  161. KeyError if fileobj is already registered
  162. OSError if fileobj is closed or otherwise is unacceptable to
  163. the underlying system call (if a system call is made)
  164. Note:
  165. OSError may or may not be raised
  166. """
  167. raise NotImplementedError
  168. @abstractmethod
  169. def unregister(self, fileobj):
  170. """Unregister a file object.
  171. Parameters:
  172. fileobj -- file object or file descriptor
  173. Returns:
  174. SelectorKey instance
  175. Raises:
  176. KeyError if fileobj is not registered
  177. Note:
  178. If fileobj is registered but has since been closed this does
  179. *not* raise OSError (even if the wrapped syscall does)
  180. """
  181. raise NotImplementedError
  182. def modify(self, fileobj, events, data=None):
  183. """Change a registered file object monitored events or attached data.
  184. Parameters:
  185. fileobj -- file object or file descriptor
  186. events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE)
  187. data -- attached data
  188. Returns:
  189. SelectorKey instance
  190. Raises:
  191. Anything that unregister() or register() raises
  192. """
  193. self.unregister(fileobj)
  194. return self.register(fileobj, events, data)
  195. @abstractmethod
  196. def select(self, timeout=None):
  197. """Perform the actual selection, until some monitored file objects are
  198. ready or a timeout expires.
  199. Parameters:
  200. timeout -- if timeout > 0, this specifies the maximum wait time, in
  201. seconds
  202. if timeout <= 0, the select() call won't block, and will
  203. report the currently ready file objects
  204. if timeout is None, select() will block until a monitored
  205. file object becomes ready
  206. Returns:
  207. list of (key, events) for ready file objects
  208. `events` is a bitwise mask of EVENT_READ|EVENT_WRITE
  209. """
  210. raise NotImplementedError
  211. def close(self):
  212. """Close the selector.
  213. This must be called to make sure that any underlying resource is freed.
  214. """
  215. pass
  216. def get_key(self, fileobj):
  217. """Return the key associated to a registered file object.
  218. Returns:
  219. SelectorKey for this file object
  220. """
  221. mapping = self.get_map()
  222. if mapping is None:
  223. raise RuntimeError('Selector is closed')
  224. try:
  225. return mapping[fileobj]
  226. except KeyError:
  227. raise KeyError("{0!r} is not registered".format(fileobj))
  228. @abstractmethod
  229. def get_map(self):
  230. """Return a mapping of file objects to selector keys."""
  231. raise NotImplementedError
  232. def __enter__(self):
  233. return self
  234. def __exit__(self, *args):
  235. self.close()
  236. class _BaseSelectorImpl(BaseSelector):
  237. """Base selector implementation."""
  238. def __init__(self):
  239. # this maps file descriptors to keys
  240. self._fd_to_key = {}
  241. # read-only mapping returned by get_map()
  242. self._map = _SelectorMapping(self)
  243. def _fileobj_lookup(self, fileobj):
  244. """Return a file descriptor from a file object.
  245. This wraps _fileobj_to_fd() to do an exhaustive search in case
  246. the object is invalid but we still have it in our map. This
  247. is used by unregister() so we can unregister an object that
  248. was previously registered even if it is closed. It is also
  249. used by _SelectorMapping.
  250. """
  251. try:
  252. return _fileobj_to_fd(fileobj)
  253. except ValueError:
  254. # Do an exhaustive search.
  255. for key in self._fd_to_key.values():
  256. if key.fileobj is fileobj:
  257. return key.fd
  258. # Raise ValueError after all.
  259. raise
  260. def register(self, fileobj, events, data=None):
  261. if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)):
  262. raise ValueError("Invalid events: {0!r}".format(events))
  263. key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)
  264. if key.fd in self._fd_to_key:
  265. raise KeyError("{0!r} (FD {1}) is already registered"
  266. .format(fileobj, key.fd))
  267. self._fd_to_key[key.fd] = key
  268. return key
  269. def unregister(self, fileobj):
  270. try:
  271. key = self._fd_to_key.pop(self._fileobj_lookup(fileobj))
  272. except KeyError:
  273. raise KeyError("{0!r} is not registered".format(fileobj))
  274. return key
  275. def modify(self, fileobj, events, data=None):
  276. # TODO: Subclasses can probably optimize this even further.
  277. try:
  278. key = self._fd_to_key[self._fileobj_lookup(fileobj)]
  279. except KeyError:
  280. raise KeyError("{0!r} is not registered".format(fileobj))
  281. if events != key.events:
  282. self.unregister(fileobj)
  283. key = self.register(fileobj, events, data)
  284. elif data != key.data:
  285. # Use a shortcut to update the data.
  286. key = key._replace(data=data)
  287. self._fd_to_key[key.fd] = key
  288. return key
  289. def close(self):
  290. self._fd_to_key.clear()
  291. self._map = None
  292. def get_map(self):
  293. return self._map
  294. def _key_from_fd(self, fd):
  295. """Return the key associated to a given file descriptor.
  296. Parameters:
  297. fd -- file descriptor
  298. Returns:
  299. corresponding key, or None if not found
  300. """
  301. try:
  302. return self._fd_to_key[fd]
  303. except KeyError:
  304. return None
  305. class SelectSelector(_BaseSelectorImpl):
  306. """Select-based selector."""
  307. def __init__(self):
  308. super(SelectSelector, self).__init__()
  309. self._readers = set()
  310. self._writers = set()
  311. def register(self, fileobj, events, data=None):
  312. key = super(SelectSelector, self).register(fileobj, events, data)
  313. if events & EVENT_READ:
  314. self._readers.add(key.fd)
  315. if events & EVENT_WRITE:
  316. self._writers.add(key.fd)
  317. return key
  318. def unregister(self, fileobj):
  319. key = super(SelectSelector, self).unregister(fileobj)
  320. self._readers.discard(key.fd)
  321. self._writers.discard(key.fd)
  322. return key
  323. if sys.platform == 'win32':
  324. def _select(self, r, w, _, timeout=None):
  325. r, w, x = select.select(r, w, w, timeout)
  326. return r, w + x, []
  327. else:
  328. _select = select.select
  329. def select(self, timeout=None):
  330. timeout = None if timeout is None else max(timeout, 0)
  331. ready = []
  332. try:
  333. r, w, _ = wrap_error(self._select,
  334. self._readers, self._writers, [], timeout)
  335. except InterruptedError:
  336. return ready
  337. r = set(r)
  338. w = set(w)
  339. for fd in r | w:
  340. events = 0
  341. if fd in r:
  342. events |= EVENT_READ
  343. if fd in w:
  344. events |= EVENT_WRITE
  345. key = self._key_from_fd(fd)
  346. if key:
  347. ready.append((key, events & key.events))
  348. return ready
  349. if hasattr(select, 'poll'):
  350. class PollSelector(_BaseSelectorImpl):
  351. """Poll-based selector."""
  352. def __init__(self):
  353. super(PollSelector, self).__init__()
  354. self._poll = select.poll()
  355. def register(self, fileobj, events, data=None):
  356. key = super(PollSelector, self).register(fileobj, events, data)
  357. poll_events = 0
  358. if events & EVENT_READ:
  359. poll_events |= select.POLLIN
  360. if events & EVENT_WRITE:
  361. poll_events |= select.POLLOUT
  362. self._poll.register(key.fd, poll_events)
  363. return key
  364. def unregister(self, fileobj):
  365. key = super(PollSelector, self).unregister(fileobj)
  366. self._poll.unregister(key.fd)
  367. return key
  368. def select(self, timeout=None):
  369. if timeout is None:
  370. timeout = None
  371. elif timeout <= 0:
  372. timeout = 0
  373. else:
  374. # poll() has a resolution of 1 millisecond, round away from
  375. # zero to wait *at least* timeout seconds.
  376. timeout = int(math.ceil(timeout * 1e3))
  377. ready = []
  378. try:
  379. fd_event_list = wrap_error(self._poll.poll, timeout)
  380. except InterruptedError:
  381. return ready
  382. for fd, event in fd_event_list:
  383. events = 0
  384. if event & ~select.POLLIN:
  385. events |= EVENT_WRITE
  386. if event & ~select.POLLOUT:
  387. events |= EVENT_READ
  388. key = self._key_from_fd(fd)
  389. if key:
  390. ready.append((key, events & key.events))
  391. return ready
  392. if hasattr(select, 'epoll'):
  393. class EpollSelector(_BaseSelectorImpl):
  394. """Epoll-based selector."""
  395. def __init__(self):
  396. super(EpollSelector, self).__init__()
  397. self._epoll = select.epoll()
  398. def fileno(self):
  399. return self._epoll.fileno()
  400. def register(self, fileobj, events, data=None):
  401. key = super(EpollSelector, self).register(fileobj, events, data)
  402. epoll_events = 0
  403. if events & EVENT_READ:
  404. epoll_events |= select.EPOLLIN
  405. if events & EVENT_WRITE:
  406. epoll_events |= select.EPOLLOUT
  407. self._epoll.register(key.fd, epoll_events)
  408. return key
  409. def unregister(self, fileobj):
  410. key = super(EpollSelector, self).unregister(fileobj)
  411. try:
  412. self._epoll.unregister(key.fd)
  413. except IOError:
  414. # This can happen if the FD was closed since it
  415. # was registered.
  416. pass
  417. return key
  418. def select(self, timeout=None):
  419. if timeout is None:
  420. timeout = -1
  421. elif timeout <= 0:
  422. timeout = 0
  423. else:
  424. # epoll_wait() has a resolution of 1 millisecond, round away
  425. # from zero to wait *at least* timeout seconds.
  426. timeout = math.ceil(timeout * 1e3) * 1e-3
  427. # epoll_wait() expects `maxevents` to be greater than zero;
  428. # we want to make sure that `select()` can be called when no
  429. # FD is registered.
  430. max_ev = max(len(self._fd_to_key), 1)
  431. ready = []
  432. try:
  433. fd_event_list = wrap_error(self._epoll.poll, timeout, max_ev)
  434. except InterruptedError:
  435. return ready
  436. for fd, event in fd_event_list:
  437. events = 0
  438. if event & ~select.EPOLLIN:
  439. events |= EVENT_WRITE
  440. if event & ~select.EPOLLOUT:
  441. events |= EVENT_READ
  442. key = self._key_from_fd(fd)
  443. if key:
  444. ready.append((key, events & key.events))
  445. return ready
  446. def close(self):
  447. self._epoll.close()
  448. super(EpollSelector, self).close()
  449. if hasattr(select, 'devpoll'):
  450. class DevpollSelector(_BaseSelectorImpl):
  451. """Solaris /dev/poll selector."""
  452. def __init__(self):
  453. super(DevpollSelector, self).__init__()
  454. self._devpoll = select.devpoll()
  455. def fileno(self):
  456. return self._devpoll.fileno()
  457. def register(self, fileobj, events, data=None):
  458. key = super(DevpollSelector, self).register(fileobj, events, data)
  459. poll_events = 0
  460. if events & EVENT_READ:
  461. poll_events |= select.POLLIN
  462. if events & EVENT_WRITE:
  463. poll_events |= select.POLLOUT
  464. self._devpoll.register(key.fd, poll_events)
  465. return key
  466. def unregister(self, fileobj):
  467. key = super(DevpollSelector, self).unregister(fileobj)
  468. self._devpoll.unregister(key.fd)
  469. return key
  470. def select(self, timeout=None):
  471. if timeout is None:
  472. timeout = None
  473. elif timeout <= 0:
  474. timeout = 0
  475. else:
  476. # devpoll() has a resolution of 1 millisecond, round away from
  477. # zero to wait *at least* timeout seconds.
  478. timeout = math.ceil(timeout * 1e3)
  479. ready = []
  480. try:
  481. fd_event_list = self._devpoll.poll(timeout)
  482. except InterruptedError:
  483. return ready
  484. for fd, event in fd_event_list:
  485. events = 0
  486. if event & ~select.POLLIN:
  487. events |= EVENT_WRITE
  488. if event & ~select.POLLOUT:
  489. events |= EVENT_READ
  490. key = self._key_from_fd(fd)
  491. if key:
  492. ready.append((key, events & key.events))
  493. return ready
  494. def close(self):
  495. self._devpoll.close()
  496. super(DevpollSelector, self).close()
  497. if hasattr(select, 'kqueue'):
  498. class KqueueSelector(_BaseSelectorImpl):
  499. """Kqueue-based selector."""
  500. def __init__(self):
  501. super(KqueueSelector, self).__init__()
  502. self._kqueue = select.kqueue()
  503. def fileno(self):
  504. return self._kqueue.fileno()
  505. def register(self, fileobj, events, data=None):
  506. key = super(KqueueSelector, self).register(fileobj, events, data)
  507. if events & EVENT_READ:
  508. kev = select.kevent(key.fd, select.KQ_FILTER_READ,
  509. select.KQ_EV_ADD)
  510. self._kqueue.control([kev], 0, 0)
  511. if events & EVENT_WRITE:
  512. kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
  513. select.KQ_EV_ADD)
  514. self._kqueue.control([kev], 0, 0)
  515. return key
  516. def unregister(self, fileobj):
  517. key = super(KqueueSelector, self).unregister(fileobj)
  518. if key.events & EVENT_READ:
  519. kev = select.kevent(key.fd, select.KQ_FILTER_READ,
  520. select.KQ_EV_DELETE)
  521. try:
  522. self._kqueue.control([kev], 0, 0)
  523. except OSError:
  524. # This can happen if the FD was closed since it
  525. # was registered.
  526. pass
  527. if key.events & EVENT_WRITE:
  528. kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
  529. select.KQ_EV_DELETE)
  530. try:
  531. self._kqueue.control([kev], 0, 0)
  532. except OSError:
  533. # See comment above.
  534. pass
  535. return key
  536. def select(self, timeout=None):
  537. timeout = None if timeout is None else max(timeout, 0)
  538. max_ev = len(self._fd_to_key)
  539. ready = []
  540. try:
  541. kev_list = wrap_error(self._kqueue.control,
  542. None, max_ev, timeout)
  543. except InterruptedError:
  544. return ready
  545. for kev in kev_list:
  546. fd = kev.ident
  547. flag = kev.filter
  548. events = 0
  549. if flag == select.KQ_FILTER_READ:
  550. events |= EVENT_READ
  551. if flag == select.KQ_FILTER_WRITE:
  552. events |= EVENT_WRITE
  553. key = self._key_from_fd(fd)
  554. if key:
  555. ready.append((key, events & key.events))
  556. return ready
  557. def close(self):
  558. self._kqueue.close()
  559. super(KqueueSelector, self).close()
  560. # Choose the best implementation, roughly:
  561. # epoll|kqueue|devpoll > poll > select.
  562. # select() also can't accept a FD > FD_SETSIZE (usually around 1024)
  563. if 'KqueueSelector' in globals():
  564. DefaultSelector = KqueueSelector
  565. elif 'EpollSelector' in globals():
  566. DefaultSelector = EpollSelector
  567. elif 'DevpollSelector' in globals():
  568. DefaultSelector = DevpollSelector
  569. elif 'PollSelector' in globals():
  570. DefaultSelector = PollSelector
  571. else:
  572. DefaultSelector = SelectSelector