tcp.py 19 KB


  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. TCP support for IOCP reactor
  5. """
  6. import socket, operator, errno, struct
  7. from zope.interface import implementer, classImplements
  8. from twisted.internet import interfaces, error, address, main, defer
  9. from twisted.internet.protocol import Protocol
  10. from twisted.internet.abstract import _LogOwner, isIPv6Address
  11. from twisted.internet.tcp import _SocketCloser, Connector as TCPConnector
  12. from twisted.internet.tcp import _AbortingMixin, _BaseBaseClient, _BaseTCPClient
  13. from twisted.python import log, failure, reflect
  14. from twisted.python.compat import _PY3, nativeString
  15. from twisted.internet.iocpreactor import iocpsupport as _iocp, abstract
  16. from twisted.internet.iocpreactor.interfaces import IReadWriteHandle
  17. from twisted.internet.iocpreactor.const import ERROR_IO_PENDING
  18. from twisted.internet.iocpreactor.const import SO_UPDATE_CONNECT_CONTEXT
  19. from twisted.internet.iocpreactor.const import SO_UPDATE_ACCEPT_CONTEXT
  20. from twisted.internet.iocpreactor.const import ERROR_CONNECTION_REFUSED
  21. from twisted.internet.iocpreactor.const import ERROR_NETWORK_UNREACHABLE
  22. try:
  23. from twisted.internet._newtls import startTLS as _startTLS
  24. except ImportError:
  25. _startTLS = None
  26. # ConnectEx returns these. XXX: find out what it does for timeout
  27. connectExErrors = {
  28. ERROR_CONNECTION_REFUSED: errno.WSAECONNREFUSED,
  29. ERROR_NETWORK_UNREACHABLE: errno.WSAENETUNREACH,
  30. }
  31. @implementer(IReadWriteHandle, interfaces.ITCPTransport,
  32. interfaces.ISystemHandle)
  33. class Connection(abstract.FileHandle, _SocketCloser, _AbortingMixin):
  34. """
  35. @ivar TLS: C{False} to indicate the connection is in normal TCP mode,
  36. C{True} to indicate that TLS has been started and that operations must
  37. be routed through the L{TLSMemoryBIOProtocol} instance.
  38. """
  39. TLS = False
  40. def __init__(self, sock, proto, reactor=None):
  41. abstract.FileHandle.__init__(self, reactor)
  42. self.socket = sock
  43. self.getFileHandle = sock.fileno
  44. self.protocol = proto
  45. def getHandle(self):
  46. return self.socket
  47. def dataReceived(self, rbuffer):
  48. """
  49. @param rbuffer: Data received.
  50. @type rbuffer: L{bytes} or L{bytearray}
  51. """
  52. if isinstance(rbuffer, bytes):
  53. pass
  54. elif isinstance(rbuffer, bytearray):
  55. # XXX: some day, we'll have protocols that can handle raw buffers
  56. rbuffer = bytes(rbuffer)
  57. else:
  58. raise TypeError("data must be bytes or bytearray, not " +
  59. type(rbuffer))
  60. self.protocol.dataReceived(rbuffer)
  61. def readFromHandle(self, bufflist, evt):
  62. return _iocp.recv(self.getFileHandle(), bufflist, evt)
  63. def writeToHandle(self, buff, evt):
  64. """
  65. Send C{buff} to current file handle using C{_iocp.send}. The buffer
  66. sent is limited to a size of C{self.SEND_LIMIT}.
  67. """
  68. writeView = memoryview(buff)
  69. return _iocp.send(self.getFileHandle(),
  70. writeView[0:self.SEND_LIMIT].tobytes(), evt)
  71. def _closeWriteConnection(self):
  72. try:
  73. self.socket.shutdown(1)
  74. except socket.error:
  75. pass
  76. p = interfaces.IHalfCloseableProtocol(self.protocol, None)
  77. if p:
  78. try:
  79. p.writeConnectionLost()
  80. except:
  81. f = failure.Failure()
  82. log.err()
  83. self.connectionLost(f)
  84. def readConnectionLost(self, reason):
  85. p = interfaces.IHalfCloseableProtocol(self.protocol, None)
  86. if p:
  87. try:
  88. p.readConnectionLost()
  89. except:
  90. log.err()
  91. self.connectionLost(failure.Failure())
  92. else:
  93. self.connectionLost(reason)
  94. def connectionLost(self, reason):
  95. if self.disconnected:
  96. return
  97. abstract.FileHandle.connectionLost(self, reason)
  98. isClean = (reason is None or
  99. not reason.check(error.ConnectionAborted))
  100. self._closeSocket(isClean)
  101. protocol = self.protocol
  102. del self.protocol
  103. del self.socket
  104. del self.getFileHandle
  105. protocol.connectionLost(reason)
  106. def logPrefix(self):
  107. """
  108. Return the prefix to log with when I own the logging thread.
  109. """
  110. return self.logstr
  111. def getTcpNoDelay(self):
  112. return operator.truth(self.socket.getsockopt(socket.IPPROTO_TCP,
  113. socket.TCP_NODELAY))
  114. def setTcpNoDelay(self, enabled):
  115. self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, enabled)
  116. def getTcpKeepAlive(self):
  117. return operator.truth(self.socket.getsockopt(socket.SOL_SOCKET,
  118. socket.SO_KEEPALIVE))
  119. def setTcpKeepAlive(self, enabled):
  120. self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, enabled)
  121. if _startTLS is not None:
  122. def startTLS(self, contextFactory, normal=True):
  123. """
  124. @see: L{ITLSTransport.startTLS}
  125. """
  126. _startTLS(self, contextFactory, normal, abstract.FileHandle)
  127. def write(self, data):
  128. """
  129. Write some data, either directly to the underlying handle or, if TLS
  130. has been started, to the L{TLSMemoryBIOProtocol} for it to encrypt and
  131. send.
  132. @see: L{twisted.internet.interfaces.ITransport.write}
  133. """
  134. if self.disconnected:
  135. return
  136. if self.TLS:
  137. self.protocol.write(data)
  138. else:
  139. abstract.FileHandle.write(self, data)
  140. def writeSequence(self, iovec):
  141. """
  142. Write some data, either directly to the underlying handle or, if TLS
  143. has been started, to the L{TLSMemoryBIOProtocol} for it to encrypt and
  144. send.
  145. @see: L{twisted.internet.interfaces.ITransport.writeSequence}
  146. """
  147. if self.disconnected:
  148. return
  149. if self.TLS:
  150. self.protocol.writeSequence(iovec)
  151. else:
  152. abstract.FileHandle.writeSequence(self, iovec)
  153. def loseConnection(self, reason=None):
  154. """
  155. Close the underlying handle or, if TLS has been started, first shut it
  156. down.
  157. @see: L{twisted.internet.interfaces.ITransport.loseConnection}
  158. """
  159. if self.TLS:
  160. if self.connected and not self.disconnecting:
  161. self.protocol.loseConnection()
  162. else:
  163. abstract.FileHandle.loseConnection(self, reason)
  164. def registerProducer(self, producer, streaming):
  165. """
  166. Register a producer.
  167. If TLS is enabled, the TLS connection handles this.
  168. """
  169. if self.TLS:
  170. # Registering a producer before we're connected shouldn't be a
  171. # problem. If we end up with a write(), that's already handled in
  172. # the write() code above, and there are no other potential
  173. # side-effects.
  174. self.protocol.registerProducer(producer, streaming)
  175. else:
  176. abstract.FileHandle.registerProducer(self, producer, streaming)
  177. def unregisterProducer(self):
  178. """
  179. Unregister a producer.
  180. If TLS is enabled, the TLS connection handles this.
  181. """
  182. if self.TLS:
  183. self.protocol.unregisterProducer()
  184. else:
  185. abstract.FileHandle.unregisterProducer(self)
  186. if _startTLS is not None:
  187. classImplements(Connection, interfaces.ITLSTransport)
  188. class Client(_BaseBaseClient, _BaseTCPClient, Connection):
  189. """
  190. @ivar _tlsClientDefault: Always C{True}, indicating that this is a client
  191. connection, and by default when TLS is negotiated this class will act as
  192. a TLS client.
  193. """
  194. addressFamily = socket.AF_INET
  195. socketType = socket.SOCK_STREAM
  196. _tlsClientDefault = True
  197. _commonConnection = Connection
  198. def __init__(self, host, port, bindAddress, connector, reactor):
  199. # ConnectEx documentation says socket _has_ to be bound
  200. if bindAddress is None:
  201. bindAddress = ('', 0)
  202. self.reactor = reactor # createInternetSocket needs this
  203. _BaseTCPClient.__init__(self, host, port, bindAddress, connector,
  204. reactor)
  205. def createInternetSocket(self):
  206. """
  207. Create a socket registered with the IOCP reactor.
  208. @see: L{_BaseTCPClient}
  209. """
  210. return self.reactor.createSocket(self.addressFamily, self.socketType)
  211. def _collectSocketDetails(self):
  212. """
  213. Clean up potentially circular references to the socket and to its
  214. C{getFileHandle} method.
  215. @see: L{_BaseBaseClient}
  216. """
  217. del self.socket, self.getFileHandle
  218. def _stopReadingAndWriting(self):
  219. """
  220. Remove the active handle from the reactor.
  221. @see: L{_BaseBaseClient}
  222. """
  223. self.reactor.removeActiveHandle(self)
  224. def cbConnect(self, rc, data, evt):
  225. if rc:
  226. rc = connectExErrors.get(rc, rc)
  227. self.failIfNotConnected(error.getConnectError((rc,
  228. errno.errorcode.get(rc, 'Unknown error'))))
  229. else:
  230. self.socket.setsockopt(
  231. socket.SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT,
  232. struct.pack('P', self.socket.fileno()))
  233. self.protocol = self.connector.buildProtocol(self.getPeer())
  234. self.connected = True
  235. logPrefix = self._getLogPrefix(self.protocol)
  236. self.logstr = logPrefix + ",client"
  237. if self.protocol is None:
  238. # Factory.buildProtocol is allowed to return None. In that
  239. # case, make up a protocol to satisfy the rest of the
  240. # implementation; connectionLost is going to be called on
  241. # something, for example. This is easier than adding special
  242. # case support for a None protocol throughout the rest of the
  243. # transport implementation.
  244. self.protocol = Protocol()
  245. # But dispose of the connection quickly.
  246. self.loseConnection()
  247. else:
  248. self.protocol.makeConnection(self)
  249. self.startReading()
  250. def doConnect(self):
  251. if not hasattr(self, "connector"):
  252. # this happens if we connector.stopConnecting in
  253. # factory.startedConnecting
  254. return
  255. assert _iocp.have_connectex
  256. self.reactor.addActiveHandle(self)
  257. evt = _iocp.Event(self.cbConnect, self)
  258. rc = _iocp.connect(self.socket.fileno(), self.realAddress, evt)
  259. if rc and rc != ERROR_IO_PENDING:
  260. self.cbConnect(rc, 0, evt)
  261. class Server(Connection):
  262. """
  263. Serverside socket-stream connection class.
  264. I am a serverside network connection transport; a socket which came from an
  265. accept() on a server.
  266. @ivar _tlsClientDefault: Always C{False}, indicating that this is a server
  267. connection, and by default when TLS is negotiated this class will act as
  268. a TLS server.
  269. """
  270. _tlsClientDefault = False
  271. def __init__(self, sock, protocol, clientAddr, serverAddr, sessionno, reactor):
  272. """
  273. Server(sock, protocol, client, server, sessionno)
  274. Initialize me with a socket, a protocol, a descriptor for my peer (a
  275. tuple of host, port describing the other end of the connection), an
  276. instance of Port, and a session number.
  277. """
  278. Connection.__init__(self, sock, protocol, reactor)
  279. self.serverAddr = serverAddr
  280. self.clientAddr = clientAddr
  281. self.sessionno = sessionno
  282. logPrefix = self._getLogPrefix(self.protocol)
  283. self.logstr = "%s,%s,%s" % (logPrefix, sessionno, self.clientAddr.host)
  284. self.repstr = "<%s #%s on %s>" % (self.protocol.__class__.__name__,
  285. self.sessionno, self.serverAddr.port)
  286. self.connected = True
  287. self.startReading()
  288. def __repr__(self):
  289. """
  290. A string representation of this connection.
  291. """
  292. return self.repstr
  293. def getHost(self):
  294. """
  295. Returns an IPv4Address.
  296. This indicates the server's address.
  297. """
  298. return self.serverAddr
  299. def getPeer(self):
  300. """
  301. Returns an IPv4Address.
  302. This indicates the client's address.
  303. """
  304. return self.clientAddr
  305. class Connector(TCPConnector):
  306. def _makeTransport(self):
  307. return Client(self.host, self.port, self.bindAddress, self,
  308. self.reactor)
  309. @implementer(interfaces.IListeningPort)
  310. class Port(_SocketCloser, _LogOwner):
  311. connected = False
  312. disconnected = False
  313. disconnecting = False
  314. addressFamily = socket.AF_INET
  315. socketType = socket.SOCK_STREAM
  316. _addressType = address.IPv4Address
  317. sessionno = 0
  318. # Actual port number being listened on, only set to a non-None
  319. # value when we are actually listening.
  320. _realPortNumber = None
  321. # A string describing the connections which will be created by this port.
  322. # Normally this is C{"TCP"}, since this is a TCP port, but when the TLS
  323. # implementation re-uses this class it overrides the value with C{"TLS"}.
  324. # Only used for logging.
  325. _type = 'TCP'
  326. def __init__(self, port, factory, backlog=50, interface='', reactor=None):
  327. self.port = port
  328. self.factory = factory
  329. self.backlog = backlog
  330. self.interface = interface
  331. self.reactor = reactor
  332. if isIPv6Address(interface):
  333. self.addressFamily = socket.AF_INET6
  334. self._addressType = address.IPv6Address
  335. def __repr__(self):
  336. if self._realPortNumber is not None:
  337. return "<%s of %s on %s>" % (self.__class__,
  338. self.factory.__class__,
  339. self._realPortNumber)
  340. else:
  341. return "<%s of %s (not listening)>" % (self.__class__,
  342. self.factory.__class__)
  343. def startListening(self):
  344. try:
  345. skt = self.reactor.createSocket(self.addressFamily,
  346. self.socketType)
  347. # TODO: resolve self.interface if necessary
  348. if self.addressFamily == socket.AF_INET6:
  349. addr = socket.getaddrinfo(self.interface, self.port)[0][4]
  350. else:
  351. addr = (self.interface, self.port)
  352. skt.bind(addr)
  353. except socket.error as le:
  354. raise error.CannotListenError(self.interface, self.port, le)
  355. self.addrLen = _iocp.maxAddrLen(skt.fileno())
  356. # Make sure that if we listened on port 0, we update that to
  357. # reflect what the OS actually assigned us.
  358. self._realPortNumber = skt.getsockname()[1]
  359. log.msg("%s starting on %s" % (self._getLogPrefix(self.factory),
  360. self._realPortNumber))
  361. self.factory.doStart()
  362. skt.listen(self.backlog)
  363. self.connected = True
  364. self.disconnected = False
  365. self.reactor.addActiveHandle(self)
  366. self.socket = skt
  367. self.getFileHandle = self.socket.fileno
  368. self.doAccept()
  369. def loseConnection(self, connDone=failure.Failure(main.CONNECTION_DONE)):
  370. """
  371. Stop accepting connections on this port.
  372. This will shut down my socket and call self.connectionLost().
  373. It returns a deferred which will fire successfully when the
  374. port is actually closed.
  375. """
  376. self.disconnecting = True
  377. if self.connected:
  378. self.deferred = defer.Deferred()
  379. self.reactor.callLater(0, self.connectionLost, connDone)
  380. return self.deferred
  381. stopListening = loseConnection
  382. def _logConnectionLostMsg(self):
  383. """
  384. Log message for closing port
  385. """
  386. log.msg('(%s Port %s Closed)' % (self._type, self._realPortNumber))
  387. def connectionLost(self, reason):
  388. """
  389. Cleans up the socket.
  390. """
  391. self._logConnectionLostMsg()
  392. self._realPortNumber = None
  393. d = None
  394. if hasattr(self, "deferred"):
  395. d = self.deferred
  396. del self.deferred
  397. self.disconnected = True
  398. self.reactor.removeActiveHandle(self)
  399. self.connected = False
  400. self._closeSocket(True)
  401. del self.socket
  402. del self.getFileHandle
  403. try:
  404. self.factory.doStop()
  405. except:
  406. self.disconnecting = False
  407. if d is not None:
  408. d.errback(failure.Failure())
  409. else:
  410. raise
  411. else:
  412. self.disconnecting = False
  413. if d is not None:
  414. d.callback(None)
  415. def logPrefix(self):
  416. """
  417. Returns the name of my class, to prefix log entries with.
  418. """
  419. return reflect.qual(self.factory.__class__)
  420. def getHost(self):
  421. """
  422. Returns an IPv4Address.
  423. This indicates the server's address.
  424. """
  425. host, port = self.socket.getsockname()[:2]
  426. return self._addressType('TCP', host, port)
  427. def cbAccept(self, rc, data, evt):
  428. self.handleAccept(rc, evt)
  429. if not (self.disconnecting or self.disconnected):
  430. self.doAccept()
  431. def handleAccept(self, rc, evt):
  432. if self.disconnecting or self.disconnected:
  433. return False
  434. # possible errors:
  435. # (WSAEMFILE, WSAENOBUFS, WSAENFILE, WSAENOMEM, WSAECONNABORTED)
  436. if rc:
  437. log.msg("Could not accept new connection -- %s (%s)" %
  438. (errno.errorcode.get(rc, 'unknown error'), rc))
  439. return False
  440. else:
  441. evt.newskt.setsockopt(
  442. socket.SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
  443. struct.pack('P', self.socket.fileno()))
  444. family, lAddr, rAddr = _iocp.get_accept_addrs(evt.newskt.fileno(),
  445. evt.buff)
  446. if not _PY3:
  447. # In _makesockaddr(), we use the Win32 API which
  448. # gives us an address of the form: (unicode host, port).
  449. # Only on Python 2 do we need to convert it to a
  450. # non-unicode str.
  451. # On Python 3, we leave it alone as unicode.
  452. lAddr = (nativeString(lAddr[0]), lAddr[1])
  453. rAddr = (nativeString(rAddr[0]), rAddr[1])
  454. assert family == self.addressFamily
  455. protocol = self.factory.buildProtocol(
  456. self._addressType('TCP', rAddr[0], rAddr[1]))
  457. if protocol is None:
  458. evt.newskt.close()
  459. else:
  460. s = self.sessionno
  461. self.sessionno = s+1
  462. transport = Server(evt.newskt, protocol,
  463. self._addressType('TCP', rAddr[0], rAddr[1]),
  464. self._addressType('TCP', lAddr[0], lAddr[1]),
  465. s, self.reactor)
  466. protocol.makeConnection(transport)
  467. return True
  468. def doAccept(self):
  469. evt = _iocp.Event(self.cbAccept, self)
  470. # see AcceptEx documentation
  471. evt.buff = buff = bytearray(2 * (self.addrLen + 16))
  472. evt.newskt = newskt = self.reactor.createSocket(self.addressFamily,
  473. self.socketType)
  474. rc = _iocp.accept(self.socket.fileno(), newskt.fileno(), buff, evt)
  475. if rc and rc != ERROR_IO_PENDING:
  476. self.handleAccept(rc, evt)