123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269 |
- # coding=utf-8
- #
- # This file is part of Hypothesis, which may be found at
- # https://github.com/HypothesisWorks/hypothesis-python
- #
- # Most of this work is copyright (C) 2013-2018 David R. MacIver
- # (david@drmaciver.com), but it contains contributions by others. See
- # CONTRIBUTING.rst for a full list of people who may hold copyright, and
- # consult the git log if you need to determine who owns an individual
- # contribution.
- #
- # This Source Code Form is subject to the terms of the Mozilla Public License,
- # v. 2.0. If a copy of the MPL was not distributed with this file, You can
- # obtain one at http://mozilla.org/MPL/2.0/.
- #
- # END HEADER
- from __future__ import division, print_function, absolute_import
- import sys
- from enum import IntEnum
- from hypothesis.errors import Frozen, StopTest, InvalidArgument
- from hypothesis.internal.compat import hbytes, hrange, text_type, \
- bit_length, benchmark_time, int_from_bytes, unicode_safe_repr
- from hypothesis.internal.coverage import IN_COVERAGE_TESTS
- from hypothesis.internal.escalation import mark_for_escalation
- class Status(IntEnum):
- OVERRUN = 0
- INVALID = 1
- VALID = 2
- INTERESTING = 3
- global_test_counter = 0
- MAX_DEPTH = 100
- class ConjectureData(object):
- @classmethod
- def for_buffer(self, buffer):
- buffer = hbytes(buffer)
- return ConjectureData(
- max_length=len(buffer),
- draw_bytes=lambda data, n:
- hbytes(buffer[data.index:data.index + n])
- )
- def __init__(self, max_length, draw_bytes):
- self.max_length = max_length
- self.is_find = False
- self._draw_bytes = draw_bytes
- self.overdraw = 0
- self.level = 0
- self.block_starts = {}
- self.blocks = []
- self.buffer = bytearray()
- self.output = u''
- self.status = Status.VALID
- self.frozen = False
- self.intervals_by_level = []
- self.interval_stack = []
- global global_test_counter
- self.testcounter = global_test_counter
- global_test_counter += 1
- self.start_time = benchmark_time()
- self.events = set()
- self.forced_indices = set()
- self.capped_indices = {}
- self.interesting_origin = None
- self.tags = set()
- self.draw_times = []
- self.__intervals = None
- self.shrinking_blocks = set()
- self.discarded = set()
- def __assert_not_frozen(self, name):
- if self.frozen:
- raise Frozen(
- 'Cannot call %s on frozen ConjectureData' % (
- name,))
- def add_tag(self, tag):
- self.tags.add(tag)
- @property
- def depth(self):
- return len(self.interval_stack)
- @property
- def index(self):
- return len(self.buffer)
- def note(self, value):
- self.__assert_not_frozen('note')
- if not isinstance(value, text_type):
- value = unicode_safe_repr(value)
- self.output += value
- def draw(self, strategy):
- if self.is_find and not strategy.supports_find:
- raise InvalidArgument((
- 'Cannot use strategy %r within a call to find (presumably '
- 'because it would be invalid after the call had ended).'
- ) % (strategy,))
- if strategy.is_empty:
- self.mark_invalid()
- if self.depth >= MAX_DEPTH:
- self.mark_invalid()
- if self.depth == 0 and not IN_COVERAGE_TESTS: # pragma: no cover
- original_tracer = sys.gettrace()
- try:
- sys.settrace(None)
- return self.__draw(strategy)
- finally:
- sys.settrace(original_tracer)
- else:
- return self.__draw(strategy)
- def __draw(self, strategy):
- at_top_level = self.depth == 0
- self.start_example()
- try:
- if not at_top_level:
- return strategy.do_draw(self)
- else:
- start_time = benchmark_time()
- try:
- return strategy.do_draw(self)
- except BaseException as e:
- mark_for_escalation(e)
- raise
- finally:
- self.draw_times.append(benchmark_time() - start_time)
- finally:
- if not self.frozen:
- self.stop_example()
- def start_example(self):
- self.__assert_not_frozen('start_example')
- self.interval_stack.append(self.index)
- self.level += 1
- def stop_example(self, discard=False):
- if self.frozen:
- return
- self.level -= 1
- while self.level >= len(self.intervals_by_level):
- self.intervals_by_level.append([])
- k = self.interval_stack.pop()
- if k != self.index:
- t = (k, self.index)
- self.intervals_by_level[self.level].append(t)
- if discard:
- self.discarded.add(t)
- def note_event(self, event):
- self.events.add(event)
- @property
- def intervals(self):
- assert self.frozen
- if self.__intervals is None:
- intervals = set(self.blocks)
- for l in self.intervals_by_level:
- intervals.update(l)
- for i in hrange(len(l) - 1):
- if (
- l[i] not in self.discarded and
- l[i + 1] not in self.discarded and
- l[i][1] == l[i + 1][0]
- ):
- intervals.add((l[i][0], l[i + 1][1]))
- for i in hrange(len(self.blocks) - 1):
- intervals.add((self.blocks[i][0], self.blocks[i + 1][1]))
- # Intervals are sorted as longest first, then by interval start.
- self.__intervals = tuple(sorted(
- set(intervals),
- key=lambda se: (se[0] - se[1], se[0])
- ))
- del self.intervals_by_level
- return self.__intervals
- def freeze(self):
- if self.frozen:
- assert isinstance(self.buffer, hbytes)
- return
- self.frozen = True
- self.finish_time = benchmark_time()
- self.buffer = hbytes(self.buffer)
- self.events = frozenset(self.events)
- del self._draw_bytes
- def draw_bits(self, n):
- self.__assert_not_frozen('draw_bits')
- if n == 0:
- result = 0
- elif n % 8 == 0:
- return int_from_bytes(self.draw_bytes(n // 8))
- else:
- n_bytes = (n // 8) + 1
- self.__check_capacity(n_bytes)
- buf = bytearray(self._draw_bytes(self, n_bytes))
- assert len(buf) == n_bytes
- mask = (1 << (n % 8)) - 1
- buf[0] &= mask
- self.capped_indices[self.index] = mask
- buf = hbytes(buf)
- self.__write(buf)
- result = int_from_bytes(buf)
- assert bit_length(result) <= n
- return result
- def write(self, string):
- self.__assert_not_frozen('write')
- self.__check_capacity(len(string))
- assert isinstance(string, hbytes)
- original = self.index
- self.__write(string)
- self.forced_indices.update(hrange(original, self.index))
- return string
- def __check_capacity(self, n):
- if self.index + n > self.max_length:
- self.overdraw = self.index + n - self.max_length
- self.status = Status.OVERRUN
- self.freeze()
- raise StopTest(self.testcounter)
- def __write(self, result):
- initial = self.index
- n = len(result)
- self.block_starts.setdefault(n, []).append(initial)
- self.blocks.append((initial, initial + n))
- assert len(result) == n
- assert self.index == initial
- self.buffer.extend(result)
- def draw_bytes(self, n):
- self.__assert_not_frozen('draw_bytes')
- if n == 0:
- return hbytes(b'')
- self.__check_capacity(n)
- result = self._draw_bytes(self, n)
- assert len(result) == n
- self.__write(result)
- return hbytes(result)
- def mark_interesting(self, interesting_origin=None):
- self.__assert_not_frozen('mark_interesting')
- self.interesting_origin = interesting_origin
- self.status = Status.INTERESTING
- self.freeze()
- raise StopTest(self.testcounter)
- def mark_invalid(self):
- self.__assert_not_frozen('mark_invalid')
- self.status = Status.INVALID
- self.freeze()
- raise StopTest(self.testcounter)
|