Skip to content

Commit

Permalink
add BytestringProvider for fuzz_one_input
Browse files Browse the repository at this point in the history
  • Loading branch information
tybug committed Dec 29, 2024
1 parent 8250483 commit 78bc517
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 7 deletions.
3 changes: 3 additions & 0 deletions hypothesis-python/RELEASE.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
RELEASE_TYPE: patch

:ref:`fuzz_one_input <fuzz_one_input>` is now implemented using an :ref:`alternative backend <alternative-backends>`. This brings the interpretation of the fuzzer-provided bytestring closer to the fuzzer mutations, allowing them to work more reliably. We hope to use this backend functionality to work towards better integration with fuzzers (see e.g. https://github.com/google/atheris/issues/20) in the future!
17 changes: 14 additions & 3 deletions hypothesis-python/src/hypothesis/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,14 @@
ConjectureData,
PrimitiveProvider,
Status,
ir_to_buffer,
)
from hypothesis.internal.conjecture.engine import BUFFER_SIZE, ConjectureRunner
from hypothesis.internal.conjecture.junkdrawer import (
ensure_free_stackframes,
gc_cumulative_time,
)
from hypothesis.internal.conjecture.providers import BytestringProvider
from hypothesis.internal.conjecture.shrinker import sort_key, sort_key_ir
from hypothesis.internal.entropy import deterministic_PRNG
from hypothesis.internal.escalation import (
Expand Down Expand Up @@ -1829,21 +1831,30 @@ def fuzz_one_input(
if isinstance(buffer, io.IOBase):
buffer = buffer.read(BUFFER_SIZE)
assert isinstance(buffer, (bytes, bytearray, memoryview))
data = ConjectureData.for_buffer(buffer)
data = ConjectureData(
max_length=BUFFER_SIZE,
prefix=b"",
random=None,
provider=BytestringProvider,
provider_kw={"bytestring": buffer},
)
try:
state.execute_once(data)
except (StopTest, UnsatisfiedAssumption):
return None
except BaseException:
buffer = bytes(data.buffer)
buffer = b"".join(
ir_to_buffer(n.ir_type, n.kwargs, forced=n.value)[1]
for n in data.ir_nodes
)
known = minimal_failures.get(data.interesting_origin)
if settings.database is not None and (
known is None or sort_key(buffer) <= sort_key(known)
):
settings.database.save(database_key, buffer)
minimal_failures[data.interesting_origin] = buffer
raise
return bytes(data.buffer)
return bytes(data.provider.drawn)

fuzz_one_input.__doc__ = HypothesisHandle.fuzz_one_input.__doc__
return fuzz_one_input
Expand Down
7 changes: 5 additions & 2 deletions hypothesis-python/src/hypothesis/internal/conjecture/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2005,11 +2005,14 @@ def __init__(
provider: Union[type, PrimitiveProvider] = HypothesisProvider,
ir_tree_prefix: Optional[Sequence[Union[IRNode, NodeTemplate]]] = None,
max_length_ir: Optional[int] = None,
provider_kw: Optional[dict[str, Any]] = None,
) -> None:
from hypothesis.internal.conjecture.engine import BUFFER_SIZE_IR

if observer is None:
observer = DataObserver()
if provider_kw is None:
provider_kw = {}
assert isinstance(observer, DataObserver)
self._bytes_drawn = 0
self.observer = observer
Expand All @@ -2020,7 +2023,7 @@ def __init__(
self.__prefix = bytes(prefix)
self.__random = random

if ir_tree_prefix is None:
if ir_tree_prefix is None and type(provider) is HypothesisProvider:
assert random is not None or max_length <= len(prefix)

self.blocks = Blocks(self)
Expand All @@ -2045,7 +2048,7 @@ def __init__(
self.has_discards = False

self.provider: PrimitiveProvider = (
provider(self) if isinstance(provider, type) else provider
provider(self, **provider_kw) if isinstance(provider, type) else provider
)
assert isinstance(self.provider, PrimitiveProvider)

Expand Down
180 changes: 180 additions & 0 deletions hypothesis-python/src/hypothesis/internal/conjecture/providers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
# This file is part of Hypothesis, which may be found at
# https://github.com/HypothesisWorks/hypothesis/
#
# Copyright the Hypothesis Authors.
# Individual contributors are listed in AUTHORS.rst and the git log.
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at https://mozilla.org/MPL/2.0/.

import math
from typing import Optional

from hypothesis.internal.compat import int_from_bytes
from hypothesis.internal.conjecture.data import (
BYTE_MASKS,
COLLECTION_DEFAULT_MAX_SIZE,
ConjectureData,
PrimitiveProvider,
Sampler,
bits_to_bytes,
)
from hypothesis.internal.conjecture.floats import lex_to_float
from hypothesis.internal.conjecture.utils import many
from hypothesis.internal.floats import make_float_clamper
from hypothesis.internal.intervalsets import IntervalSet


class BytestringProvider(PrimitiveProvider):
lifetime = "test_case"

def __init__(
self, conjecturedata: Optional["ConjectureData"], /, *, bytestring: bytes
):
super().__init__(conjecturedata)
self.bytestring = bytestring
self.index = 0
self.drawn = bytearray()

def _draw_bits(self, n):
if n == 0:
return 0
n_bytes = bits_to_bytes(n)
if self.index + n_bytes > len(self.bytestring):
self._cd.mark_overrun()
buf = bytearray(self.bytestring[self.index : self.index + n_bytes])
self.index += n_bytes

buf[0] &= BYTE_MASKS[n % 8]
buf = bytes(buf)
self.drawn += buf
return int_from_bytes(buf)

def draw_boolean(
self,
p: float = 0.5,
*,
forced: Optional[bool] = None,
fake_forced: bool = False,
) -> bool:
if forced is not None:
return forced

if p <= 0:
return False
if p >= 1:
return True

bits = math.ceil(-math.log2(min(p, 1 - p)))
# we treat probabilities of under 2^-64 as effectively zero.
if bits > 64:
return False

size = 2**bits
falsey = math.floor(size * (1 - p))
n = self._draw_bits(bits)
return n >= falsey

def draw_integer(
self,
min_value: Optional[int] = None,
max_value: Optional[int] = None,
*,
weights: Optional[dict[int, float]] = None,
shrink_towards: int = 0,
forced: Optional[int] = None,
fake_forced: bool = False,
) -> int:
if forced is not None:
return forced

if weights is not None:
sampler = Sampler(weights.values(), observe=False)
idx = sampler.sample(self._cd)
return list(weights)[idx]

if min_value is None and max_value is None:
min_value = -(2**127)
max_value = 2**127 - 1
elif min_value is None:
min_value = max_value - 2**64
elif max_value is None:
max_value = min_value + 2**64

if min_value == max_value:
return min_value

bits = (max_value - min_value).bit_length()
value = self._draw_bits(bits)
while not (min_value <= value <= max_value):
value = self._draw_bits(bits)
return value

def draw_float(
self,
*,
min_value: float = -math.inf,
max_value: float = math.inf,
allow_nan: bool = True,
smallest_nonzero_magnitude: float,
forced: Optional[float] = None,
fake_forced: bool = False,
) -> float:
if forced is not None:
return forced

n = self._draw_bits(64)
sign = -1 if n >> 64 else 1
f = sign * lex_to_float(n & ((1 << 64) - 1))
clamper = make_float_clamper(
min_value,
max_value,
smallest_nonzero_magnitude=smallest_nonzero_magnitude,
allow_nan=allow_nan,
)
return clamper(f)

def _draw_collection(self, min_size, max_size, *, alphabet_size):
average_size = min(
max(min_size * 2, min_size + 5),
0.5 * (min_size + max_size),
)
elements = many(
self._cd,
min_size=min_size,
max_size=max_size,
average_size=average_size,
observe=False,
)
values = []
while elements.more():
values.append(self.draw_integer(0, alphabet_size - 1))
return values

def draw_string(
self,
intervals: IntervalSet,
*,
min_size: int = 0,
max_size: int = COLLECTION_DEFAULT_MAX_SIZE,
forced: Optional[str] = None,
fake_forced: bool = False,
) -> str:
if forced is not None:
return forced
values = self._draw_collection(min_size, max_size, alphabet_size=len(intervals))
return "".join(chr(intervals[v]) for v in values)

def draw_bytes(
self,
min_size: int = 0,
max_size: int = COLLECTION_DEFAULT_MAX_SIZE,
*,
forced: Optional[bytes] = None,
fake_forced: bool = False,
) -> bytes:
if forced is not None:
return forced
values = self._draw_collection(min_size, max_size, alphabet_size=2**8)
return bytes(values)
36 changes: 36 additions & 0 deletions hypothesis-python/tests/conjecture/test_provider_contract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# This file is part of Hypothesis, which may be found at
# https://github.com/HypothesisWorks/hypothesis/
#
# Copyright the Hypothesis Authors.
# Individual contributors are listed in AUTHORS.rst and the git log.
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at https://mozilla.org/MPL/2.0/.

from hypothesis import given, strategies as st
from hypothesis.errors import StopTest
from hypothesis.internal.conjecture.data import ConjectureData, ir_value_permitted
from hypothesis.internal.conjecture.engine import BUFFER_SIZE
from hypothesis.internal.conjecture.providers import BytestringProvider

from tests.conjecture.common import ir_nodes


@given(st.binary(min_size=200), st.lists(ir_nodes()))
def test_provider_contract_bytestring(bytestring, nodes):
data = ConjectureData(
BUFFER_SIZE,
prefix=b"",
random=None,
observer=None,
provider=BytestringProvider,
provider_kw={"bytestring": bytestring},
)

for node in nodes:
try:
value = getattr(data, f"draw_{node.ir_type}")(**node.kwargs)
except StopTest:
return
assert ir_value_permitted(value, node.ir_type, node.kwargs)
4 changes: 2 additions & 2 deletions hypothesis-python/tests/cover/test_fuzz_one_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def test_fuzz_one_input(buffer_type):
@settings(database=db, phases=[Phase.reuse, Phase.shrink])
def test(s):
seen.append(s)
assert "\0" not in s, repr(s)
assert len(s) < 5, repr(s)

# Before running fuzz_one_input, there's nothing in `db`, and so the test passes
# (because example generation is disabled by the custom settings)
Expand Down Expand Up @@ -67,7 +67,7 @@ def test(s):
# reproduce it, *and shrink to a minimal example*.
with pytest.raises(AssertionError):
test()
assert seen[-1] == "\0"
assert seen[-1] == "0" * 5


def test_can_fuzz_with_database_eq_None():
Expand Down

0 comments on commit 78bc517

Please sign in to comment.