test_sip.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781
  1. # -*- test-case-name: twisted.test.test_sip -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Session Initialization Protocol tests.
  6. """
  7. from twisted.cred import portal, checkers
  8. from twisted.internet import defer, reactor
  9. from twisted.protocols import sip
  10. from twisted.trial import unittest
  11. try:
  12. from twisted.internet.asyncioreactor import AsyncioSelectorReactor
  13. except:
  14. AsyncioSelectorReactor = None
  15. from zope.interface import implementer
  16. # request, prefixed by random CRLFs
  17. request1 = "\n\r\n\n\r" + """\
  18. INVITE sip:foo SIP/2.0
  19. From: mo
  20. To: joe
  21. Content-Length: 4
  22. abcd""".replace("\n", "\r\n")
  23. # request, no content-length
  24. request2 = """INVITE sip:foo SIP/2.0
  25. From: mo
  26. To: joe
  27. 1234""".replace("\n", "\r\n")
  28. # request, with garbage after
  29. request3 = """INVITE sip:foo SIP/2.0
  30. From: mo
  31. To: joe
  32. Content-Length: 4
  33. 1234
  34. lalalal""".replace("\n", "\r\n")
  35. # three requests
  36. request4 = """INVITE sip:foo SIP/2.0
  37. From: mo
  38. To: joe
  39. Content-Length: 0
  40. INVITE sip:loop SIP/2.0
  41. From: foo
  42. To: bar
  43. Content-Length: 4
  44. abcdINVITE sip:loop SIP/2.0
  45. From: foo
  46. To: bar
  47. Content-Length: 4
  48. 1234""".replace("\n", "\r\n")
  49. # response, no content
  50. response1 = """SIP/2.0 200 OK
  51. From: foo
  52. To:bar
  53. Content-Length: 0
  54. """.replace("\n", "\r\n")
  55. # short header version
  56. request_short = """\
  57. INVITE sip:foo SIP/2.0
  58. f: mo
  59. t: joe
  60. l: 4
  61. abcd""".replace("\n", "\r\n")
  62. request_natted = """\
  63. INVITE sip:foo SIP/2.0
  64. Via: SIP/2.0/UDP 10.0.0.1:5060;rport
  65. """.replace("\n", "\r\n")
  66. # multiline headers (example from RFC 3621).
  67. response_multiline = """\
  68. SIP/2.0 200 OK
  69. Via: SIP/2.0/UDP server10.biloxi.com
  70. ;branch=z9hG4bKnashds8;received=192.0.2.3
  71. Via: SIP/2.0/UDP bigbox3.site3.atlanta.com
  72. ;branch=z9hG4bK77ef4c2312983.1;received=192.0.2.2
  73. Via: SIP/2.0/UDP pc33.atlanta.com
  74. ;branch=z9hG4bK776asdhds ;received=192.0.2.1
  75. To: Bob <sip:bob@biloxi.com>;tag=a6c85cf
  76. From: Alice <sip:alice@atlanta.com>;tag=1928301774
  77. Call-ID: a84b4c76e66710@pc33.atlanta.com
  78. CSeq: 314159 INVITE
  79. Contact: <sip:bob@192.0.2.4>
  80. Content-Type: application/sdp
  81. Content-Length: 0
  82. \n""".replace("\n", "\r\n")
  83. class TestRealm:
  84. def requestAvatar(self, avatarId, mind, *interfaces):
  85. return sip.IContact, None, lambda: None
  86. class MessageParsingTests(unittest.TestCase):
  87. def setUp(self):
  88. self.l = []
  89. self.parser = sip.MessagesParser(self.l.append)
  90. def feedMessage(self, message):
  91. self.parser.dataReceived(message)
  92. self.parser.dataDone()
  93. def validateMessage(self, m, method, uri, headers, body):
  94. """
  95. Validate Requests.
  96. """
  97. self.assertEqual(m.method, method)
  98. self.assertEqual(m.uri.toString(), uri)
  99. self.assertEqual(m.headers, headers)
  100. self.assertEqual(m.body, body)
  101. self.assertEqual(m.finished, 1)
  102. def testSimple(self):
  103. l = self.l
  104. self.feedMessage(request1)
  105. self.assertEqual(len(l), 1)
  106. self.validateMessage(
  107. l[0], "INVITE", "sip:foo",
  108. {"from": ["mo"], "to": ["joe"], "content-length": ["4"]},
  109. "abcd")
  110. def testTwoMessages(self):
  111. l = self.l
  112. self.feedMessage(request1)
  113. self.feedMessage(request2)
  114. self.assertEqual(len(l), 2)
  115. self.validateMessage(
  116. l[0], "INVITE", "sip:foo",
  117. {"from": ["mo"], "to": ["joe"], "content-length": ["4"]},
  118. "abcd")
  119. self.validateMessage(l[1], "INVITE", "sip:foo",
  120. {"from": ["mo"], "to": ["joe"]},
  121. "1234")
  122. def testGarbage(self):
  123. l = self.l
  124. self.feedMessage(request3)
  125. self.assertEqual(len(l), 1)
  126. self.validateMessage(
  127. l[0], "INVITE", "sip:foo",
  128. {"from": ["mo"], "to": ["joe"], "content-length": ["4"]},
  129. "1234")
  130. def testThreeInOne(self):
  131. l = self.l
  132. self.feedMessage(request4)
  133. self.assertEqual(len(l), 3)
  134. self.validateMessage(
  135. l[0], "INVITE", "sip:foo",
  136. {"from": ["mo"], "to": ["joe"], "content-length": ["0"]},
  137. "")
  138. self.validateMessage(
  139. l[1], "INVITE", "sip:loop",
  140. {"from": ["foo"], "to": ["bar"], "content-length": ["4"]},
  141. "abcd")
  142. self.validateMessage(
  143. l[2], "INVITE", "sip:loop",
  144. {"from": ["foo"], "to": ["bar"], "content-length": ["4"]},
  145. "1234")
  146. def testShort(self):
  147. l = self.l
  148. self.feedMessage(request_short)
  149. self.assertEqual(len(l), 1)
  150. self.validateMessage(
  151. l[0], "INVITE", "sip:foo",
  152. {"from": ["mo"], "to": ["joe"], "content-length": ["4"]},
  153. "abcd")
  154. def testSimpleResponse(self):
  155. l = self.l
  156. self.feedMessage(response1)
  157. self.assertEqual(len(l), 1)
  158. m = l[0]
  159. self.assertEqual(m.code, 200)
  160. self.assertEqual(m.phrase, "OK")
  161. self.assertEqual(
  162. m.headers,
  163. {"from": ["foo"], "to": ["bar"], "content-length": ["0"]})
  164. self.assertEqual(m.body, "")
  165. self.assertEqual(m.finished, 1)
  166. def test_multiLine(self):
  167. """
  168. A header may be split across multiple lines. Subsequent lines begin
  169. with C{" "} or C{"\\t"}.
  170. """
  171. l = self.l
  172. self.feedMessage(response_multiline)
  173. self.assertEqual(len(l), 1)
  174. m = l[0]
  175. self.assertEqual(
  176. m.headers['via'][0],
  177. "SIP/2.0/UDP server10.biloxi.com;"
  178. "branch=z9hG4bKnashds8;received=192.0.2.3")
  179. self.assertEqual(
  180. m.headers['via'][1],
  181. "SIP/2.0/UDP bigbox3.site3.atlanta.com;"
  182. "branch=z9hG4bK77ef4c2312983.1;received=192.0.2.2")
  183. self.assertEqual(
  184. m.headers['via'][2],
  185. "SIP/2.0/UDP pc33.atlanta.com;"
  186. "branch=z9hG4bK776asdhds ;received=192.0.2.1")
  187. class MessageParsingFeedDataCharByCharTests(MessageParsingTests):
  188. """
  189. Same as base class, but feed data char by char.
  190. """
  191. def feedMessage(self, message):
  192. for c in message:
  193. self.parser.dataReceived(c)
  194. self.parser.dataDone()
  195. class MakeMessageTests(unittest.TestCase):
  196. def testRequest(self):
  197. r = sip.Request("INVITE", "sip:foo")
  198. r.addHeader("foo", "bar")
  199. self.assertEqual(
  200. r.toString(),
  201. "INVITE sip:foo SIP/2.0\r\nFoo: bar\r\n\r\n")
  202. def testResponse(self):
  203. r = sip.Response(200, "OK")
  204. r.addHeader("foo", "bar")
  205. r.addHeader("Content-Length", "4")
  206. r.bodyDataReceived("1234")
  207. self.assertEqual(
  208. r.toString(),
  209. "SIP/2.0 200 OK\r\nFoo: bar\r\nContent-Length: 4\r\n\r\n1234")
  210. def testStatusCode(self):
  211. r = sip.Response(200)
  212. self.assertEqual(r.toString(), "SIP/2.0 200 OK\r\n\r\n")
  213. class ViaTests(unittest.TestCase):
  214. def checkRoundtrip(self, v):
  215. s = v.toString()
  216. self.assertEqual(s, sip.parseViaHeader(s).toString())
  217. def testExtraWhitespace(self):
  218. v1 = sip.parseViaHeader('SIP/2.0/UDP 192.168.1.1:5060')
  219. v2 = sip.parseViaHeader('SIP/2.0/UDP 192.168.1.1:5060')
  220. self.assertEqual(v1.transport, v2.transport)
  221. self.assertEqual(v1.host, v2.host)
  222. self.assertEqual(v1.port, v2.port)
  223. def test_complex(self):
  224. """
  225. Test parsing a Via header with one of everything.
  226. """
  227. s = ("SIP/2.0/UDP first.example.com:4000;ttl=16;maddr=224.2.0.1"
  228. " ;branch=a7c6a8dlze (Example)")
  229. v = sip.parseViaHeader(s)
  230. self.assertEqual(v.transport, "UDP")
  231. self.assertEqual(v.host, "first.example.com")
  232. self.assertEqual(v.port, 4000)
  233. self.assertIsNone(v.rport)
  234. self.assertIsNone(v.rportValue)
  235. self.assertFalse(v.rportRequested)
  236. self.assertEqual(v.ttl, 16)
  237. self.assertEqual(v.maddr, "224.2.0.1")
  238. self.assertEqual(v.branch, "a7c6a8dlze")
  239. self.assertEqual(v.hidden, 0)
  240. self.assertEqual(v.toString(),
  241. "SIP/2.0/UDP first.example.com:4000"
  242. ";ttl=16;branch=a7c6a8dlze;maddr=224.2.0.1")
  243. self.checkRoundtrip(v)
  244. def test_simple(self):
  245. """
  246. Test parsing a simple Via header.
  247. """
  248. s = "SIP/2.0/UDP example.com;hidden"
  249. v = sip.parseViaHeader(s)
  250. self.assertEqual(v.transport, "UDP")
  251. self.assertEqual(v.host, "example.com")
  252. self.assertEqual(v.port, 5060)
  253. self.assertIsNone(v.rport)
  254. self.assertIsNone(v.rportValue)
  255. self.assertFalse(v.rportRequested)
  256. self.assertIsNone(v.ttl)
  257. self.assertIsNone(v.maddr)
  258. self.assertIsNone(v.branch)
  259. self.assertTrue(v.hidden)
  260. self.assertEqual(v.toString(),
  261. "SIP/2.0/UDP example.com:5060;hidden")
  262. self.checkRoundtrip(v)
  263. def testSimpler(self):
  264. v = sip.Via("example.com")
  265. self.checkRoundtrip(v)
  266. def test_deprecatedRPort(self):
  267. """
  268. Setting rport to True is deprecated, but still produces a Via header
  269. with the expected properties.
  270. """
  271. v = sip.Via("foo.bar", rport=True)
  272. warnings = self.flushWarnings(
  273. offendingFunctions=[self.test_deprecatedRPort])
  274. self.assertEqual(len(warnings), 1)
  275. self.assertEqual(
  276. warnings[0]['message'],
  277. 'rport=True is deprecated since Twisted 9.0.')
  278. self.assertEqual(
  279. warnings[0]['category'],
  280. DeprecationWarning)
  281. self.assertEqual(v.toString(), "SIP/2.0/UDP foo.bar:5060;rport")
  282. self.assertTrue(v.rport)
  283. self.assertTrue(v.rportRequested)
  284. self.assertIsNone(v.rportValue)
  285. def test_rport(self):
  286. """
  287. An rport setting of None should insert the parameter with no value.
  288. """
  289. v = sip.Via("foo.bar", rport=None)
  290. self.assertEqual(v.toString(), "SIP/2.0/UDP foo.bar:5060;rport")
  291. self.assertTrue(v.rportRequested)
  292. self.assertIsNone(v.rportValue)
  293. def test_rportValue(self):
  294. """
  295. An rport numeric setting should insert the parameter with the number
  296. value given.
  297. """
  298. v = sip.Via("foo.bar", rport=1)
  299. self.assertEqual(v.toString(), "SIP/2.0/UDP foo.bar:5060;rport=1")
  300. self.assertFalse(v.rportRequested)
  301. self.assertEqual(v.rportValue, 1)
  302. self.assertEqual(v.rport, 1)
  303. def testNAT(self):
  304. s = "SIP/2.0/UDP 10.0.0.1:5060;received=22.13.1.5;rport=12345"
  305. v = sip.parseViaHeader(s)
  306. self.assertEqual(v.transport, "UDP")
  307. self.assertEqual(v.host, "10.0.0.1")
  308. self.assertEqual(v.port, 5060)
  309. self.assertEqual(v.received, "22.13.1.5")
  310. self.assertEqual(v.rport, 12345)
  311. self.assertNotEqual(v.toString().find("rport=12345"), -1)
  312. def test_unknownParams(self):
  313. """
  314. Parsing and serializing Via headers with unknown parameters should work.
  315. """
  316. s = "SIP/2.0/UDP example.com:5060;branch=a12345b;bogus;pie=delicious"
  317. v = sip.parseViaHeader(s)
  318. self.assertEqual(v.toString(), s)
  319. class URLTests(unittest.TestCase):
  320. def testRoundtrip(self):
  321. for url in [
  322. "sip:j.doe@big.com",
  323. "sip:j.doe:secret@big.com;transport=tcp",
  324. "sip:j.doe@big.com?subject=project",
  325. "sip:example.com",
  326. ]:
  327. self.assertEqual(sip.parseURL(url).toString(), url)
  328. def testComplex(self):
  329. s = ("sip:user:pass@hosta:123;transport=udp;user=phone;method=foo;"
  330. "ttl=12;maddr=1.2.3.4;blah;goo=bar?a=b&c=d")
  331. url = sip.parseURL(s)
  332. for k, v in [("username", "user"), ("password", "pass"),
  333. ("host", "hosta"), ("port", 123),
  334. ("transport", "udp"), ("usertype", "phone"),
  335. ("method", "foo"), ("ttl", 12),
  336. ("maddr", "1.2.3.4"), ("other", ["blah", "goo=bar"]),
  337. ("headers", {"a": "b", "c": "d"})]:
  338. self.assertEqual(getattr(url, k), v)
  339. class ParseTests(unittest.TestCase):
  340. def testParseAddress(self):
  341. for address, name, urls, params in [
  342. ('"A. G. Bell" <sip:foo@example.com>',
  343. "A. G. Bell", "sip:foo@example.com", {}),
  344. ("Anon <sip:foo@example.com>", "Anon", "sip:foo@example.com", {}),
  345. ("sip:foo@example.com", "", "sip:foo@example.com", {}),
  346. ("<sip:foo@example.com>", "", "sip:foo@example.com", {}),
  347. ("foo <sip:foo@example.com>;tag=bar;foo=baz", "foo",
  348. "sip:foo@example.com", {"tag": "bar", "foo": "baz"}),
  349. ]:
  350. gname, gurl, gparams = sip.parseAddress(address)
  351. self.assertEqual(name, gname)
  352. self.assertEqual(gurl.toString(), urls)
  353. self.assertEqual(gparams, params)
  354. @implementer(sip.ILocator)
  355. class DummyLocator:
  356. def getAddress(self, logicalURL):
  357. return defer.succeed(sip.URL("server.com", port=5060))
  358. @implementer(sip.ILocator)
  359. class FailingLocator:
  360. def getAddress(self, logicalURL):
  361. return defer.fail(LookupError())
  362. class ProxyTests(unittest.TestCase):
  363. def setUp(self):
  364. self.proxy = sip.Proxy("127.0.0.1")
  365. self.proxy.locator = DummyLocator()
  366. self.sent = []
  367. self.proxy.sendMessage = lambda dest, msg: self.sent.append((dest, msg))
  368. def testRequestForward(self):
  369. r = sip.Request("INVITE", "sip:foo")
  370. r.addHeader("via", sip.Via("1.2.3.4").toString())
  371. r.addHeader("via", sip.Via("1.2.3.5").toString())
  372. r.addHeader("foo", "bar")
  373. r.addHeader("to", "<sip:joe@server.com>")
  374. r.addHeader("contact", "<sip:joe@1.2.3.5>")
  375. self.proxy.datagramReceived(r.toString(), ("1.2.3.4", 5060))
  376. self.assertEqual(len(self.sent), 1)
  377. dest, m = self.sent[0]
  378. self.assertEqual(dest.port, 5060)
  379. self.assertEqual(dest.host, "server.com")
  380. self.assertEqual(m.uri.toString(), "sip:foo")
  381. self.assertEqual(m.method, "INVITE")
  382. self.assertEqual(m.headers["via"],
  383. ["SIP/2.0/UDP 127.0.0.1:5060",
  384. "SIP/2.0/UDP 1.2.3.4:5060",
  385. "SIP/2.0/UDP 1.2.3.5:5060"])
  386. def testReceivedRequestForward(self):
  387. r = sip.Request("INVITE", "sip:foo")
  388. r.addHeader("via", sip.Via("1.2.3.4").toString())
  389. r.addHeader("foo", "bar")
  390. r.addHeader("to", "<sip:joe@server.com>")
  391. r.addHeader("contact", "<sip:joe@1.2.3.4>")
  392. self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060))
  393. dest, m = self.sent[0]
  394. self.assertEqual(m.headers["via"],
  395. ["SIP/2.0/UDP 127.0.0.1:5060",
  396. "SIP/2.0/UDP 1.2.3.4:5060;received=1.1.1.1"])
  397. def testResponseWrongVia(self):
  398. # first via must match proxy's address
  399. r = sip.Response(200)
  400. r.addHeader("via", sip.Via("foo.com").toString())
  401. self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060))
  402. self.assertEqual(len(self.sent), 0)
  403. def testResponseForward(self):
  404. r = sip.Response(200)
  405. r.addHeader("via", sip.Via("127.0.0.1").toString())
  406. r.addHeader("via", sip.Via("client.com", port=1234).toString())
  407. self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060))
  408. self.assertEqual(len(self.sent), 1)
  409. dest, m = self.sent[0]
  410. self.assertEqual((dest.host, dest.port), ("client.com", 1234))
  411. self.assertEqual(m.code, 200)
  412. self.assertEqual(m.headers["via"], ["SIP/2.0/UDP client.com:1234"])
  413. def testReceivedResponseForward(self):
  414. r = sip.Response(200)
  415. r.addHeader("via", sip.Via("127.0.0.1").toString())
  416. r.addHeader(
  417. "via",
  418. sip.Via("10.0.0.1", received="client.com").toString())
  419. self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060))
  420. self.assertEqual(len(self.sent), 1)
  421. dest, m = self.sent[0]
  422. self.assertEqual((dest.host, dest.port), ("client.com", 5060))
  423. def testResponseToUs(self):
  424. r = sip.Response(200)
  425. r.addHeader("via", sip.Via("127.0.0.1").toString())
  426. l = []
  427. self.proxy.gotResponse = lambda *a: l.append(a)
  428. self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060))
  429. self.assertEqual(len(l), 1)
  430. m, addr = l[0]
  431. self.assertEqual(len(m.headers.get("via", [])), 0)
  432. self.assertEqual(m.code, 200)
  433. def testLoop(self):
  434. r = sip.Request("INVITE", "sip:foo")
  435. r.addHeader("via", sip.Via("1.2.3.4").toString())
  436. r.addHeader("via", sip.Via("127.0.0.1").toString())
  437. self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
  438. self.assertEqual(self.sent, [])
  439. def testCantForwardRequest(self):
  440. r = sip.Request("INVITE", "sip:foo")
  441. r.addHeader("via", sip.Via("1.2.3.4").toString())
  442. r.addHeader("to", "<sip:joe@server.com>")
  443. self.proxy.locator = FailingLocator()
  444. self.proxy.datagramReceived(r.toString(), ("1.2.3.4", 5060))
  445. self.assertEqual(len(self.sent), 1)
  446. dest, m = self.sent[0]
  447. self.assertEqual((dest.host, dest.port), ("1.2.3.4", 5060))
  448. self.assertEqual(m.code, 404)
  449. self.assertEqual(m.headers["via"], ["SIP/2.0/UDP 1.2.3.4:5060"])
  450. class RegistrationTests(unittest.TestCase):
  451. def setUp(self):
  452. self.proxy = sip.RegisterProxy(host="127.0.0.1")
  453. self.registry = sip.InMemoryRegistry("bell.example.com")
  454. self.proxy.registry = self.proxy.locator = self.registry
  455. self.sent = []
  456. self.proxy.sendMessage = lambda dest, msg: self.sent.append((dest, msg))
  457. def tearDown(self):
  458. for d, uri in self.registry.users.values():
  459. d.cancel()
  460. del self.proxy
  461. def register(self):
  462. r = sip.Request("REGISTER", "sip:bell.example.com")
  463. r.addHeader("to", "sip:joe@bell.example.com")
  464. r.addHeader("contact", "sip:joe@client.com:1234")
  465. r.addHeader("via", sip.Via("client.com").toString())
  466. self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
  467. def unregister(self):
  468. r = sip.Request("REGISTER", "sip:bell.example.com")
  469. r.addHeader("to", "sip:joe@bell.example.com")
  470. r.addHeader("contact", "*")
  471. r.addHeader("via", sip.Via("client.com").toString())
  472. r.addHeader("expires", "0")
  473. self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
  474. def testRegister(self):
  475. self.register()
  476. dest, m = self.sent[0]
  477. self.assertEqual((dest.host, dest.port), ("client.com", 5060))
  478. self.assertEqual(m.code, 200)
  479. self.assertEqual(m.headers["via"], ["SIP/2.0/UDP client.com:5060"])
  480. self.assertEqual(m.headers["to"], ["sip:joe@bell.example.com"])
  481. self.assertEqual(m.headers["contact"], ["sip:joe@client.com:5060"])
  482. #
  483. # XX: See http://tm.tl/8886
  484. #
  485. if type(reactor) != AsyncioSelectorReactor:
  486. self.assertTrue(
  487. int(m.headers["expires"][0]) in (3600, 3601, 3599, 3598))
  488. self.assertEqual(len(self.registry.users), 1)
  489. dc, uri = self.registry.users["joe"]
  490. self.assertEqual(uri.toString(), "sip:joe@client.com:5060")
  491. d = self.proxy.locator.getAddress(sip.URL(username="joe",
  492. host="bell.example.com"))
  493. d.addCallback(lambda desturl : (desturl.host, desturl.port))
  494. d.addCallback(self.assertEqual, ('client.com', 5060))
  495. return d
  496. def testUnregister(self):
  497. self.register()
  498. self.unregister()
  499. dest, m = self.sent[1]
  500. self.assertEqual((dest.host, dest.port), ("client.com", 5060))
  501. self.assertEqual(m.code, 200)
  502. self.assertEqual(m.headers["via"], ["SIP/2.0/UDP client.com:5060"])
  503. self.assertEqual(m.headers["to"], ["sip:joe@bell.example.com"])
  504. self.assertEqual(m.headers["contact"], ["sip:joe@client.com:5060"])
  505. self.assertEqual(m.headers["expires"], ["0"])
  506. self.assertEqual(self.registry.users, {})
  507. def addPortal(self):
  508. r = TestRealm()
  509. p = portal.Portal(r)
  510. c = checkers.InMemoryUsernamePasswordDatabaseDontUse()
  511. c.addUser('userXname@127.0.0.1', 'passXword')
  512. p.registerChecker(c)
  513. self.proxy.portal = p
  514. def testFailedAuthentication(self):
  515. self.addPortal()
  516. self.register()
  517. self.assertEqual(len(self.registry.users), 0)
  518. self.assertEqual(len(self.sent), 1)
  519. dest, m = self.sent[0]
  520. self.assertEqual(m.code, 401)
  521. def testWrongDomainRegister(self):
  522. r = sip.Request("REGISTER", "sip:wrong.com")
  523. r.addHeader("to", "sip:joe@bell.example.com")
  524. r.addHeader("contact", "sip:joe@client.com:1234")
  525. r.addHeader("via", sip.Via("client.com").toString())
  526. self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
  527. self.assertEqual(len(self.sent), 0)
  528. def testWrongToDomainRegister(self):
  529. r = sip.Request("REGISTER", "sip:bell.example.com")
  530. r.addHeader("to", "sip:joe@foo.com")
  531. r.addHeader("contact", "sip:joe@client.com:1234")
  532. r.addHeader("via", sip.Via("client.com").toString())
  533. self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
  534. self.assertEqual(len(self.sent), 0)
  535. def testWrongDomainLookup(self):
  536. self.register()
  537. url = sip.URL(username="joe", host="foo.com")
  538. d = self.proxy.locator.getAddress(url)
  539. self.assertFailure(d, LookupError)
  540. return d
  541. def testNoContactLookup(self):
  542. self.register()
  543. url = sip.URL(username="jane", host="bell.example.com")
  544. d = self.proxy.locator.getAddress(url)
  545. self.assertFailure(d, LookupError)
  546. return d
  547. class Client(sip.Base):
  548. def __init__(self):
  549. sip.Base.__init__(self)
  550. self.received = []
  551. self.deferred = defer.Deferred()
  552. def handle_response(self, response, addr):
  553. self.received.append(response)
  554. self.deferred.callback(self.received)
  555. class LiveTests(unittest.TestCase):
  556. def setUp(self):
  557. self.proxy = sip.RegisterProxy(host="127.0.0.1")
  558. self.registry = sip.InMemoryRegistry("bell.example.com")
  559. self.proxy.registry = self.proxy.locator = self.registry
  560. self.serverPort = reactor.listenUDP(
  561. 0, self.proxy, interface="127.0.0.1")
  562. self.client = Client()
  563. self.clientPort = reactor.listenUDP(
  564. 0, self.client, interface="127.0.0.1")
  565. self.serverAddress = (self.serverPort.getHost().host,
  566. self.serverPort.getHost().port)
  567. def tearDown(self):
  568. for d, uri in self.registry.users.values():
  569. d.cancel()
  570. d1 = defer.maybeDeferred(self.clientPort.stopListening)
  571. d2 = defer.maybeDeferred(self.serverPort.stopListening)
  572. return defer.gatherResults([d1, d2])
  573. def testRegister(self):
  574. p = self.clientPort.getHost().port
  575. r = sip.Request("REGISTER", "sip:bell.example.com")
  576. r.addHeader("to", "sip:joe@bell.example.com")
  577. r.addHeader("contact", "sip:joe@127.0.0.1:%d" % p)
  578. r.addHeader("via", sip.Via("127.0.0.1", port=p).toString())
  579. self.client.sendMessage(
  580. sip.URL(host="127.0.0.1", port=self.serverAddress[1]), r)
  581. d = self.client.deferred
  582. def check(received):
  583. self.assertEqual(len(received), 1)
  584. r = received[0]
  585. self.assertEqual(r.code, 200)
  586. d.addCallback(check)
  587. return d
  588. def test_amoralRPort(self):
  589. """
  590. rport is allowed without a value, apparently because server
  591. implementors might be too stupid to check the received port
  592. against 5060 and see if they're equal, and because client
  593. implementors might be too stupid to bind to port 5060, or set a
  594. value on the rport parameter they send if they bind to another
  595. port.
  596. """
  597. p = self.clientPort.getHost().port
  598. r = sip.Request("REGISTER", "sip:bell.example.com")
  599. r.addHeader("to", "sip:joe@bell.example.com")
  600. r.addHeader("contact", "sip:joe@127.0.0.1:%d" % p)
  601. r.addHeader("via", sip.Via("127.0.0.1", port=p, rport=True).toString())
  602. warnings = self.flushWarnings(
  603. offendingFunctions=[self.test_amoralRPort])
  604. self.assertEqual(len(warnings), 1)
  605. self.assertEqual(
  606. warnings[0]['message'],
  607. 'rport=True is deprecated since Twisted 9.0.')
  608. self.assertEqual(
  609. warnings[0]['category'],
  610. DeprecationWarning)
  611. self.client.sendMessage(sip.URL(host="127.0.0.1",
  612. port=self.serverAddress[1]),
  613. r)
  614. d = self.client.deferred
  615. def check(received):
  616. self.assertEqual(len(received), 1)
  617. r = received[0]
  618. self.assertEqual(r.code, 200)
  619. d.addCallback(check)
  620. return d