blob: a80a902fa420455df50fd019613114370ccdd072 [file] [log] [blame]
# Run a fuzz test to verify robustness against corrupted/malicious data.
import sys
import time
import zipfile
import subprocess
Import("env", "malloc_env")
def set_pkgname(src, dst, pkgname):
data = open(str(src)).read()
placeholder = '// package name placeholder'
assert placeholder in data
data = data.replace(placeholder, 'package %s;' % pkgname)
open(str(dst), 'w').write(data)
# We want both pointer and static versions of the AllTypes message
# Prefix them with package name.
env.Command("alltypes_static.proto", "#alltypes/alltypes.proto",
lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_static'))
env.Command("alltypes_pointer.proto", "#alltypes/alltypes.proto",
lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_pointer'))
env.NanopbProto(["alltypes_pointer", "alltypes_pointer.options"])
env.NanopbProto(["alltypes_static", "alltypes_static.options"])
# Do the same for proto3 versions
env.Command("alltypes_proto3_static.proto", "#alltypes_proto3/alltypes.proto",
lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_proto3_static'))
env.Command("alltypes_proto3_pointer.proto", "#alltypes_proto3/alltypes.proto",
lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_proto3_pointer'))
env.NanopbProto(["alltypes_proto3_pointer", "alltypes_proto3_pointer.options"])
env.NanopbProto(["alltypes_proto3_static", "alltypes_proto3_static.options"])
# And also a callback version
env.Command("alltypes_callback.proto", "#alltypes/alltypes.proto",
lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_callback'))
env.NanopbProto(["alltypes_callback", "alltypes_callback.options"])
fuzz = malloc_env.Program(["fuzztest.c",
"random_data.c",
"validation.c",
"flakystream.c",
"alltypes_pointer.pb.c",
"alltypes_static.pb.c",
"alltypes_callback.pb.c",
"alltypes_proto3_pointer.pb.c",
"alltypes_proto3_static.pb.c",
"$COMMON/pb_encode_with_malloc.o",
"$COMMON/pb_decode_with_malloc.o",
"$COMMON/pb_common_with_malloc.o",
"$COMMON/malloc_wrappers.o"])
# Run the stand-alone fuzz tester
seed = int(time.time())
if env.get('EMBEDDED'):
iterations = 100
else:
iterations = 10000
env.RunTest(fuzz, ARGS = [str(seed), str(iterations)])
generate_message = malloc_env.Program(["generate_message.c",
"random_data.c",
"alltypes_static.pb.c",
"$COMMON/pb_encode.o",
"$COMMON/pb_common.o",
"$COMMON/malloc_wrappers.o"])
# Test the message generator
env.RunTest(generate_message, ARGS = [str(seed)])
env.RunTest("generate_message.output.fuzzed", [fuzz, "generate_message.output"])
# Run against the latest corpus from ossfuzz
# This allows quick testing against regressions and also lets us more
# completely test slow embedded targets.
def run_against_corpus(target, source, env):
corpus = zipfile.ZipFile(str(source[1]), 'r')
count = 0
args = [str(source[0])]
if env.has_key("TEST_RUNNER"):
args = [env["TEST_RUNNER"]] + args
files = [n for n in corpus.namelist() if not n.endswith('/')]
for filename in files:
sys.stdout.write("Fuzzing: %5d/%5d: %-40.40s\r" % (count, len(files), filename))
sys.stdout.flush()
count += 1
try:
process = subprocess.Popen(args, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate(input = corpus.read(filename))
result = process.wait()
except OSError as e:
if e.errno == 22:
print("Warning: OSError 22 when running with input " + filename)
result = process.wait()
else:
raise
if result != 0:
stdout += stderr
print(stdout)
print('\033[31m[FAIL]\033[0m Program ' + str(args) + ' returned ' + str(result) + ' with input ' + filename + ' from ' + str(source[1]))
return result
open(str(target[0]), 'w').write(str(count))
print('\033[32m[ OK ]\033[0m Ran ' + str(args) + " against " + str(source[1]) + " (" + str(count) + " entries)")
env.Command("corpus.zip.fuzzed", [fuzz, "corpus.zip"], run_against_corpus)