test_pb.py 62 KB


  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Tests for Perspective Broker module.
  5. TODO: update protocol level tests to use new connection API, leaving
  6. only specific tests for old API.
  7. """
  8. # issue1195 TODOs: replace pump.pump() with something involving Deferreds.
  9. # Clean up warning suppression.
  10. from __future__ import absolute_import, division
  11. import sys, os, time, gc, weakref
  12. from collections import deque
  13. from io import BytesIO as StringIO
  14. from zope.interface import implementer, Interface
  15. from twisted.trial import unittest
  16. from twisted.spread import pb, util, publish, jelly
  17. from twisted.internet import protocol, main, reactor, address
  18. from twisted.internet.error import ConnectionRefusedError
  19. from twisted.internet.defer import Deferred, gatherResults, succeed
  20. from twisted.protocols.policies import WrappingFactory
  21. from twisted.python import failure, log
  22. from twisted.python.compat import iterbytes, range, _PY3
  23. from twisted.cred.error import UnauthorizedLogin, UnhandledCredentials
  24. from twisted.cred import portal, checkers, credentials
  25. from twisted.test.proto_helpers import _FakeConnector
  26. class Dummy(pb.Viewable):
  27. def view_doNothing(self, user):
  28. if isinstance(user, DummyPerspective):
  29. return 'hello world!'
  30. else:
  31. return 'goodbye, cruel world!'
  32. class DummyPerspective(pb.Avatar):
  33. """
  34. An L{IPerspective} avatar which will be used in some tests.
  35. """
  36. def perspective_getDummyViewPoint(self):
  37. return Dummy()
  38. @implementer(portal.IRealm)
  39. class DummyRealm(object):
  40. def requestAvatar(self, avatarId, mind, *interfaces):
  41. for iface in interfaces:
  42. if iface is pb.IPerspective:
  43. return iface, DummyPerspective(avatarId), lambda: None
  44. class IOPump:
  45. """
  46. Utility to pump data between clients and servers for protocol testing.
  47. Perhaps this is a utility worthy of being in protocol.py?
  48. """
  49. def __init__(self, client, server, clientIO, serverIO):
  50. self.client = client
  51. self.server = server
  52. self.clientIO = clientIO
  53. self.serverIO = serverIO
  54. def flush(self):
  55. """
  56. Pump until there is no more input or output or until L{stop} is called.
  57. This does not run any timers, so don't use it with any code that calls
  58. reactor.callLater.
  59. """
  60. # failsafe timeout
  61. self._stop = False
  62. timeout = time.time() + 5
  63. while not self._stop and self.pump():
  64. if time.time() > timeout:
  65. return
  66. def stop(self):
  67. """
  68. Stop a running L{flush} operation, even if data remains to be
  69. transferred.
  70. """
  71. self._stop = True
  72. def pump(self):
  73. """
  74. Move data back and forth.
  75. Returns whether any data was moved.
  76. """
  77. self.clientIO.seek(0)
  78. self.serverIO.seek(0)
  79. cData = self.clientIO.read()
  80. sData = self.serverIO.read()
  81. self.clientIO.seek(0)
  82. self.serverIO.seek(0)
  83. self.clientIO.truncate()
  84. self.serverIO.truncate()
  85. self.client.transport._checkProducer()
  86. self.server.transport._checkProducer()
  87. for byte in iterbytes(cData):
  88. self.server.dataReceived(byte)
  89. for byte in iterbytes(sData):
  90. self.client.dataReceived(byte)
  91. if cData or sData:
  92. return 1
  93. else:
  94. return 0
  95. def connectServerAndClient(test, clientFactory, serverFactory):
  96. """
  97. Create a server and a client and connect the two with an
  98. L{IOPump}.
  99. @param test: the test case where the client and server will be
  100. used.
  101. @type test: L{twisted.trial.unittest.TestCase}
  102. @param clientFactory: The factory that creates the client object.
  103. @type clientFactory: L{twisted.spread.pb.PBClientFactory}
  104. @param serverFactory: The factory that creates the server object.
  105. @type serverFactory: L{twisted.spread.pb.PBServerFactory}
  106. @return: a 3-tuple of (client, server, pump)
  107. @rtype: (L{twisted.spread.pb.Broker}, L{twisted.spread.pb.Broker},
  108. L{IOPump})
  109. """
  110. addr = ('127.0.0.1',)
  111. clientBroker = clientFactory.buildProtocol(addr)
  112. serverBroker = serverFactory.buildProtocol(addr)
  113. clientTransport = StringIO()
  114. serverTransport = StringIO()
  115. clientBroker.makeConnection(protocol.FileWrapper(clientTransport))
  116. serverBroker.makeConnection(protocol.FileWrapper(serverTransport))
  117. pump = IOPump(clientBroker, serverBroker, clientTransport, serverTransport)
  118. def maybeDisconnect(broker):
  119. if not broker.disconnected:
  120. broker.connectionLost(failure.Failure(main.CONNECTION_DONE))
  121. def disconnectClientFactory():
  122. # There's no connector, just a FileWrapper mediated by the
  123. # IOPump. Fortunately PBClientFactory.clientConnectionLost
  124. # doesn't do anything with the connector so we can get away
  125. # with passing None here.
  126. clientFactory.clientConnectionLost(
  127. connector=None,
  128. reason=failure.Failure(main.CONNECTION_DONE))
  129. test.addCleanup(maybeDisconnect, clientBroker)
  130. test.addCleanup(maybeDisconnect, serverBroker)
  131. test.addCleanup(disconnectClientFactory)
  132. # Establish the connection
  133. pump.pump()
  134. return clientBroker, serverBroker, pump
  135. class _ReconnectingFakeConnectorState(object):
  136. """
  137. Manages connection notifications for a
  138. L{_ReconnectingFakeConnector} instance.
  139. @ivar notifications: pending L{Deferreds} that will fire when the
  140. L{_ReconnectingFakeConnector}'s connect method is called
  141. """
  142. def __init__(self):
  143. self.notifications = deque()
  144. def notifyOnConnect(self):
  145. """
  146. Connection notification.
  147. @return: A L{Deferred} that fires when this instance's
  148. L{twisted.internet.interfaces.IConnector.connect} method
  149. is called.
  150. @rtype: L{Deferred}
  151. """
  152. notifier = Deferred()
  153. self.notifications.appendleft(notifier)
  154. return notifier
  155. def notifyAll(self):
  156. """
  157. Fire all pending notifications.
  158. """
  159. while self.notifications:
  160. self.notifications.pop().callback(self)
  161. class _ReconnectingFakeConnector(_FakeConnector):
  162. """
  163. A fake L{IConnector} that can fire L{Deferred}s when its
  164. C{connect} method is called.
  165. """
  166. def __init__(self, address, state):
  167. """
  168. @param address: An L{IAddress} provider that represents this
  169. connector's destination.
  170. @type address: An L{IAddress} provider.
  171. @param state: The state instance
  172. @type state: L{_ReconnectingFakeConnectorState}
  173. """
  174. super(_ReconnectingFakeConnector, self).__init__(address)
  175. self._state = state
  176. def connect(self):
  177. """
  178. A C{connect} implementation that calls C{reconnectCallback}
  179. """
  180. super(_ReconnectingFakeConnector, self).connect()
  181. self._state.notifyAll()
  182. def connectedServerAndClient(test, realm=None):
  183. """
  184. Connect a client and server L{Broker} together with an L{IOPump}
  185. @param realm: realm to use, defaulting to a L{DummyRealm}
  186. @returns: a 3-tuple (client, server, pump).
  187. """
  188. realm = realm or DummyRealm()
  189. checker = checkers.InMemoryUsernamePasswordDatabaseDontUse(guest=b'guest')
  190. serverFactory = pb.PBServerFactory(portal.Portal(realm, [checker]))
  191. clientFactory = pb.PBClientFactory()
  192. return connectServerAndClient(test, clientFactory, serverFactory)
  193. class SimpleRemote(pb.Referenceable):
  194. def remote_thunk(self, arg):
  195. self.arg = arg
  196. return arg + 1
  197. def remote_knuth(self, arg):
  198. raise Exception()
  199. class NestedRemote(pb.Referenceable):
  200. def remote_getSimple(self):
  201. return SimpleRemote()
  202. class SimpleCopy(pb.Copyable):
  203. def __init__(self):
  204. self.x = 1
  205. self.y = {"Hello":"World"}
  206. self.z = ['test']
  207. class SimpleLocalCopy(pb.RemoteCopy):
  208. pass
  209. pb.setUnjellyableForClass(SimpleCopy, SimpleLocalCopy)
  210. class SimpleFactoryCopy(pb.Copyable):
  211. """
  212. @cvar allIDs: hold every created instances of this class.
  213. @type allIDs: C{dict}
  214. """
  215. allIDs = {}
  216. def __init__(self, id):
  217. self.id = id
  218. SimpleFactoryCopy.allIDs[id] = self
  219. def createFactoryCopy(state):
  220. """
  221. Factory of L{SimpleFactoryCopy}, getting a created instance given the
  222. C{id} found in C{state}.
  223. """
  224. stateId = state.get("id", None)
  225. if stateId is None:
  226. raise RuntimeError("factory copy state has no 'id' member %s" %
  227. (repr(state),))
  228. if not stateId in SimpleFactoryCopy.allIDs:
  229. raise RuntimeError("factory class has no ID: %s" %
  230. (SimpleFactoryCopy.allIDs,))
  231. inst = SimpleFactoryCopy.allIDs[stateId]
  232. if not inst:
  233. raise RuntimeError("factory method found no object with id")
  234. return inst
  235. pb.setUnjellyableFactoryForClass(SimpleFactoryCopy, createFactoryCopy)
  236. class NestedCopy(pb.Referenceable):
  237. def remote_getCopy(self):
  238. return SimpleCopy()
  239. def remote_getFactory(self, value):
  240. return SimpleFactoryCopy(value)
  241. class SimpleCache(pb.Cacheable):
  242. def __init___(self):
  243. self.x = 1
  244. self.y = {"Hello":"World"}
  245. self.z = ['test']
  246. class NestedComplicatedCache(pb.Referenceable):
  247. def __init__(self):
  248. self.c = VeryVeryComplicatedCacheable()
  249. def remote_getCache(self):
  250. return self.c
  251. class VeryVeryComplicatedCacheable(pb.Cacheable):
  252. def __init__(self):
  253. self.x = 1
  254. self.y = 2
  255. self.foo = 3
  256. def setFoo4(self):
  257. self.foo = 4
  258. self.observer.callRemote('foo',4)
  259. def getStateToCacheAndObserveFor(self, perspective, observer):
  260. self.observer = observer
  261. return {"x": self.x,
  262. "y": self.y,
  263. "foo": self.foo}
  264. def stoppedObserving(self, perspective, observer):
  265. log.msg("stopped observing")
  266. observer.callRemote("end")
  267. if observer == self.observer:
  268. self.observer = None
  269. class RatherBaroqueCache(pb.RemoteCache):
  270. def observe_foo(self, newFoo):
  271. self.foo = newFoo
  272. def observe_end(self):
  273. log.msg("the end of things")
  274. pb.setUnjellyableForClass(VeryVeryComplicatedCacheable, RatherBaroqueCache)
  275. class SimpleLocalCache(pb.RemoteCache):
  276. def setCopyableState(self, state):
  277. self.__dict__.update(state)
  278. def checkMethod(self):
  279. return self.check
  280. def checkSelf(self):
  281. return self
  282. def check(self):
  283. return 1
  284. pb.setUnjellyableForClass(SimpleCache, SimpleLocalCache)
  285. class NestedCache(pb.Referenceable):
  286. def __init__(self):
  287. self.x = SimpleCache()
  288. def remote_getCache(self):
  289. return [self.x,self.x]
  290. def remote_putCache(self, cache):
  291. return (self.x is cache)
  292. class Observable(pb.Referenceable):
  293. def __init__(self):
  294. self.observers = []
  295. def remote_observe(self, obs):
  296. self.observers.append(obs)
  297. def remote_unobserve(self, obs):
  298. self.observers.remove(obs)
  299. def notify(self, obj):
  300. for observer in self.observers:
  301. observer.callRemote('notify', self, obj)
  302. class DeferredRemote(pb.Referenceable):
  303. def __init__(self):
  304. self.run = 0
  305. def runMe(self, arg):
  306. self.run = arg
  307. return arg + 1
  308. def dontRunMe(self, arg):
  309. assert 0, "shouldn't have been run!"
  310. def remote_doItLater(self):
  311. """
  312. Return a L{Deferred} to be fired on client side. When fired,
  313. C{self.runMe} is called.
  314. """
  315. d = Deferred()
  316. d.addCallbacks(self.runMe, self.dontRunMe)
  317. self.d = d
  318. return d
  319. class Observer(pb.Referenceable):
  320. notified = 0
  321. obj = None
  322. def remote_notify(self, other, obj):
  323. self.obj = obj
  324. self.notified = self.notified + 1
  325. other.callRemote('unobserve',self)
  326. class NewStyleCopy(pb.Copyable, pb.RemoteCopy, object):
  327. def __init__(self, s):
  328. self.s = s
  329. pb.setUnjellyableForClass(NewStyleCopy, NewStyleCopy)
  330. class NewStyleCopy2(pb.Copyable, pb.RemoteCopy, object):
  331. allocated = 0
  332. initialized = 0
  333. value = 1
  334. def __new__(self):
  335. NewStyleCopy2.allocated += 1
  336. inst = object.__new__(self)
  337. inst.value = 2
  338. return inst
  339. def __init__(self):
  340. NewStyleCopy2.initialized += 1
  341. pb.setUnjellyableForClass(NewStyleCopy2, NewStyleCopy2)
  342. class NewStyleCacheCopy(pb.Cacheable, pb.RemoteCache, object):
  343. def getStateToCacheAndObserveFor(self, perspective, observer):
  344. return self.__dict__
  345. pb.setUnjellyableForClass(NewStyleCacheCopy, NewStyleCacheCopy)
  346. class Echoer(pb.Root):
  347. def remote_echo(self, st):
  348. return st
  349. def remote_echoWithKeywords(self, st, **kw):
  350. return (st, kw)
  351. class CachedReturner(pb.Root):
  352. def __init__(self, cache):
  353. self.cache = cache
  354. def remote_giveMeCache(self, st):
  355. return self.cache
  356. class NewStyleTests(unittest.SynchronousTestCase):
  357. def setUp(self):
  358. """
  359. Create a pb server using L{Echoer} protocol and connect a client to it.
  360. """
  361. self.serverFactory = pb.PBServerFactory(Echoer())
  362. clientFactory = pb.PBClientFactory()
  363. client, self.server, self.pump = connectServerAndClient(
  364. test=self,
  365. clientFactory=clientFactory,
  366. serverFactory=self.serverFactory)
  367. self.ref = self.successResultOf(clientFactory.getRootObject())
  368. def tearDown(self):
  369. """
  370. Close client and server connections, reset values of L{NewStyleCopy2}
  371. class variables.
  372. """
  373. NewStyleCopy2.allocated = 0
  374. NewStyleCopy2.initialized = 0
  375. NewStyleCopy2.value = 1
  376. def test_newStyle(self):
  377. """
  378. Create a new style object, send it over the wire, and check the result.
  379. """
  380. orig = NewStyleCopy("value")
  381. d = self.ref.callRemote("echo", orig)
  382. self.pump.flush()
  383. def cb(res):
  384. self.assertIsInstance(res, NewStyleCopy)
  385. self.assertEqual(res.s, "value")
  386. self.assertFalse(res is orig) # no cheating :)
  387. d.addCallback(cb)
  388. return d
  389. def test_alloc(self):
  390. """
  391. Send a new style object and check the number of allocations.
  392. """
  393. orig = NewStyleCopy2()
  394. self.assertEqual(NewStyleCopy2.allocated, 1)
  395. self.assertEqual(NewStyleCopy2.initialized, 1)
  396. d = self.ref.callRemote("echo", orig)
  397. self.pump.flush()
  398. def cb(res):
  399. # Receiving the response creates a third one on the way back
  400. self.assertIsInstance(res, NewStyleCopy2)
  401. self.assertEqual(res.value, 2)
  402. self.assertEqual(NewStyleCopy2.allocated, 3)
  403. self.assertEqual(NewStyleCopy2.initialized, 1)
  404. self.assertIsNot(res, orig) # No cheating :)
  405. # Sending the object creates a second one on the far side
  406. d.addCallback(cb)
  407. return d
  408. def test_newStyleWithKeywords(self):
  409. """
  410. Create a new style object with keywords,
  411. send it over the wire, and check the result.
  412. """
  413. orig = NewStyleCopy("value1")
  414. d = self.ref.callRemote("echoWithKeywords", orig,
  415. keyword1="one", keyword2="two")
  416. self.pump.flush()
  417. def cb(res):
  418. self.assertIsInstance(res, tuple)
  419. self.assertIsInstance(res[0], NewStyleCopy)
  420. self.assertIsInstance(res[1], dict)
  421. self.assertEqual(res[0].s, "value1")
  422. self.assertIsNot(res[0], orig)
  423. self.assertEqual(res[1], {"keyword1": "one", "keyword2": "two"})
  424. d.addCallback(cb)
  425. return d
  426. class ConnectionNotifyServerFactory(pb.PBServerFactory):
  427. """
  428. A server factory which stores the last connection and fires a
  429. L{Deferred} on connection made. This factory can handle only one
  430. client connection.
  431. @ivar protocolInstance: the last protocol instance.
  432. @type protocolInstance: C{pb.Broker}
  433. @ivar connectionMade: the deferred fired upon connection.
  434. @type connectionMade: C{Deferred}
  435. """
  436. protocolInstance = None
  437. def __init__(self, root):
  438. """
  439. Initialize the factory.
  440. """
  441. pb.PBServerFactory.__init__(self, root)
  442. self.connectionMade = Deferred()
  443. def clientConnectionMade(self, protocol):
  444. """
  445. Store the protocol and fire the connection deferred.
  446. """
  447. self.protocolInstance = protocol
  448. d, self.connectionMade = self.connectionMade, None
  449. if d is not None:
  450. d.callback(None)
  451. class NewStyleCachedTests(unittest.TestCase):
  452. def setUp(self):
  453. """
  454. Create a pb server using L{CachedReturner} protocol and connect a
  455. client to it.
  456. """
  457. self.orig = NewStyleCacheCopy()
  458. self.orig.s = "value"
  459. self.server = reactor.listenTCP(0,
  460. ConnectionNotifyServerFactory(CachedReturner(self.orig)))
  461. clientFactory = pb.PBClientFactory()
  462. reactor.connectTCP("localhost", self.server.getHost().port,
  463. clientFactory)
  464. def gotRoot(ref):
  465. self.ref = ref
  466. d1 = clientFactory.getRootObject().addCallback(gotRoot)
  467. d2 = self.server.factory.connectionMade
  468. return gatherResults([d1, d2])
  469. def tearDown(self):
  470. """
  471. Close client and server connections.
  472. """
  473. self.server.factory.protocolInstance.transport.loseConnection()
  474. self.ref.broker.transport.loseConnection()
  475. return self.server.stopListening()
  476. def test_newStyleCache(self):
  477. """
  478. A new-style cacheable object can be retrieved and re-retrieved over a
  479. single connection. The value of an attribute of the cacheable can be
  480. accessed on the receiving side.
  481. """
  482. d = self.ref.callRemote("giveMeCache", self.orig)
  483. def cb(res, again):
  484. self.assertIsInstance(res, NewStyleCacheCopy)
  485. self.assertEqual("value", res.s)
  486. # no cheating :)
  487. self.assertIsNot(self.orig, res)
  488. if again:
  489. # Save a reference so it stays alive for the rest of this test
  490. self.res = res
  491. # And ask for it again to exercise the special re-jelly logic in
  492. # Cacheable.
  493. return self.ref.callRemote("giveMeCache", self.orig)
  494. d.addCallback(cb, True)
  495. d.addCallback(cb, False)
  496. return d
  497. class BrokerTests(unittest.TestCase):
  498. thunkResult = None
  499. def tearDown(self):
  500. try:
  501. # from RemotePublished.getFileName
  502. os.unlink('None-None-TESTING.pub')
  503. except OSError:
  504. pass
  505. def thunkErrorBad(self, error):
  506. self.fail("This should cause a return value, not %s" % (error,))
  507. def thunkResultGood(self, result):
  508. self.thunkResult = result
  509. def thunkErrorGood(self, tb):
  510. pass
  511. def thunkResultBad(self, result):
  512. self.fail("This should cause an error, not %s" % (result,))
  513. def test_reference(self):
  514. c, s, pump = connectedServerAndClient(test=self)
  515. class X(pb.Referenceable):
  516. def remote_catch(self,arg):
  517. self.caught = arg
  518. class Y(pb.Referenceable):
  519. def remote_throw(self, a, b):
  520. a.callRemote('catch', b)
  521. s.setNameForLocal("y", Y())
  522. y = c.remoteForName("y")
  523. x = X()
  524. z = X()
  525. y.callRemote('throw', x, z)
  526. pump.pump()
  527. pump.pump()
  528. pump.pump()
  529. self.assertIs(x.caught, z, "X should have caught Z")
  530. # make sure references to remote methods are equals
  531. self.assertEqual(y.remoteMethod('throw'), y.remoteMethod('throw'))
  532. def test_result(self):
  533. c, s, pump = connectedServerAndClient(test=self)
  534. for x, y in (c, s), (s, c):
  535. # test reflexivity
  536. foo = SimpleRemote()
  537. x.setNameForLocal("foo", foo)
  538. bar = y.remoteForName("foo")
  539. self.expectedThunkResult = 8
  540. bar.callRemote('thunk',self.expectedThunkResult - 1
  541. ).addCallbacks(self.thunkResultGood, self.thunkErrorBad)
  542. # Send question.
  543. pump.pump()
  544. # Send response.
  545. pump.pump()
  546. # Shouldn't require any more pumping than that...
  547. self.assertEqual(self.thunkResult, self.expectedThunkResult,
  548. "result wasn't received.")
  549. def refcountResult(self, result):
  550. self.nestedRemote = result
  551. def test_tooManyRefs(self):
  552. l = []
  553. e = []
  554. c, s, pump = connectedServerAndClient(test=self)
  555. foo = NestedRemote()
  556. s.setNameForLocal("foo", foo)
  557. x = c.remoteForName("foo")
  558. for igno in range(pb.MAX_BROKER_REFS + 10):
  559. if s.transport.closed or c.transport.closed:
  560. break
  561. x.callRemote("getSimple").addCallbacks(l.append, e.append)
  562. pump.pump()
  563. expected = (pb.MAX_BROKER_REFS - 1)
  564. self.assertTrue(s.transport.closed, "transport was not closed")
  565. self.assertEqual(len(l), expected,
  566. "expected %s got %s" % (expected, len(l)))
  567. def test_copy(self):
  568. c, s, pump = connectedServerAndClient(test=self)
  569. foo = NestedCopy()
  570. s.setNameForLocal("foo", foo)
  571. x = c.remoteForName("foo")
  572. x.callRemote('getCopy'
  573. ).addCallbacks(self.thunkResultGood, self.thunkErrorBad)
  574. pump.pump()
  575. pump.pump()
  576. self.assertEqual(self.thunkResult.x, 1)
  577. self.assertEqual(self.thunkResult.y['Hello'], 'World')
  578. self.assertEqual(self.thunkResult.z[0], 'test')
  579. def test_observe(self):
  580. c, s, pump = connectedServerAndClient(test=self)
  581. # this is really testing the comparison between remote objects, to make
  582. # sure that you can *UN*observe when you have an observer architecture.
  583. a = Observable()
  584. b = Observer()
  585. s.setNameForLocal("a", a)
  586. ra = c.remoteForName("a")
  587. ra.callRemote('observe',b)
  588. pump.pump()
  589. a.notify(1)
  590. pump.pump()
  591. pump.pump()
  592. a.notify(10)
  593. pump.pump()
  594. pump.pump()
  595. self.assertIsNotNone(b.obj, "didn't notify")
  596. self.assertEqual(b.obj, 1, 'notified too much')
  597. def test_defer(self):
  598. c, s, pump = connectedServerAndClient(test=self)
  599. d = DeferredRemote()
  600. s.setNameForLocal("d", d)
  601. e = c.remoteForName("d")
  602. pump.pump(); pump.pump()
  603. results = []
  604. e.callRemote('doItLater').addCallback(results.append)
  605. pump.pump(); pump.pump()
  606. self.assertFalse(d.run, "Deferred method run too early.")
  607. d.d.callback(5)
  608. self.assertEqual(d.run, 5, "Deferred method run too late.")
  609. pump.pump(); pump.pump()
  610. self.assertEqual(results[0], 6, "Incorrect result.")
  611. def test_refcount(self):
  612. c, s, pump = connectedServerAndClient(test=self)
  613. foo = NestedRemote()
  614. s.setNameForLocal("foo", foo)
  615. bar = c.remoteForName("foo")
  616. bar.callRemote('getSimple'
  617. ).addCallbacks(self.refcountResult, self.thunkErrorBad)
  618. # send question
  619. pump.pump()
  620. # send response
  621. pump.pump()
  622. # delving into internal structures here, because GC is sort of
  623. # inherently internal.
  624. rluid = self.nestedRemote.luid
  625. self.assertIn(rluid, s.localObjects)
  626. del self.nestedRemote
  627. # nudge the gc
  628. if sys.hexversion >= 0x2000000:
  629. gc.collect()
  630. # try to nudge the GC even if we can't really
  631. pump.pump()
  632. pump.pump()
  633. pump.pump()
  634. self.assertNotIn(rluid, s.localObjects)
  635. def test_cache(self):
  636. c, s, pump = connectedServerAndClient(test=self)
  637. obj = NestedCache()
  638. obj2 = NestedComplicatedCache()
  639. vcc = obj2.c
  640. s.setNameForLocal("obj", obj)
  641. s.setNameForLocal("xxx", obj2)
  642. o2 = c.remoteForName("obj")
  643. o3 = c.remoteForName("xxx")
  644. coll = []
  645. o2.callRemote("getCache"
  646. ).addCallback(coll.append).addErrback(coll.append)
  647. o2.callRemote("getCache"
  648. ).addCallback(coll.append).addErrback(coll.append)
  649. complex = []
  650. o3.callRemote("getCache").addCallback(complex.append)
  651. o3.callRemote("getCache").addCallback(complex.append)
  652. pump.flush()
  653. # `worst things first'
  654. self.assertEqual(complex[0].x, 1)
  655. self.assertEqual(complex[0].y, 2)
  656. self.assertEqual(complex[0].foo, 3)
  657. vcc.setFoo4()
  658. pump.flush()
  659. self.assertEqual(complex[0].foo, 4)
  660. self.assertEqual(len(coll), 2)
  661. cp = coll[0][0]
  662. self.assertIdentical(cp.checkMethod().__self__ if _PY3 else
  663. cp.checkMethod().im_self, cp,
  664. "potential refcounting issue")
  665. self.assertIdentical(cp.checkSelf(), cp,
  666. "other potential refcounting issue")
  667. col2 = []
  668. o2.callRemote('putCache',cp).addCallback(col2.append)
  669. pump.flush()
  670. # The objects were the same (testing lcache identity)
  671. self.assertTrue(col2[0])
  672. # test equality of references to methods
  673. self.assertEqual(o2.remoteMethod("getCache"),
  674. o2.remoteMethod("getCache"))
  675. # now, refcounting (similar to testRefCount)
  676. luid = cp.luid
  677. baroqueLuid = complex[0].luid
  678. self.assertIn(luid, s.remotelyCachedObjects,
  679. "remote cache doesn't have it")
  680. del coll
  681. del cp
  682. pump.flush()
  683. del complex
  684. del col2
  685. # extra nudge...
  686. pump.flush()
  687. # del vcc.observer
  688. # nudge the gc
  689. if sys.hexversion >= 0x2000000:
  690. gc.collect()
  691. # try to nudge the GC even if we can't really
  692. pump.flush()
  693. # The GC is done with it.
  694. self.assertNotIn(luid, s.remotelyCachedObjects,
  695. "Server still had it after GC")
  696. self.assertNotIn(luid, c.locallyCachedObjects,
  697. "Client still had it after GC")
  698. self.assertNotIn(baroqueLuid, s.remotelyCachedObjects,
  699. "Server still had complex after GC")
  700. self.assertNotIn(baroqueLuid, c.locallyCachedObjects,
  701. "Client still had complex after GC")
  702. self.assertIsNone(vcc.observer, "observer was not removed")
  703. def test_publishable(self):
  704. try:
  705. os.unlink('None-None-TESTING.pub') # from RemotePublished.getFileName
  706. except OSError:
  707. pass # Sometimes it's not there.
  708. c, s, pump = connectedServerAndClient(test=self)
  709. foo = GetPublisher()
  710. # foo.pub.timestamp = 1.0
  711. s.setNameForLocal("foo", foo)
  712. bar = c.remoteForName("foo")
  713. accum = []
  714. bar.callRemote('getPub').addCallbacks(accum.append, self.thunkErrorBad)
  715. pump.flush()
  716. obj = accum.pop()
  717. self.assertEqual(obj.activateCalled, 1)
  718. self.assertEqual(obj.isActivated, 1)
  719. self.assertEqual(obj.yayIGotPublished, 1)
  720. # timestamp's dirty, we don't have a cache file
  721. self.assertEqual(obj._wasCleanWhenLoaded, 0)
  722. c, s, pump = connectedServerAndClient(test=self)
  723. s.setNameForLocal("foo", foo)
  724. bar = c.remoteForName("foo")
  725. bar.callRemote('getPub').addCallbacks(accum.append, self.thunkErrorBad)
  726. pump.flush()
  727. obj = accum.pop()
  728. # timestamp's clean, our cache file is up-to-date
  729. self.assertEqual(obj._wasCleanWhenLoaded, 1)
  730. def gotCopy(self, val):
  731. self.thunkResult = val.id
  732. def test_factoryCopy(self):
  733. c, s, pump = connectedServerAndClient(test=self)
  734. ID = 99
  735. obj = NestedCopy()
  736. s.setNameForLocal("foo", obj)
  737. x = c.remoteForName("foo")
  738. x.callRemote('getFactory', ID
  739. ).addCallbacks(self.gotCopy, self.thunkResultBad)
  740. pump.pump()
  741. pump.pump()
  742. pump.pump()
  743. self.assertEqual(self.thunkResult, ID,
  744. "ID not correct on factory object %s" % (self.thunkResult,))
  745. bigString = b"helloworld" * 50
  746. callbackArgs = None
  747. callbackKeyword = None
  748. def finishedCallback(*args, **kw):
  749. global callbackArgs, callbackKeyword
  750. callbackArgs = args
  751. callbackKeyword = kw
  752. class Pagerizer(pb.Referenceable):
  753. def __init__(self, callback, *args, **kw):
  754. self.callback, self.args, self.kw = callback, args, kw
  755. def remote_getPages(self, collector):
  756. util.StringPager(collector, bigString, 100,
  757. self.callback, *self.args, **self.kw)
  758. self.args = self.kw = None
  759. class FilePagerizer(pb.Referenceable):
  760. pager = None
  761. def __init__(self, filename, callback, *args, **kw):
  762. self.filename = filename
  763. self.callback, self.args, self.kw = callback, args, kw
  764. def remote_getPages(self, collector):
  765. self.pager = util.FilePager(collector, open(self.filename, 'rb'),
  766. self.callback, *self.args, **self.kw)
  767. self.args = self.kw = None
  768. class PagingTests(unittest.TestCase):
  769. """
  770. Test pb objects sending data by pages.
  771. """
  772. def setUp(self):
  773. """
  774. Create a file used to test L{util.FilePager}.
  775. """
  776. self.filename = self.mktemp()
  777. with open(self.filename, 'wb') as f:
  778. f.write(bigString)
  779. def test_pagingWithCallback(self):
  780. """
  781. Test L{util.StringPager}, passing a callback to fire when all pages
  782. are sent.
  783. """
  784. c, s, pump = connectedServerAndClient(test=self)
  785. s.setNameForLocal("foo", Pagerizer(finishedCallback, 'hello', value=10))
  786. x = c.remoteForName("foo")
  787. l = []
  788. util.getAllPages(x, "getPages").addCallback(l.append)
  789. while not l:
  790. pump.pump()
  791. self.assertEqual(b''.join(l[0]), bigString,
  792. "Pages received not equal to pages sent!")
  793. self.assertEqual(callbackArgs, ('hello',),
  794. "Completed callback not invoked")
  795. self.assertEqual(callbackKeyword, {'value': 10},
  796. "Completed callback not invoked")
  797. def test_pagingWithoutCallback(self):
  798. """
  799. Test L{util.StringPager} without a callback.
  800. """
  801. c, s, pump = connectedServerAndClient(test=self)
  802. s.setNameForLocal("foo", Pagerizer(None))
  803. x = c.remoteForName("foo")
  804. l = []
  805. util.getAllPages(x, "getPages").addCallback(l.append)
  806. while not l:
  807. pump.pump()
  808. self.assertEqual(b''.join(l[0]), bigString,
  809. "Pages received not equal to pages sent!")
  810. def test_emptyFilePaging(self):
  811. """
  812. Test L{util.FilePager}, sending an empty file.
  813. """
  814. filenameEmpty = self.mktemp()
  815. open(filenameEmpty, 'w').close()
  816. c, s, pump = connectedServerAndClient(test=self)
  817. pagerizer = FilePagerizer(filenameEmpty, None)
  818. s.setNameForLocal("bar", pagerizer)
  819. x = c.remoteForName("bar")
  820. l = []
  821. util.getAllPages(x, "getPages").addCallback(l.append)
  822. ttl = 10
  823. while not l and ttl > 0:
  824. pump.pump()
  825. ttl -= 1
  826. if not ttl:
  827. self.fail('getAllPages timed out')
  828. self.assertEqual(b''.join(l[0]), b'',
  829. "Pages received not equal to pages sent!")
  830. def test_filePagingWithCallback(self):
  831. """
  832. Test L{util.FilePager}, passing a callback to fire when all pages
  833. are sent, and verify that the pager doesn't keep chunks in memory.
  834. """
  835. c, s, pump = connectedServerAndClient(test=self)
  836. pagerizer = FilePagerizer(self.filename, finishedCallback,
  837. 'frodo', value = 9)
  838. s.setNameForLocal("bar", pagerizer)
  839. x = c.remoteForName("bar")
  840. l = []
  841. util.getAllPages(x, "getPages").addCallback(l.append)
  842. while not l:
  843. pump.pump()
  844. self.assertEqual(b''.join(l[0]), bigString,
  845. "Pages received not equal to pages sent!")
  846. self.assertEqual(callbackArgs, ('frodo',),
  847. "Completed callback not invoked")
  848. self.assertEqual(callbackKeyword, {'value': 9},
  849. "Completed callback not invoked")
  850. self.assertEqual(pagerizer.pager.chunks, [])
  851. def test_filePagingWithoutCallback(self):
  852. """
  853. Test L{util.FilePager} without a callback.
  854. """
  855. c, s, pump = connectedServerAndClient(test=self)
  856. pagerizer = FilePagerizer(self.filename, None)
  857. s.setNameForLocal("bar", pagerizer)
  858. x = c.remoteForName("bar")
  859. l = []
  860. util.getAllPages(x, "getPages").addCallback(l.append)
  861. while not l:
  862. pump.pump()
  863. self.assertEqual(b''.join(l[0]), bigString,
  864. "Pages received not equal to pages sent!")
  865. self.assertEqual(pagerizer.pager.chunks, [])
  866. class DumbPublishable(publish.Publishable):
  867. def getStateToPublish(self):
  868. return {"yayIGotPublished": 1}
  869. class DumbPub(publish.RemotePublished):
  870. def activated(self):
  871. self.activateCalled = 1
  872. class GetPublisher(pb.Referenceable):
  873. def __init__(self):
  874. self.pub = DumbPublishable("TESTING")
  875. def remote_getPub(self):
  876. return self.pub
  877. pb.setUnjellyableForClass(DumbPublishable, DumbPub)
  878. class DisconnectionTests(unittest.TestCase):
  879. """
  880. Test disconnection callbacks.
  881. """
  882. def error(self, *args):
  883. raise RuntimeError("I shouldn't have been called: %s" % (args,))
  884. def gotDisconnected(self):
  885. """
  886. Called on broker disconnect.
  887. """
  888. self.gotCallback = 1
  889. def objectDisconnected(self, o):
  890. """
  891. Called on RemoteReference disconnect.
  892. """
  893. self.assertEqual(o, self.remoteObject)
  894. self.objectCallback = 1
  895. def test_badSerialization(self):
  896. c, s, pump = connectedServerAndClient(test=self)
  897. pump.pump()
  898. s.setNameForLocal("o", BadCopySet())
  899. g = c.remoteForName("o")
  900. l = []
  901. g.callRemote("setBadCopy", BadCopyable()).addErrback(l.append)
  902. pump.flush()
  903. self.assertEqual(len(l), 1)
  904. def test_disconnection(self):
  905. c, s, pump = connectedServerAndClient(test=self)
  906. pump.pump()
  907. s.setNameForLocal("o", SimpleRemote())
  908. # get a client reference to server object
  909. r = c.remoteForName("o")
  910. pump.pump()
  911. pump.pump()
  912. pump.pump()
  913. # register and then unregister disconnect callbacks
  914. # making sure they get unregistered
  915. c.notifyOnDisconnect(self.error)
  916. self.assertIn(self.error, c.disconnects)
  917. c.dontNotifyOnDisconnect(self.error)
  918. self.assertNotIn(self.error, c.disconnects)
  919. r.notifyOnDisconnect(self.error)
  920. self.assertIn(r._disconnected, c.disconnects)
  921. self.assertIn(self.error, r.disconnectCallbacks)
  922. r.dontNotifyOnDisconnect(self.error)
  923. self.assertNotIn(r._disconnected, c.disconnects)
  924. self.assertNotIn(self.error, r.disconnectCallbacks)
  925. # register disconnect callbacks
  926. c.notifyOnDisconnect(self.gotDisconnected)
  927. r.notifyOnDisconnect(self.objectDisconnected)
  928. self.remoteObject = r
  929. # disconnect
  930. c.connectionLost(failure.Failure(main.CONNECTION_DONE))
  931. self.assertTrue(self.gotCallback)
  932. self.assertTrue(self.objectCallback)
  933. class FreakOut(Exception):
  934. pass
  935. class BadCopyable(pb.Copyable):
  936. def getStateToCopyFor(self, p):
  937. raise FreakOut()
  938. class BadCopySet(pb.Referenceable):
  939. def remote_setBadCopy(self, bc):
  940. return None
  941. class LocalRemoteTest(util.LocalAsRemote):
  942. reportAllTracebacks = 0
  943. def sync_add1(self, x):
  944. return x + 1
  945. def async_add(self, x=0, y=1):
  946. return x + y
  947. def async_fail(self):
  948. raise RuntimeError()
  949. @implementer(pb.IPerspective)
  950. class MyPerspective(pb.Avatar):
  951. """
  952. @ivar loggedIn: set to C{True} when the avatar is logged in.
  953. @type loggedIn: C{bool}
  954. @ivar loggedOut: set to C{True} when the avatar is logged out.
  955. @type loggedOut: C{bool}
  956. """
  957. loggedIn = loggedOut = False
  958. def __init__(self, avatarId):
  959. self.avatarId = avatarId
  960. def perspective_getAvatarId(self):
  961. """
  962. Return the avatar identifier which was used to access this avatar.
  963. """
  964. return self.avatarId
  965. def perspective_getViewPoint(self):
  966. return MyView()
  967. def perspective_add(self, a, b):
  968. """
  969. Add the given objects and return the result. This is a method
  970. unavailable on L{Echoer}, so it can only be invoked by authenticated
  971. users who received their avatar from L{TestRealm}.
  972. """
  973. return a + b
  974. def logout(self):
  975. self.loggedOut = True
  976. class TestRealm(object):
  977. """
  978. A realm which repeatedly gives out a single instance of L{MyPerspective}
  979. for non-anonymous logins and which gives out a new instance of L{Echoer}
  980. for each anonymous login.
  981. @ivar lastPerspective: The L{MyPerspective} most recently created and
  982. returned from C{requestAvatar}.
  983. @ivar perspectiveFactory: A one-argument callable which will be used to
  984. create avatars to be returned from C{requestAvatar}.
  985. """
  986. perspectiveFactory = MyPerspective
  987. lastPerspective = None
  988. def requestAvatar(self, avatarId, mind, interface):
  989. """
  990. Verify that the mind and interface supplied have the expected values
  991. (this should really be done somewhere else, like inside a test method)
  992. and return an avatar appropriate for the given identifier.
  993. """
  994. assert interface == pb.IPerspective
  995. assert mind == "BRAINS!"
  996. if avatarId is checkers.ANONYMOUS:
  997. return pb.IPerspective, Echoer(), lambda: None
  998. else:
  999. self.lastPerspective = self.perspectiveFactory(avatarId)
  1000. self.lastPerspective.loggedIn = True
  1001. return (
  1002. pb.IPerspective, self.lastPerspective,
  1003. self.lastPerspective.logout)
  1004. class MyView(pb.Viewable):
  1005. def view_check(self, user):
  1006. return isinstance(user, MyPerspective)
  1007. class LeakyRealm(TestRealm):
  1008. """
  1009. A realm which hangs onto a reference to the mind object in its logout
  1010. function.
  1011. """
  1012. def __init__(self, mindEater):
  1013. """
  1014. Create a L{LeakyRealm}.
  1015. @param mindEater: a callable that will be called with the C{mind}
  1016. object when it is available
  1017. """
  1018. self._mindEater = mindEater
  1019. def requestAvatar(self, avatarId, mind, interface):
  1020. self._mindEater(mind)
  1021. persp = self.perspectiveFactory(avatarId)
  1022. return (pb.IPerspective, persp, lambda : (mind, persp.logout()))
  1023. class NewCredLeakTests(unittest.TestCase):
  1024. """
  1025. Tests to try to trigger memory leaks.
  1026. """
  1027. def test_logoutLeak(self):
  1028. """
  1029. The server does not leak a reference when the client disconnects
  1030. suddenly, even if the cred logout function forms a reference cycle with
  1031. the perspective.
  1032. """
  1033. # keep a weak reference to the mind object, which we can verify later
  1034. # evaluates to None, thereby ensuring the reference leak is fixed.
  1035. self.mindRef = None
  1036. def setMindRef(mind):
  1037. self.mindRef = weakref.ref(mind)
  1038. clientBroker, serverBroker, pump = connectedServerAndClient(
  1039. test=self, realm=LeakyRealm(setMindRef)
  1040. )
  1041. # log in from the client
  1042. connectionBroken = []
  1043. root = clientBroker.remoteForName("root")
  1044. d = root.callRemote("login", b'guest')
  1045. def cbResponse(x):
  1046. challenge, challenger = x
  1047. mind = SimpleRemote()
  1048. return challenger.callRemote("respond",
  1049. pb.respond(challenge, b'guest'), mind)
  1050. d.addCallback(cbResponse)
  1051. def connectionLost(_):
  1052. pump.stop() # don't try to pump data anymore - it won't work
  1053. connectionBroken.append(1)
  1054. serverBroker.connectionLost(failure.Failure(RuntimeError("boom")))
  1055. d.addCallback(connectionLost)
  1056. # flush out the response and connectionLost
  1057. pump.flush()
  1058. self.assertEqual(connectionBroken, [1])
  1059. # and check for lingering references - requestAvatar sets mindRef
  1060. # to a weakref to the mind; this object should be gc'd, and thus
  1061. # the ref should return None
  1062. gc.collect()
  1063. self.assertIsNone(self.mindRef())
  1064. class NewCredTests(unittest.TestCase):
  1065. """
  1066. Tests related to the L{twisted.cred} support in PB.
  1067. """
  1068. def setUp(self):
  1069. """
  1070. Create a portal with no checkers and wrap it around a simple test
  1071. realm. Set up a PB server on a TCP port which serves perspectives
  1072. using that portal.
  1073. """
  1074. self.realm = TestRealm()
  1075. self.portal = portal.Portal(self.realm)
  1076. self.serverFactory = ConnectionNotifyServerFactory(self.portal)
  1077. self.clientFactory = pb.PBClientFactory()
  1078. def establishClientAndServer(self, _ignored=None):
  1079. """
  1080. Connect a client obtained from C{clientFactory} and a server
  1081. obtained from the current server factory via an L{IOPump},
  1082. then assign them to the appropriate instance variables
  1083. @ivar clientFactory: the broker client factory
  1084. @ivar clientFactory: L{pb.PBClientFactory} instance
  1085. @ivar client: the client broker
  1086. @type client: L{pb.Broker}
  1087. @ivar server: the server broker
  1088. @type server: L{pb.Broker}
  1089. @ivar pump: the IOPump connecting the client and server
  1090. @type pump: L{IOPump}
  1091. @ivar connector: A connector whose connect method recreates
  1092. the above instance variables
  1093. @type connector: L{twisted.internet.base.IConnector}
  1094. """
  1095. self.client, self.server, self.pump = connectServerAndClient(
  1096. self, self.clientFactory, self.serverFactory)
  1097. self.connectorState = _ReconnectingFakeConnectorState()
  1098. self.connector = _ReconnectingFakeConnector(
  1099. address.IPv4Address('TCP', '127.0.0.1', 4321),
  1100. self.connectorState)
  1101. self.connectorState.notifyOnConnect().addCallback(
  1102. self.establishClientAndServer)
  1103. def completeClientLostConnection(
  1104. self, reason=failure.Failure(main.CONNECTION_DONE)):
  1105. """
  1106. Asserts that the client broker's transport was closed and then
  1107. mimics the event loop by calling the broker's connectionLost
  1108. callback with C{reason}, followed by C{self.clientFactory}'s
  1109. C{clientConnectionLost}
  1110. @param reason: (optional) the reason to pass to the client
  1111. broker's connectionLost callback
  1112. @type reason: L{Failure}
  1113. """
  1114. self.assertTrue(self.client.transport.closed)
  1115. # simulate the reactor calling back the client's
  1116. # connectionLost after the loseConnection implied by
  1117. # clientFactory.disconnect
  1118. self.client.connectionLost(reason)
  1119. self.clientFactory.clientConnectionLost(self.connector, reason)
  1120. def test_getRootObject(self):
  1121. """
  1122. Assert that L{PBClientFactory.getRootObject}'s Deferred fires with
  1123. a L{RemoteReference}, and that disconnecting it runs its
  1124. disconnection callbacks.
  1125. """
  1126. self.establishClientAndServer()
  1127. rootObjDeferred = self.clientFactory.getRootObject()
  1128. def gotRootObject(rootObj):
  1129. self.assertIsInstance(rootObj, pb.RemoteReference)
  1130. return rootObj
  1131. def disconnect(rootObj):
  1132. disconnectedDeferred = Deferred()
  1133. rootObj.notifyOnDisconnect(disconnectedDeferred.callback)
  1134. self.clientFactory.disconnect()
  1135. self.completeClientLostConnection()
  1136. return disconnectedDeferred
  1137. rootObjDeferred.addCallback(gotRootObject)
  1138. rootObjDeferred.addCallback(disconnect)
  1139. return rootObjDeferred
  1140. def test_deadReferenceError(self):
  1141. """
  1142. Test that when a connection is lost, calling a method on a
  1143. RemoteReference obtained from it raises L{DeadReferenceError}.
  1144. """
  1145. self.establishClientAndServer()
  1146. rootObjDeferred = self.clientFactory.getRootObject()
  1147. def gotRootObject(rootObj):
  1148. disconnectedDeferred = Deferred()
  1149. rootObj.notifyOnDisconnect(disconnectedDeferred.callback)
  1150. def lostConnection(ign):
  1151. self.assertRaises(
  1152. pb.DeadReferenceError,
  1153. rootObj.callRemote, 'method')
  1154. disconnectedDeferred.addCallback(lostConnection)
  1155. self.clientFactory.disconnect()
  1156. self.completeClientLostConnection()
  1157. return disconnectedDeferred
  1158. return rootObjDeferred.addCallback(gotRootObject)
  1159. def test_clientConnectionLost(self):
  1160. """
  1161. Test that if the L{reconnecting} flag is passed with a True value then
  1162. a remote call made from a disconnection notification callback gets a
  1163. result successfully.
  1164. """
  1165. class ReconnectOnce(pb.PBClientFactory):
  1166. reconnectedAlready = False
  1167. def clientConnectionLost(self, connector, reason):
  1168. reconnecting = not self.reconnectedAlready
  1169. self.reconnectedAlready = True
  1170. result = pb.PBClientFactory.clientConnectionLost(
  1171. self, connector, reason, reconnecting)
  1172. if reconnecting:
  1173. connector.connect()
  1174. return result
  1175. self.clientFactory = ReconnectOnce()
  1176. self.establishClientAndServer()
  1177. rootObjDeferred = self.clientFactory.getRootObject()
  1178. def gotRootObject(rootObj):
  1179. self.assertIsInstance(rootObj, pb.RemoteReference)
  1180. d = Deferred()
  1181. rootObj.notifyOnDisconnect(d.callback)
  1182. # request a disconnection
  1183. self.clientFactory.disconnect()
  1184. self.completeClientLostConnection()
  1185. def disconnected(ign):
  1186. d = self.clientFactory.getRootObject()
  1187. def gotAnotherRootObject(anotherRootObj):
  1188. self.assertIsInstance(anotherRootObj, pb.RemoteReference)
  1189. d = Deferred()
  1190. anotherRootObj.notifyOnDisconnect(d.callback)
  1191. self.clientFactory.disconnect()
  1192. self.completeClientLostConnection()
  1193. return d
  1194. return d.addCallback(gotAnotherRootObject)
  1195. return d.addCallback(disconnected)
  1196. return rootObjDeferred.addCallback(gotRootObject)
  1197. def test_immediateClose(self):
  1198. """
  1199. Test that if a Broker loses its connection without receiving any bytes,
  1200. it doesn't raise any exceptions or log any errors.
  1201. """
  1202. self.establishClientAndServer()
  1203. serverProto = self.serverFactory.buildProtocol(('127.0.0.1', 12345))
  1204. serverProto.makeConnection(protocol.FileWrapper(StringIO()))
  1205. serverProto.connectionLost(failure.Failure(main.CONNECTION_DONE))
  1206. def test_loginConnectionRefused(self):
  1207. """
  1208. L{PBClientFactory.login} returns a L{Deferred} which is errbacked
  1209. with the L{ConnectionRefusedError} if the underlying connection is
  1210. refused.
  1211. """
  1212. clientFactory = pb.PBClientFactory()
  1213. loginDeferred = clientFactory.login(
  1214. credentials.UsernamePassword(b"foo", b"bar"))
  1215. clientFactory.clientConnectionFailed(
  1216. None,
  1217. failure.Failure(
  1218. ConnectionRefusedError("Test simulated refused connection")))
  1219. return self.assertFailure(loginDeferred, ConnectionRefusedError)
  1220. def test_loginLogout(self):
  1221. """
  1222. Test that login can be performed with IUsernamePassword credentials and
  1223. that when the connection is dropped the avatar is logged out.
  1224. """
  1225. self.portal.registerChecker(
  1226. checkers.InMemoryUsernamePasswordDatabaseDontUse(user=b'pass'))
  1227. creds = credentials.UsernamePassword(b"user", b"pass")
  1228. # NOTE: real code probably won't need anything where we have the
  1229. # "BRAINS!" argument, passing None is fine. We just do it here to
  1230. # test that it is being passed. It is used to give additional info to
  1231. # the realm to aid perspective creation, if you don't need that,
  1232. # ignore it.
  1233. mind = "BRAINS!"
  1234. loginCompleted = Deferred()
  1235. d = self.clientFactory.login(creds, mind)
  1236. def cbLogin(perspective):
  1237. self.assertTrue(self.realm.lastPerspective.loggedIn)
  1238. self.assertIsInstance(perspective, pb.RemoteReference)
  1239. return loginCompleted
  1240. def cbDisconnect(ignored):
  1241. self.clientFactory.disconnect()
  1242. self.completeClientLostConnection()
  1243. d.addCallback(cbLogin)
  1244. d.addCallback(cbDisconnect)
  1245. def cbLogout(ignored):
  1246. self.assertTrue(self.realm.lastPerspective.loggedOut)
  1247. d.addCallback(cbLogout)
  1248. self.establishClientAndServer()
  1249. self.pump.flush()
  1250. # The perspective passed to cbLogin has gone out of scope.
  1251. # Ensure its __del__ runs...
  1252. gc.collect()
  1253. # ...and send its decref message to the server
  1254. self.pump.flush()
  1255. # Now allow the client to disconnect.
  1256. loginCompleted.callback(None)
  1257. return d
  1258. def test_logoutAfterDecref(self):
  1259. """
  1260. If a L{RemoteReference} to an L{IPerspective} avatar is decrefed and
  1261. there remain no other references to the avatar on the server, the
  1262. avatar is garbage collected and the logout method called.
  1263. """
  1264. loggedOut = Deferred()
  1265. class EventPerspective(pb.Avatar):
  1266. """
  1267. An avatar which fires a Deferred when it is logged out.
  1268. """
  1269. def __init__(self, avatarId):
  1270. pass
  1271. def logout(self):
  1272. loggedOut.callback(None)
  1273. self.realm.perspectiveFactory = EventPerspective
  1274. self.portal.registerChecker(
  1275. checkers.InMemoryUsernamePasswordDatabaseDontUse(foo=b'bar'))
  1276. d = self.clientFactory.login(
  1277. credentials.UsernamePassword(b'foo', b'bar'), "BRAINS!")
  1278. def cbLoggedIn(avatar):
  1279. # Just wait for the logout to happen, as it should since the
  1280. # reference to the avatar will shortly no longer exists.
  1281. return loggedOut
  1282. d.addCallback(cbLoggedIn)
  1283. def cbLoggedOut(ignored):
  1284. # Verify that the server broker's _localCleanup dict isn't growing
  1285. # without bound.
  1286. self.assertEqual(self.serverFactory.protocolInstance._localCleanup, {})
  1287. d.addCallback(cbLoggedOut)
  1288. self.establishClientAndServer()
  1289. # complete authentication
  1290. self.pump.flush()
  1291. # _PortalAuthChallenger and our Avatar should be dead by now;
  1292. # force a collection to trigger their __del__s
  1293. gc.collect()
  1294. # push their decref messages through
  1295. self.pump.flush()
  1296. return d
  1297. def test_concurrentLogin(self):
  1298. """
  1299. Two different correct login attempts can be made on the same root
  1300. object at the same time and produce two different resulting avatars.
  1301. """
  1302. self.portal.registerChecker(
  1303. checkers.InMemoryUsernamePasswordDatabaseDontUse(
  1304. foo=b'bar', baz=b'quux'))
  1305. firstLogin = self.clientFactory.login(
  1306. credentials.UsernamePassword(b'foo', b'bar'), "BRAINS!")
  1307. secondLogin = self.clientFactory.login(
  1308. credentials.UsernamePassword(b'baz', b'quux'), "BRAINS!")
  1309. d = gatherResults([firstLogin, secondLogin])
  1310. def cbLoggedIn(result):
  1311. (first, second) = result
  1312. return gatherResults([
  1313. first.callRemote('getAvatarId'),
  1314. second.callRemote('getAvatarId')])
  1315. d.addCallback(cbLoggedIn)
  1316. def cbAvatarIds(x):
  1317. first, second = x
  1318. self.assertEqual(first, b'foo')
  1319. self.assertEqual(second, b'baz')
  1320. d.addCallback(cbAvatarIds)
  1321. self.establishClientAndServer()
  1322. self.pump.flush()
  1323. return d
  1324. def test_badUsernamePasswordLogin(self):
  1325. """
  1326. Test that a login attempt with an invalid user or invalid password
  1327. fails in the appropriate way.
  1328. """
  1329. self.portal.registerChecker(
  1330. checkers.InMemoryUsernamePasswordDatabaseDontUse(user=b'pass'))
  1331. firstLogin = self.clientFactory.login(
  1332. credentials.UsernamePassword(b'nosuchuser', b'pass'))
  1333. secondLogin = self.clientFactory.login(
  1334. credentials.UsernamePassword(b'user', b'wrongpass'))
  1335. self.assertFailure(firstLogin, UnauthorizedLogin)
  1336. self.assertFailure(secondLogin, UnauthorizedLogin)
  1337. d = gatherResults([firstLogin, secondLogin])
  1338. def cleanup(ignore):
  1339. errors = self.flushLoggedErrors(UnauthorizedLogin)
  1340. self.assertEqual(len(errors), 2)
  1341. d.addCallback(cleanup)
  1342. self.establishClientAndServer()
  1343. self.pump.flush()
  1344. return d
  1345. def test_anonymousLogin(self):
  1346. """
  1347. Verify that a PB server using a portal configured with a checker which
  1348. allows IAnonymous credentials can be logged into using IAnonymous
  1349. credentials.
  1350. """
  1351. self.portal.registerChecker(checkers.AllowAnonymousAccess())
  1352. d = self.clientFactory.login(credentials.Anonymous(), "BRAINS!")
  1353. def cbLoggedIn(perspective):
  1354. return perspective.callRemote('echo', 123)
  1355. d.addCallback(cbLoggedIn)
  1356. d.addCallback(self.assertEqual, 123)
  1357. self.establishClientAndServer()
  1358. self.pump.flush()
  1359. return d
  1360. def test_anonymousLoginNotPermitted(self):
  1361. """
  1362. Verify that without an anonymous checker set up, anonymous login is
  1363. rejected.
  1364. """
  1365. self.portal.registerChecker(
  1366. checkers.InMemoryUsernamePasswordDatabaseDontUse(user='pass'))
  1367. d = self.clientFactory.login(credentials.Anonymous(), "BRAINS!")
  1368. self.assertFailure(d, UnhandledCredentials)
  1369. def cleanup(ignore):
  1370. errors = self.flushLoggedErrors(UnhandledCredentials)
  1371. self.assertEqual(len(errors), 1)
  1372. d.addCallback(cleanup)
  1373. self.establishClientAndServer()
  1374. self.pump.flush()
  1375. return d
  1376. def test_anonymousLoginWithMultipleCheckers(self):
  1377. """
  1378. Like L{test_anonymousLogin} but against a portal with a checker for
  1379. both IAnonymous and IUsernamePassword.
  1380. """
  1381. self.portal.registerChecker(checkers.AllowAnonymousAccess())
  1382. self.portal.registerChecker(
  1383. checkers.InMemoryUsernamePasswordDatabaseDontUse(user=b'pass'))
  1384. d = self.clientFactory.login(credentials.Anonymous(), "BRAINS!")
  1385. def cbLogin(perspective):
  1386. return perspective.callRemote('echo', 123)
  1387. d.addCallback(cbLogin)
  1388. d.addCallback(self.assertEqual, 123)
  1389. self.establishClientAndServer()
  1390. self.pump.flush()
  1391. return d
  1392. def test_authenticatedLoginWithMultipleCheckers(self):
  1393. """
  1394. Like L{test_anonymousLoginWithMultipleCheckers} but check that
  1395. username/password authentication works.
  1396. """
  1397. self.portal.registerChecker(checkers.AllowAnonymousAccess())
  1398. self.portal.registerChecker(
  1399. checkers.InMemoryUsernamePasswordDatabaseDontUse(user=b'pass'))
  1400. d = self.clientFactory.login(
  1401. credentials.UsernamePassword(b'user', b'pass'), "BRAINS!")
  1402. def cbLogin(perspective):
  1403. return perspective.callRemote('add', 100, 23)
  1404. d.addCallback(cbLogin)
  1405. d.addCallback(self.assertEqual, 123)
  1406. self.establishClientAndServer()
  1407. self.pump.flush()
  1408. return d
  1409. def test_view(self):
  1410. """
  1411. Verify that a viewpoint can be retrieved after authenticating with
  1412. cred.
  1413. """
  1414. self.portal.registerChecker(
  1415. checkers.InMemoryUsernamePasswordDatabaseDontUse(user=b'pass'))
  1416. d = self.clientFactory.login(
  1417. credentials.UsernamePassword(b"user", b"pass"), "BRAINS!")
  1418. def cbLogin(perspective):
  1419. return perspective.callRemote("getViewPoint")
  1420. d.addCallback(cbLogin)
  1421. def cbView(viewpoint):
  1422. return viewpoint.callRemote("check")
  1423. d.addCallback(cbView)
  1424. d.addCallback(self.assertTrue)
  1425. self.establishClientAndServer()
  1426. self.pump.flush()
  1427. return d
  1428. @implementer(pb.IPerspective)
  1429. class NonSubclassingPerspective:
  1430. def __init__(self, avatarId):
  1431. pass
  1432. # IPerspective implementation
  1433. def perspectiveMessageReceived(self, broker, message, args, kwargs):
  1434. args = broker.unserialize(args, self)
  1435. kwargs = broker.unserialize(kwargs, self)
  1436. return broker.serialize((message, args, kwargs))
  1437. # Methods required by TestRealm
  1438. def logout(self):
  1439. self.loggedOut = True
  1440. class NSPTests(unittest.TestCase):
  1441. """
  1442. Tests for authentication against a realm where the L{IPerspective}
  1443. implementation is not a subclass of L{Avatar}.
  1444. """
  1445. def setUp(self):
  1446. self.realm = TestRealm()
  1447. self.realm.perspectiveFactory = NonSubclassingPerspective
  1448. self.portal = portal.Portal(self.realm)
  1449. self.checker = checkers.InMemoryUsernamePasswordDatabaseDontUse()
  1450. self.checker.addUser(b"user", b"pass")
  1451. self.portal.registerChecker(self.checker)
  1452. self.factory = WrappingFactory(pb.PBServerFactory(self.portal))
  1453. self.port = reactor.listenTCP(0, self.factory, interface="127.0.0.1")
  1454. self.addCleanup(self.port.stopListening)
  1455. self.portno = self.port.getHost().port
  1456. def test_NSP(self):
  1457. """
  1458. An L{IPerspective} implementation which does not subclass
  1459. L{Avatar} can expose remote methods for the client to call.
  1460. """
  1461. factory = pb.PBClientFactory()
  1462. d = factory.login(credentials.UsernamePassword(b'user', b'pass'),
  1463. "BRAINS!")
  1464. reactor.connectTCP('127.0.0.1', self.portno, factory)
  1465. d.addCallback(lambda p: p.callRemote('ANYTHING', 'here', bar='baz'))
  1466. d.addCallback(self.assertEqual,
  1467. ('ANYTHING', ('here',), {'bar': 'baz'}))
  1468. def cleanup(ignored):
  1469. factory.disconnect()
  1470. for p in self.factory.protocols:
  1471. p.transport.loseConnection()
  1472. d.addCallback(cleanup)
  1473. return d
  1474. class IForwarded(Interface):
  1475. """
  1476. Interface used for testing L{util.LocalAsyncForwarder}.
  1477. """
  1478. def forwardMe():
  1479. """
  1480. Simple synchronous method.
  1481. """
  1482. def forwardDeferred():
  1483. """
  1484. Simple asynchronous method.
  1485. """
  1486. @implementer(IForwarded)
  1487. class Forwarded:
  1488. """
  1489. Test implementation of L{IForwarded}.
  1490. @ivar forwarded: set if C{forwardMe} is called.
  1491. @type forwarded: C{bool}
  1492. @ivar unforwarded: set if C{dontForwardMe} is called.
  1493. @type unforwarded: C{bool}
  1494. """
  1495. forwarded = False
  1496. unforwarded = False
  1497. def forwardMe(self):
  1498. """
  1499. Set a local flag to test afterwards.
  1500. """
  1501. self.forwarded = True
  1502. def dontForwardMe(self):
  1503. """
  1504. Set a local flag to test afterwards. This should not be called as it's
  1505. not in the interface.
  1506. """
  1507. self.unforwarded = True
  1508. def forwardDeferred(self):
  1509. """
  1510. Asynchronously return C{True}.
  1511. """
  1512. return succeed(True)
  1513. class SpreadUtilTests(unittest.TestCase):
  1514. """
  1515. Tests for L{twisted.spread.util}.
  1516. """
  1517. def test_sync(self):
  1518. """
  1519. Call a synchronous method of a L{util.LocalAsRemote} object and check
  1520. the result.
  1521. """
  1522. o = LocalRemoteTest()
  1523. self.assertEqual(o.callRemote("add1", 2), 3)
  1524. def test_async(self):
  1525. """
  1526. Call an asynchronous method of a L{util.LocalAsRemote} object and check
  1527. the result.
  1528. """
  1529. o = LocalRemoteTest()
  1530. o = LocalRemoteTest()
  1531. d = o.callRemote("add", 2, y=4)
  1532. self.assertIsInstance(d, Deferred)
  1533. d.addCallback(self.assertEqual, 6)
  1534. return d
  1535. def test_asyncFail(self):
  1536. """
  1537. Test an asynchronous failure on a remote method call.
  1538. """
  1539. o = LocalRemoteTest()
  1540. d = o.callRemote("fail")
  1541. def eb(f):
  1542. self.assertIsInstance(f, failure.Failure)
  1543. f.trap(RuntimeError)
  1544. d.addCallbacks(lambda res: self.fail("supposed to fail"), eb)
  1545. return d
  1546. def test_remoteMethod(self):
  1547. """
  1548. Test the C{remoteMethod} facility of L{util.LocalAsRemote}.
  1549. """
  1550. o = LocalRemoteTest()
  1551. m = o.remoteMethod("add1")
  1552. self.assertEqual(m(3), 4)
  1553. def test_localAsyncForwarder(self):
  1554. """
  1555. Test a call to L{util.LocalAsyncForwarder} using L{Forwarded} local
  1556. object.
  1557. """
  1558. f = Forwarded()
  1559. lf = util.LocalAsyncForwarder(f, IForwarded)
  1560. lf.callRemote("forwardMe")
  1561. self.assertTrue(f.forwarded)
  1562. lf.callRemote("dontForwardMe")
  1563. self.assertFalse(f.unforwarded)
  1564. rr = lf.callRemote("forwardDeferred")
  1565. l = []
  1566. rr.addCallback(l.append)
  1567. self.assertEqual(l[0], 1)
  1568. class PBWithSecurityOptionsTests(unittest.TestCase):
  1569. """
  1570. Test security customization.
  1571. """
  1572. def test_clientDefaultSecurityOptions(self):
  1573. """
  1574. By default, client broker should use C{jelly.globalSecurity} as
  1575. security settings.
  1576. """
  1577. factory = pb.PBClientFactory()
  1578. broker = factory.buildProtocol(None)
  1579. self.assertIs(broker.security, jelly.globalSecurity)
  1580. def test_serverDefaultSecurityOptions(self):
  1581. """
  1582. By default, server broker should use C{jelly.globalSecurity} as
  1583. security settings.
  1584. """
  1585. factory = pb.PBServerFactory(Echoer())
  1586. broker = factory.buildProtocol(None)
  1587. self.assertIs(broker.security, jelly.globalSecurity)
  1588. def test_clientSecurityCustomization(self):
  1589. """
  1590. Check that the security settings are passed from the client factory to
  1591. the broker object.
  1592. """
  1593. security = jelly.SecurityOptions()
  1594. factory = pb.PBClientFactory(security=security)
  1595. broker = factory.buildProtocol(None)
  1596. self.assertIs(broker.security, security)
  1597. def test_serverSecurityCustomization(self):
  1598. """
  1599. Check that the security settings are passed from the server factory to
  1600. the broker object.
  1601. """
  1602. security = jelly.SecurityOptions()
  1603. factory = pb.PBServerFactory(Echoer(), security=security)
  1604. broker = factory.buildProtocol(None)
  1605. self.assertIs(broker.security, security)