1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- import logging
- from OpenSSL import SSL
- from service_identity.exceptions import CertificateError
- from twisted.internet._sslverify import ClientTLSOptions, verifyHostname, VerificationError
- from twisted.internet.ssl import AcceptableCiphers
- from scrapy import twisted_version
- from scrapy.utils.ssl import x509name_to_string, get_temp_key_info
- logger = logging.getLogger(__name__)
- METHOD_SSLv3 = 'SSLv3'
- METHOD_TLS = 'TLS'
- METHOD_TLSv10 = 'TLSv1.0'
- METHOD_TLSv11 = 'TLSv1.1'
- METHOD_TLSv12 = 'TLSv1.2'
- openssl_methods = {
- METHOD_TLS: SSL.SSLv23_METHOD, # protocol negotiation (recommended)
- METHOD_SSLv3: SSL.SSLv3_METHOD, # SSL 3 (NOT recommended)
- METHOD_TLSv10: SSL.TLSv1_METHOD, # TLS 1.0 only
- METHOD_TLSv11: getattr(SSL, 'TLSv1_1_METHOD', 5), # TLS 1.1 only
- METHOD_TLSv12: getattr(SSL, 'TLSv1_2_METHOD', 6), # TLS 1.2 only
- }
- if twisted_version < (17, 0, 0):
- from twisted.internet._sslverify import _maybeSetHostNameIndication as set_tlsext_host_name
- else:
- def set_tlsext_host_name(connection, hostNameBytes):
- connection.set_tlsext_host_name(hostNameBytes)
- class ScrapyClientTLSOptions(ClientTLSOptions):
- """
- SSL Client connection creator ignoring certificate verification errors
- (for genuinely invalid certificates or bugs in verification code).
- Same as Twisted's private _sslverify.ClientTLSOptions,
- except that VerificationError, CertificateError and ValueError
- exceptions are caught, so that the connection is not closed, only
- logging warnings. Also, HTTPS connection parameters logging is added.
- """
- def __init__(self, hostname, ctx, verbose_logging=False):
- super(ScrapyClientTLSOptions, self).__init__(hostname, ctx)
- self.verbose_logging = verbose_logging
- def _identityVerifyingInfoCallback(self, connection, where, ret):
- if where & SSL.SSL_CB_HANDSHAKE_START:
- set_tlsext_host_name(connection, self._hostnameBytes)
- elif where & SSL.SSL_CB_HANDSHAKE_DONE:
- if self.verbose_logging:
- if hasattr(connection, 'get_cipher_name'): # requires pyOPenSSL 0.15
- if hasattr(connection, 'get_protocol_version_name'): # requires pyOPenSSL 16.0.0
- logger.debug('SSL connection to %s using protocol %s, cipher %s',
- self._hostnameASCII,
- connection.get_protocol_version_name(),
- connection.get_cipher_name(),
- )
- else:
- logger.debug('SSL connection to %s using cipher %s',
- self._hostnameASCII,
- connection.get_cipher_name(),
- )
- server_cert = connection.get_peer_certificate()
- logger.debug('SSL connection certificate: issuer "%s", subject "%s"',
- x509name_to_string(server_cert.get_issuer()),
- x509name_to_string(server_cert.get_subject()),
- )
- key_info = get_temp_key_info(connection._ssl)
- if key_info:
- logger.debug('SSL temp key: %s', key_info)
- try:
- verifyHostname(connection, self._hostnameASCII)
- except (CertificateError, VerificationError) as e:
- logger.warning(
- 'Remote certificate is not valid for hostname "{}"; {}'.format(
- self._hostnameASCII, e))
- except ValueError as e:
- logger.warning(
- 'Ignoring error while verifying certificate '
- 'from host "{}" (exception: {})'.format(
- self._hostnameASCII, repr(e)))
- DEFAULT_CIPHERS = AcceptableCiphers.fromOpenSSLCipherString('DEFAULT')
|