import logging from OpenSSL import SSL from service_identity.exceptions import CertificateError from twisted.internet._sslverify import ClientTLSOptions, verifyHostname, VerificationError from twisted.internet.ssl import AcceptableCiphers from scrapy import twisted_version from scrapy.utils.ssl import x509name_to_string, get_temp_key_info logger = logging.getLogger(__name__) METHOD_SSLv3 = 'SSLv3' METHOD_TLS = 'TLS' METHOD_TLSv10 = 'TLSv1.0' METHOD_TLSv11 = 'TLSv1.1' METHOD_TLSv12 = 'TLSv1.2' openssl_methods = { METHOD_TLS: SSL.SSLv23_METHOD, # protocol negotiation (recommended) METHOD_SSLv3: SSL.SSLv3_METHOD, # SSL 3 (NOT recommended) METHOD_TLSv10: SSL.TLSv1_METHOD, # TLS 1.0 only METHOD_TLSv11: getattr(SSL, 'TLSv1_1_METHOD', 5), # TLS 1.1 only METHOD_TLSv12: getattr(SSL, 'TLSv1_2_METHOD', 6), # TLS 1.2 only } if twisted_version < (17, 0, 0): from twisted.internet._sslverify import _maybeSetHostNameIndication as set_tlsext_host_name else: def set_tlsext_host_name(connection, hostNameBytes): connection.set_tlsext_host_name(hostNameBytes) class ScrapyClientTLSOptions(ClientTLSOptions): """ SSL Client connection creator ignoring certificate verification errors (for genuinely invalid certificates or bugs in verification code). Same as Twisted's private _sslverify.ClientTLSOptions, except that VerificationError, CertificateError and ValueError exceptions are caught, so that the connection is not closed, only logging warnings. Also, HTTPS connection parameters logging is added. """ def __init__(self, hostname, ctx, verbose_logging=False): super(ScrapyClientTLSOptions, self).__init__(hostname, ctx) self.verbose_logging = verbose_logging def _identityVerifyingInfoCallback(self, connection, where, ret): if where & SSL.SSL_CB_HANDSHAKE_START: set_tlsext_host_name(connection, self._hostnameBytes) elif where & SSL.SSL_CB_HANDSHAKE_DONE: if self.verbose_logging: if hasattr(connection, 'get_cipher_name'): # requires pyOPenSSL 0.15 if hasattr(connection, 'get_protocol_version_name'): # requires pyOPenSSL 16.0.0 logger.debug('SSL connection to %s using protocol %s, cipher %s', self._hostnameASCII, connection.get_protocol_version_name(), connection.get_cipher_name(), ) else: logger.debug('SSL connection to %s using cipher %s', self._hostnameASCII, connection.get_cipher_name(), ) server_cert = connection.get_peer_certificate() logger.debug('SSL connection certificate: issuer "%s", subject "%s"', x509name_to_string(server_cert.get_issuer()), x509name_to_string(server_cert.get_subject()), ) key_info = get_temp_key_info(connection._ssl) if key_info: logger.debug('SSL temp key: %s', key_info) try: verifyHostname(connection, self._hostnameASCII) except (CertificateError, VerificationError) as e: logger.warning( 'Remote certificate is not valid for hostname "{}"; {}'.format( self._hostnameASCII, e)) except ValueError as e: logger.warning( 'Ignoring error while verifying certificate ' 'from host "{}" (exception: {})'.format( self._hostnameASCII, repr(e))) DEFAULT_CIPHERS = AcceptableCiphers.fromOpenSSLCipherString('DEFAULT')