| # Run a fuzz test to verify robustness against corrupted/malicious data. |
| |
| import sys |
| import time |
| import zipfile |
| import random |
| import subprocess |
| |
| Import("env", "malloc_env") |
| |
| def set_pkgname(src, dst, pkgname): |
| data = open(str(src)).read() |
| placeholder = '// package name placeholder' |
| assert placeholder in data |
| data = data.replace(placeholder, 'package %s;' % pkgname) |
| open(str(dst), 'w').write(data) |
| |
| # We want both pointer and static versions of the AllTypes message |
| # Prefix them with package name. |
| env.Command("alltypes_static.proto", "#alltypes/alltypes.proto", |
| lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_static')) |
| env.Command("alltypes_pointer.proto", "#alltypes/alltypes.proto", |
| lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_pointer')) |
| |
| env.NanopbProto(["alltypes_pointer", "alltypes_pointer.options"]) |
| env.NanopbProto(["alltypes_static", "alltypes_static.options"]) |
| |
| # Do the same for proto3 versions |
| env.Command("alltypes_proto3_static.proto", "#alltypes_proto3/alltypes.proto", |
| lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_proto3_static')) |
| env.Command("alltypes_proto3_pointer.proto", "#alltypes_proto3/alltypes.proto", |
| lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_proto3_pointer')) |
| |
| env.NanopbProto(["alltypes_proto3_pointer", "alltypes_proto3_pointer.options"]) |
| env.NanopbProto(["alltypes_proto3_static", "alltypes_proto3_static.options"]) |
| |
| # And also a callback version |
| env.Command("alltypes_callback.proto", "#alltypes/alltypes.proto", |
| lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_callback')) |
| env.NanopbProto(["alltypes_callback", "alltypes_callback.options"]) |
| |
| common_objs = [env.Object("random_data.c"), |
| env.Object("validation.c"), |
| env.Object("flakystream.c"), |
| env.Object("alltypes_pointer.pb.c"), |
| env.Object("alltypes_static.pb.c"), |
| env.Object("alltypes_callback.pb.c"), |
| env.Object("alltypes_proto3_pointer.pb.c"), |
| env.Object("alltypes_proto3_static.pb.c"), |
| "$COMMON/malloc_wrappers.o"] |
| objs_malloc = ["$COMMON/pb_encode_with_malloc.o", |
| "$COMMON/pb_decode_with_malloc.o", |
| "$COMMON/pb_common_with_malloc.o"] + common_objs |
| objs_static = ["$COMMON/pb_encode.o", |
| "$COMMON/pb_decode.o", |
| "$COMMON/pb_common.o"] + common_objs |
| |
| fuzz = malloc_env.Program(["fuzztest.c"] + objs_malloc) |
| |
| # Run the stand-alone fuzz tester |
| seed = int(time.time()) |
| if env.get('EMBEDDED'): |
| iterations = 100 |
| else: |
| iterations = 1000 |
| env.RunTest(fuzz, ARGS = [str(seed), str(iterations)]) |
| |
| generate_message = malloc_env.Program(["generate_message.c"] + objs_static) |
| |
| # Test the message generator |
| env.RunTest(generate_message, ARGS = [str(seed)]) |
| env.RunTest("generate_message.output.fuzzed", [fuzz, "generate_message.output"]) |
| |
| # Run against the latest corpus from ossfuzz |
| # This allows quick testing against regressions and also lets us more |
| # completely test slow embedded targets. To reduce runtime, only a subset |
| # of the corpus is fuzzed each time. |
| def run_against_corpus(target, source, env): |
| corpus = zipfile.ZipFile(str(source[1]), 'r') |
| count = 0 |
| args = [str(source[0])] |
| |
| if "TEST_RUNNER" in env: |
| args = [env["TEST_RUNNER"]] + args |
| |
| if "FUZZTEST_CORPUS_SAMPLESIZE" in env: |
| samplesize = int(env["FUZZTEST_CORPUS_SAMPLESIZE"]) |
| elif env.get('EMBEDDED'): |
| samplesize = 100 |
| else: |
| samplesize = 4096 |
| |
| files = [n for n in corpus.namelist() if not n.endswith('/')] |
| files = random.sample(files, min(samplesize, len(files))) |
| for filename in files: |
| sys.stdout.write("Fuzzing: %5d/%5d: %-40.40s\r" % (count, len(files), filename)) |
| sys.stdout.flush() |
| |
| count += 1 |
| |
| maxsize = env.get('CPPDEFINES', {}).get('FUZZTEST_BUFSIZE', 256*1024) |
| data_in = corpus.read(filename)[:maxsize] |
| |
| try: |
| process = subprocess.Popen(args, stdin=subprocess.PIPE, |
| stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| stdout, stderr = process.communicate(input = data_in) |
| result = process.wait() |
| except OSError as e: |
| if e.errno == 22: |
| print("Warning: OSError 22 when running with input " + filename) |
| result = process.wait() |
| else: |
| raise |
| |
| if result != 0: |
| stdout += stderr |
| print(stdout) |
| print('\033[31m[FAIL]\033[0m Program ' + str(args) + ' returned ' + str(result) + ' with input ' + filename + ' from ' + str(source[1])) |
| return result |
| |
| open(str(target[0]), 'w').write(str(count)) |
| print('\033[32m[ OK ]\033[0m Ran ' + str(args) + " against " + str(source[1]) + " (" + str(count) + " entries)") |
| |
| env.Command("corpus.zip.fuzzed", [fuzz, "corpus.zip"], run_against_corpus) |
| env.Command("regressions.zip.fuzzed", [fuzz, "regressions.zip"], run_against_corpus) |
| |
| # Build separate fuzzers for each test case. |
| # Having them separate speeds up control flow based fuzzer engines. |
| # These are mainly used by oss-fuzz project. |
| env_proto2_static = env.Clone() |
| env_proto2_static.Append(CPPDEFINES = {'FUZZTEST_PROTO2_STATIC': '1'}) |
| env_proto2_static.Program("fuzztest_proto2_static", |
| [env_proto2_static.Object("fuzztest_proto2_static.o", "fuzztest.c")] + objs_static) |
| |
| env_proto2_pointer = malloc_env.Clone() |
| env_proto2_pointer.Append(CPPDEFINES = {'FUZZTEST_PROTO2_POINTER': '1'}) |
| env_proto2_pointer.Program("fuzztest_proto2_pointer", |
| [env_proto2_pointer.Object("fuzztest_proto2_pointer.o", "fuzztest.c")] + objs_malloc) |
| |
| env_proto3_static = env.Clone() |
| env_proto3_static.Append(CPPDEFINES = {'FUZZTEST_PROTO3_STATIC': '1'}) |
| env_proto3_static.Program("fuzztest_proto3_static", |
| [env_proto3_static.Object("fuzztest_proto3_static.o", "fuzztest.c")] + objs_static) |
| |
| env_proto3_pointer = malloc_env.Clone() |
| env_proto3_pointer.Append(CPPDEFINES = {'FUZZTEST_PROTO3_POINTER': '1'}) |
| env_proto3_pointer.Program("fuzztest_proto3_pointer", |
| [env_proto3_pointer.Object("fuzztest_proto3_pointer.o", "fuzztest.c")] + objs_malloc) |
| |
| env_io_errors = malloc_env.Clone() |
| env_io_errors.Append(CPPDEFINES = {'FUZZTEST_IO_ERRORS': '1'}) |
| env_io_errors.Program("fuzztest_io_errors", |
| [env_io_errors.Object("fuzztest_io_errors.o", "fuzztest.c")] + objs_malloc) |
| |