123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300 |
- import time
- from .exceptions import EOF, TIMEOUT
- class Expecter(object):
- def __init__(self, spawn, searcher, searchwindowsize=-1):
- self.spawn = spawn
- self.searcher = searcher
- if searchwindowsize == -1:
- searchwindowsize = spawn.searchwindowsize
- self.searchwindowsize = searchwindowsize
-
- def new_data(self, data):
- spawn = self.spawn
- searcher = self.searcher
- incoming = spawn.buffer + data
- freshlen = len(data)
- index = searcher.search(incoming, freshlen, self.searchwindowsize)
- if index >= 0:
- spawn.buffer = incoming[searcher.end:]
- spawn.before = incoming[: searcher.start]
- spawn.after = incoming[searcher.start: searcher.end]
- spawn.match = searcher.match
- spawn.match_index = index
- # Found a match
- return index
-
- spawn.buffer = incoming
-
- def eof(self, err=None):
- spawn = self.spawn
- from . import EOF
- spawn.before = spawn.buffer
- spawn.buffer = spawn.string_type()
- spawn.after = EOF
- index = self.searcher.eof_index
- if index >= 0:
- spawn.match = EOF
- spawn.match_index = index
- return index
- else:
- spawn.match = None
- spawn.match_index = None
- msg = str(spawn)
- msg += '\nsearcher: %s' % self.searcher
- if err is not None:
- msg = str(err) + '\n' + msg
- raise EOF(msg)
-
- def timeout(self, err=None):
- spawn = self.spawn
- from . import TIMEOUT
- spawn.before = spawn.buffer
- spawn.after = TIMEOUT
- index = self.searcher.timeout_index
- if index >= 0:
- spawn.match = TIMEOUT
- spawn.match_index = index
- return index
- else:
- spawn.match = None
- spawn.match_index = None
- msg = str(spawn)
- msg += '\nsearcher: %s' % self.searcher
- if err is not None:
- msg = str(err) + '\n' + msg
- raise TIMEOUT(msg)
- def errored(self):
- spawn = self.spawn
- spawn.before = spawn.buffer
- spawn.after = None
- spawn.match = None
- spawn.match_index = None
-
- def expect_loop(self, timeout=-1):
- """Blocking expect"""
- spawn = self.spawn
- from . import EOF, TIMEOUT
- if timeout is not None:
- end_time = time.time() + timeout
- try:
- incoming = spawn.buffer
- spawn.buffer = spawn.string_type() # Treat buffer as new data
- while True:
- idx = self.new_data(incoming)
- # Keep reading until exception or return.
- if idx is not None:
- return idx
- # No match at this point
- if (timeout is not None) and (timeout < 0):
- return self.timeout()
- # Still have time left, so read more data
- incoming = spawn.read_nonblocking(spawn.maxread, timeout)
- if self.spawn.delayafterread is not None:
- time.sleep(self.spawn.delayafterread)
- if timeout is not None:
- timeout = end_time - time.time()
- except EOF as e:
- return self.eof(e)
- except TIMEOUT as e:
- return self.timeout(e)
- except:
- self.errored()
- raise
- class searcher_string(object):
- '''This is a plain string search helper for the spawn.expect_any() method.
- This helper class is for speed. For more powerful regex patterns
- see the helper class, searcher_re.
- Attributes:
- eof_index - index of EOF, or -1
- timeout_index - index of TIMEOUT, or -1
- After a successful match by the search() method the following attributes
- are available:
- start - index into the buffer, first byte of match
- end - index into the buffer, first byte after match
- match - the matching string itself
- '''
- def __init__(self, strings):
- '''This creates an instance of searcher_string. This argument 'strings'
- may be a list; a sequence of strings; or the EOF or TIMEOUT types. '''
- self.eof_index = -1
- self.timeout_index = -1
- self._strings = []
- for n, s in enumerate(strings):
- if s is EOF:
- self.eof_index = n
- continue
- if s is TIMEOUT:
- self.timeout_index = n
- continue
- self._strings.append((n, s))
- def __str__(self):
- '''This returns a human-readable string that represents the state of
- the object.'''
- ss = [(ns[0], ' %d: "%s"' % ns) for ns in self._strings]
- ss.append((-1, 'searcher_string:'))
- if self.eof_index >= 0:
- ss.append((self.eof_index, ' %d: EOF' % self.eof_index))
- if self.timeout_index >= 0:
- ss.append((self.timeout_index,
- ' %d: TIMEOUT' % self.timeout_index))
- ss.sort()
- ss = list(zip(*ss))[1]
- return '\n'.join(ss)
- def search(self, buffer, freshlen, searchwindowsize=None):
- '''This searches 'buffer' for the first occurence of one of the search
- strings. 'freshlen' must indicate the number of bytes at the end of
- 'buffer' which have not been searched before. It helps to avoid
- searching the same, possibly big, buffer over and over again.
- See class spawn for the 'searchwindowsize' argument.
- If there is a match this returns the index of that string, and sets
- 'start', 'end' and 'match'. Otherwise, this returns -1. '''
- first_match = None
- # 'freshlen' helps a lot here. Further optimizations could
- # possibly include:
- #
- # using something like the Boyer-Moore Fast String Searching
- # Algorithm; pre-compiling the search through a list of
- # strings into something that can scan the input once to
- # search for all N strings; realize that if we search for
- # ['bar', 'baz'] and the input is '...foo' we need not bother
- # rescanning until we've read three more bytes.
- #
- # Sadly, I don't know enough about this interesting topic. /grahn
- for index, s in self._strings:
- if searchwindowsize is None:
- # the match, if any, can only be in the fresh data,
- # or at the very end of the old data
- offset = -(freshlen + len(s))
- else:
- # better obey searchwindowsize
- offset = -searchwindowsize
- n = buffer.find(s, offset)
- if n >= 0 and (first_match is None or n < first_match):
- first_match = n
- best_index, best_match = index, s
- if first_match is None:
- return -1
- self.match = best_match
- self.start = first_match
- self.end = self.start + len(self.match)
- return best_index
- class searcher_re(object):
- '''This is regular expression string search helper for the
- spawn.expect_any() method. This helper class is for powerful
- pattern matching. For speed, see the helper class, searcher_string.
- Attributes:
- eof_index - index of EOF, or -1
- timeout_index - index of TIMEOUT, or -1
- After a successful match by the search() method the following attributes
- are available:
- start - index into the buffer, first byte of match
- end - index into the buffer, first byte after match
- match - the re.match object returned by a succesful re.search
- '''
- def __init__(self, patterns):
- '''This creates an instance that searches for 'patterns' Where
- 'patterns' may be a list or other sequence of compiled regular
- expressions, or the EOF or TIMEOUT types.'''
- self.eof_index = -1
- self.timeout_index = -1
- self._searches = []
- for n, s in zip(list(range(len(patterns))), patterns):
- if s is EOF:
- self.eof_index = n
- continue
- if s is TIMEOUT:
- self.timeout_index = n
- continue
- self._searches.append((n, s))
- def __str__(self):
- '''This returns a human-readable string that represents the state of
- the object.'''
- #ss = [(n, ' %d: re.compile("%s")' %
- # (n, repr(s.pattern))) for n, s in self._searches]
- ss = list()
- for n, s in self._searches:
- try:
- ss.append((n, ' %d: re.compile("%s")' % (n, s.pattern)))
- except UnicodeEncodeError:
- # for test cases that display __str__ of searches, dont throw
- # another exception just because stdout is ascii-only, using
- # repr()
- ss.append((n, ' %d: re.compile(%r)' % (n, s.pattern)))
- ss.append((-1, 'searcher_re:'))
- if self.eof_index >= 0:
- ss.append((self.eof_index, ' %d: EOF' % self.eof_index))
- if self.timeout_index >= 0:
- ss.append((self.timeout_index, ' %d: TIMEOUT' %
- self.timeout_index))
- ss.sort()
- ss = list(zip(*ss))[1]
- return '\n'.join(ss)
- def search(self, buffer, freshlen, searchwindowsize=None):
- '''This searches 'buffer' for the first occurence of one of the regular
- expressions. 'freshlen' must indicate the number of bytes at the end of
- 'buffer' which have not been searched before.
- See class spawn for the 'searchwindowsize' argument.
- If there is a match this returns the index of that string, and sets
- 'start', 'end' and 'match'. Otherwise, returns -1.'''
- first_match = None
- # 'freshlen' doesn't help here -- we cannot predict the
- # length of a match, and the re module provides no help.
- if searchwindowsize is None:
- searchstart = 0
- else:
- searchstart = max(0, len(buffer) - searchwindowsize)
- for index, s in self._searches:
- match = s.search(buffer, searchstart)
- if match is None:
- continue
- n = match.start()
- if first_match is None or n < first_match:
- first_match = n
- the_match = match
- best_index = index
- if first_match is None:
- return -1
- self.start = first_match
- self.match = the_match
- self.end = self.match.end()
- return best_index
|