123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478 |
- import logging
- import re
- from collections import namedtuple
- from datetime import time
- import six
- from six.moves.urllib.parse import (ParseResult, quote, urlparse,
- urlunparse)
- logger = logging.getLogger(__name__)
- _Rule = namedtuple('Rule', ['field', 'value'])
- RequestRate = namedtuple(
- 'RequestRate', ['requests', 'seconds', 'start_time', 'end_time'])
- _DISALLOW_DIRECTIVE = {'disallow', 'dissallow', 'dissalow', 'disalow', 'diasllow', 'disallaw'}
- _ALLOW_DIRECTIVE = {'allow'}
- _USER_AGENT_DIRECTIVE = {'user-agent', 'useragent', 'user agent'}
- _SITEMAP_DIRECTIVE = {'sitemap', 'sitemaps', 'site-map'}
- _CRAWL_DELAY_DIRECTIVE = {'crawl-delay', 'crawl delay'}
- _REQUEST_RATE_DIRECTIVE = {'request-rate', 'request rate'}
- _HOST_DIRECTIVE = {'host'}
- _WILDCARDS = {'*', '$'}
- _HEX_DIGITS = set('0123456789ABCDEFabcdef')
- __all__ = ['RequestRate', 'Protego']
- def _is_valid_directive_field(field):
- return any([field in _DISALLOW_DIRECTIVE,
- field in _ALLOW_DIRECTIVE,
- field in _USER_AGENT_DIRECTIVE,
- field in _SITEMAP_DIRECTIVE,
- field in _CRAWL_DELAY_DIRECTIVE,
- field in _REQUEST_RATE_DIRECTIVE,
- field in _HOST_DIRECTIVE])
- def _enforce_path(pattern):
- if pattern.startswith('/'):
- return pattern
- return '/' + pattern
- class _URLPattern(object):
- """Internal class which represents a URL pattern."""
- def __init__(self, pattern):
- self._pattern = pattern
- self.priority = len(pattern)
- self._contains_asterisk = '*' in self._pattern
- self._contains_dollar = self._pattern.endswith('$')
- if self._contains_asterisk:
- self._pattern_before_asterisk = self._pattern[:self._pattern.find('*')]
- elif self._contains_dollar:
- self._pattern_before_dollar = self._pattern[:-1]
- self._pattern_compiled = False
- def match(self, url):
- """Retun True if pattern matches the given URL, otherwise return False."""
- # check if pattern is already compiled
- if self._pattern_compiled:
- return self._pattern.match(url)
- if not self._contains_asterisk:
- if not self._contains_dollar:
- # answer directly for patterns without wildcards
- return url.startswith(self._pattern)
- # pattern only contains $ wildcard.
- return url == self._pattern_before_dollar
- if not url.startswith(self._pattern_before_asterisk):
- return False
- self._pattern = self._prepare_pattern_for_regex(self._pattern)
- self._pattern = re.compile(self._pattern)
- self._pattern_compiled = True
- return self._pattern.match(url)
- def _prepare_pattern_for_regex(self, pattern):
- """Return equivalent regex pattern for the given URL pattern."""
- pattern = re.sub(r'\*+', '*', pattern)
- s = re.split(r'(\*|\$$)', pattern)
- for index, substr in enumerate(s):
- if substr not in _WILDCARDS:
- s[index] = re.escape(substr)
- elif s[index] == '*':
- s[index] = '.*?'
- pattern = ''.join(s)
- return pattern
- class _RuleSet(object):
- """Internal class which stores rules for a user agent."""
- def __init__(self, parser_instance):
- self.user_agent = None
- self._rules = []
- self._crawl_delay = None
- self._req_rate = None
- self._parser_instance = parser_instance
- def applies_to(self, robotname):
- """Return matching score."""
- robotname = robotname.strip().lower()
- if self.user_agent == '*':
- return 1
- if self.user_agent in robotname:
- return len(self.user_agent)
- return 0
- def _unquote(self, url, ignore='', errors='replace'):
- """Replace %xy escapes by their single-character equivalent."""
- if '%' not in url:
- return url
- def hex_to_byte(h):
- """Replaces a %xx escape with equivalent binary sequence."""
- if six.PY2:
- return chr(int(h, 16))
- return bytes.fromhex(h)
- # ignore contains %xy escapes for characters that are not
- # meant to be converted back.
- ignore = {'{:02X}'.format(ord(c)) for c in ignore}
- parts = url.split('%')
- parts[0] = parts[0].encode('utf-8')
- for i in range(1, len(parts)):
- if len(parts[i]) >= 2:
- # %xy is a valid escape only if x and y are hexadecimal digits.
- if set(parts[i][:2]).issubset(_HEX_DIGITS):
- # make sure that all %xy escapes are in uppercase.
- hexcode = parts[i][:2].upper()
- leftover = parts[i][2:]
- if hexcode not in ignore:
- parts[i] = hex_to_byte(hexcode) + leftover.encode('utf-8')
- continue
- else:
- parts[i] = hexcode + leftover
- # add back the '%' we removed during splitting.
- parts[i] = b'%' + parts[i].encode('utf-8')
- return b''.join(parts).decode('utf-8', errors)
- def hexescape(self, char):
- """Escape char as RFC 2396 specifies"""
- hex_repr = hex(ord(char))[2:].upper()
- if len(hex_repr) == 1:
- hex_repr = "0%s" % hex_repr
- return "%" + hex_repr
- def _quote_path(self, path):
- """Return percent encoded path."""
- parts = urlparse(path)
- path = self._unquote(parts.path, ignore='/%')
- # quote do not work with unicode strings in Python 2.7
- if six.PY2:
- path = quote(path.encode('utf-8'), safe='/%')
- else:
- path = quote(path, safe='/%')
- parts = ParseResult('', '', path, parts.params, parts.query, parts.fragment)
- path = urlunparse(parts)
- return path
- def _quote_pattern(self, pattern):
- # Corner case for query only (e.g. '/abc?') and param only (e.g. '/abc;') URLs.
- # Save the last character otherwise, urlparse will kill it.
- last_char = ''
- if pattern[-1] == '?' or pattern[-1] == ';' or pattern[-1] == '$':
- last_char = pattern[-1]
- pattern = pattern[:-1]
- parts = urlparse(pattern)
- pattern = self._unquote(parts.path, ignore='/*$%')
- # quote do not work with unicode strings in Python 2.7
- if six.PY2:
- pattern = quote(pattern.encode('utf-8'), safe='/*%')
- else:
- pattern = quote(pattern, safe='/*%')
- parts = ParseResult('', '', pattern + last_char, parts.params, parts.query, parts.fragment)
- pattern = urlunparse(parts)
- return pattern
- def allow(self, pattern):
- if '$' in pattern:
- self.allow(pattern.replace('$', self.hexescape('$')))
- pattern = self._quote_pattern(pattern)
- if not pattern:
- return
- self._rules.append(_Rule(field='allow', value=_URLPattern(pattern)))
- # If index.html is allowed, we interpret this as / being allowed too.
- if pattern.endswith('/index.html'):
- self.allow(pattern[:-10] + '$')
- def disallow(self, pattern):
- if '$' in pattern:
- self.disallow(pattern.replace('$', self.hexescape('$')))
- pattern = self._quote_pattern(pattern)
- if not pattern:
- return
- self._rules.append(_Rule(field='disallow', value=_URLPattern(pattern)))
- def finalize_rules(self):
- self._rules.sort(key=lambda r: (r.value.priority, r.field == 'allow'), reverse=True)
- def can_fetch(self, url):
- """Return if the url can be fetched."""
- url = self._quote_path(url)
- allowed = True
- for rule in self._rules:
- if rule.value.match(url):
- if rule.field == 'disallow':
- allowed = False
- break
- return allowed
- @property
- def crawl_delay(self):
- """Get & set crawl delay for the rule set."""
- return self._crawl_delay
- @crawl_delay.setter
- def crawl_delay(self, delay):
- try:
- delay = float(delay)
- except ValueError:
- # Value is malformed, do nothing.
- logger.debug("Malformed rule at line {} : cannot set crawl delay to '{}'. "
- "Ignoring this rule.".format(self._parser_instance._total_line_seen, delay))
- return
- self._crawl_delay = delay
- @property
- def request_rate(self):
- """Get & set request rate for the rule set."""
- return self._req_rate
- @request_rate.setter
- def request_rate(self, value):
- try:
- parts = value.split()
- if len(parts) == 2:
- rate, time_period = parts
- else:
- rate, time_period = parts[0], ''
- requests, seconds = rate.split('/')
- time_unit = seconds[-1].lower()
- requests, seconds = int(requests), int(seconds[:-1])
- if time_unit == 'm':
- seconds *= 60
- elif time_unit == 'h':
- seconds *= 3600
- elif time_unit == 'd':
- seconds *= 86400
- start_time = None
- end_time = None
- if time_period:
- start_time, end_time = time_period.split('-')
- start_time = time(int(start_time[:2]), int(start_time[-2:]))
- end_time = time(int(end_time[:2]), int(end_time[-2:]))
- except Exception:
- # Value is malformed, do nothing.
- logger.debug("Malformed rule at line {} : cannot set request rate using '{}'. "
- "Ignoring this rule.".format(self._parser_instance._total_line_seen, value))
- return
- self._req_rate = RequestRate(requests, seconds, start_time, end_time)
- class Protego(object):
- def __init__(self):
- # A dict mapping user agents (specified in robots.txt) to rule sets.
- self._user_agents = {}
- # Preferred host specified in the robots.txt
- self._host = None
- # A list of sitemaps specified in the robots.txt
- self._sitemap_list = []
- # A memoization table mapping user agents (used in queries) to matched rule sets.
- self._matched_rule_set = {}
- self._total_line_seen = 0
- self._invalid_directive_seen = 0
- self._total_directive_seen = 0
- @classmethod
- def parse(cls, content):
- o = cls()
- o._parse_robotstxt(content)
- return o
- def _parse_robotstxt(self, content):
- lines = content.splitlines()
- # A list containing rule sets corresponding to user
- # agents of the current record group.
- current_rule_sets = []
- # Last encountered rule irrespective of whether it was valid or not.
- previous_rule_field = None
- for line in lines:
- self._total_line_seen += 1
- # Remove the comment portion of the line
- hash_pos = line.find('#')
- if hash_pos != -1:
- line = line[0: hash_pos].strip()
- # Whitespace at the beginning and at the end of the line is ignored.
- line = line.strip()
- if not line:
- continue
- # Format for a valid robots.txt rule is "<field>:<value>"
- if line.find(':') != -1:
- field, value = line.split(':', 1)
- else:
- # We will be generous here and give it a second chance.
- parts = line.split(' ')
- if len(parts) < 2:
- continue
- possible_filed = parts[0]
- for i in range(1, len(parts)):
- if _is_valid_directive_field(possible_filed):
- field, value = possible_filed, ' '.join(parts[i:])
- break
- possible_filed += ' ' + parts[i]
- else:
- continue
- field = field.strip().lower()
- value = value.strip()
- # Ignore rules with no value part (e.g. "Disallow: ", "Allow: ").
- if not value:
- previous_rule_field = field
- continue
- # Ignore rules without a corresponding user agent.
- if not current_rule_sets and field not in _USER_AGENT_DIRECTIVE:
- logger.debug("Rule at line {} without any user agent to enforce it on.".format(self._total_line_seen))
- continue
- self._total_directive_seen += 1
- if field in _USER_AGENT_DIRECTIVE:
- if previous_rule_field and previous_rule_field not in _USER_AGENT_DIRECTIVE:
- current_rule_sets = []
- # Wildcards are not supported in the user agent values.
- # We will be generous here and remove all the wildcards.
- user_agent = value.strip().lower()
- user_agent_without_asterisk = None
- if user_agent != '*' and '*' in user_agent:
- user_agent_without_asterisk = user_agent.replace('*', '')
- for user_agent in [user_agent, user_agent_without_asterisk]:
- if not user_agent:
- continue
- # See if this user agent is encountered before, if so merge these rules into it.
- rule_set = self._user_agents.get(user_agent, None)
- if rule_set and rule_set not in current_rule_sets:
- current_rule_sets.append(rule_set)
- if not rule_set:
- rule_set = _RuleSet(self)
- rule_set.user_agent = user_agent
- self._user_agents[user_agent] = rule_set
- current_rule_sets.append(rule_set)
- elif field in _ALLOW_DIRECTIVE:
- for rule_set in current_rule_sets:
- rule_set.allow(_enforce_path(value))
- elif field in _DISALLOW_DIRECTIVE:
- for rule_set in current_rule_sets:
- rule_set.disallow(_enforce_path(value))
- elif field in _SITEMAP_DIRECTIVE:
- self._sitemap_list.append(value)
- elif field in _CRAWL_DELAY_DIRECTIVE:
- for rule_set in current_rule_sets:
- rule_set.crawl_delay = value
- elif field in _REQUEST_RATE_DIRECTIVE:
- for rule_set in current_rule_sets:
- rule_set.request_rate = value
- elif field in _HOST_DIRECTIVE:
- self._host = value
- else:
- self._invalid_directive_seen += 1
- previous_rule_field = field
- for user_agent in self._user_agents.values():
- user_agent.finalize_rules()
- def _get_matching_rule_set(self, user_agent):
- """Return the rule set with highest matching score."""
- if not self._user_agents:
- return None
- if user_agent in self._matched_rule_set:
- return self._matched_rule_set[user_agent]
- score_rule_set_pairs = ((rs.applies_to(user_agent), rs) for rs in self._user_agents.values())
- match_score, matched_rule_set = max(score_rule_set_pairs, key=lambda p: p[0])
- if not match_score:
- self._matched_rule_set[user_agent] = None
- return None
- self._matched_rule_set[user_agent] = matched_rule_set
- return matched_rule_set
- def can_fetch(self, url, user_agent):
- """Return True if the user agent can fetch the URL, otherwise return False."""
- matched_rule_set = self._get_matching_rule_set(user_agent)
- if not matched_rule_set:
- return True
- return matched_rule_set.can_fetch(url)
- def crawl_delay(self, user_agent):
- """Return the crawl delay specified for the user agent as a float.
- If nothing is specified, return None.
- """
- matched_rule_set = self._get_matching_rule_set(user_agent)
- if not matched_rule_set:
- return None
- return matched_rule_set.crawl_delay
- def request_rate(self, user_agent):
- """Return the request rate specified for the user agent as a named tuple
- RequestRate(requests, seconds, start_time, end_time). If nothing is
- specified, return None.
- """
- matched_rule_set = self._get_matching_rule_set(user_agent)
- if not matched_rule_set:
- return None
- return matched_rule_set.request_rate
- @property
- def sitemaps(self):
- """Get an iterator containing links to sitemaps specified."""
- return iter(self._sitemap_list)
- @property
- def preferred_host(self):
- """Get the preferred host."""
- return self._host
- @property
- def _valid_directive_seen(self):
- return self._total_directive_seen - self._invalid_directive_seen
|