blob: 49eb193711e7bb7a2c7532b462f84ef764dcd2d2 [file] [log] [blame] [edit]
// Copyright 2023 The Centipede Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "./centipede/distill.h"
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <cstdlib>
#include <functional>
#include <memory>
#include <numeric>
#include <optional>
#include <sstream>
#include <string>
#include <string_view>
#include <utility>
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/container/flat_hash_set.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
#include "./centipede/corpus_io.h"
#include "./centipede/environment.h"
#include "./centipede/feature.h"
#include "./centipede/feature_set.h"
#include "./centipede/periodic_action.h"
#include "./centipede/resource_pool.h"
#include "./centipede/rusage_profiler.h"
#include "./centipede/rusage_stats.h"
#include "./centipede/thread_pool.h"
#include "./centipede/util.h"
#include "./centipede/workdir.h"
#include "./common/blob_file.h"
#include "./common/defs.h"
#include "./common/hash.h"
#include "./common/logging.h"
#include "./common/remote_file.h"
#include "./common/status_macros.h"
namespace fuzztest::internal {
namespace {
// A corpus element. Consists of a fuzz test input and its matching features.
struct CorpusElt {
ByteArray input;
FeatureVec features;
CorpusElt(const ByteArray &input, FeatureVec features)
: input(input), features(std::move(features)) {}
// Movable, but not copyable for efficiency.
CorpusElt(const CorpusElt &) = delete;
CorpusElt &operator=(const CorpusElt &) = delete;
CorpusElt(CorpusElt &&) = default;
CorpusElt &operator=(CorpusElt &&) = default;
ByteArray PackedFeatures() const {
return PackFeaturesAndHash(input, features);
}
};
using CorpusEltVec = std::vector<CorpusElt>;
// The maximum number of threads reading input shards concurrently. This is
// mainly to prevent I/O congestion.
inline constexpr size_t kMaxReadingThreads = 50;
// The maximum number of threads writing shards concurrently. These in turn
// launch up to `kMaxReadingThreads` reading threads.
inline constexpr size_t kMaxWritingThreads = 100;
// A global cap on the total number of threads, both writing and reading. Unlike
// the other two limits, this one is purely to prevent too many threads in the
// process.
inline constexpr size_t kMaxTotalThreads = 5000;
static_assert(kMaxReadingThreads * kMaxWritingThreads <= kMaxTotalThreads);
inline constexpr MemSize kGB = 1024L * 1024L * 1024L;
// The total approximate amount of RAM to be shared by the concurrent threads.
// TODO(ussuri): Replace by a function of free RSS on the system.
inline constexpr RUsageMemory kRamQuota{/*mem_vsize=*/0, /*mem_vpeak=*/0,
/*mem_rss=*/25 * kGB};
// The amount of time that each thread will wait for enough RAM to be freed up
// by its concurrent siblings.
inline constexpr absl::Duration kRamLeaseTimeout = absl::Hours(5);
std::string LogPrefix(const Environment &env) {
return absl::StrCat("DISTILL[S.", env.my_shard_index, "]: ");
}
std::string LogPrefix() { return absl::StrCat("DISTILL[ALL]: "); }
// TODO(ussuri): Move the reader/writer classes to shard_reader.cc, rename it
// to corpus_io.cc, and reuse the new APIs where useful in the code base.
// A helper class for reading input corpus shards. Thread-safe.
class InputCorpusShardReader {
public:
InputCorpusShardReader(const Environment &env)
: workdir_{env}, log_prefix_{LogPrefix(env)} {}
MemSize EstimateRamFootprint(size_t shard_idx) const {
const auto corpus_path = workdir_.CorpusFilePaths().Shard(shard_idx);
const auto features_path = workdir_.FeaturesFilePaths().Shard(shard_idx);
const MemSize corpus_file_size = ValueOrDie(RemoteFileGetSize(corpus_path));
const MemSize features_file_size =
ValueOrDie(RemoteFileGetSize(features_path));
// Conservative compression factors for the two file types. These have been
// observed empirically for the Riegeli blob format. The legacy format is
// approximately 1:1, but use the stricter Riegeli numbers, as the legacy
// should be considered obsolete.
// TODO(b/322880269): Use the actual in-memory footprint once available.
constexpr double kMaxCorpusCompressionRatio = 5.0;
constexpr double kMaxFeaturesCompressionRatio = 10.0;
return corpus_file_size * kMaxCorpusCompressionRatio +
features_file_size * kMaxFeaturesCompressionRatio;
}
// Reads and returns a single shard's elements. Thread-safe.
CorpusEltVec ReadShard(size_t shard_idx) {
const auto corpus_path = workdir_.CorpusFilePaths().Shard(shard_idx);
const auto features_path = workdir_.FeaturesFilePaths().Shard(shard_idx);
FUZZTEST_VLOG(1) << log_prefix_ << "reading input shard " << shard_idx
<< ":\n"
<< VV(corpus_path) << "\n"
<< VV(features_path);
CorpusEltVec elts;
// Read elements from the current shard.
fuzztest::internal::ReadShard( //
corpus_path, features_path,
[&elts](ByteArray input, FeatureVec features) {
elts.emplace_back(std::move(input), std::move(features));
});
return elts;
}
private:
const WorkDir workdir_;
const std::string log_prefix_;
};
// A helper class for writing corpus shards. Thread-safe.
class CorpusShardWriter {
public:
// The writing stats so far.
struct Stats {
size_t num_total_elts = 0;
size_t num_written_elts = 0;
size_t num_written_batches = 0;
};
CorpusShardWriter(const Environment &env, bool append)
: workdir_{env},
log_prefix_{LogPrefix(env)},
corpus_path_{workdir_.DistilledCorpusFilePaths().MyShard()},
features_path_{workdir_.DistilledFeaturesFilePaths().MyShard()},
corpus_writer_{DefaultBlobFileWriterFactory()},
feature_writer_{DefaultBlobFileWriterFactory()} {
FUZZTEST_CHECK_OK(corpus_writer_->Open(corpus_path_, append ? "a" : "w"));
FUZZTEST_CHECK_OK(
feature_writer_->Open(features_path_, append ? "a" : "w"));
}
virtual ~CorpusShardWriter() = default;
void WriteElt(CorpusElt elt) {
absl::MutexLock lock(&mu_);
WriteEltImpl(std::move(elt));
}
void WriteBatch(CorpusEltVec elts) {
absl::MutexLock lock(&mu_);
FUZZTEST_VLOG(1) << log_prefix_ << "writing " << elts.size()
<< " elements to output shard:\n"
<< VV(corpus_path_) << "\n"
<< VV(features_path_);
for (auto &elt : elts) {
WriteEltImpl(std::move(elt));
}
++stats_.num_written_batches;
}
Stats GetStats() const {
absl::MutexLock lock(&mu_);
return stats_;
}
protected:
// A behavior customization point: a derived class gets an opportunity to
// analyze and/or preprocess `elt` before it is written. For example, a
// derived class can trim the element's feature set before it is written, or
// choose to skip writing it entirely by returning `std::nullopt`.
virtual std::optional<CorpusElt> PreprocessElt(CorpusElt elt) {
return std::move(elt);
}
private:
void WriteEltImpl(CorpusElt elt) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
++stats_.num_total_elts;
const auto preprocessed_elt = PreprocessElt(std::move(elt));
if (preprocessed_elt.has_value()) {
// Append to the distilled corpus and features files.
FUZZTEST_CHECK_OK(corpus_writer_->Write(preprocessed_elt->input));
FUZZTEST_CHECK_OK(
feature_writer_->Write(preprocessed_elt->PackedFeatures()));
++stats_.num_written_elts;
}
}
// Const state.
const WorkDir workdir_;
const std::string log_prefix_;
const std::string corpus_path_;
const std::string features_path_;
// Mutable state.
mutable absl::Mutex mu_;
std::unique_ptr<BlobFileWriter> corpus_writer_ ABSL_GUARDED_BY(mu_);
std::unique_ptr<BlobFileWriter> feature_writer_ ABSL_GUARDED_BY(mu_);
Stats stats_ ABSL_GUARDED_BY(mu_);
};
// A distilling input filter:
// - Deduplicates byte-identical inputs: only the first one is allowed to pass.
// - Deduplicates feature-equivalent inputs: up to N from each equivalency set
// are allowed to pass.
// - Discards the specified set of "uninteresting" feature domains from the
// feature sets of filtered inputs.
class DistillingInputFilter {
public:
// An extension to the parent class's `Stats`.
struct Stats {
size_t num_total_elts = 0;
size_t num_byte_unique_elts = 0;
size_t num_feature_unique_elts = 0;
// The accumulated features of the distilled corpus so far, represents in
// the same compact textual form that Centipede uses in its fuzzing progress
// log messages, e.g.: "ft: 96331 cov: 81793 usr1: 5045 ...".
std::string coverage_str;
};
// `feature_equiv_redundancy` specifies how many inputs with equivalent
// feature sets are allowed to pass the filter. Any subsequent inputs with the
// equivalent set will be rejected.
// `should_discard_domains` specifies the domains that should be discarded
// from the feature set of a filtered input.
DistillingInputFilter( //
uint8_t feature_frequency_threshold,
const FeatureSet::FeatureDomainSet &domains_to_discard)
: seen_inputs_{},
seen_features_{
/*frequency_threshold=*/feature_frequency_threshold,
/*should_discard_domain=*/domains_to_discard,
} {}
std::optional<CorpusElt> FilterElt(CorpusElt elt) {
absl::MutexLock lock{&mu_};
++stats_.num_total_elts;
// Filter out approximately byte-identical inputs ("approximately" because
// we use hashes).
std::string hash = Hash(elt.input);
const auto [iter, inserted] = seen_inputs_.insert(std::move(hash));
if (!inserted) return std::nullopt;
++stats_.num_byte_unique_elts;
// Filter out feature-equivalent inputs.
seen_features_.PruneDiscardedDomains(elt.features);
if (!seen_features_.HasUnseenFeatures(elt.features)) return std::nullopt;
seen_features_.MergeFeatures(elt.features);
++stats_.num_feature_unique_elts;
return std::move(elt);
}
Stats GetStats() {
absl::MutexLock lock{&mu_};
std::stringstream ss;
ss << seen_features_;
stats_.coverage_str = std::move(ss).str();
return stats_;
}
private:
absl::Mutex mu_;
absl::flat_hash_set<std::string /*hash*/> seen_inputs_ ABSL_GUARDED_BY(mu_);
FeatureSet seen_features_ ABSL_GUARDED_BY(mu_);
Stats stats_ ABSL_GUARDED_BY(mu_);
};
// A helper class for writing distilled corpus shards. NOT thread-safe because
// all writes go to a single file.
class DistilledCorpusShardWriter : public CorpusShardWriter {
public:
DistilledCorpusShardWriter( //
const Environment &env, bool append, DistillingInputFilter &filter)
: CorpusShardWriter{env, append}, input_filter_{filter} {}
~DistilledCorpusShardWriter() override = default;
protected:
std::optional<CorpusElt> PreprocessElt(CorpusElt elt) override {
return input_filter_.FilterElt(std::move(elt));
}
private:
DistillingInputFilter &input_filter_;
};
} // namespace
// Runs one independent distillation task. Reads shards in the order specified
// by `shard_indices`, distills inputs from them using `input_filter`, and
// writes the result to `WorkDir{env}.DistilledPath()`. Every task gets its own
// `env.my_shard_index`, and so every task creates its own independent distilled
// corpus file. `parallelism` is the maximum number of concurrent
// reading/writing threads. Values > 1 can cause non-determinism in which of the
// same-coverage inputs gets selected to be written to the output shard; set to
// 1 for tests.
void DistillToOneOutputShard( //
const Environment &env, //
const std::vector<size_t> &shard_indices, //
DistillingInputFilter &input_filter, //
ResourcePool<RUsageMemory> &ram_pool, //
int parallelism) {
FUZZTEST_LOG(INFO) << LogPrefix(env) << "Distilling to output shard "
<< env.my_shard_index << "; input shard indices:\n"
<< absl::StrJoin(shard_indices, ", ");
// Read and write the shards in parallel, but gate reading of each on the
// availability of free RAM to keep the peak RAM usage under control.
const size_t num_shards = shard_indices.size();
InputCorpusShardReader reader{env};
// NOTE: Always overwrite corpus and features files, never append.
DistilledCorpusShardWriter writer{env, /*append=*/false, input_filter};
{
ThreadPool threads{parallelism};
for (size_t shard_idx : shard_indices) {
threads.Schedule([shard_idx, &reader, &writer, &env, num_shards,
&ram_pool] {
const auto ram_lease = ram_pool.AcquireLeaseBlocking({
/*id=*/absl::StrCat("out_", env.my_shard_index, "/in_", shard_idx),
/*amount=*/
{/*mem_vsize=*/0, /*mem_vpeak=*/0,
/*mem_rss=*/reader.EstimateRamFootprint(shard_idx)},
/*timeout=*/kRamLeaseTimeout,
});
FUZZTEST_CHECK_OK(ram_lease.status());
CorpusEltVec shard_elts = reader.ReadShard(shard_idx);
// Reverse the order of elements. The intuition is as follows:
// * If the shard is the result of fuzzing with Centipede, the inputs
// that are closer to the end are more interesting, so we start there.
// * If the shard resulted from somethening else, the reverse order is
// not any better or worse than any other order.
std::reverse(shard_elts.begin(), shard_elts.end());
writer.WriteBatch(std::move(shard_elts));
const CorpusShardWriter::Stats shard_stats = writer.GetStats();
FUZZTEST_LOG(INFO) << LogPrefix(env)
<< "batches: " << shard_stats.num_written_batches
<< "/" << num_shards
<< " inputs: " << shard_stats.num_total_elts
<< " written: " << shard_stats.num_written_elts;
});
}
} // The threads join here.
FUZZTEST_LOG(INFO) << LogPrefix(env) << "Done distilling to output shard "
<< env.my_shard_index;
}
int Distill(const Environment &env, const DistillOptions &opts) {
RPROF_THIS_FUNCTION_WITH_TIMELAPSE( //
/*enable=*/FUZZTEST_VLOG_IS_ON(1), //
/*timelapse_interval=*/
absl::Seconds(FUZZTEST_VLOG_IS_ON(2) ? 10 : 60), //
/*also_log_timelapses=*/FUZZTEST_VLOG_IS_ON(10));
// Prepare the per-thread envs.
std::vector<Environment> envs_per_thread(env.num_threads, env);
for (size_t thread_idx = 0; thread_idx < env.num_threads; ++thread_idx) {
envs_per_thread[thread_idx].my_shard_index += thread_idx;
}
// Prepare the per-thread input shard indices. This assigns a randomized and
// shuffled subset of the input shards to each output shard writer. The subset
// sizes are roughly equal between the writers.
std::vector<std::vector<size_t>> shard_indices_per_thread(env.num_threads);
std::vector<size_t> all_shard_indices(env.total_shards);
std::iota(all_shard_indices.begin(), all_shard_indices.end(), 0);
Rng rng{GetRandomSeed(env.seed)};
std::shuffle(all_shard_indices.begin(), all_shard_indices.end(), rng);
size_t thread_idx = 0;
for (size_t shard_idx : all_shard_indices) {
shard_indices_per_thread[thread_idx].push_back(shard_idx);
thread_idx = (thread_idx + 1) % env.num_threads;
}
// Run the distillation threads in parallel.
{
// A global input filter shared by all output shard writers. The output
// shards will collectively contain a deduplicated set of byte- and
// feature-unique inputs.
DistillingInputFilter input_filter{
opts.feature_frequency_threshold,
env.MakeDomainDiscardMask(),
};
// A periodic logger of the global distillation progress. Runs on a separate
// thread.
PeriodicAction progress_logger{
[&input_filter]() {
const auto stats = input_filter.GetStats();
FUZZTEST_LOG(INFO) << LogPrefix() << stats.coverage_str
<< " inputs: " << stats.num_total_elts
<< " unique: " << stats.num_byte_unique_elts
<< " distilled: " << stats.num_feature_unique_elts;
},
// Seeing 0's at the beginning is not interesting, unless debugging.
// Likewise, increase the frequency --v >= 1 to aid debugging.
PeriodicAction::ConstDelayConstInterval(
absl::Seconds(FUZZTEST_VLOG_IS_ON(1) ? 0 : 60),
absl::Seconds(FUZZTEST_VLOG_IS_ON(1) ? 10 : 60)),
};
// The RAM pool shared between all the `DistillToOneOutputShard()` threads.
ResourcePool ram_pool{kRamQuota};
const size_t num_threads = std::min(env.num_threads, kMaxWritingThreads);
ThreadPool threads{static_cast<int>(num_threads)};
for (size_t thread_idx = 0; thread_idx < env.num_threads; ++thread_idx) {
threads.Schedule(
[&thread_env = envs_per_thread[thread_idx],
&thread_shard_indices = shard_indices_per_thread[thread_idx],
&input_filter, &progress_logger, &ram_pool]() {
DistillToOneOutputShard( //
thread_env, thread_shard_indices, input_filter, ram_pool,
kMaxReadingThreads);
// In addition to periodic progress reports, also log the progress
// after writing each output shard.
progress_logger.Nudge();
});
}
} // The threads join here.
return EXIT_SUCCESS;
}
void DistillForTests(const Environment &env,
const std::vector<size_t> &shard_indices) {
DistillingInputFilter input_filter{
/*feature_frequency_threshold=*/1,
env.MakeDomainDiscardMask(),
};
// Do not limit the max RAM.
ResourcePool ram_pool{RUsageMemory::Max()};
// Read the input shards sequentially and in order to ensure deterministic
// outputs.
DistillToOneOutputShard( //
env, shard_indices, input_filter, ram_pool, /*parallelism=*/1);
}
} // namespace fuzztest::internal