123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353 |
- # coding=utf-8
- #
- # This file is part of Hypothesis, which may be found at
- # https://github.com/HypothesisWorks/hypothesis-python
- #
- # Most of this work is copyright (C) 2013-2018 David R. MacIver
- # (david@drmaciver.com), but it contains contributions by others. See
- # CONTRIBUTING.rst for a full list of people who may hold copyright, and
- # consult the git log if you need to determine who owns an individual
- # contribution.
- #
- # This Source Code Form is subject to the terms of the Mozilla Public License,
- # v. 2.0. If a copy of the MPL was not distributed with this file, You can
- # obtain one at http://mozilla.org/MPL/2.0/.
- #
- # END HEADER
- from __future__ import division, print_function, absolute_import
- import enum
- import math
- from collections import Sequence, OrderedDict
- from hypothesis._settings import note_deprecation
- from hypothesis.internal.compat import floor, hbytes, bit_length, \
- int_from_bytes
- from hypothesis.internal.floats import int_to_float
- def integer_range(data, lower, upper, center=None):
- assert lower <= upper
- if lower == upper:
- return int(lower)
- if center is None:
- center = lower
- center = min(max(center, lower), upper)
- if center == upper:
- above = False
- elif center == lower:
- above = True
- else:
- above = boolean(data)
- if above:
- gap = upper - center
- else:
- gap = center - lower
- assert gap > 0
- bits = bit_length(gap)
- probe = gap + 1
- while probe > gap:
- data.start_example()
- probe = data.draw_bits(bits)
- data.stop_example(discard=probe > gap)
- if above:
- result = center + probe
- else:
- result = center - probe
- assert lower <= result <= upper
- return int(result)
- def centered_integer_range(data, lower, upper, center):
- return integer_range(
- data, lower, upper, center=center
- )
- def check_sample(values):
- try:
- from numpy import ndarray
- if isinstance(values, ndarray):
- if values.ndim != 1:
- note_deprecation((
- 'Only one-dimensional arrays are supported for sampling, '
- 'and the given value has {ndim} dimensions (shape '
- '{shape}). This array would give samples of array slices '
- 'instead of elements! Use np.ravel(values) to convert '
- 'to a one-dimensional array, or tuple(values) if you '
- 'want to sample slices. Sampling a multi-dimensional '
- 'array will be an error in a future version of Hypothesis.'
- ).format(ndim=values.ndim, shape=values.shape))
- return tuple(values)
- except ImportError: # pragma: no cover
- pass
- if not isinstance(values, (OrderedDict, Sequence, enum.EnumMeta)):
- note_deprecation(
- ('Cannot sample from %r, not a sequence. ' % (values,)) +
- 'Hypothesis goes to some length to ensure that sampling an '
- 'element from a collection (with `sampled_from` or `choices`) is '
- 'replayable and can be minimised. To replay a saved example, '
- 'the sampled values must have the same iteration order on every '
- 'run - ruling out sets, dicts, etc due to hash randomisation. '
- 'Most cases can simply use `sorted(values)`, but mixed types or '
- 'special values such as math.nan require careful handling - and '
- 'note that when simplifying an example, Hypothesis treats '
- 'earlier values as simpler.')
- return tuple(values)
- def choice(data, values):
- return values[integer_range(data, 0, len(values) - 1)]
- def getrandbits(data, n):
- n_bytes = n // 8
- if n % 8 != 0:
- n_bytes += 1
- return int_from_bytes(data.draw_bytes(n_bytes)) & ((1 << n) - 1)
- FLOAT_PREFIX = 0b1111111111 << 52
- FULL_FLOAT = int_to_float(FLOAT_PREFIX | ((2 << 53) - 1)) - 1
- def fractional_float(data):
- return (
- int_to_float(FLOAT_PREFIX | getrandbits(data, 52)) - 1
- ) / FULL_FLOAT
- def geometric(data, p):
- denom = math.log1p(-p)
- data.start_example()
- while True:
- probe = fractional_float(data)
- if probe < 1.0:
- result = int(math.log1p(-probe) / denom)
- assert result >= 0, (probe, p, result)
- data.stop_example()
- return result
- def boolean(data):
- return bool(data.draw_bits(1))
- def biased_coin(data, p):
- """Return False with probability p (assuming a uniform generator),
- shrinking towards False."""
- data.start_example()
- while True:
- # The logic here is a bit complicated and special cased to make it
- # play better with the shrinker.
- # We imagine partitioning the real interval [0, 1] into 256 equal parts
- # and looking at each part and whether its interior is wholly <= p
- # or wholly >= p. At most one part can be neither.
- # We then pick a random part. If it's wholly on one side or the other
- # of p then we use that as the answer. If p is contained in the
- # interval then we start again with a new probability that is given
- # by the fraction of that interval that was <= our previous p.
- # We then take advantage of the fact that we have control of the
- # labelling to make this shrink better, using the following tricks:
- # If p is <= 0 or >= 1 the result of this coin is certain. We make sure
- # to write a byte to the data stream anyway so that these don't cause
- # difficulties when shrinking.
- if p <= 0:
- data.write(hbytes([0]))
- result = False
- elif p >= 1:
- data.write(hbytes([1]))
- result = True
- else:
- falsey = floor(256 * (1 - p))
- truthy = floor(256 * p)
- remainder = 256 * p - truthy
- if falsey + truthy == 256:
- m, n = p.as_integer_ratio()
- assert n & (n - 1) == 0, n # n is a power of 2
- assert n > m > 0
- truthy = m
- falsey = n - m
- bits = bit_length(n) - 1
- partial = False
- else:
- bits = 8
- partial = True
- i = data.draw_bits(bits)
- # We always label the region that causes us to repeat the loop as
- # 255 so that shrinking this byte never causes us to need to draw
- # more data.
- if partial and i == 255:
- p = remainder
- continue
- if falsey == 0:
- # Every other partition is truthy, so the result is true
- result = True
- elif truthy == 0:
- # Every other partition is falsey, so the result is false
- result = False
- elif i <= 1:
- # We special case so that zero is always false and 1 is always
- # true which makes shrinking easier because we can always
- # replace a truthy block with 1. This has the slightly weird
- # property that shrinking from 2 to 1 can cause the result to
- # grow, but the shrinker always tries 0 and 1 first anyway, so
- # this will usually be fine.
- result = bool(i)
- else:
- # Originally everything in the region 0 <= i < falsey was false
- # and everything above was true. We swapped one truthy element
- # into this region, so the region becomes 0 <= i <= falsey
- # except for i = 1. We know i > 1 here, so the test for truth
- # becomes i > falsey.
- result = i > falsey
- break
- data.stop_example()
- return result
- class Sampler(object):
- """Sampler based on "Succinct Sampling from Discrete Distributions" by
- Bringmann and Larsen. In general it has some advantages and disadvantages
- compared to the more normal alias method, but its big advantage for us is
- that it plays well with shrinking: The values are laid out in their natural
- order, so shrink in that order.
- Its big disadvantage is that for heavily biased distributions it can
- use a lot of memory. Solution is some mix of "don't do that then"
- and not worrying about it because Hypothesis is something of a
- memory hog anyway.
- """
- def __init__(self, weights):
- # We consider each weight expressed in terms of the average weight,
- # say t. We write the weight of i as nt + f, where n is an integer and
- # 0 <= f < 1. We then store n items for this weight which correspond
- # to drawing i unconditionally, and if f > 0 we store an additional
- # item that corresponds to drawing i with probability f. This ensures
- # that (under a uniform model) we draw i with probability proportionate
- # to its weight.
- # We then rearrange things to shrink better. The table with the whole
- # weights is kept in sorted order so that shrinking still corresponds
- # to shrinking leftwards. The fractional weights however are put in
- # a second table that is logically "to the right" of the whole weights
- # and are sorted in order of decreasing acceptance probaility. This
- # ensures that shrinking lexicographically always results in drawing
- # less data.
- self.table = []
- self.extras = []
- self.acceptance = []
- total = sum(weights)
- n = len(weights)
- for i, x in enumerate(weights):
- whole_occurrences = floor(x * n / total)
- acceptance = x - whole_occurrences
- self.acceptance.append(acceptance)
- for _ in range(whole_occurrences):
- self.table.append(i)
- if acceptance > 0:
- self.extras.append(i)
- self.extras.sort(key=self.acceptance.__getitem__, reverse=True)
- def sample(self, data):
- while True:
- data.start_example()
- # We always draw the acceptance data even if we don't need it,
- # because that way we keep the amount of data we use stable.
- i = integer_range(data, 0, len(self.table) + len(self.extras) - 1)
- if i < len(self.table):
- result = self.table[i]
- data.stop_example()
- return result
- else:
- result = self.extras[i - len(self.table)]
- accept = not biased_coin(data, 1 - self.acceptance[result])
- data.stop_example()
- if accept:
- return result
- class many(object):
- """Utility class for collections. Bundles up the logic we use for "should I
- keep drawing more values?" and handles starting and stopping examples in
- the right place.
- Intended usage is something like:
- elements = many(data, ...)
- while elements.more():
- add_stuff_to_result()
- """
- def __init__(self, data, min_size, max_size, average_size):
- self.min_size = min_size
- self.max_size = max_size
- self.data = data
- self.stopping_value = 1 - 1.0 / (1 + average_size)
- self.count = 0
- self.rejections = 0
- self.drawn = False
- self.force_stop = False
- self.rejected = False
- def more(self):
- """Should I draw another element to add to the collection?"""
- if self.drawn:
- self.data.stop_example(discard=self.rejected)
- self.drawn = True
- self.rejected = False
- if self.min_size == self.max_size:
- should_continue = self.count < self.min_size
- elif self.force_stop:
- should_continue = False
- else:
- if self.count < self.min_size:
- p_continue = 1.0
- elif self.count >= self.max_size:
- p_continue = 0.0
- else:
- p_continue = self.stopping_value
- should_continue = biased_coin(self.data, p_continue)
- if should_continue:
- self.data.start_example()
- self.count += 1
- return True
- else:
- return False
- def reject(self):
- """Reject the last example (i.e. don't count it towards our budget of
- elements because it's not going to go in the final collection)"""
- assert self.count > 0
- self.count -= 1
- self.rejections += 1
- self.rejected = True
- if self.rejections > 2 * self.count:
- if self.count < self.min_size:
- self.data.mark_invalid()
- else:
- self.force_stop = True
|