test_sslverify.py 104 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068
  1. # Copyright 2005 Divmod, Inc. See LICENSE file for details
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Tests for L{twisted.internet._sslverify}.
  6. """
  7. from __future__ import division, absolute_import
  8. import sys
  9. import itertools
  10. import datetime
  11. from zope.interface import implementer
  12. from twisted.python.reflect import requireModule
  13. skipSSL = None
  14. skipSNI = None
  15. skipNPN = None
  16. skipALPN = None
  17. if requireModule("OpenSSL"):
  18. from OpenSSL import SSL
  19. from OpenSSL.crypto import PKey, X509
  20. from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM
  21. from cryptography import x509
  22. from cryptography.hazmat.backends import default_backend
  23. from cryptography.hazmat.primitives import hashes
  24. from cryptography.hazmat.primitives.asymmetric import rsa
  25. from cryptography.hazmat.primitives.serialization import (
  26. PrivateFormat, NoEncryption
  27. )
  28. from cryptography.x509.oid import NameOID
  29. from cryptography.hazmat.primitives.serialization import Encoding
  30. try:
  31. ctx = SSL.Context(SSL.SSLv23_METHOD)
  32. ctx.set_npn_advertise_callback(lambda c: None)
  33. except NotImplementedError:
  34. skipNPN = "OpenSSL 1.0.1 or greater required for NPN support"
  35. try:
  36. ctx = SSL.Context(SSL.SSLv23_METHOD)
  37. ctx.set_alpn_select_callback(lambda c: None)
  38. except NotImplementedError:
  39. skipALPN = "OpenSSL 1.0.2 or greater required for ALPN support"
  40. else:
  41. skipSSL = "OpenSSL is required for SSL tests."
  42. skipSNI = skipSSL
  43. skipNPN = skipSSL
  44. skipALPN = skipSSL
  45. from twisted.test.test_twisted import SetAsideModule
  46. from twisted.test.iosim import connectedServerAndClient
  47. from twisted.internet.error import ConnectionClosed
  48. from twisted.python.compat import nativeString, _PY3
  49. from twisted.python.filepath import FilePath
  50. from twisted.python.modules import getModule
  51. from twisted.trial import unittest, util
  52. from twisted.internet import protocol, defer, reactor
  53. from twisted.internet._idna import _idnaText
  54. from twisted.internet.error import CertificateError, ConnectionLost
  55. from twisted.internet import interfaces
  56. from incremental import Version
  57. if not skipSSL:
  58. from twisted.internet.ssl import platformTrust, VerificationError
  59. from twisted.internet import _sslverify as sslverify
  60. from twisted.protocols.tls import TLSMemoryBIOFactory
  61. # A couple of static PEM-format certificates to be used by various tests.
  62. A_HOST_CERTIFICATE_PEM = """
  63. -----BEGIN CERTIFICATE-----
  64. MIIC2jCCAkMCAjA5MA0GCSqGSIb3DQEBBAUAMIG0MQswCQYDVQQGEwJVUzEiMCAG
  65. A1UEAxMZZXhhbXBsZS50d2lzdGVkbWF0cml4LmNvbTEPMA0GA1UEBxMGQm9zdG9u
  66. MRwwGgYDVQQKExNUd2lzdGVkIE1hdHJpeCBMYWJzMRYwFAYDVQQIEw1NYXNzYWNo
  67. dXNldHRzMScwJQYJKoZIhvcNAQkBFhhub2JvZHlAdHdpc3RlZG1hdHJpeC5jb20x
  68. ETAPBgNVBAsTCFNlY3VyaXR5MB4XDTA2MDgxNjAxMDEwOFoXDTA3MDgxNjAxMDEw
  69. OFowgbQxCzAJBgNVBAYTAlVTMSIwIAYDVQQDExlleGFtcGxlLnR3aXN0ZWRtYXRy
  70. aXguY29tMQ8wDQYDVQQHEwZCb3N0b24xHDAaBgNVBAoTE1R3aXN0ZWQgTWF0cml4
  71. IExhYnMxFjAUBgNVBAgTDU1hc3NhY2h1c2V0dHMxJzAlBgkqhkiG9w0BCQEWGG5v
  72. Ym9keUB0d2lzdGVkbWF0cml4LmNvbTERMA8GA1UECxMIU2VjdXJpdHkwgZ8wDQYJ
  73. KoZIhvcNAQEBBQADgY0AMIGJAoGBAMzH8CDF/U91y/bdbdbJKnLgnyvQ9Ig9ZNZp
  74. 8hpsu4huil60zF03+Lexg2l1FIfURScjBuaJMR6HiMYTMjhzLuByRZ17KW4wYkGi
  75. KXstz03VIKy4Tjc+v4aXFI4XdRw10gGMGQlGGscXF/RSoN84VoDKBfOMWdXeConJ
  76. VyC4w3iJAgMBAAEwDQYJKoZIhvcNAQEEBQADgYEAviMT4lBoxOgQy32LIgZ4lVCj
  77. JNOiZYg8GMQ6y0ugp86X80UjOvkGtNf/R7YgED/giKRN/q/XJiLJDEhzknkocwmO
  78. S+4b2XpiaZYxRyKWwL221O7CGmtWYyZl2+92YYmmCiNzWQPfP6BOMlfax0AGLHls
  79. fXzCWdG0O/3Lk2SRM0I=
  80. -----END CERTIFICATE-----
  81. """
  82. A_PEER_CERTIFICATE_PEM = """
  83. -----BEGIN CERTIFICATE-----
  84. MIIC3jCCAkcCAjA6MA0GCSqGSIb3DQEBBAUAMIG2MQswCQYDVQQGEwJVUzEiMCAG
  85. A1UEAxMZZXhhbXBsZS50d2lzdGVkbWF0cml4LmNvbTEPMA0GA1UEBxMGQm9zdG9u
  86. MRwwGgYDVQQKExNUd2lzdGVkIE1hdHJpeCBMYWJzMRYwFAYDVQQIEw1NYXNzYWNo
  87. dXNldHRzMSkwJwYJKoZIhvcNAQkBFhpzb21lYm9keUB0d2lzdGVkbWF0cml4LmNv
  88. bTERMA8GA1UECxMIU2VjdXJpdHkwHhcNMDYwODE2MDEwMTU2WhcNMDcwODE2MDEw
  89. MTU2WjCBtjELMAkGA1UEBhMCVVMxIjAgBgNVBAMTGWV4YW1wbGUudHdpc3RlZG1h
  90. dHJpeC5jb20xDzANBgNVBAcTBkJvc3RvbjEcMBoGA1UEChMTVHdpc3RlZCBNYXRy
  91. aXggTGFiczEWMBQGA1UECBMNTWFzc2FjaHVzZXR0czEpMCcGCSqGSIb3DQEJARYa
  92. c29tZWJvZHlAdHdpc3RlZG1hdHJpeC5jb20xETAPBgNVBAsTCFNlY3VyaXR5MIGf
  93. MA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCnm+WBlgFNbMlHehib9ePGGDXF+Nz4
  94. CjGuUmVBaXCRCiVjg3kSDecwqfb0fqTksBZ+oQ1UBjMcSh7OcvFXJZnUesBikGWE
  95. JE4V8Bjh+RmbJ1ZAlUPZ40bAkww0OpyIRAGMvKG+4yLFTO4WDxKmfDcrOb6ID8WJ
  96. e1u+i3XGkIf/5QIDAQABMA0GCSqGSIb3DQEBBAUAA4GBAD4Oukm3YYkhedUepBEA
  97. vvXIQhVDqL7mk6OqYdXmNj6R7ZMC8WWvGZxrzDI1bZuB+4aIxxd1FXC3UOHiR/xg
  98. i9cDl1y8P/qRp4aEBNF6rI0D4AxTbfnHQx4ERDAOShJdYZs/2zifPJ6va6YvrEyr
  99. yqDtGhklsWW3ZwBzEh5VEOUp
  100. -----END CERTIFICATE-----
  101. """
  102. A_KEYPAIR = getModule(__name__).filePath.sibling('server.pem').getContent()
  103. def counter(counter=itertools.count()):
  104. """
  105. Each time we're called, return the next integer in the natural numbers.
  106. """
  107. return next(counter)
  108. def makeCertificate(**kw):
  109. keypair = PKey()
  110. keypair.generate_key(TYPE_RSA, 1024)
  111. certificate = X509()
  112. certificate.gmtime_adj_notBefore(0)
  113. certificate.gmtime_adj_notAfter(60 * 60 * 24 * 365) # One year
  114. for xname in certificate.get_issuer(), certificate.get_subject():
  115. for (k, v) in kw.items():
  116. setattr(xname, k, nativeString(v))
  117. certificate.set_serial_number(counter())
  118. certificate.set_pubkey(keypair)
  119. certificate.sign(keypair, "md5")
  120. return keypair, certificate
  121. def certificatesForAuthorityAndServer(serviceIdentity=u'example.com'):
  122. """
  123. Create a self-signed CA certificate and server certificate signed by the
  124. CA.
  125. @param serviceIdentity: The identity (hostname) of the server.
  126. @type serviceIdentity: L{unicode}
  127. @return: a 2-tuple of C{(certificate_authority_certificate,
  128. server_certificate)}
  129. @rtype: L{tuple} of (L{sslverify.Certificate},
  130. L{sslverify.PrivateCertificate})
  131. """
  132. commonNameForCA = x509.Name(
  133. [x509.NameAttribute(NameOID.COMMON_NAME, u'Testing Example CA')]
  134. )
  135. commonNameForServer = x509.Name(
  136. [x509.NameAttribute(NameOID.COMMON_NAME, u'Testing Example Server')]
  137. )
  138. oneDay = datetime.timedelta(1, 0, 0)
  139. privateKeyForCA = rsa.generate_private_key(
  140. public_exponent=65537,
  141. key_size=4096,
  142. backend=default_backend()
  143. )
  144. publicKeyForCA = privateKeyForCA.public_key()
  145. caCertificate = (
  146. x509.CertificateBuilder()
  147. .subject_name(commonNameForCA)
  148. .issuer_name(commonNameForCA)
  149. .not_valid_before(datetime.datetime.today() - oneDay)
  150. .not_valid_after(datetime.datetime.today() + oneDay)
  151. .serial_number(x509.random_serial_number())
  152. .public_key(publicKeyForCA)
  153. .add_extension(
  154. x509.BasicConstraints(ca=True, path_length=9), critical=True,
  155. )
  156. .sign(
  157. private_key=privateKeyForCA, algorithm=hashes.SHA256(),
  158. backend=default_backend()
  159. )
  160. )
  161. privateKeyForServer = rsa.generate_private_key(
  162. public_exponent=65537,
  163. key_size=4096,
  164. backend=default_backend()
  165. )
  166. publicKeyForServer = privateKeyForServer.public_key()
  167. serverCertificate = (
  168. x509.CertificateBuilder()
  169. .subject_name(commonNameForServer)
  170. .issuer_name(commonNameForCA)
  171. .not_valid_before(datetime.datetime.today() - oneDay)
  172. .not_valid_after(datetime.datetime.today() + oneDay)
  173. .serial_number(x509.random_serial_number())
  174. .public_key(publicKeyForServer)
  175. .add_extension(
  176. x509.BasicConstraints(ca=False, path_length=None), critical=True,
  177. )
  178. .add_extension(
  179. x509.SubjectAlternativeName(
  180. [x509.DNSName(serviceIdentity)]
  181. ),
  182. critical=True,
  183. )
  184. .sign(
  185. private_key=privateKeyForCA, algorithm=hashes.SHA256(),
  186. backend=default_backend()
  187. )
  188. )
  189. caSelfCert = sslverify.Certificate.loadPEM(
  190. caCertificate.public_bytes(Encoding.PEM)
  191. )
  192. serverCert = sslverify.PrivateCertificate.loadPEM(
  193. b"\n".join([privateKeyForServer.private_bytes(
  194. Encoding.PEM,
  195. PrivateFormat.TraditionalOpenSSL,
  196. NoEncryption(),
  197. ),
  198. serverCertificate.public_bytes(Encoding.PEM)])
  199. )
  200. return caSelfCert, serverCert
  201. def _loopbackTLSConnection(serverOpts, clientOpts):
  202. """
  203. Common implementation code for both L{loopbackTLSConnection} and
  204. L{loopbackTLSConnectionInMemory}. Creates a loopback TLS connection
  205. using the provided server and client context factories.
  206. @param serverOpts: An OpenSSL context factory for the server.
  207. @type serverOpts: C{OpenSSLCertificateOptions}, or any class with an
  208. equivalent API.
  209. @param clientOpts: An OpenSSL context factory for the client.
  210. @type clientOpts: C{OpenSSLCertificateOptions}, or any class with an
  211. equivalent API.
  212. @return: 3-tuple of server-protocol, client-protocol, and L{IOPump}
  213. @rtype: L{tuple}
  214. """
  215. class GreetingServer(protocol.Protocol):
  216. greeting = b"greetings!"
  217. def connectionMade(self):
  218. self.transport.write(self.greeting)
  219. class ListeningClient(protocol.Protocol):
  220. data = b''
  221. lostReason = None
  222. def dataReceived(self, data):
  223. self.data += data
  224. def connectionLost(self, reason):
  225. self.lostReason = reason
  226. clientFactory = TLSMemoryBIOFactory(
  227. clientOpts, isClient=True,
  228. wrappedFactory=protocol.Factory.forProtocol(GreetingServer)
  229. )
  230. serverFactory = TLSMemoryBIOFactory(
  231. serverOpts, isClient=False,
  232. wrappedFactory=protocol.Factory.forProtocol(ListeningClient)
  233. )
  234. sProto, cProto, pump = connectedServerAndClient(
  235. lambda: serverFactory.buildProtocol(None),
  236. lambda: clientFactory.buildProtocol(None)
  237. )
  238. return sProto, cProto, pump
  239. def loopbackTLSConnection(trustRoot, privateKeyFile, chainedCertFile=None):
  240. """
  241. Create a loopback TLS connection with the given trust and keys.
  242. @param trustRoot: the C{trustRoot} argument for the client connection's
  243. context.
  244. @type trustRoot: L{sslverify.IOpenSSLTrustRoot}
  245. @param privateKeyFile: The name of the file containing the private key.
  246. @type privateKeyFile: L{str} (native string; file name)
  247. @param chainedCertFile: The name of the chained certificate file.
  248. @type chainedCertFile: L{str} (native string; file name)
  249. @return: 3-tuple of server-protocol, client-protocol, and L{IOPump}
  250. @rtype: L{tuple}
  251. """
  252. class ContextFactory(object):
  253. def getContext(self):
  254. """
  255. Create a context for the server side of the connection.
  256. @return: an SSL context using a certificate and key.
  257. @rtype: C{OpenSSL.SSL.Context}
  258. """
  259. ctx = SSL.Context(SSL.TLSv1_METHOD)
  260. if chainedCertFile is not None:
  261. ctx.use_certificate_chain_file(chainedCertFile)
  262. ctx.use_privatekey_file(privateKeyFile)
  263. # Let the test author know if they screwed something up.
  264. ctx.check_privatekey()
  265. return ctx
  266. serverOpts = ContextFactory()
  267. clientOpts = sslverify.OpenSSLCertificateOptions(trustRoot=trustRoot)
  268. return _loopbackTLSConnection(serverOpts, clientOpts)
  269. def loopbackTLSConnectionInMemory(trustRoot, privateKey,
  270. serverCertificate, clientProtocols=None,
  271. serverProtocols=None,
  272. clientOptions=None):
  273. """
  274. Create a loopback TLS connection with the given trust and keys. Like
  275. L{loopbackTLSConnection}, but using in-memory certificates and keys rather
  276. than writing them to disk.
  277. @param trustRoot: the C{trustRoot} argument for the client connection's
  278. context.
  279. @type trustRoot: L{sslverify.IOpenSSLTrustRoot}
  280. @param privateKey: The private key.
  281. @type privateKey: L{str} (native string)
  282. @param serverCertificate: The certificate used by the server.
  283. @type chainedCertFile: L{str} (native string)
  284. @param clientProtocols: The protocols the client is willing to negotiate
  285. using NPN/ALPN.
  286. @param serverProtocols: The protocols the server is willing to negotiate
  287. using NPN/ALPN.
  288. @param clientOptions: The type of C{OpenSSLCertificateOptions} class to
  289. use for the client. Defaults to C{OpenSSLCertificateOptions}.
  290. @return: 3-tuple of server-protocol, client-protocol, and L{IOPump}
  291. @rtype: L{tuple}
  292. """
  293. if clientOptions is None:
  294. clientOptions = sslverify.OpenSSLCertificateOptions
  295. clientCertOpts = clientOptions(
  296. trustRoot=trustRoot,
  297. acceptableProtocols=clientProtocols
  298. )
  299. serverCertOpts = sslverify.OpenSSLCertificateOptions(
  300. privateKey=privateKey,
  301. certificate=serverCertificate,
  302. acceptableProtocols=serverProtocols,
  303. )
  304. return _loopbackTLSConnection(serverCertOpts, clientCertOpts)
  305. def pathContainingDumpOf(testCase, *dumpables):
  306. """
  307. Create a temporary file to store some serializable-as-PEM objects in, and
  308. return its name.
  309. @param testCase: a test case to use for generating a temporary directory.
  310. @type testCase: L{twisted.trial.unittest.TestCase}
  311. @param dumpables: arguments are objects from pyOpenSSL with a C{dump}
  312. method, taking a pyOpenSSL file-type constant, such as
  313. L{OpenSSL.crypto.FILETYPE_PEM} or L{OpenSSL.crypto.FILETYPE_ASN1}.
  314. @type dumpables: L{tuple} of L{object} with C{dump} method taking L{int}
  315. returning L{bytes}
  316. @return: the path to a file where all of the dumpables were dumped in PEM
  317. format.
  318. @rtype: L{str}
  319. """
  320. fname = testCase.mktemp()
  321. with open(fname, "wb") as f:
  322. for dumpable in dumpables:
  323. f.write(dumpable.dump(FILETYPE_PEM))
  324. return fname
  325. class DataCallbackProtocol(protocol.Protocol):
  326. def dataReceived(self, data):
  327. d, self.factory.onData = self.factory.onData, None
  328. if d is not None:
  329. d.callback(data)
  330. def connectionLost(self, reason):
  331. d, self.factory.onLost = self.factory.onLost, None
  332. if d is not None:
  333. d.errback(reason)
  334. class WritingProtocol(protocol.Protocol):
  335. byte = b'x'
  336. def connectionMade(self):
  337. self.transport.write(self.byte)
  338. def connectionLost(self, reason):
  339. self.factory.onLost.errback(reason)
  340. class FakeContext(object):
  341. """
  342. Introspectable fake of an C{OpenSSL.SSL.Context}.
  343. Saves call arguments for later introspection.
  344. Necessary because C{Context} offers poor introspection. cf. this
  345. U{pyOpenSSL bug<https://bugs.launchpad.net/pyopenssl/+bug/1173899>}.
  346. @ivar _method: See C{method} parameter of L{__init__}.
  347. @ivar _options: L{int} of C{OR}ed values from calls of L{set_options}.
  348. @ivar _certificate: Set by L{use_certificate}.
  349. @ivar _privateKey: Set by L{use_privatekey}.
  350. @ivar _verify: Set by L{set_verify}.
  351. @ivar _verifyDepth: Set by L{set_verify_depth}.
  352. @ivar _mode: Set by L{set_mode}.
  353. @ivar _sessionID: Set by L{set_session_id}.
  354. @ivar _extraCertChain: Accumulated L{list} of all extra certificates added
  355. by L{add_extra_chain_cert}.
  356. @ivar _cipherList: Set by L{set_cipher_list}.
  357. @ivar _dhFilename: Set by L{load_tmp_dh}.
  358. @ivar _defaultVerifyPathsSet: Set by L{set_default_verify_paths}
  359. """
  360. _options = 0
  361. def __init__(self, method):
  362. self._method = method
  363. self._extraCertChain = []
  364. self._defaultVerifyPathsSet = False
  365. def set_options(self, options):
  366. self._options |= options
  367. def use_certificate(self, certificate):
  368. self._certificate = certificate
  369. def use_privatekey(self, privateKey):
  370. self._privateKey = privateKey
  371. def check_privatekey(self):
  372. return None
  373. def set_mode(self, mode):
  374. """
  375. Set the mode. See L{SSL.Context.set_mode}.
  376. @param mode: See L{SSL.Context.set_mode}.
  377. """
  378. self._mode = mode
  379. def set_verify(self, flags, callback):
  380. self._verify = flags, callback
  381. def set_verify_depth(self, depth):
  382. self._verifyDepth = depth
  383. def set_session_id(self, sessionID):
  384. self._sessionID = sessionID
  385. def add_extra_chain_cert(self, cert):
  386. self._extraCertChain.append(cert)
  387. def set_cipher_list(self, cipherList):
  388. self._cipherList = cipherList
  389. def load_tmp_dh(self, dhfilename):
  390. self._dhFilename = dhfilename
  391. def set_default_verify_paths(self):
  392. """
  393. Set the default paths for the platform.
  394. """
  395. self._defaultVerifyPathsSet = True
  396. class ClientOptionsTests(unittest.SynchronousTestCase):
  397. """
  398. Tests for L{sslverify.optionsForClientTLS}.
  399. """
  400. if skipSSL:
  401. skip = skipSSL
  402. def test_extraKeywords(self):
  403. """
  404. When passed a keyword parameter other than C{extraCertificateOptions},
  405. L{sslverify.optionsForClientTLS} raises an exception just like a
  406. normal Python function would.
  407. """
  408. error = self.assertRaises(
  409. TypeError,
  410. sslverify.optionsForClientTLS,
  411. hostname=u'alpha', someRandomThing=u'beta',
  412. )
  413. self.assertEqual(
  414. str(error),
  415. "optionsForClientTLS() got an unexpected keyword argument "
  416. "'someRandomThing'"
  417. )
  418. def test_bytesFailFast(self):
  419. """
  420. If you pass L{bytes} as the hostname to
  421. L{sslverify.optionsForClientTLS} it immediately raises a L{TypeError}.
  422. """
  423. error = self.assertRaises(
  424. TypeError,
  425. sslverify.optionsForClientTLS, b'not-actually-a-hostname.com'
  426. )
  427. expectedText = (
  428. "optionsForClientTLS requires text for host names, not " +
  429. bytes.__name__
  430. )
  431. self.assertEqual(str(error), expectedText)
  432. class OpenSSLOptionsTests(unittest.TestCase):
  433. if skipSSL:
  434. skip = skipSSL
  435. serverPort = clientConn = None
  436. onServerLost = onClientLost = None
  437. sKey = None
  438. sCert = None
  439. cKey = None
  440. cCert = None
  441. def setUp(self):
  442. """
  443. Create class variables of client and server certificates.
  444. """
  445. self.sKey, self.sCert = makeCertificate(
  446. O=b"Server Test Certificate",
  447. CN=b"server")
  448. self.cKey, self.cCert = makeCertificate(
  449. O=b"Client Test Certificate",
  450. CN=b"client")
  451. self.caCert1 = makeCertificate(
  452. O=b"CA Test Certificate 1",
  453. CN=b"ca1")[1]
  454. self.caCert2 = makeCertificate(
  455. O=b"CA Test Certificate",
  456. CN=b"ca2")[1]
  457. self.caCerts = [self.caCert1, self.caCert2]
  458. self.extraCertChain = self.caCerts
  459. def tearDown(self):
  460. if self.serverPort is not None:
  461. self.serverPort.stopListening()
  462. if self.clientConn is not None:
  463. self.clientConn.disconnect()
  464. L = []
  465. if self.onServerLost is not None:
  466. L.append(self.onServerLost)
  467. if self.onClientLost is not None:
  468. L.append(self.onClientLost)
  469. return defer.DeferredList(L, consumeErrors=True)
  470. def loopback(self, serverCertOpts, clientCertOpts,
  471. onServerLost=None, onClientLost=None, onData=None):
  472. if onServerLost is None:
  473. self.onServerLost = onServerLost = defer.Deferred()
  474. if onClientLost is None:
  475. self.onClientLost = onClientLost = defer.Deferred()
  476. if onData is None:
  477. onData = defer.Deferred()
  478. serverFactory = protocol.ServerFactory()
  479. serverFactory.protocol = DataCallbackProtocol
  480. serverFactory.onLost = onServerLost
  481. serverFactory.onData = onData
  482. clientFactory = protocol.ClientFactory()
  483. clientFactory.protocol = WritingProtocol
  484. clientFactory.onLost = onClientLost
  485. self.serverPort = reactor.listenSSL(0, serverFactory, serverCertOpts)
  486. self.clientConn = reactor.connectSSL('127.0.0.1',
  487. self.serverPort.getHost().port, clientFactory, clientCertOpts)
  488. def test_constructorWithOnlyPrivateKey(self):
  489. """
  490. C{privateKey} and C{certificate} make only sense if both are set.
  491. """
  492. self.assertRaises(
  493. ValueError,
  494. sslverify.OpenSSLCertificateOptions, privateKey=self.sKey
  495. )
  496. def test_constructorWithOnlyCertificate(self):
  497. """
  498. C{privateKey} and C{certificate} make only sense if both are set.
  499. """
  500. self.assertRaises(
  501. ValueError,
  502. sslverify.OpenSSLCertificateOptions, certificate=self.sCert
  503. )
  504. def test_constructorWithCertificateAndPrivateKey(self):
  505. """
  506. Specifying C{privateKey} and C{certificate} initializes correctly.
  507. """
  508. opts = sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
  509. certificate=self.sCert)
  510. self.assertEqual(opts.privateKey, self.sKey)
  511. self.assertEqual(opts.certificate, self.sCert)
  512. self.assertEqual(opts.extraCertChain, [])
  513. def test_constructorDoesNotAllowVerifyWithoutCACerts(self):
  514. """
  515. C{verify} must not be C{True} without specifying C{caCerts}.
  516. """
  517. self.assertRaises(
  518. ValueError,
  519. sslverify.OpenSSLCertificateOptions,
  520. privateKey=self.sKey, certificate=self.sCert, verify=True
  521. )
  522. def test_constructorDoesNotAllowLegacyWithTrustRoot(self):
  523. """
  524. C{verify}, C{requireCertificate}, and C{caCerts} must not be specified
  525. by the caller (to be I{any} value, even the default!) when specifying
  526. C{trustRoot}.
  527. """
  528. self.assertRaises(
  529. TypeError,
  530. sslverify.OpenSSLCertificateOptions,
  531. privateKey=self.sKey, certificate=self.sCert,
  532. verify=True, trustRoot=None, caCerts=self.caCerts,
  533. )
  534. self.assertRaises(
  535. TypeError,
  536. sslverify.OpenSSLCertificateOptions,
  537. privateKey=self.sKey, certificate=self.sCert,
  538. trustRoot=None, requireCertificate=True,
  539. )
  540. def test_constructorAllowsCACertsWithoutVerify(self):
  541. """
  542. It's currently a NOP, but valid.
  543. """
  544. opts = sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
  545. certificate=self.sCert,
  546. caCerts=self.caCerts)
  547. self.assertFalse(opts.verify)
  548. self.assertEqual(self.caCerts, opts.caCerts)
  549. def test_constructorWithVerifyAndCACerts(self):
  550. """
  551. Specifying C{verify} and C{caCerts} initializes correctly.
  552. """
  553. opts = sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
  554. certificate=self.sCert,
  555. verify=True,
  556. caCerts=self.caCerts)
  557. self.assertTrue(opts.verify)
  558. self.assertEqual(self.caCerts, opts.caCerts)
  559. def test_constructorSetsExtraChain(self):
  560. """
  561. Setting C{extraCertChain} works if C{certificate} and C{privateKey} are
  562. set along with it.
  563. """
  564. opts = sslverify.OpenSSLCertificateOptions(
  565. privateKey=self.sKey,
  566. certificate=self.sCert,
  567. extraCertChain=self.extraCertChain,
  568. )
  569. self.assertEqual(self.extraCertChain, opts.extraCertChain)
  570. def test_constructorDoesNotAllowExtraChainWithoutPrivateKey(self):
  571. """
  572. A C{extraCertChain} without C{privateKey} doesn't make sense and is
  573. thus rejected.
  574. """
  575. self.assertRaises(
  576. ValueError,
  577. sslverify.OpenSSLCertificateOptions,
  578. certificate=self.sCert,
  579. extraCertChain=self.extraCertChain,
  580. )
  581. def test_constructorDoesNotAllowExtraChainWithOutPrivateKey(self):
  582. """
  583. A C{extraCertChain} without C{certificate} doesn't make sense and is
  584. thus rejected.
  585. """
  586. self.assertRaises(
  587. ValueError,
  588. sslverify.OpenSSLCertificateOptions,
  589. privateKey=self.sKey,
  590. extraCertChain=self.extraCertChain,
  591. )
  592. def test_extraChainFilesAreAddedIfSupplied(self):
  593. """
  594. If C{extraCertChain} is set and all prerequisites are met, the
  595. specified chain certificates are added to C{Context}s that get
  596. created.
  597. """
  598. opts = sslverify.OpenSSLCertificateOptions(
  599. privateKey=self.sKey,
  600. certificate=self.sCert,
  601. extraCertChain=self.extraCertChain,
  602. )
  603. opts._contextFactory = FakeContext
  604. ctx = opts.getContext()
  605. self.assertEqual(self.sKey, ctx._privateKey)
  606. self.assertEqual(self.sCert, ctx._certificate)
  607. self.assertEqual(self.extraCertChain, ctx._extraCertChain)
  608. def test_extraChainDoesNotBreakPyOpenSSL(self):
  609. """
  610. C{extraCertChain} doesn't break C{OpenSSL.SSL.Context} creation.
  611. """
  612. opts = sslverify.OpenSSLCertificateOptions(
  613. privateKey=self.sKey,
  614. certificate=self.sCert,
  615. extraCertChain=self.extraCertChain,
  616. )
  617. ctx = opts.getContext()
  618. self.assertIsInstance(ctx, SSL.Context)
  619. def test_acceptableCiphersAreAlwaysSet(self):
  620. """
  621. If the user doesn't supply custom acceptable ciphers, a shipped secure
  622. default is used. We can't check directly for it because the effective
  623. cipher string we set varies with platforms.
  624. """
  625. opts = sslverify.OpenSSLCertificateOptions(
  626. privateKey=self.sKey,
  627. certificate=self.sCert,
  628. )
  629. opts._contextFactory = FakeContext
  630. ctx = opts.getContext()
  631. self.assertEqual(opts._cipherString.encode('ascii'), ctx._cipherList)
  632. def test_givesMeaningfulErrorMessageIfNoCipherMatches(self):
  633. """
  634. If there is no valid cipher that matches the user's wishes,
  635. a L{ValueError} is raised.
  636. """
  637. self.assertRaises(
  638. ValueError,
  639. sslverify.OpenSSLCertificateOptions,
  640. privateKey=self.sKey,
  641. certificate=self.sCert,
  642. acceptableCiphers=
  643. sslverify.OpenSSLAcceptableCiphers.fromOpenSSLCipherString('')
  644. )
  645. def test_honorsAcceptableCiphersArgument(self):
  646. """
  647. If acceptable ciphers are passed, they are used.
  648. """
  649. @implementer(interfaces.IAcceptableCiphers)
  650. class FakeAcceptableCiphers(object):
  651. def selectCiphers(self, _):
  652. return [sslverify.OpenSSLCipher(u'sentinel')]
  653. opts = sslverify.OpenSSLCertificateOptions(
  654. privateKey=self.sKey,
  655. certificate=self.sCert,
  656. acceptableCiphers=FakeAcceptableCiphers(),
  657. )
  658. opts._contextFactory = FakeContext
  659. ctx = opts.getContext()
  660. self.assertEqual(b'sentinel', ctx._cipherList)
  661. def test_basicSecurityOptionsAreSet(self):
  662. """
  663. Every context must have C{OP_NO_SSLv2}, C{OP_NO_COMPRESSION}, and
  664. C{OP_CIPHER_SERVER_PREFERENCE} set.
  665. """
  666. opts = sslverify.OpenSSLCertificateOptions(
  667. privateKey=self.sKey,
  668. certificate=self.sCert,
  669. )
  670. opts._contextFactory = FakeContext
  671. ctx = opts.getContext()
  672. options = (SSL.OP_NO_SSLv2 | SSL.OP_NO_COMPRESSION |
  673. SSL.OP_CIPHER_SERVER_PREFERENCE)
  674. self.assertEqual(options, ctx._options & options)
  675. def test_modeIsSet(self):
  676. """
  677. Every context must be in C{MODE_RELEASE_BUFFERS} mode.
  678. """
  679. opts = sslverify.OpenSSLCertificateOptions(
  680. privateKey=self.sKey,
  681. certificate=self.sCert,
  682. )
  683. opts._contextFactory = FakeContext
  684. ctx = opts.getContext()
  685. self.assertEqual(SSL.MODE_RELEASE_BUFFERS, ctx._mode)
  686. def test_singleUseKeys(self):
  687. """
  688. If C{singleUseKeys} is set, every context must have
  689. C{OP_SINGLE_DH_USE} and C{OP_SINGLE_ECDH_USE} set.
  690. """
  691. opts = sslverify.OpenSSLCertificateOptions(
  692. privateKey=self.sKey,
  693. certificate=self.sCert,
  694. enableSingleUseKeys=True,
  695. )
  696. opts._contextFactory = FakeContext
  697. ctx = opts.getContext()
  698. options = SSL.OP_SINGLE_DH_USE | SSL.OP_SINGLE_ECDH_USE
  699. self.assertEqual(options, ctx._options & options)
  700. def test_methodIsDeprecated(self):
  701. """
  702. Passing C{method} to L{sslverify.OpenSSLCertificateOptions} is
  703. deprecated.
  704. """
  705. sslverify.OpenSSLCertificateOptions(
  706. privateKey=self.sKey,
  707. certificate=self.sCert,
  708. method=SSL.SSLv23_METHOD,
  709. )
  710. message = ("Passing method to twisted.internet.ssl.CertificateOptions "
  711. "was deprecated in Twisted 17.1.0. Please use a "
  712. "combination of insecurelyLowerMinimumTo, raiseMinimumTo, "
  713. "and lowerMaximumSecurityTo instead, as Twisted will "
  714. "correctly configure the method.")
  715. warnings = self.flushWarnings([self.test_methodIsDeprecated])
  716. self.assertEqual(1, len(warnings))
  717. self.assertEqual(DeprecationWarning, warnings[0]['category'])
  718. self.assertEqual(message, warnings[0]['message'])
  719. def test_tlsv1ByDefault(self):
  720. """
  721. L{sslverify.OpenSSLCertificateOptions} will make the default minimum
  722. TLS version v1.0, if no C{method}, or C{insecurelyLowerMinimumTo} is
  723. given.
  724. """
  725. opts = sslverify.OpenSSLCertificateOptions(
  726. privateKey=self.sKey,
  727. certificate=self.sCert
  728. )
  729. opts._contextFactory = FakeContext
  730. ctx = opts.getContext()
  731. options = (SSL.OP_NO_SSLv2 | SSL.OP_NO_COMPRESSION |
  732. SSL.OP_CIPHER_SERVER_PREFERENCE | SSL.OP_NO_SSLv3)
  733. self.assertEqual(options, ctx._options & options)
  734. def test_tlsProtocolsAtLeastWithMinimum(self):
  735. """
  736. Passing C{insecurelyLowerMinimumTo} along with C{raiseMinimumTo} to
  737. L{sslverify.OpenSSLCertificateOptions} will cause it to raise an
  738. exception.
  739. """
  740. with self.assertRaises(TypeError) as e:
  741. sslverify.OpenSSLCertificateOptions(
  742. privateKey=self.sKey,
  743. certificate=self.sCert,
  744. raiseMinimumTo=sslverify.TLSVersion.TLSv1_2,
  745. insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_2,
  746. )
  747. # Best error message
  748. self.assertEqual(e.exception.args, ("nope",))
  749. def test_tlsProtocolsNoMethodWithAtLeast(self):
  750. """
  751. Passing C{raiseMinimumTo} along with C{method} to
  752. L{sslverify.OpenSSLCertificateOptions} will cause it to raise an
  753. exception.
  754. """
  755. with self.assertRaises(TypeError) as e:
  756. sslverify.OpenSSLCertificateOptions(
  757. privateKey=self.sKey,
  758. certificate=self.sCert,
  759. method=SSL.SSLv23_METHOD,
  760. raiseMinimumTo=sslverify.TLSVersion.TLSv1_2,
  761. )
  762. # Best error message
  763. self.assertEqual(e.exception.args, ("nope",))
  764. def test_tlsProtocolsNoMethodWithMinimum(self):
  765. """
  766. Passing C{insecurelyLowerMinimumTo} along with C{method} to
  767. L{sslverify.OpenSSLCertificateOptions} will cause it to raise an
  768. exception.
  769. """
  770. with self.assertRaises(TypeError) as e:
  771. sslverify.OpenSSLCertificateOptions(
  772. privateKey=self.sKey,
  773. certificate=self.sCert,
  774. method=SSL.SSLv23_METHOD,
  775. insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_2,
  776. )
  777. # Best error message
  778. self.assertEqual(e.exception.args, ("nope",))
  779. def test_tlsProtocolsNoMethodWithMaximum(self):
  780. """
  781. Passing C{lowerMaximumSecurityTo} along with C{method} to
  782. L{sslverify.OpenSSLCertificateOptions} will cause it to raise an
  783. exception.
  784. """
  785. with self.assertRaises(TypeError) as e:
  786. sslverify.OpenSSLCertificateOptions(
  787. privateKey=self.sKey,
  788. certificate=self.sCert,
  789. method=SSL.SSLv23_METHOD,
  790. lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_2,
  791. )
  792. # Best error message
  793. self.assertEqual(e.exception.args, ("nope",))
  794. def test_tlsVersionRangeInOrder(self):
  795. """
  796. Passing out of order TLS versions to C{insecurelyLowerMinimumTo} and
  797. C{lowerMaximumSecurityTo} will cause it to raise an exception.
  798. """
  799. with self.assertRaises(ValueError) as e:
  800. sslverify.OpenSSLCertificateOptions(
  801. privateKey=self.sKey,
  802. certificate=self.sCert,
  803. insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_0,
  804. lowerMaximumSecurityTo=sslverify.TLSVersion.SSLv3)
  805. self.assertEqual(e.exception.args, (
  806. ("insecurelyLowerMinimumTo needs to be lower than "
  807. "lowerMaximumSecurityTo"),))
  808. def test_tlsVersionRangeInOrderAtLeast(self):
  809. """
  810. Passing out of order TLS versions to C{raiseMinimumTo} and
  811. C{lowerMaximumSecurityTo} will cause it to raise an exception.
  812. """
  813. with self.assertRaises(ValueError) as e:
  814. sslverify.OpenSSLCertificateOptions(
  815. privateKey=self.sKey,
  816. certificate=self.sCert,
  817. raiseMinimumTo=sslverify.TLSVersion.TLSv1_0,
  818. lowerMaximumSecurityTo=sslverify.TLSVersion.SSLv3)
  819. self.assertEqual(e.exception.args, (
  820. ("raiseMinimumTo needs to be lower than "
  821. "lowerMaximumSecurityTo"),))
  822. def test_tlsProtocolsreduceToMaxWithoutMin(self):
  823. """
  824. When calling L{sslverify.OpenSSLCertificateOptions} with
  825. C{lowerMaximumSecurityTo} but no C{raiseMinimumTo} or
  826. C{insecurelyLowerMinimumTo} set, and C{lowerMaximumSecurityTo} is
  827. below the minimum default, the minimum will be made the new maximum.
  828. """
  829. opts = sslverify.OpenSSLCertificateOptions(
  830. privateKey=self.sKey,
  831. certificate=self.sCert,
  832. lowerMaximumSecurityTo=sslverify.TLSVersion.SSLv3,
  833. )
  834. opts._contextFactory = FakeContext
  835. ctx = opts.getContext()
  836. options = (SSL.OP_NO_SSLv2 | SSL.OP_NO_COMPRESSION |
  837. SSL.OP_CIPHER_SERVER_PREFERENCE | SSL.OP_NO_TLSv1 |
  838. SSL.OP_NO_TLSv1_1 | SSL.OP_NO_TLSv1_2 | opts._OP_NO_TLSv1_3)
  839. self.assertEqual(options, ctx._options & options)
  840. def test_tlsProtocolsSSLv3Only(self):
  841. """
  842. When calling L{sslverify.OpenSSLCertificateOptions} with
  843. C{insecurelyLowerMinimumTo} and C{lowerMaximumSecurityTo} set to
  844. SSLv3, it will exclude all others.
  845. """
  846. opts = sslverify.OpenSSLCertificateOptions(
  847. privateKey=self.sKey,
  848. certificate=self.sCert,
  849. insecurelyLowerMinimumTo=sslverify.TLSVersion.SSLv3,
  850. lowerMaximumSecurityTo=sslverify.TLSVersion.SSLv3,
  851. )
  852. opts._contextFactory = FakeContext
  853. ctx = opts.getContext()
  854. options = (SSL.OP_NO_SSLv2 | SSL.OP_NO_COMPRESSION |
  855. SSL.OP_CIPHER_SERVER_PREFERENCE | SSL.OP_NO_TLSv1 |
  856. SSL.OP_NO_TLSv1_1 | SSL.OP_NO_TLSv1_2 | opts._OP_NO_TLSv1_3)
  857. self.assertEqual(options, ctx._options & options)
  858. def test_tlsProtocolsTLSv1Point0Only(self):
  859. """
  860. When calling L{sslverify.OpenSSLCertificateOptions} with
  861. C{insecurelyLowerMinimumTo} and C{lowerMaximumSecurityTo} set to v1.0,
  862. it will exclude all others.
  863. """
  864. opts = sslverify.OpenSSLCertificateOptions(
  865. privateKey=self.sKey,
  866. certificate=self.sCert,
  867. insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_0,
  868. lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_0,
  869. )
  870. opts._contextFactory = FakeContext
  871. ctx = opts.getContext()
  872. options = (SSL.OP_NO_SSLv2 | SSL.OP_NO_COMPRESSION |
  873. SSL.OP_CIPHER_SERVER_PREFERENCE | SSL.OP_NO_SSLv3 |
  874. SSL.OP_NO_TLSv1_1 | SSL.OP_NO_TLSv1_2 | opts._OP_NO_TLSv1_3)
  875. self.assertEqual(options, ctx._options & options)
  876. def test_tlsProtocolsTLSv1Point1Only(self):
  877. """
  878. When calling L{sslverify.OpenSSLCertificateOptions} with
  879. C{insecurelyLowerMinimumTo} and C{lowerMaximumSecurityTo} set to v1.1,
  880. it will exclude all others.
  881. """
  882. opts = sslverify.OpenSSLCertificateOptions(
  883. privateKey=self.sKey,
  884. certificate=self.sCert,
  885. insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_1,
  886. lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_1,
  887. )
  888. opts._contextFactory = FakeContext
  889. ctx = opts.getContext()
  890. options = (SSL.OP_NO_SSLv2 | SSL.OP_NO_COMPRESSION |
  891. SSL.OP_CIPHER_SERVER_PREFERENCE | SSL.OP_NO_SSLv3 |
  892. SSL.OP_NO_TLSv1 | SSL.OP_NO_TLSv1_2 | opts._OP_NO_TLSv1_3)
  893. self.assertEqual(options, ctx._options & options)
  894. def test_tlsProtocolsTLSv1Point2Only(self):
  895. """
  896. When calling L{sslverify.OpenSSLCertificateOptions} with
  897. C{insecurelyLowerMinimumTo} and C{lowerMaximumSecurityTo} set to v1.2,
  898. it will exclude all others.
  899. """
  900. opts = sslverify.OpenSSLCertificateOptions(
  901. privateKey=self.sKey,
  902. certificate=self.sCert,
  903. insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_2,
  904. lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_2,
  905. )
  906. opts._contextFactory = FakeContext
  907. ctx = opts.getContext()
  908. options = (SSL.OP_NO_SSLv2 | SSL.OP_NO_COMPRESSION |
  909. SSL.OP_CIPHER_SERVER_PREFERENCE | SSL.OP_NO_SSLv3 |
  910. SSL.OP_NO_TLSv1 | SSL.OP_NO_TLSv1_1 | opts._OP_NO_TLSv1_3)
  911. self.assertEqual(options, ctx._options & options)
  912. def test_tlsProtocolsAllModernTLS(self):
  913. """
  914. When calling L{sslverify.OpenSSLCertificateOptions} with
  915. C{insecurelyLowerMinimumTo} set to TLSv1.0 and
  916. C{lowerMaximumSecurityTo} to TLSv1.2, it will exclude both SSLs and
  917. the (unreleased) TLSv1.3.
  918. """
  919. opts = sslverify.OpenSSLCertificateOptions(
  920. privateKey=self.sKey,
  921. certificate=self.sCert,
  922. insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_0,
  923. lowerMaximumSecurityTo=sslverify.TLSVersion.TLSv1_2,
  924. )
  925. opts._contextFactory = FakeContext
  926. ctx = opts.getContext()
  927. options = (SSL.OP_NO_SSLv2 | SSL.OP_NO_COMPRESSION |
  928. SSL.OP_CIPHER_SERVER_PREFERENCE | SSL.OP_NO_SSLv3 |
  929. opts._OP_NO_TLSv1_3)
  930. self.assertEqual(options, ctx._options & options)
  931. def test_tlsProtocolsAtLeastAllSecureTLS(self):
  932. """
  933. When calling L{sslverify.OpenSSLCertificateOptions} with
  934. C{raiseMinimumTo} set to TLSv1.2, it will ignore all TLSs below
  935. 1.2 and SSL.
  936. """
  937. opts = sslverify.OpenSSLCertificateOptions(
  938. privateKey=self.sKey,
  939. certificate=self.sCert,
  940. raiseMinimumTo=sslverify.TLSVersion.TLSv1_2
  941. )
  942. opts._contextFactory = FakeContext
  943. ctx = opts.getContext()
  944. options = (SSL.OP_NO_SSLv2 | SSL.OP_NO_COMPRESSION |
  945. SSL.OP_CIPHER_SERVER_PREFERENCE | SSL.OP_NO_SSLv3 |
  946. SSL.OP_NO_TLSv1 | SSL.OP_NO_TLSv1_1)
  947. self.assertEqual(options, ctx._options & options)
  948. def test_tlsProtocolsAtLeastWillAcceptHigherDefault(self):
  949. """
  950. When calling L{sslverify.OpenSSLCertificateOptions} with
  951. C{raiseMinimumTo} set to a value lower than Twisted's default will
  952. cause it to use the more secure default.
  953. """
  954. opts = sslverify.OpenSSLCertificateOptions(
  955. privateKey=self.sKey,
  956. certificate=self.sCert,
  957. raiseMinimumTo=sslverify.TLSVersion.SSLv3
  958. )
  959. opts._contextFactory = FakeContext
  960. ctx = opts.getContext()
  961. # Future maintainer warning: this will break if we change our default
  962. # up, so you should change it to add the relevant OP_NO flags when we
  963. # do make that change and this test fails.
  964. options = (SSL.OP_NO_SSLv2 | SSL.OP_NO_COMPRESSION |
  965. SSL.OP_CIPHER_SERVER_PREFERENCE | SSL.OP_NO_SSLv3)
  966. self.assertEqual(options, ctx._options & options)
  967. self.assertEqual(opts._defaultMinimumTLSVersion,
  968. sslverify.TLSVersion.TLSv1_0)
  969. def test_tlsProtocolsAllSecureTLS(self):
  970. """
  971. When calling L{sslverify.OpenSSLCertificateOptions} with
  972. C{insecurelyLowerMinimumTo} set to TLSv1.2, it will ignore all TLSs below
  973. 1.2 and SSL.
  974. """
  975. opts = sslverify.OpenSSLCertificateOptions(
  976. privateKey=self.sKey,
  977. certificate=self.sCert,
  978. insecurelyLowerMinimumTo=sslverify.TLSVersion.TLSv1_2
  979. )
  980. opts._contextFactory = FakeContext
  981. ctx = opts.getContext()
  982. options = (SSL.OP_NO_SSLv2 | SSL.OP_NO_COMPRESSION |
  983. SSL.OP_CIPHER_SERVER_PREFERENCE | SSL.OP_NO_SSLv3 |
  984. SSL.OP_NO_TLSv1 | SSL.OP_NO_TLSv1_1)
  985. self.assertEqual(options, ctx._options & options)
  986. def test_dhParams(self):
  987. """
  988. If C{dhParams} is set, they are loaded into each new context.
  989. """
  990. class FakeDiffieHellmanParameters(object):
  991. _dhFile = FilePath(b'dh.params')
  992. dhParams = FakeDiffieHellmanParameters()
  993. opts = sslverify.OpenSSLCertificateOptions(
  994. privateKey=self.sKey,
  995. certificate=self.sCert,
  996. dhParameters=dhParams,
  997. )
  998. opts._contextFactory = FakeContext
  999. ctx = opts.getContext()
  1000. self.assertEqual(
  1001. FakeDiffieHellmanParameters._dhFile.path,
  1002. ctx._dhFilename
  1003. )
  1004. def test_ecDoesNotBreakConstructor(self):
  1005. """
  1006. Missing ECC does not break the constructor and sets C{_ecCurve} to
  1007. L{None}.
  1008. """
  1009. def raiser(self):
  1010. raise NotImplementedError
  1011. self.patch(sslverify._OpenSSLECCurve, "_getBinding", raiser)
  1012. opts = sslverify.OpenSSLCertificateOptions(
  1013. privateKey=self.sKey,
  1014. certificate=self.sCert,
  1015. )
  1016. self.assertIsNone(opts._ecCurve)
  1017. def test_ecNeverBreaksGetContext(self):
  1018. """
  1019. ECDHE support is best effort only and errors are ignored.
  1020. """
  1021. opts = sslverify.OpenSSLCertificateOptions(
  1022. privateKey=self.sKey,
  1023. certificate=self.sCert,
  1024. )
  1025. opts._ecCurve = object()
  1026. ctx = opts.getContext()
  1027. self.assertIsInstance(ctx, SSL.Context)
  1028. def test_ecSuccessWithRealBindings(self):
  1029. """
  1030. Integration test that checks the positive code path to ensure that we
  1031. use the API properly.
  1032. """
  1033. try:
  1034. defaultCurve = sslverify._OpenSSLECCurve(
  1035. sslverify._defaultCurveName
  1036. )
  1037. except NotImplementedError:
  1038. raise unittest.SkipTest(
  1039. "Underlying pyOpenSSL is not based on cryptography."
  1040. )
  1041. opts = sslverify.OpenSSLCertificateOptions(
  1042. privateKey=self.sKey,
  1043. certificate=self.sCert,
  1044. )
  1045. self.assertEqual(defaultCurve, opts._ecCurve)
  1046. # Exercise positive code path. getContext swallows errors so we do it
  1047. # explicitly by hand.
  1048. opts._ecCurve.addECKeyToContext(opts.getContext())
  1049. def test_abbreviatingDistinguishedNames(self):
  1050. """
  1051. Check that abbreviations used in certificates correctly map to
  1052. complete names.
  1053. """
  1054. self.assertEqual(
  1055. sslverify.DN(CN=b'a', OU=b'hello'),
  1056. sslverify.DistinguishedName(commonName=b'a',
  1057. organizationalUnitName=b'hello'))
  1058. self.assertNotEqual(
  1059. sslverify.DN(CN=b'a', OU=b'hello'),
  1060. sslverify.DN(CN=b'a', OU=b'hello', emailAddress=b'xxx'))
  1061. dn = sslverify.DN(CN=b'abcdefg')
  1062. self.assertRaises(AttributeError, setattr, dn, 'Cn', b'x')
  1063. self.assertEqual(dn.CN, dn.commonName)
  1064. dn.CN = b'bcdefga'
  1065. self.assertEqual(dn.CN, dn.commonName)
  1066. def testInspectDistinguishedName(self):
  1067. n = sslverify.DN(commonName=b'common name',
  1068. organizationName=b'organization name',
  1069. organizationalUnitName=b'organizational unit name',
  1070. localityName=b'locality name',
  1071. stateOrProvinceName=b'state or province name',
  1072. countryName=b'country name',
  1073. emailAddress=b'email address')
  1074. s = n.inspect()
  1075. for k in [
  1076. 'common name',
  1077. 'organization name',
  1078. 'organizational unit name',
  1079. 'locality name',
  1080. 'state or province name',
  1081. 'country name',
  1082. 'email address']:
  1083. self.assertIn(k, s, "%r was not in inspect output." % (k,))
  1084. self.assertIn(k.title(), s, "%r was not in inspect output." % (k,))
  1085. def testInspectDistinguishedNameWithoutAllFields(self):
  1086. n = sslverify.DN(localityName=b'locality name')
  1087. s = n.inspect()
  1088. for k in [
  1089. 'common name',
  1090. 'organization name',
  1091. 'organizational unit name',
  1092. 'state or province name',
  1093. 'country name',
  1094. 'email address']:
  1095. self.assertNotIn(k, s, "%r was in inspect output." % (k,))
  1096. self.assertNotIn(k.title(), s, "%r was in inspect output." % (k,))
  1097. self.assertIn('locality name', s)
  1098. self.assertIn('Locality Name', s)
  1099. def test_inspectCertificate(self):
  1100. """
  1101. Test that the C{inspect} method of L{sslverify.Certificate} returns
  1102. a human-readable string containing some basic information about the
  1103. certificate.
  1104. """
  1105. c = sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM)
  1106. pk = c.getPublicKey()
  1107. keyHash = pk.keyHash()
  1108. # Maintenance Note: the algorithm used to compute the "public key hash"
  1109. # is highly dubious and can differ between underlying versions of
  1110. # OpenSSL (and across versions of Twisted), since it is not actually
  1111. # the hash of the public key by itself. If we can get the appropriate
  1112. # APIs to get the hash of the key itself out of OpenSSL, then we should
  1113. # be able to make it statically declared inline below again rather than
  1114. # computing it here.
  1115. self.assertEqual(
  1116. c.inspect().split('\n'),
  1117. ["Certificate For Subject:",
  1118. " Common Name: example.twistedmatrix.com",
  1119. " Country Name: US",
  1120. " Email Address: nobody@twistedmatrix.com",
  1121. " Locality Name: Boston",
  1122. " Organization Name: Twisted Matrix Labs",
  1123. " Organizational Unit Name: Security",
  1124. " State Or Province Name: Massachusetts",
  1125. "",
  1126. "Issuer:",
  1127. " Common Name: example.twistedmatrix.com",
  1128. " Country Name: US",
  1129. " Email Address: nobody@twistedmatrix.com",
  1130. " Locality Name: Boston",
  1131. " Organization Name: Twisted Matrix Labs",
  1132. " Organizational Unit Name: Security",
  1133. " State Or Province Name: Massachusetts",
  1134. "",
  1135. "Serial Number: 12345",
  1136. "Digest: C4:96:11:00:30:C3:EC:EE:A3:55:AA:ED:8C:84:85:18",
  1137. "Public Key with Hash: " + keyHash])
  1138. def test_publicKeyMatching(self):
  1139. """
  1140. L{PublicKey.matches} returns L{True} for keys from certificates with
  1141. the same key, and L{False} for keys from certificates with different
  1142. keys.
  1143. """
  1144. hostA = sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM)
  1145. hostB = sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM)
  1146. peerA = sslverify.Certificate.loadPEM(A_PEER_CERTIFICATE_PEM)
  1147. self.assertTrue(hostA.getPublicKey().matches(hostB.getPublicKey()))
  1148. self.assertFalse(peerA.getPublicKey().matches(hostA.getPublicKey()))
  1149. def test_certificateOptionsSerialization(self):
  1150. """
  1151. Test that __setstate__(__getstate__()) round-trips properly.
  1152. """
  1153. firstOpts = sslverify.OpenSSLCertificateOptions(
  1154. privateKey=self.sKey,
  1155. certificate=self.sCert,
  1156. method=SSL.SSLv23_METHOD,
  1157. verify=True,
  1158. caCerts=[self.sCert],
  1159. verifyDepth=2,
  1160. requireCertificate=False,
  1161. verifyOnce=False,
  1162. enableSingleUseKeys=False,
  1163. enableSessions=False,
  1164. fixBrokenPeers=True,
  1165. enableSessionTickets=True)
  1166. context = firstOpts.getContext()
  1167. self.assertIs(context, firstOpts._context)
  1168. self.assertIsNotNone(context)
  1169. state = firstOpts.__getstate__()
  1170. self.assertNotIn("_context", state)
  1171. opts = sslverify.OpenSSLCertificateOptions()
  1172. opts.__setstate__(state)
  1173. self.assertEqual(opts.privateKey, self.sKey)
  1174. self.assertEqual(opts.certificate, self.sCert)
  1175. self.assertEqual(opts.method, SSL.SSLv23_METHOD)
  1176. self.assertTrue(opts.verify)
  1177. self.assertEqual(opts.caCerts, [self.sCert])
  1178. self.assertEqual(opts.verifyDepth, 2)
  1179. self.assertFalse(opts.requireCertificate)
  1180. self.assertFalse(opts.verifyOnce)
  1181. self.assertFalse(opts.enableSingleUseKeys)
  1182. self.assertFalse(opts.enableSessions)
  1183. self.assertTrue(opts.fixBrokenPeers)
  1184. self.assertTrue(opts.enableSessionTickets)
  1185. test_certificateOptionsSerialization.suppress = [
  1186. util.suppress(category = DeprecationWarning,
  1187. message='twisted\.internet\._sslverify\.*__[gs]etstate__')]
  1188. def test_certificateOptionsSessionTickets(self):
  1189. """
  1190. Enabling session tickets should not set the OP_NO_TICKET option.
  1191. """
  1192. opts = sslverify.OpenSSLCertificateOptions(enableSessionTickets=True)
  1193. ctx = opts.getContext()
  1194. self.assertEqual(0, ctx.set_options(0) & 0x00004000)
  1195. def test_certificateOptionsSessionTicketsDisabled(self):
  1196. """
  1197. Enabling session tickets should set the OP_NO_TICKET option.
  1198. """
  1199. opts = sslverify.OpenSSLCertificateOptions(enableSessionTickets=False)
  1200. ctx = opts.getContext()
  1201. self.assertEqual(0x00004000, ctx.set_options(0) & 0x00004000)
  1202. def test_allowedAnonymousClientConnection(self):
  1203. """
  1204. Check that anonymous connections are allowed when certificates aren't
  1205. required on the server.
  1206. """
  1207. onData = defer.Deferred()
  1208. self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
  1209. certificate=self.sCert, requireCertificate=False),
  1210. sslverify.OpenSSLCertificateOptions(
  1211. requireCertificate=False),
  1212. onData=onData)
  1213. return onData.addCallback(
  1214. lambda result: self.assertEqual(result, WritingProtocol.byte))
  1215. def test_refusedAnonymousClientConnection(self):
  1216. """
  1217. Check that anonymous connections are refused when certificates are
  1218. required on the server.
  1219. """
  1220. onServerLost = defer.Deferred()
  1221. onClientLost = defer.Deferred()
  1222. self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
  1223. certificate=self.sCert, verify=True,
  1224. caCerts=[self.sCert], requireCertificate=True),
  1225. sslverify.OpenSSLCertificateOptions(
  1226. requireCertificate=False),
  1227. onServerLost=onServerLost,
  1228. onClientLost=onClientLost)
  1229. d = defer.DeferredList([onClientLost, onServerLost],
  1230. consumeErrors=True)
  1231. def afterLost(result):
  1232. ((cSuccess, cResult), (sSuccess, sResult)) = result
  1233. self.assertFalse(cSuccess)
  1234. self.assertFalse(sSuccess)
  1235. # Win32 fails to report the SSL Error, and report a connection lost
  1236. # instead: there is a race condition so that's not totally
  1237. # surprising (see ticket #2877 in the tracker)
  1238. self.assertIsInstance(cResult.value, (SSL.Error, ConnectionLost))
  1239. self.assertIsInstance(sResult.value, SSL.Error)
  1240. return d.addCallback(afterLost)
  1241. def test_failedCertificateVerification(self):
  1242. """
  1243. Check that connecting with a certificate not accepted by the server CA
  1244. fails.
  1245. """
  1246. onServerLost = defer.Deferred()
  1247. onClientLost = defer.Deferred()
  1248. self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
  1249. certificate=self.sCert, verify=False,
  1250. requireCertificate=False),
  1251. sslverify.OpenSSLCertificateOptions(verify=True,
  1252. requireCertificate=False, caCerts=[self.cCert]),
  1253. onServerLost=onServerLost,
  1254. onClientLost=onClientLost)
  1255. d = defer.DeferredList([onClientLost, onServerLost],
  1256. consumeErrors=True)
  1257. def afterLost(result):
  1258. ((cSuccess, cResult), (sSuccess, sResult)) = result
  1259. self.assertFalse(cSuccess)
  1260. self.assertFalse(sSuccess)
  1261. return d.addCallback(afterLost)
  1262. def test_successfulCertificateVerification(self):
  1263. """
  1264. Test a successful connection with client certificate validation on
  1265. server side.
  1266. """
  1267. onData = defer.Deferred()
  1268. self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
  1269. certificate=self.sCert, verify=False,
  1270. requireCertificate=False),
  1271. sslverify.OpenSSLCertificateOptions(verify=True,
  1272. requireCertificate=True, caCerts=[self.sCert]),
  1273. onData=onData)
  1274. return onData.addCallback(
  1275. lambda result: self.assertEqual(result, WritingProtocol.byte))
  1276. def test_successfulSymmetricSelfSignedCertificateVerification(self):
  1277. """
  1278. Test a successful connection with validation on both server and client
  1279. sides.
  1280. """
  1281. onData = defer.Deferred()
  1282. self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
  1283. certificate=self.sCert, verify=True,
  1284. requireCertificate=True, caCerts=[self.cCert]),
  1285. sslverify.OpenSSLCertificateOptions(privateKey=self.cKey,
  1286. certificate=self.cCert, verify=True,
  1287. requireCertificate=True, caCerts=[self.sCert]),
  1288. onData=onData)
  1289. return onData.addCallback(
  1290. lambda result: self.assertEqual(result, WritingProtocol.byte))
  1291. def test_verification(self):
  1292. """
  1293. Check certificates verification building custom certificates data.
  1294. """
  1295. clientDN = sslverify.DistinguishedName(commonName='client')
  1296. clientKey = sslverify.KeyPair.generate()
  1297. clientCertReq = clientKey.certificateRequest(clientDN)
  1298. serverDN = sslverify.DistinguishedName(commonName='server')
  1299. serverKey = sslverify.KeyPair.generate()
  1300. serverCertReq = serverKey.certificateRequest(serverDN)
  1301. clientSelfCertReq = clientKey.certificateRequest(clientDN)
  1302. clientSelfCertData = clientKey.signCertificateRequest(
  1303. clientDN, clientSelfCertReq, lambda dn: True, 132)
  1304. clientSelfCert = clientKey.newCertificate(clientSelfCertData)
  1305. serverSelfCertReq = serverKey.certificateRequest(serverDN)
  1306. serverSelfCertData = serverKey.signCertificateRequest(
  1307. serverDN, serverSelfCertReq, lambda dn: True, 516)
  1308. serverSelfCert = serverKey.newCertificate(serverSelfCertData)
  1309. clientCertData = serverKey.signCertificateRequest(
  1310. serverDN, clientCertReq, lambda dn: True, 7)
  1311. clientCert = clientKey.newCertificate(clientCertData)
  1312. serverCertData = clientKey.signCertificateRequest(
  1313. clientDN, serverCertReq, lambda dn: True, 42)
  1314. serverCert = serverKey.newCertificate(serverCertData)
  1315. onData = defer.Deferred()
  1316. serverOpts = serverCert.options(serverSelfCert)
  1317. clientOpts = clientCert.options(clientSelfCert)
  1318. self.loopback(serverOpts,
  1319. clientOpts,
  1320. onData=onData)
  1321. return onData.addCallback(
  1322. lambda result: self.assertEqual(result, WritingProtocol.byte))
  1323. class DeprecationTests(unittest.SynchronousTestCase):
  1324. """
  1325. Tests for deprecation of L{sslverify.OpenSSLCertificateOptions}'s support
  1326. of the pickle protocol.
  1327. """
  1328. if skipSSL:
  1329. skip = skipSSL
  1330. def test_getstateDeprecation(self):
  1331. """
  1332. L{sslverify.OpenSSLCertificateOptions.__getstate__} is deprecated.
  1333. """
  1334. self.callDeprecated(
  1335. (Version("Twisted", 15, 0, 0), "a real persistence system"),
  1336. sslverify.OpenSSLCertificateOptions().__getstate__)
  1337. def test_setstateDeprecation(self):
  1338. """
  1339. L{sslverify.OpenSSLCertificateOptions.__setstate__} is deprecated.
  1340. """
  1341. self.callDeprecated(
  1342. (Version("Twisted", 15, 0, 0), "a real persistence system"),
  1343. sslverify.OpenSSLCertificateOptions().__setstate__, {})
  1344. class TrustRootTests(unittest.TestCase):
  1345. """
  1346. Tests for L{sslverify.OpenSSLCertificateOptions}' C{trustRoot} argument,
  1347. L{sslverify.platformTrust}, and their interactions.
  1348. """
  1349. if skipSSL:
  1350. skip = skipSSL
  1351. def test_caCertsPlatformDefaults(self):
  1352. """
  1353. Specifying a C{trustRoot} of L{sslverify.OpenSSLDefaultPaths} when
  1354. initializing L{sslverify.OpenSSLCertificateOptions} loads the
  1355. platform-provided trusted certificates via C{set_default_verify_paths}.
  1356. """
  1357. opts = sslverify.OpenSSLCertificateOptions(
  1358. trustRoot=sslverify.OpenSSLDefaultPaths(),
  1359. )
  1360. fc = FakeContext(SSL.TLSv1_METHOD)
  1361. opts._contextFactory = lambda method: fc
  1362. opts.getContext()
  1363. self.assertTrue(fc._defaultVerifyPathsSet)
  1364. def test_trustRootPlatformRejectsUntrustedCA(self):
  1365. """
  1366. Specifying a C{trustRoot} of L{platformTrust} when initializing
  1367. L{sslverify.OpenSSLCertificateOptions} causes certificates issued by a
  1368. newly created CA to be rejected by an SSL connection using these
  1369. options.
  1370. Note that this test should I{always} pass, even on platforms where the
  1371. CA certificates are not installed, as long as L{platformTrust} rejects
  1372. completely invalid / unknown root CA certificates. This is simply a
  1373. smoke test to make sure that verification is happening at all.
  1374. """
  1375. caSelfCert, serverCert = certificatesForAuthorityAndServer()
  1376. chainedCert = pathContainingDumpOf(self, serverCert, caSelfCert)
  1377. privateKey = pathContainingDumpOf(self, serverCert.privateKey)
  1378. sProto, cProto, pump = loopbackTLSConnection(
  1379. trustRoot=platformTrust(),
  1380. privateKeyFile=privateKey,
  1381. chainedCertFile=chainedCert,
  1382. )
  1383. # No data was received.
  1384. self.assertEqual(cProto.wrappedProtocol.data, b'')
  1385. # It was an L{SSL.Error}.
  1386. self.assertEqual(cProto.wrappedProtocol.lostReason.type, SSL.Error)
  1387. # Some combination of OpenSSL and PyOpenSSL is bad at reporting errors.
  1388. err = cProto.wrappedProtocol.lostReason.value
  1389. self.assertEqual(err.args[0][0][2], 'tlsv1 alert unknown ca')
  1390. def test_trustRootSpecificCertificate(self):
  1391. """
  1392. Specifying a L{Certificate} object for L{trustRoot} will result in that
  1393. certificate being the only trust root for a client.
  1394. """
  1395. caCert, serverCert = certificatesForAuthorityAndServer()
  1396. otherCa, otherServer = certificatesForAuthorityAndServer()
  1397. sProto, cProto, pump = loopbackTLSConnection(
  1398. trustRoot=caCert,
  1399. privateKeyFile=pathContainingDumpOf(self, serverCert.privateKey),
  1400. chainedCertFile=pathContainingDumpOf(self, serverCert),
  1401. )
  1402. pump.flush()
  1403. self.assertIsNone(cProto.wrappedProtocol.lostReason)
  1404. self.assertEqual(cProto.wrappedProtocol.data,
  1405. sProto.wrappedProtocol.greeting)
  1406. class ServiceIdentityTests(unittest.SynchronousTestCase):
  1407. """
  1408. Tests for the verification of the peer's service's identity via the
  1409. C{hostname} argument to L{sslverify.OpenSSLCertificateOptions}.
  1410. """
  1411. if skipSSL:
  1412. skip = skipSSL
  1413. def serviceIdentitySetup(self, clientHostname, serverHostname,
  1414. serverContextSetup=lambda ctx: None,
  1415. validCertificate=True,
  1416. clientPresentsCertificate=False,
  1417. validClientCertificate=True,
  1418. serverVerifies=False,
  1419. buggyInfoCallback=False,
  1420. fakePlatformTrust=False,
  1421. useDefaultTrust=False):
  1422. """
  1423. Connect a server and a client.
  1424. @param clientHostname: The I{client's idea} of the server's hostname;
  1425. passed as the C{hostname} to the
  1426. L{sslverify.OpenSSLCertificateOptions} instance.
  1427. @type clientHostname: L{unicode}
  1428. @param serverHostname: The I{server's own idea} of the server's
  1429. hostname; present in the certificate presented by the server.
  1430. @type serverHostname: L{unicode}
  1431. @param serverContextSetup: a 1-argument callable invoked with the
  1432. L{OpenSSL.SSL.Context} after it's produced.
  1433. @type serverContextSetup: L{callable} taking L{OpenSSL.SSL.Context}
  1434. returning L{None}.
  1435. @param validCertificate: Is the server's certificate valid? L{True} if
  1436. so, L{False} otherwise.
  1437. @type validCertificate: L{bool}
  1438. @param clientPresentsCertificate: Should the client present a
  1439. certificate to the server? Defaults to 'no'.
  1440. @type clientPresentsCertificate: L{bool}
  1441. @param validClientCertificate: If the client presents a certificate,
  1442. should it actually be a valid one, i.e. signed by the same CA that
  1443. the server is checking? Defaults to 'yes'.
  1444. @type validClientCertificate: L{bool}
  1445. @param serverVerifies: Should the server verify the client's
  1446. certificate? Defaults to 'no'.
  1447. @type serverVerifies: L{bool}
  1448. @param buggyInfoCallback: Should we patch the implementation so that
  1449. the C{info_callback} passed to OpenSSL to have a bug and raise an
  1450. exception (L{ZeroDivisionError})? Defaults to 'no'.
  1451. @type buggyInfoCallback: L{bool}
  1452. @param fakePlatformTrust: Should we fake the platformTrust to be the
  1453. same as our fake server certificate authority, so that we can test
  1454. it's being used? Defaults to 'no' and we just pass platform trust.
  1455. @type fakePlatformTrust: L{bool}
  1456. @param useDefaultTrust: Should we avoid passing the C{trustRoot} to
  1457. L{ssl.optionsForClientTLS}? Defaults to 'no'.
  1458. @type useDefaultTrust: L{bool}
  1459. @return: see L{connectedServerAndClient}.
  1460. @rtype: see L{connectedServerAndClient}.
  1461. """
  1462. serverCA, serverCert = certificatesForAuthorityAndServer(
  1463. serverHostname
  1464. )
  1465. other = {}
  1466. passClientCert = None
  1467. clientCA, clientCert = certificatesForAuthorityAndServer(u'client')
  1468. if serverVerifies:
  1469. other.update(trustRoot=clientCA)
  1470. if clientPresentsCertificate:
  1471. if validClientCertificate:
  1472. passClientCert = clientCert
  1473. else:
  1474. bogusCA, bogus = certificatesForAuthorityAndServer(u'client')
  1475. passClientCert = bogus
  1476. serverOpts = sslverify.OpenSSLCertificateOptions(
  1477. privateKey=serverCert.privateKey.original,
  1478. certificate=serverCert.original,
  1479. **other
  1480. )
  1481. serverContextSetup(serverOpts.getContext())
  1482. if not validCertificate:
  1483. serverCA, otherServer = certificatesForAuthorityAndServer(
  1484. serverHostname
  1485. )
  1486. if buggyInfoCallback:
  1487. def broken(*a, **k):
  1488. """
  1489. Raise an exception.
  1490. @param a: Arguments for an C{info_callback}
  1491. @param k: Keyword arguments for an C{info_callback}
  1492. """
  1493. 1 / 0
  1494. self.patch(
  1495. sslverify.ClientTLSOptions, "_identityVerifyingInfoCallback",
  1496. broken,
  1497. )
  1498. signature = {'hostname': clientHostname}
  1499. if passClientCert:
  1500. signature.update(clientCertificate=passClientCert)
  1501. if not useDefaultTrust:
  1502. signature.update(trustRoot=serverCA)
  1503. if fakePlatformTrust:
  1504. self.patch(sslverify, "platformTrust", lambda: serverCA)
  1505. clientOpts = sslverify.optionsForClientTLS(**signature)
  1506. class GreetingServer(protocol.Protocol):
  1507. greeting = b"greetings!"
  1508. lostReason = None
  1509. data = b''
  1510. def connectionMade(self):
  1511. self.transport.write(self.greeting)
  1512. def dataReceived(self, data):
  1513. self.data += data
  1514. def connectionLost(self, reason):
  1515. self.lostReason = reason
  1516. class GreetingClient(protocol.Protocol):
  1517. greeting = b'cheerio!'
  1518. data = b''
  1519. lostReason = None
  1520. def connectionMade(self):
  1521. self.transport.write(self.greeting)
  1522. def dataReceived(self, data):
  1523. self.data += data
  1524. def connectionLost(self, reason):
  1525. self.lostReason = reason
  1526. self.serverOpts = serverOpts
  1527. self.clientOpts = clientOpts
  1528. clientFactory = TLSMemoryBIOFactory(
  1529. clientOpts, isClient=True,
  1530. wrappedFactory=protocol.Factory.forProtocol(GreetingClient)
  1531. )
  1532. serverFactory = TLSMemoryBIOFactory(
  1533. serverOpts, isClient=False,
  1534. wrappedFactory=protocol.Factory.forProtocol(GreetingServer)
  1535. )
  1536. return connectedServerAndClient(
  1537. lambda: serverFactory.buildProtocol(None),
  1538. lambda: clientFactory.buildProtocol(None),
  1539. )
  1540. def test_invalidHostname(self):
  1541. """
  1542. When a certificate containing an invalid hostname is received from the
  1543. server, the connection is immediately dropped.
  1544. """
  1545. cProto, sProto, pump = self.serviceIdentitySetup(
  1546. u"wrong-host.example.com",
  1547. u"correct-host.example.com",
  1548. )
  1549. self.assertEqual(cProto.wrappedProtocol.data, b'')
  1550. self.assertEqual(sProto.wrappedProtocol.data, b'')
  1551. cErr = cProto.wrappedProtocol.lostReason.value
  1552. sErr = sProto.wrappedProtocol.lostReason.value
  1553. self.assertIsInstance(cErr, VerificationError)
  1554. self.assertIsInstance(sErr, ConnectionClosed)
  1555. def test_validHostname(self):
  1556. """
  1557. Whenever a valid certificate containing a valid hostname is received,
  1558. connection proceeds normally.
  1559. """
  1560. cProto, sProto, pump = self.serviceIdentitySetup(
  1561. u"valid.example.com",
  1562. u"valid.example.com",
  1563. )
  1564. self.assertEqual(cProto.wrappedProtocol.data,
  1565. b'greetings!')
  1566. cErr = cProto.wrappedProtocol.lostReason
  1567. sErr = sProto.wrappedProtocol.lostReason
  1568. self.assertIsNone(cErr)
  1569. self.assertIsNone(sErr)
  1570. def test_validHostnameInvalidCertificate(self):
  1571. """
  1572. When an invalid certificate containing a perfectly valid hostname is
  1573. received, the connection is aborted with an OpenSSL error.
  1574. """
  1575. cProto, sProto, pump = self.serviceIdentitySetup(
  1576. u"valid.example.com",
  1577. u"valid.example.com",
  1578. validCertificate=False,
  1579. )
  1580. self.assertEqual(cProto.wrappedProtocol.data, b'')
  1581. self.assertEqual(sProto.wrappedProtocol.data, b'')
  1582. cErr = cProto.wrappedProtocol.lostReason.value
  1583. sErr = sProto.wrappedProtocol.lostReason.value
  1584. self.assertIsInstance(cErr, SSL.Error)
  1585. self.assertIsInstance(sErr, SSL.Error)
  1586. def test_realCAsBetterNotSignOurBogusTestCerts(self):
  1587. """
  1588. If we use the default trust from the platform, our dinky certificate
  1589. should I{really} fail.
  1590. """
  1591. cProto, sProto, pump = self.serviceIdentitySetup(
  1592. u"valid.example.com",
  1593. u"valid.example.com",
  1594. validCertificate=False,
  1595. useDefaultTrust=True,
  1596. )
  1597. self.assertEqual(cProto.wrappedProtocol.data, b'')
  1598. self.assertEqual(sProto.wrappedProtocol.data, b'')
  1599. cErr = cProto.wrappedProtocol.lostReason.value
  1600. sErr = sProto.wrappedProtocol.lostReason.value
  1601. self.assertIsInstance(cErr, SSL.Error)
  1602. self.assertIsInstance(sErr, SSL.Error)
  1603. def test_butIfTheyDidItWouldWork(self):
  1604. """
  1605. L{ssl.optionsForClientTLS} should be using L{ssl.platformTrust} by
  1606. default, so if we fake that out then it should trust ourselves again.
  1607. """
  1608. cProto, sProto, pump = self.serviceIdentitySetup(
  1609. u"valid.example.com",
  1610. u"valid.example.com",
  1611. useDefaultTrust=True,
  1612. fakePlatformTrust=True,
  1613. )
  1614. self.assertEqual(cProto.wrappedProtocol.data,
  1615. b'greetings!')
  1616. cErr = cProto.wrappedProtocol.lostReason
  1617. sErr = sProto.wrappedProtocol.lostReason
  1618. self.assertIsNone(cErr)
  1619. self.assertIsNone(sErr)
  1620. def test_clientPresentsCertificate(self):
  1621. """
  1622. When the server verifies and the client presents a valid certificate
  1623. for that verification by passing it to
  1624. L{sslverify.optionsForClientTLS}, communication proceeds.
  1625. """
  1626. cProto, sProto, pump = self.serviceIdentitySetup(
  1627. u"valid.example.com",
  1628. u"valid.example.com",
  1629. validCertificate=True,
  1630. serverVerifies=True,
  1631. clientPresentsCertificate=True,
  1632. )
  1633. self.assertEqual(cProto.wrappedProtocol.data,
  1634. b'greetings!')
  1635. cErr = cProto.wrappedProtocol.lostReason
  1636. sErr = sProto.wrappedProtocol.lostReason
  1637. self.assertIsNone(cErr)
  1638. self.assertIsNone(sErr)
  1639. def test_clientPresentsBadCertificate(self):
  1640. """
  1641. When the server verifies and the client presents an invalid certificate
  1642. for that verification by passing it to
  1643. L{sslverify.optionsForClientTLS}, the connection cannot be established
  1644. with an SSL error.
  1645. """
  1646. cProto, sProto, pump = self.serviceIdentitySetup(
  1647. u"valid.example.com",
  1648. u"valid.example.com",
  1649. validCertificate=True,
  1650. serverVerifies=True,
  1651. validClientCertificate=False,
  1652. clientPresentsCertificate=True,
  1653. )
  1654. self.assertEqual(cProto.wrappedProtocol.data,
  1655. b'')
  1656. cErr = cProto.wrappedProtocol.lostReason.value
  1657. sErr = sProto.wrappedProtocol.lostReason.value
  1658. self.assertIsInstance(cErr, SSL.Error)
  1659. self.assertIsInstance(sErr, SSL.Error)
  1660. def test_hostnameIsIndicated(self):
  1661. """
  1662. Specifying the C{hostname} argument to L{CertificateOptions} also sets
  1663. the U{Server Name Extension
  1664. <https://en.wikipedia.org/wiki/Server_Name_Indication>} TLS indication
  1665. field to the correct value.
  1666. """
  1667. names = []
  1668. def setupServerContext(ctx):
  1669. def servername_received(conn):
  1670. names.append(conn.get_servername().decode("ascii"))
  1671. ctx.set_tlsext_servername_callback(servername_received)
  1672. cProto, sProto, pump = self.serviceIdentitySetup(
  1673. u"valid.example.com",
  1674. u"valid.example.com",
  1675. setupServerContext
  1676. )
  1677. self.assertEqual(names, [u"valid.example.com"])
  1678. if skipSNI is not None:
  1679. test_hostnameIsIndicated.skip = skipSNI
  1680. def test_hostnameEncoding(self):
  1681. """
  1682. Hostnames are encoded as IDNA.
  1683. """
  1684. names = []
  1685. hello = u"h\N{LATIN SMALL LETTER A WITH ACUTE}llo.example.com"
  1686. def setupServerContext(ctx):
  1687. def servername_received(conn):
  1688. serverIDNA = _idnaText(conn.get_servername())
  1689. names.append(serverIDNA)
  1690. ctx.set_tlsext_servername_callback(servername_received)
  1691. cProto, sProto, pump = self.serviceIdentitySetup(
  1692. hello, hello, setupServerContext
  1693. )
  1694. self.assertEqual(names, [hello])
  1695. self.assertEqual(cProto.wrappedProtocol.data,
  1696. b'greetings!')
  1697. cErr = cProto.wrappedProtocol.lostReason
  1698. sErr = sProto.wrappedProtocol.lostReason
  1699. self.assertIsNone(cErr)
  1700. self.assertIsNone(sErr)
  1701. if skipSNI is not None:
  1702. test_hostnameEncoding.skip = skipSNI
  1703. def test_fallback(self):
  1704. """
  1705. L{sslverify.simpleVerifyHostname} checks string equality on the
  1706. commonName of a connection's certificate's subject, doing nothing if it
  1707. matches and raising L{VerificationError} if it doesn't.
  1708. """
  1709. name = 'something.example.com'
  1710. class Connection(object):
  1711. def get_peer_certificate(self):
  1712. """
  1713. Fake of L{OpenSSL.SSL.Connection.get_peer_certificate}.
  1714. @return: A certificate with a known common name.
  1715. @rtype: L{OpenSSL.crypto.X509}
  1716. """
  1717. cert = X509()
  1718. cert.get_subject().commonName = name
  1719. return cert
  1720. conn = Connection()
  1721. self.assertIs(
  1722. sslverify.simpleVerifyHostname(conn, u'something.example.com'),
  1723. None
  1724. )
  1725. self.assertRaises(
  1726. sslverify.SimpleVerificationError,
  1727. sslverify.simpleVerifyHostname, conn, u'nonsense'
  1728. )
  1729. def test_surpriseFromInfoCallback(self):
  1730. """
  1731. pyOpenSSL isn't always so great about reporting errors. If one occurs
  1732. in the verification info callback, it should be logged and the
  1733. connection should be shut down (if possible, anyway; the app_data could
  1734. be clobbered but there's no point testing for that).
  1735. """
  1736. cProto, sProto, pump = self.serviceIdentitySetup(
  1737. u"correct-host.example.com",
  1738. u"correct-host.example.com",
  1739. buggyInfoCallback=True,
  1740. )
  1741. self.assertEqual(cProto.wrappedProtocol.data, b'')
  1742. self.assertEqual(sProto.wrappedProtocol.data, b'')
  1743. cErr = cProto.wrappedProtocol.lostReason.value
  1744. sErr = sProto.wrappedProtocol.lostReason.value
  1745. self.assertIsInstance(cErr, ZeroDivisionError)
  1746. self.assertIsInstance(sErr, (ConnectionClosed, SSL.Error))
  1747. errors = self.flushLoggedErrors(ZeroDivisionError)
  1748. self.assertTrue(errors)
  1749. def negotiateProtocol(serverProtocols,
  1750. clientProtocols,
  1751. clientOptions=None):
  1752. """
  1753. Create the TLS connection and negotiate a next protocol.
  1754. @param serverProtocols: The protocols the server is willing to negotiate.
  1755. @param clientProtocols: The protocols the client is willing to negotiate.
  1756. @param clientOptions: The type of C{OpenSSLCertificateOptions} class to
  1757. use for the client. Defaults to C{OpenSSLCertificateOptions}.
  1758. @return: A L{tuple} of the negotiated protocol and the reason the
  1759. connection was lost.
  1760. """
  1761. caCertificate, serverCertificate = certificatesForAuthorityAndServer()
  1762. trustRoot = sslverify.OpenSSLCertificateAuthorities([
  1763. caCertificate.original,
  1764. ])
  1765. sProto, cProto, pump = loopbackTLSConnectionInMemory(
  1766. trustRoot=trustRoot,
  1767. privateKey=serverCertificate.privateKey.original,
  1768. serverCertificate=serverCertificate.original,
  1769. clientProtocols=clientProtocols,
  1770. serverProtocols=serverProtocols,
  1771. clientOptions=clientOptions,
  1772. )
  1773. pump.flush()
  1774. return (cProto.negotiatedProtocol, cProto.wrappedProtocol.lostReason)
  1775. class NPNOrALPNTests(unittest.TestCase):
  1776. """
  1777. NPN and ALPN protocol selection.
  1778. These tests only run on platforms that have a PyOpenSSL version >= 0.15,
  1779. and OpenSSL version 1.0.1 or later.
  1780. """
  1781. if skipSSL:
  1782. skip = skipSSL
  1783. elif skipNPN:
  1784. skip = skipNPN
  1785. def test_nextProtocolMechanismsNPNIsSupported(self):
  1786. """
  1787. When at least NPN is available on the platform, NPN is in the set of
  1788. supported negotiation protocols.
  1789. """
  1790. supportedProtocols = sslverify.protocolNegotiationMechanisms()
  1791. self.assertTrue(
  1792. sslverify.ProtocolNegotiationSupport.NPN in supportedProtocols
  1793. )
  1794. def test_NPNAndALPNSuccess(self):
  1795. """
  1796. When both ALPN and NPN are used, and both the client and server have
  1797. overlapping protocol choices, a protocol is successfully negotiated.
  1798. Further, the negotiated protocol is the first one in the list.
  1799. """
  1800. protocols = [b'h2', b'http/1.1']
  1801. negotiatedProtocol, lostReason = negotiateProtocol(
  1802. clientProtocols=protocols,
  1803. serverProtocols=protocols,
  1804. )
  1805. self.assertEqual(negotiatedProtocol, b'h2')
  1806. self.assertIsNone(lostReason)
  1807. def test_NPNAndALPNDifferent(self):
  1808. """
  1809. Client and server have different protocol lists: only the common
  1810. element is chosen.
  1811. """
  1812. serverProtocols = [b'h2', b'http/1.1', b'spdy/2']
  1813. clientProtocols = [b'spdy/3', b'http/1.1']
  1814. negotiatedProtocol, lostReason = negotiateProtocol(
  1815. clientProtocols=clientProtocols,
  1816. serverProtocols=serverProtocols,
  1817. )
  1818. self.assertEqual(negotiatedProtocol, b'http/1.1')
  1819. self.assertIsNone(lostReason)
  1820. def test_NPNAndALPNNoAdvertise(self):
  1821. """
  1822. When one peer does not advertise any protocols, the connection is set
  1823. up with no next protocol.
  1824. """
  1825. protocols = [b'h2', b'http/1.1']
  1826. negotiatedProtocol, lostReason = negotiateProtocol(
  1827. clientProtocols=protocols,
  1828. serverProtocols=[],
  1829. )
  1830. self.assertIsNone(negotiatedProtocol)
  1831. self.assertIsNone(lostReason)
  1832. def test_NPNAndALPNNoOverlap(self):
  1833. """
  1834. When the client and server have no overlap of protocols, the connection
  1835. fails.
  1836. """
  1837. clientProtocols = [b'h2', b'http/1.1']
  1838. serverProtocols = [b'spdy/3']
  1839. negotiatedProtocol, lostReason = negotiateProtocol(
  1840. serverProtocols=clientProtocols,
  1841. clientProtocols=serverProtocols,
  1842. )
  1843. self.assertIsNone(negotiatedProtocol)
  1844. self.assertEqual(lostReason.type, SSL.Error)
  1845. class ALPNTests(unittest.TestCase):
  1846. """
  1847. ALPN protocol selection.
  1848. These tests only run on platforms that have a PyOpenSSL version >= 0.15,
  1849. and OpenSSL version 1.0.2 or later.
  1850. This covers only the ALPN specific logic, as any platform that has ALPN
  1851. will also have NPN and so will run the NPNAndALPNTest suite as well.
  1852. """
  1853. if skipSSL:
  1854. skip = skipSSL
  1855. elif skipALPN:
  1856. skip = skipALPN
  1857. def test_nextProtocolMechanismsALPNIsSupported(self):
  1858. """
  1859. When ALPN is available on a platform, protocolNegotiationMechanisms
  1860. includes ALPN in the suported protocols.
  1861. """
  1862. supportedProtocols = sslverify.protocolNegotiationMechanisms()
  1863. self.assertTrue(
  1864. sslverify.ProtocolNegotiationSupport.ALPN in
  1865. supportedProtocols
  1866. )
  1867. class NPNAndALPNAbsentTests(unittest.TestCase):
  1868. """
  1869. NPN/ALPN operations fail on platforms that do not support them.
  1870. These tests only run on platforms that have a PyOpenSSL version < 0.15,
  1871. or an OpenSSL version earlier than 1.0.1
  1872. """
  1873. if skipSSL:
  1874. skip = skipSSL
  1875. elif not skipNPN:
  1876. skip = "NPN/ALPN is present on this platform"
  1877. def test_nextProtocolMechanismsNoNegotiationSupported(self):
  1878. """
  1879. When neither NPN or ALPN are available on a platform, there are no
  1880. supported negotiation protocols.
  1881. """
  1882. supportedProtocols = sslverify.protocolNegotiationMechanisms()
  1883. self.assertFalse(supportedProtocols)
  1884. def test_NPNAndALPNNotImplemented(self):
  1885. """
  1886. A NotImplementedError is raised when using acceptableProtocols on a
  1887. platform that does not support either NPN or ALPN.
  1888. """
  1889. protocols = [b'h2', b'http/1.1']
  1890. self.assertRaises(
  1891. NotImplementedError,
  1892. negotiateProtocol,
  1893. serverProtocols=protocols,
  1894. clientProtocols=protocols,
  1895. )
  1896. def test_NegotiatedProtocolReturnsNone(self):
  1897. """
  1898. negotiatedProtocol return L{None} even when NPN/ALPN aren't supported.
  1899. This works because, as neither are supported, negotiation isn't even
  1900. attempted.
  1901. """
  1902. serverProtocols = None
  1903. clientProtocols = None
  1904. negotiatedProtocol, lostReason = negotiateProtocol(
  1905. clientProtocols=clientProtocols,
  1906. serverProtocols=serverProtocols,
  1907. )
  1908. self.assertIsNone(negotiatedProtocol)
  1909. self.assertIsNone(lostReason)
  1910. class _NotSSLTransport:
  1911. def getHandle(self):
  1912. return self
  1913. class _MaybeSSLTransport:
  1914. def getHandle(self):
  1915. return self
  1916. def get_peer_certificate(self):
  1917. return None
  1918. def get_host_certificate(self):
  1919. return None
  1920. class _ActualSSLTransport:
  1921. def getHandle(self):
  1922. return self
  1923. def get_host_certificate(self):
  1924. return sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM).original
  1925. def get_peer_certificate(self):
  1926. return sslverify.Certificate.loadPEM(A_PEER_CERTIFICATE_PEM).original
  1927. class ConstructorsTests(unittest.TestCase):
  1928. if skipSSL:
  1929. skip = skipSSL
  1930. def test_peerFromNonSSLTransport(self):
  1931. """
  1932. Verify that peerFromTransport raises an exception if the transport
  1933. passed is not actually an SSL transport.
  1934. """
  1935. x = self.assertRaises(CertificateError,
  1936. sslverify.Certificate.peerFromTransport,
  1937. _NotSSLTransport())
  1938. self.assertTrue(str(x).startswith("non-TLS"))
  1939. def test_peerFromBlankSSLTransport(self):
  1940. """
  1941. Verify that peerFromTransport raises an exception if the transport
  1942. passed is an SSL transport, but doesn't have a peer certificate.
  1943. """
  1944. x = self.assertRaises(CertificateError,
  1945. sslverify.Certificate.peerFromTransport,
  1946. _MaybeSSLTransport())
  1947. self.assertTrue(str(x).startswith("TLS"))
  1948. def test_hostFromNonSSLTransport(self):
  1949. """
  1950. Verify that hostFromTransport raises an exception if the transport
  1951. passed is not actually an SSL transport.
  1952. """
  1953. x = self.assertRaises(CertificateError,
  1954. sslverify.Certificate.hostFromTransport,
  1955. _NotSSLTransport())
  1956. self.assertTrue(str(x).startswith("non-TLS"))
  1957. def test_hostFromBlankSSLTransport(self):
  1958. """
  1959. Verify that hostFromTransport raises an exception if the transport
  1960. passed is an SSL transport, but doesn't have a host certificate.
  1961. """
  1962. x = self.assertRaises(CertificateError,
  1963. sslverify.Certificate.hostFromTransport,
  1964. _MaybeSSLTransport())
  1965. self.assertTrue(str(x).startswith("TLS"))
  1966. def test_hostFromSSLTransport(self):
  1967. """
  1968. Verify that hostFromTransport successfully creates the correct
  1969. certificate if passed a valid SSL transport.
  1970. """
  1971. self.assertEqual(
  1972. sslverify.Certificate.hostFromTransport(
  1973. _ActualSSLTransport()).serialNumber(),
  1974. 12345)
  1975. def test_peerFromSSLTransport(self):
  1976. """
  1977. Verify that peerFromTransport successfully creates the correct
  1978. certificate if passed a valid SSL transport.
  1979. """
  1980. self.assertEqual(
  1981. sslverify.Certificate.peerFromTransport(
  1982. _ActualSSLTransport()).serialNumber(),
  1983. 12346)
  1984. class MultipleCertificateTrustRootTests(unittest.TestCase):
  1985. """
  1986. Test the behavior of the trustRootFromCertificates() API call.
  1987. """
  1988. if skipSSL:
  1989. skip = skipSSL
  1990. def test_trustRootFromCertificatesPrivatePublic(self):
  1991. """
  1992. L{trustRootFromCertificates} accepts either a L{sslverify.Certificate}
  1993. or a L{sslverify.PrivateCertificate} instance.
  1994. """
  1995. privateCert = sslverify.PrivateCertificate.loadPEM(A_KEYPAIR)
  1996. cert = sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM)
  1997. mt = sslverify.trustRootFromCertificates([privateCert, cert])
  1998. # Verify that the returned object acts correctly when used as a
  1999. # trustRoot= param to optionsForClientTLS.
  2000. sProto, cProto, pump = loopbackTLSConnectionInMemory(
  2001. trustRoot=mt,
  2002. privateKey=privateCert.privateKey.original,
  2003. serverCertificate=privateCert.original,
  2004. )
  2005. # This connection should succeed
  2006. self.assertEqual(cProto.wrappedProtocol.data, b'greetings!')
  2007. self.assertIsNone(cProto.wrappedProtocol.lostReason)
  2008. def test_trustRootSelfSignedServerCertificate(self):
  2009. """
  2010. L{trustRootFromCertificates} called with a single self-signed
  2011. certificate will cause L{optionsForClientTLS} to accept client
  2012. connections to a server with that certificate.
  2013. """
  2014. key, cert = makeCertificate(O=b"Server Test Certificate", CN=b"server")
  2015. selfSigned = sslverify.PrivateCertificate.fromCertificateAndKeyPair(
  2016. sslverify.Certificate(cert),
  2017. sslverify.KeyPair(key),
  2018. )
  2019. trust = sslverify.trustRootFromCertificates([selfSigned])
  2020. # Since we trust this exact certificate, connections to this server
  2021. # should succeed.
  2022. sProto, cProto, pump = loopbackTLSConnectionInMemory(
  2023. trustRoot=trust,
  2024. privateKey=selfSigned.privateKey.original,
  2025. serverCertificate=selfSigned.original,
  2026. )
  2027. self.assertEqual(cProto.wrappedProtocol.data, b'greetings!')
  2028. self.assertIsNone(cProto.wrappedProtocol.lostReason)
  2029. def test_trustRootCertificateAuthorityTrustsConnection(self):
  2030. """
  2031. L{trustRootFromCertificates} called with certificate A will cause
  2032. L{optionsForClientTLS} to accept client connections to a server with
  2033. certificate B where B is signed by A.
  2034. """
  2035. caCert, serverCert = certificatesForAuthorityAndServer()
  2036. trust = sslverify.trustRootFromCertificates([caCert])
  2037. # Since we've listed the CA's certificate as a trusted cert, a
  2038. # connection to the server certificate it signed should succeed.
  2039. sProto, cProto, pump = loopbackTLSConnectionInMemory(
  2040. trustRoot=trust,
  2041. privateKey=serverCert.privateKey.original,
  2042. serverCertificate=serverCert.original,
  2043. )
  2044. self.assertEqual(cProto.wrappedProtocol.data, b'greetings!')
  2045. self.assertIsNone(cProto.wrappedProtocol.lostReason)
  2046. def test_trustRootFromCertificatesUntrusted(self):
  2047. """
  2048. L{trustRootFromCertificates} called with certificate A will cause
  2049. L{optionsForClientTLS} to disallow any connections to a server with
  2050. certificate B where B is not signed by A.
  2051. """
  2052. key, cert = makeCertificate(O=b"Server Test Certificate", CN=b"server")
  2053. serverCert = sslverify.PrivateCertificate.fromCertificateAndKeyPair(
  2054. sslverify.Certificate(cert),
  2055. sslverify.KeyPair(key),
  2056. )
  2057. untrustedCert = sslverify.Certificate(
  2058. makeCertificate(O=b"CA Test Certificate", CN=b"unknown CA")[1]
  2059. )
  2060. trust = sslverify.trustRootFromCertificates([untrustedCert])
  2061. # Since we only trust 'untrustedCert' which has not signed our
  2062. # server's cert, we should reject this connection
  2063. sProto, cProto, pump = loopbackTLSConnectionInMemory(
  2064. trustRoot=trust,
  2065. privateKey=serverCert.privateKey.original,
  2066. serverCertificate=serverCert.original,
  2067. )
  2068. # This connection should fail, so no data was received.
  2069. self.assertEqual(cProto.wrappedProtocol.data, b'')
  2070. # It was an L{SSL.Error}.
  2071. self.assertEqual(cProto.wrappedProtocol.lostReason.type, SSL.Error)
  2072. # Some combination of OpenSSL and PyOpenSSL is bad at reporting errors.
  2073. err = cProto.wrappedProtocol.lostReason.value
  2074. self.assertEqual(err.args[0][0][2], 'tlsv1 alert unknown ca')
  2075. def test_trustRootFromCertificatesOpenSSLObjects(self):
  2076. """
  2077. L{trustRootFromCertificates} rejects any L{OpenSSL.crypto.X509}
  2078. instances in the list passed to it.
  2079. """
  2080. private = sslverify.PrivateCertificate.loadPEM(A_KEYPAIR)
  2081. certX509 = private.original
  2082. exception = self.assertRaises(
  2083. TypeError,
  2084. sslverify.trustRootFromCertificates, [certX509],
  2085. )
  2086. self.assertEqual(
  2087. "certificates items must be twisted.internet.ssl.CertBase "
  2088. "instances",
  2089. exception.args[0],
  2090. )
  2091. class OpenSSLCipherTests(unittest.TestCase):
  2092. """
  2093. Tests for twisted.internet._sslverify.OpenSSLCipher.
  2094. """
  2095. if skipSSL:
  2096. skip = skipSSL
  2097. cipherName = u'CIPHER-STRING'
  2098. def test_constructorSetsFullName(self):
  2099. """
  2100. The first argument passed to the constructor becomes the full name.
  2101. """
  2102. self.assertEqual(
  2103. self.cipherName,
  2104. sslverify.OpenSSLCipher(self.cipherName).fullName
  2105. )
  2106. def test_repr(self):
  2107. """
  2108. C{repr(cipher)} returns a valid constructor call.
  2109. """
  2110. cipher = sslverify.OpenSSLCipher(self.cipherName)
  2111. self.assertEqual(
  2112. cipher,
  2113. eval(repr(cipher), {'OpenSSLCipher': sslverify.OpenSSLCipher})
  2114. )
  2115. def test_eqSameClass(self):
  2116. """
  2117. Equal type and C{fullName} means that the objects are equal.
  2118. """
  2119. cipher1 = sslverify.OpenSSLCipher(self.cipherName)
  2120. cipher2 = sslverify.OpenSSLCipher(self.cipherName)
  2121. self.assertEqual(cipher1, cipher2)
  2122. def test_eqSameNameDifferentType(self):
  2123. """
  2124. If ciphers have the same name but different types, they're still
  2125. different.
  2126. """
  2127. class DifferentCipher(object):
  2128. fullName = self.cipherName
  2129. self.assertNotEqual(
  2130. sslverify.OpenSSLCipher(self.cipherName),
  2131. DifferentCipher(),
  2132. )
  2133. class ExpandCipherStringTests(unittest.TestCase):
  2134. """
  2135. Tests for twisted.internet._sslverify._expandCipherString.
  2136. """
  2137. if skipSSL:
  2138. skip = skipSSL
  2139. def test_doesNotStumbleOverEmptyList(self):
  2140. """
  2141. If the expanded cipher list is empty, an empty L{list} is returned.
  2142. """
  2143. self.assertEqual(
  2144. [],
  2145. sslverify._expandCipherString(u'', SSL.SSLv23_METHOD, 0)
  2146. )
  2147. def test_doesNotSwallowOtherSSLErrors(self):
  2148. """
  2149. Only no cipher matches get swallowed, every other SSL error gets
  2150. propagated.
  2151. """
  2152. def raiser(_):
  2153. # Unfortunately, there seems to be no way to trigger a real SSL
  2154. # error artificially.
  2155. raise SSL.Error([['', '', '']])
  2156. ctx = FakeContext(SSL.SSLv23_METHOD)
  2157. ctx.set_cipher_list = raiser
  2158. self.patch(sslverify.SSL, 'Context', lambda _: ctx)
  2159. self.assertRaises(
  2160. SSL.Error,
  2161. sslverify._expandCipherString, u'ALL', SSL.SSLv23_METHOD, 0
  2162. )
  2163. def test_returnsListOfICiphers(self):
  2164. """
  2165. L{sslverify._expandCipherString} always returns a L{list} of
  2166. L{interfaces.ICipher}.
  2167. """
  2168. ciphers = sslverify._expandCipherString(u'ALL', SSL.SSLv23_METHOD, 0)
  2169. self.assertIsInstance(ciphers, list)
  2170. bogus = []
  2171. for c in ciphers:
  2172. if not interfaces.ICipher.providedBy(c):
  2173. bogus.append(c)
  2174. self.assertEqual([], bogus)
  2175. class AcceptableCiphersTests(unittest.TestCase):
  2176. """
  2177. Tests for twisted.internet._sslverify.OpenSSLAcceptableCiphers.
  2178. """
  2179. if skipSSL:
  2180. skip = skipSSL
  2181. def test_selectOnEmptyListReturnsEmptyList(self):
  2182. """
  2183. If no ciphers are available, nothing can be selected.
  2184. """
  2185. ac = sslverify.OpenSSLAcceptableCiphers([])
  2186. self.assertEqual([], ac.selectCiphers([]))
  2187. def test_selectReturnsOnlyFromAvailable(self):
  2188. """
  2189. Select only returns a cross section of what is available and what is
  2190. desirable.
  2191. """
  2192. ac = sslverify.OpenSSLAcceptableCiphers([
  2193. sslverify.OpenSSLCipher('A'),
  2194. sslverify.OpenSSLCipher('B'),
  2195. ])
  2196. self.assertEqual([sslverify.OpenSSLCipher('B')],
  2197. ac.selectCiphers([sslverify.OpenSSLCipher('B'),
  2198. sslverify.OpenSSLCipher('C')]))
  2199. def test_fromOpenSSLCipherStringExpandsToListOfCiphers(self):
  2200. """
  2201. If L{sslverify.OpenSSLAcceptableCiphers.fromOpenSSLCipherString} is
  2202. called it expands the string to a list of ciphers.
  2203. """
  2204. ac = sslverify.OpenSSLAcceptableCiphers.fromOpenSSLCipherString('ALL')
  2205. self.assertIsInstance(ac._ciphers, list)
  2206. self.assertTrue(all(sslverify.ICipher.providedBy(c)
  2207. for c in ac._ciphers))
  2208. class DiffieHellmanParametersTests(unittest.TestCase):
  2209. """
  2210. Tests for twisted.internet._sslverify.OpenSSLDHParameters.
  2211. """
  2212. if skipSSL:
  2213. skip = skipSSL
  2214. filePath = FilePath(b'dh.params')
  2215. def test_fromFile(self):
  2216. """
  2217. Calling C{fromFile} with a filename returns an instance with that file
  2218. name saved.
  2219. """
  2220. params = sslverify.OpenSSLDiffieHellmanParameters.fromFile(
  2221. self.filePath
  2222. )
  2223. self.assertEqual(self.filePath, params._dhFile)
  2224. class FakeECKey(object):
  2225. """
  2226. An introspectable fake of a key.
  2227. @ivar _nid: A free form nid.
  2228. """
  2229. def __init__(self, nid):
  2230. self._nid = nid
  2231. class FakeNID(object):
  2232. """
  2233. An introspectable fake of a NID.
  2234. @ivar _snName: A free form sn name.
  2235. """
  2236. def __init__(self, snName):
  2237. self._snName = snName
  2238. class FakeLib(object):
  2239. """
  2240. An introspectable fake of cryptography's lib object.
  2241. @ivar _createdKey: A set of keys that have been created by this instance.
  2242. @type _createdKey: L{set} of L{FakeKey}
  2243. @cvar NID_undef: A symbolic constant for undefined NIDs.
  2244. @type NID_undef: L{FakeNID}
  2245. """
  2246. NID_undef = FakeNID("undef")
  2247. def __init__(self):
  2248. self._createdKeys = set()
  2249. def OBJ_sn2nid(self, snName):
  2250. """
  2251. Create a L{FakeNID} with C{snName} and return it.
  2252. @param snName: a free form name that gets passed to the constructor
  2253. of L{FakeNID}.
  2254. @return: a new L{FakeNID}.
  2255. @rtype: L{FakeNID}.
  2256. """
  2257. return FakeNID(snName)
  2258. def EC_KEY_new_by_curve_name(self, nid):
  2259. """
  2260. Create a L{FakeECKey}, save it to C{_createdKeys} and return it.
  2261. @param nid: an arbitrary object that is passed to the constructor of
  2262. L{FakeECKey}.
  2263. @return: a new L{FakeECKey}
  2264. @rtype: L{FakeECKey}
  2265. """
  2266. key = FakeECKey(nid)
  2267. self._createdKeys.add(key)
  2268. return key
  2269. def EC_KEY_free(self, key):
  2270. """
  2271. Remove C{key} from C{_createdKey}.
  2272. @param key: a key object to be freed; i.e. removed from
  2273. C{_createdKeys}.
  2274. @raises ValueError: If C{key} is not in C{_createdKeys} and thus not
  2275. created by us.
  2276. """
  2277. try:
  2278. self._createdKeys.remove(key)
  2279. except KeyError:
  2280. raise ValueError("Unallocated EC key attempted to free.")
  2281. def SSL_CTX_set_tmp_ecdh(self, ffiContext, key):
  2282. """
  2283. Does not do anything.
  2284. @param ffiContext: ignored
  2285. @param key: ignored
  2286. """
  2287. class FakeLibTests(unittest.TestCase):
  2288. """
  2289. Tests for FakeLib
  2290. """
  2291. def test_objSn2Nid(self):
  2292. """
  2293. Returns a L{FakeNID} with correct name.
  2294. """
  2295. nid = FakeNID("test")
  2296. self.assertEqual("test", nid._snName)
  2297. def test_emptyKeys(self):
  2298. """
  2299. A new L{FakeLib} has an empty set for created keys.
  2300. """
  2301. self.assertEqual(set(), FakeLib()._createdKeys)
  2302. def test_newKey(self):
  2303. """
  2304. If a new key is created, it's added to C{_createdKeys}.
  2305. """
  2306. lib = FakeLib()
  2307. key = lib.EC_KEY_new_by_curve_name(FakeNID("name"))
  2308. self.assertEqual(set([key]), lib._createdKeys)
  2309. def test_freeUnknownKey(self):
  2310. """
  2311. Raise L{ValueError} if an unknown key is attempted to be freed.
  2312. """
  2313. key = FakeECKey(object())
  2314. self.assertRaises(
  2315. ValueError,
  2316. FakeLib().EC_KEY_free, key
  2317. )
  2318. def test_freeKnownKey(self):
  2319. """
  2320. Freeing an allocated key removes it from C{_createdKeys}.
  2321. """
  2322. lib = FakeLib()
  2323. key = lib.EC_KEY_new_by_curve_name(FakeNID("name"))
  2324. lib.EC_KEY_free(key)
  2325. self.assertEqual(set(), lib._createdKeys)
  2326. class FakeFFI(object):
  2327. """
  2328. A fake of a cryptography's ffi object.
  2329. @cvar NULL: Symbolic constant for CFFI's NULL objects.
  2330. """
  2331. NULL = object()
  2332. class FakeBinding(object):
  2333. """
  2334. A fake of cryptography's binding object.
  2335. @type lib: L{FakeLib}
  2336. @type ffi: L{FakeFFI}
  2337. """
  2338. def __init__(self, lib=None, ffi=None):
  2339. self.lib = lib or FakeLib()
  2340. self.ffi = ffi or FakeFFI()
  2341. class ECCurveTests(unittest.TestCase):
  2342. """
  2343. Tests for twisted.internet._sslverify.OpenSSLECCurve.
  2344. """
  2345. if skipSSL:
  2346. skip = skipSSL
  2347. def test_missingBinding(self):
  2348. """
  2349. Raise L{NotImplementedError} if pyOpenSSL is not based on cryptography.
  2350. """
  2351. def raiser(self):
  2352. raise NotImplementedError
  2353. self.patch(sslverify._OpenSSLECCurve, "_getBinding", raiser)
  2354. self.assertRaises(
  2355. NotImplementedError,
  2356. sslverify._OpenSSLECCurve, sslverify._defaultCurveName,
  2357. )
  2358. def test_nonECbinding(self):
  2359. """
  2360. Raise L{NotImplementedError} if pyOpenSSL is based on cryptography but
  2361. cryptography lacks required EC methods.
  2362. """
  2363. def raiser(self):
  2364. raise AttributeError
  2365. lib = FakeLib()
  2366. lib.OBJ_sn2nid = raiser
  2367. self.patch(sslverify._OpenSSLECCurve,
  2368. "_getBinding",
  2369. lambda self: FakeBinding(lib=lib))
  2370. self.assertRaises(
  2371. NotImplementedError,
  2372. sslverify._OpenSSLECCurve, sslverify._defaultCurveName,
  2373. )
  2374. def test_wrongName(self):
  2375. """
  2376. Raise L{ValueError} on unknown sn names.
  2377. """
  2378. lib = FakeLib()
  2379. lib.OBJ_sn2nid = lambda self: FakeLib.NID_undef
  2380. self.patch(sslverify._OpenSSLECCurve,
  2381. "_getBinding",
  2382. lambda self: FakeBinding(lib=lib))
  2383. self.assertRaises(
  2384. ValueError,
  2385. sslverify._OpenSSLECCurve, u"doesNotExist",
  2386. )
  2387. def test_keyFails(self):
  2388. """
  2389. Raise L{EnvironmentError} if key creation fails.
  2390. """
  2391. lib = FakeLib()
  2392. lib.EC_KEY_new_by_curve_name = lambda *a, **kw: FakeFFI.NULL
  2393. self.patch(sslverify._OpenSSLECCurve,
  2394. "_getBinding",
  2395. lambda self: FakeBinding(lib=lib))
  2396. curve = sslverify._OpenSSLECCurve(sslverify._defaultCurveName)
  2397. self.assertRaises(
  2398. EnvironmentError,
  2399. curve.addECKeyToContext, object()
  2400. )
  2401. def test_keyGetsFreed(self):
  2402. """
  2403. Don't leak a key when adding it to a context.
  2404. """
  2405. lib = FakeLib()
  2406. self.patch(sslverify._OpenSSLECCurve,
  2407. "_getBinding",
  2408. lambda self: FakeBinding(lib=lib))
  2409. curve = sslverify._OpenSSLECCurve(sslverify._defaultCurveName)
  2410. ctx = FakeContext(None)
  2411. ctx._context = None
  2412. curve.addECKeyToContext(ctx)
  2413. self.assertEqual(set(), lib._createdKeys)
  2414. class KeyPairTests(unittest.TestCase):
  2415. """
  2416. Tests for L{sslverify.KeyPair}.
  2417. """
  2418. if skipSSL:
  2419. skip = skipSSL
  2420. def setUp(self):
  2421. """
  2422. Create test certificate.
  2423. """
  2424. self.sKey = makeCertificate(
  2425. O=b"Server Test Certificate",
  2426. CN=b"server")[0]
  2427. def test_getstateDeprecation(self):
  2428. """
  2429. L{sslverify.KeyPair.__getstate__} is deprecated.
  2430. """
  2431. self.callDeprecated(
  2432. (Version("Twisted", 15, 0, 0), "a real persistence system"),
  2433. sslverify.KeyPair(self.sKey).__getstate__)
  2434. def test_setstateDeprecation(self):
  2435. """
  2436. {sslverify.KeyPair.__setstate__} is deprecated.
  2437. """
  2438. state = sslverify.KeyPair(self.sKey).dump()
  2439. self.callDeprecated(
  2440. (Version("Twisted", 15, 0, 0), "a real persistence system"),
  2441. sslverify.KeyPair(self.sKey).__setstate__, state)
  2442. class SelectVerifyImplementationTests(unittest.SynchronousTestCase):
  2443. """
  2444. Tests for L{_selectVerifyImplementation}.
  2445. """
  2446. if skipSSL is not None:
  2447. skip = skipSSL
  2448. def test_dependencyMissing(self):
  2449. """
  2450. If I{service_identity} cannot be imported then
  2451. L{_selectVerifyImplementation} returns L{simpleVerifyHostname} and
  2452. L{SimpleVerificationError}.
  2453. """
  2454. with SetAsideModule("service_identity"):
  2455. sys.modules["service_identity"] = None
  2456. result = sslverify._selectVerifyImplementation()
  2457. expected = (
  2458. sslverify.simpleVerifyHostname,
  2459. sslverify.SimpleVerificationError)
  2460. self.assertEqual(expected, result)
  2461. test_dependencyMissing.suppress = [
  2462. util.suppress(
  2463. message=(
  2464. "You do not have a working installation of the "
  2465. "service_identity module"),
  2466. ),
  2467. ]
  2468. def test_dependencyMissingWarning(self):
  2469. """
  2470. If I{service_identity} cannot be imported then
  2471. L{_selectVerifyImplementation} emits a L{UserWarning} advising the user
  2472. of the exact error.
  2473. """
  2474. with SetAsideModule("service_identity"):
  2475. sys.modules["service_identity"] = None
  2476. sslverify._selectVerifyImplementation()
  2477. [warning] = list(
  2478. warning
  2479. for warning
  2480. in self.flushWarnings()
  2481. if warning["category"] == UserWarning)
  2482. if _PY3:
  2483. importError = (
  2484. "'import of 'service_identity' halted; None in sys.modules'")
  2485. else:
  2486. importError = "'No module named service_identity'"
  2487. expectedMessage = (
  2488. "You do not have a working installation of the "
  2489. "service_identity module: {message}. Please install it from "
  2490. "<https://pypi.python.org/pypi/service_identity> "
  2491. "and make sure all of its dependencies are satisfied. "
  2492. "Without the service_identity module, Twisted can perform only "
  2493. "rudimentary TLS client hostname verification. Many valid "
  2494. "certificate/hostname mappings may be rejected.").format(
  2495. message=importError)
  2496. self.assertEqual(
  2497. (warning["message"], warning["filename"], warning["lineno"]),
  2498. # Make sure we're abusing the warning system to a sufficient
  2499. # degree: there is no filename or line number that makes sense for
  2500. # this warning to "blame" for the problem. It is a system
  2501. # misconfiguration. So the location information should be blank
  2502. # (or as blank as we can make it).
  2503. (expectedMessage, "", 0))