pw_protobuf: Encoder rework

Rewrites the pw_protobuf encoder to be oriented around pw_stream
compatibility.

Improvements:
 * Only the deepest nested encoder can be used at any given moment.
   Using a parent encoder triggers a crash rather than silently
   corrupting data.
 * Nested depth is bounded by scratch buffer size and stack usage rather
   than template arguments.
 * Can write to pw::stream::Writer objects without any temporary buffer.
 * Encoders can be explicitly finalized to allow returning to the parent
   encoder for further modifications without needing to create scopes to
   handle RAII behavior.
 * Proto encode interfaces can standardize on the StreamingEncoder as
   any StreamingEncoder can nest until the temporary buffer is
   exhausted.
 * Better handling of endianness.

Change-Id: I3c50abbf2fe28328a9cb1aa527cf829abe583218
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/40245
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
Pigweed-Auto-Submit: Armando Montanez <amontanez@google.com>
Reviewed-by: Wyatt Hepler <hepler@google.com>
Reviewed-by: Ewout van Bekkum <ewout@google.com>
Reviewed-by: Keir Mierle <keir@google.com>
diff --git a/pw_protobuf/BUILD b/pw_protobuf/BUILD
index d3772ad..32b5877 100644
--- a/pw_protobuf/BUILD
+++ b/pw_protobuf/BUILD
@@ -38,6 +38,7 @@
         "decoder.cc",
         "encoder.cc",
         "find.cc",
+        "streaming_encoder.cc",
     ],
     hdrs = [
         "public/pw_protobuf/codegen.h",
@@ -45,15 +46,18 @@
         "public/pw_protobuf/encoder.h",
         "public/pw_protobuf/find.h",
         "public/pw_protobuf/serialized_size.h",
+        "public/pw_protobuf/streaming_encoder.h",
         "public/pw_protobuf/wire_format.h",
     ],
     includes = ["public"],
     deps = [
         ":config",
+        "//pw_assert",
         "//pw_bytes",
         "//pw_result",
         "//pw_span",
         "//pw_status",
+        "//pw_stream",
         "//pw_varint",
     ],
 )
@@ -78,6 +82,15 @@
 )
 
 pw_cc_test(
+    name = "streaming_encoder_test",
+    srcs = ["streaming_encoder_test.cc"],
+    deps = [
+        ":pw_protobuf",
+        "//pw_unit_test",
+    ],
+)
+
+pw_cc_test(
     name = "find_test",
     srcs = ["find_test.cc"],
     deps = [
diff --git a/pw_protobuf/BUILD.gn b/pw_protobuf/BUILD.gn
index ceb9947..82b3d60 100644
--- a/pw_protobuf/BUILD.gn
+++ b/pw_protobuf/BUILD.gn
@@ -45,9 +45,11 @@
   public_configs = [ ":public_include_path" ]
   public_deps = [
     ":config",
+    dir_pw_assert,
     dir_pw_bytes,
     dir_pw_result,
     dir_pw_status,
+    dir_pw_stream,
     dir_pw_varint,
   ]
   public = [
@@ -56,12 +58,14 @@
     "public/pw_protobuf/encoder.h",
     "public/pw_protobuf/find.h",
     "public/pw_protobuf/serialized_size.h",
+    "public/pw_protobuf/streaming_encoder.h",
     "public/pw_protobuf/wire_format.h",
   ]
   sources = [
     "decoder.cc",
     "encoder.cc",
     "find.cc",
+    "streaming_encoder.cc",
   ]
 }
 
@@ -69,6 +73,7 @@
   sources = [
     "decoding.rst",
     "docs.rst",
+    "encoding.rst",
   ]
   report_deps = [
     "size_report:decoder_full",
@@ -84,6 +89,7 @@
     ":encoder_fuzzer",
     ":find_test",
     ":varint_size_test",
+    ":streaming_encoder_test",
   ]
 }
 
@@ -97,6 +103,11 @@
   sources = [ "encoder_test.cc" ]
 }
 
+pw_test("streaming_encoder_test") {
+  deps = [ ":pw_protobuf" ]
+  sources = [ "streaming_encoder_test.cc" ]
+}
+
 pw_test("find_test") {
   deps = [ ":pw_protobuf" ]
   sources = [ "find_test.cc" ]
diff --git a/pw_protobuf/CMakeLists.txt b/pw_protobuf/CMakeLists.txt
index 6d821b5..feef667 100644
--- a/pw_protobuf/CMakeLists.txt
+++ b/pw_protobuf/CMakeLists.txt
@@ -20,10 +20,13 @@
     decoder.cc
     encoder.cc
     find.cc
+    streaming_encoder.cc
   PUBLIC_DEPS
+    pw_assert
     pw_bytes
     pw_result
     pw_status
+    pw_stream
     pw_varint
 )
 
@@ -57,6 +60,16 @@
     pw_protobuf
 )
 
+pw_add_test(pw_protobuf.streaming_encoder_test
+  SOURCES
+    streaming_encoder_test.cc
+  DEPS
+    pw_protobuf
+  GROUPS
+    modules
+    pw_protobuf
+)
+
 pw_add_test(pw_protobuf.codegen_test
   SOURCES
     codegen_test.cc
diff --git a/pw_protobuf/docs.rst b/pw_protobuf/docs.rst
index 5a02779..7459afe 100644
--- a/pw_protobuf/docs.rst
+++ b/pw_protobuf/docs.rst
@@ -66,6 +66,7 @@
 .. toctree::
   :maxdepth: 1
 
+  encoding
   decoding
 
 Comparison with other protobuf libraries
diff --git a/pw_protobuf/encoding.rst b/pw_protobuf/encoding.rst
new file mode 100644
index 0000000..a7259db
--- /dev/null
+++ b/pw_protobuf/encoding.rst
@@ -0,0 +1,122 @@
+.. _module-pw_protobuf-encoding:
+
+--------------------
+pw_protobuf Encoding
+--------------------
+
+Usage
+=====
+Pigweed's protobuf encoders encode directly to the wire format of a proto rather
+than staging information to a mutable datastructure. This means any writes of a
+value are final, and can't be referenced or modified as a later step in the
+encode process.
+
+MemoryEncoder
+-------------
+A MemoryEncoder directly encodes a proto to an in-memory buffer.
+
+.. Code:: cpp
+
+  // Writes a proto response to the provided buffer, returning the encode
+  // status and number of bytes written.
+  StatusWithSize WriteProtoResponse(ByteSpan response) {
+    // All proto writes are directly written to the `response` buffer.
+    MemoryEncoder encoder(response);
+    encoder.WriteUint32(kMagicNumberField, 0x1a1a2b2b);
+    encoder.WriteString(kFavoriteFood, "cookies");
+    return StatusWithSize(encoder.status(), encoder.size());
+  }
+
+StreamingEncoder
+----------------
+pw_protobuf's StreamingEncoder class operates on pw::stream::Writer objects to
+serialized proto data. This means you can directly encode a proto to something
+like pw::sys_io without needing to build the complete message in memory first.
+
+.. Code:: cpp
+
+  #include "pw_protobuf/streaming_encoder.h"
+  #include "pw_stream/sys_io_stream.h"
+  #include "pw_bytes/span.h"
+
+  pw::stream::SysIoWriter sys_io_writer;
+  pw::protobuf::StreamingEncoder my_proto_encoder(sys_io_writer,
+                                                  pw::ByteSpan());
+
+  // Once this line returns, the field has been written to the Writer.
+  my_proto_encoder.WriteInt64(kTimestampFieldNumber, system::GetUnixEpoch());
+
+  // There's no intermediate buffering when writing a string directly to a
+  // StreamingEncoder.
+  my_proto_encoder.WriteString(kWelcomeMessageFieldNumber,
+                               "Welcome to Pigweed!");
+  if (!my_proto_encoder.status().ok()) {
+    PW_LOG_INFO("Failed to encode proto; %s", my_proto_encoder.status().str());
+  }
+
+Nested submessages
+------------------
+Writing proto messages with nested submessages requires buffering due to
+limitations of the proto format. Every proto submessage must know the size of
+the submessage before its final serialization can begin. A streaming encoder can
+be passed a scratch buffer to use when constructing nested messages. All
+submessage data is buffered to this scratch buffer until the submessage is
+finalized. Note that the contents of this scratch buffer is not necessarily
+valid proto data, so don't try to use it directly.
+
+MemoryEncoder objects use the final destination buffer rather than relying on a
+scratch buffer. Note that this means your destination buffer might need
+additional space for overhead incurred by nesting submessages. The
+``MaxScratchBufferSize()`` helper function can be useful in estimating how much
+space to allocate to account for nested submessage encoding overhead.
+
+.. Code:: cpp
+
+  #include "pw_protobuf/streaming_encoder.h"
+  #include "pw_stream/sys_io_stream.h"
+  #include "pw_bytes/span.h"
+
+  pw::stream::SysIoWriter sys_io_writer;
+  // The scratch buffer should be at least as big as the largest nested
+  // submessage. It's a good idea to be a little generous.
+  std::byte submessage_scratch_buffer[64];
+
+  // Provide the scratch buffer to the proto encoder. The buffer's lifetime must
+  // match the lifetime of the encoder.
+  pw::protobuf::StreamingEncoder my_proto_encoder(sys_io_writer,
+                                                  submessage_scratch_buffer);
+
+  StreamingEncoder nested_encoder =
+      my_proto_encoder.GetNestedEncoder(kPetsFieldNumber);
+
+  // There's no intermediate buffering when writing a string directly to a
+  // StreamingEncoder.
+  nested_encoder.WriteString(kNameFieldNumber, "Spot");
+  nested_encoder.WriteString(kPetTypeFieldNumber, "dog");
+  // Since this message is only nested one deep, the submessage is serialized to
+  // the writer as soon as we finalize the submessage.
+  PW_CHECK_OK(nested_encoder.Finalize());
+
+  {  // If a nested_encoder is destroyed it will silently Finalize().
+    StreamingEncoder nested_encoder_2 =
+        my_proto_encoder.GetNestedEncoder(kPetsFieldNumber);
+    nested_encoder_2.WriteString(kNameFieldNumber, "Slippers");
+    nested_encoder_2.WriteString(kPetTypeFieldNumber, "rabbit");
+  }  // When this scope ends, the nested encoder is serialized to the Writer.
+
+  // If an encode error occurs when encoding the nested messages, it will be
+  // reflected at the root encoder.
+  if (!my_proto_encoder.status().ok()) {
+    PW_LOG_INFO("Failed to encode proto; %s", my_proto_encoder.status().str());
+  }
+
+.. warning::
+  When a nested submessage is created, any writes to the parent encoder that
+  created the nested encoder will trigger a crash. To resume writing to
+  a parent encoder, Finalize() the submessage encoder first.
+
+Error Handling
+--------------
+While individual write calls on a proto encoder return pw::Status objects, the
+encoder tracks all status returns and "latches" onto the first error
+encountered. This status can be accessed via ``StreamingEncoder::status()``.
diff --git a/pw_protobuf/public/pw_protobuf/encoder.h b/pw_protobuf/public/pw_protobuf/encoder.h
index fbf9acf..92623eb 100644
--- a/pw_protobuf/public/pw_protobuf/encoder.h
+++ b/pw_protobuf/public/pw_protobuf/encoder.h
@@ -27,6 +27,8 @@
 namespace pw::protobuf {
 
 // A streaming protobuf encoder which encodes to a user-specified buffer.
+//
+// Warning: This encoder will soon be deprecated in favor of the MemoryEncoder.
 class Encoder {
  public:
   using SizeType = config::SizeType;
@@ -46,13 +48,6 @@
   Encoder(const Encoder& other) = delete;
   Encoder& operator=(const Encoder& other) = delete;
 
-  // Per the protobuf specification, valid field numbers range between 1 and
-  // 2**29 - 1, inclusive. The numbers 19000-19999 are reserved for internal
-  // use.
-  constexpr static uint32_t kMaxFieldNumber = (1u << 29) - 1;
-  constexpr static uint32_t kFirstReservedNumber = 19000;
-  constexpr static uint32_t kLastReservedNumber = 19999;
-
   // Writes a proto uint32 key-value pair.
   Status WriteUint32(uint32_t field_number, uint32_t value) {
     return WriteUint64(field_number, value);
@@ -275,12 +270,6 @@
   }
 
  private:
-  constexpr bool ValidFieldNumber(uint32_t field_number) const {
-    return field_number != 0 && field_number <= kMaxFieldNumber &&
-           !(field_number >= kFirstReservedNumber &&
-             field_number <= kLastReservedNumber);
-  }
-
   // Encodes the key for a proto field consisting of its number and wire type.
   Status WriteFieldKey(uint32_t field_number, WireType wire_type) {
     if (!ValidFieldNumber(field_number)) {
@@ -360,7 +349,9 @@
   Status encode_status_;
 };
 
-// A proto encoder with its own blob stack.
+// A Encoder that allocates the necessary buffers to enable nested submessages.
+//
+// Warning: This encoder will soon be deprecated in favor of the MemoryEncoder.
 template <size_t kMaxNestedDepth = 1, size_t kMaxBlobs = kMaxNestedDepth>
 class NestedEncoder : public Encoder {
  public:
diff --git a/pw_protobuf/public/pw_protobuf/streaming_encoder.h b/pw_protobuf/public/pw_protobuf/streaming_encoder.h
new file mode 100644
index 0000000..2e3d761
--- /dev/null
+++ b/pw_protobuf/public/pw_protobuf/streaming_encoder.h
@@ -0,0 +1,528 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#pragma once
+
+#include <algorithm>
+#include <array>
+#include <bit>
+#include <cstddef>
+#include <cstring>
+#include <span>
+#include <string_view>
+
+#include "pw_assert/assert.h"
+#include "pw_bytes/endian.h"
+#include "pw_bytes/span.h"
+#include "pw_protobuf/config.h"
+#include "pw_protobuf/wire_format.h"
+#include "pw_status/status.h"
+#include "pw_status/try.h"
+#include "pw_stream/memory_stream.h"
+#include "pw_stream/stream.h"
+#include "pw_varint/varint.h"
+
+namespace pw::protobuf {
+
+// Provides a size estimate to help with sizing buffers passed to
+// StreamingEncoder and MemoryEncoder objects.
+//
+// Args:
+//   max_message_size: For MemoryEncoder objects, this is the max expected size
+//     of the final proto. For StreamingEncoder objects, this should be the max
+//     size of any nested proto submessage that will be built with this encoder
+//     (recursively accumulating the size from the root submessage). If your
+//     proto will encode many large submessages, this value should just be the
+//     size of the largest one.
+//  max_nested_depth: The max number of nested submessage encoders that are
+//     expected to be open simultaneously to encode this proto message.
+static constexpr size_t MaxScratchBufferSize(size_t max_message_size,
+                                             size_t max_nested_depth) {
+  return max_message_size + max_nested_depth * config::kMaxVarintSize;
+}
+
+// Forward declaration. StreamingEncoder and MemoryEncoder are very tightly
+// coupled.
+class MemoryEncoder;
+
+// A protobuf encoder that encodes serialized proto data to a
+// pw::stream::Writer.
+class StreamingEncoder {
+ public:
+  // The StreamingEncoder will serialize proto data to the pw::stream::Writer
+  // provided through the constructor. The scratch buffer provided is for
+  // internal use ONLY and should not be considered valid proto data.
+  //
+  // If a StreamingEncoder object will be writing nested proto messages, it must
+  // provide a scratch buffer large enough to hold the largest submessage some
+  // additional overhead incurred by the encoder's implementation. It's a good
+  // idea to be generous when sizing this buffer. MaxScratchBufferSize() can be
+  // helpful in providing an estimated size for this buffer. The scratch buffer
+  // must exist for the lifetime of the StreamingEncoder object.
+  //
+  // StreamingEncoder objects that do not write nested proto messages can
+  // provide a zero-length scratch buffer.
+  constexpr StreamingEncoder(stream::Writer& writer, ByteSpan scratch_buffer)
+      : writer_(writer),
+        status_(OkStatus()),
+        parent_(nullptr),
+        nested_field_number_(0),
+        memory_writer_(scratch_buffer) {}
+  ~StreamingEncoder() { Finalize(); }
+
+  // Disallow copy/assign to avoid confusion about who owns the buffer.
+  StreamingEncoder(const StreamingEncoder& other) = delete;
+  StreamingEncoder& operator=(const StreamingEncoder& other) = delete;
+
+  // It's not safe to move an encoder as it could cause another encoder's
+  // parent_ pointer to become invalid.
+  StreamingEncoder(StreamingEncoder&& other) = delete;
+  StreamingEncoder& operator=(StreamingEncoder&& other) = delete;
+
+  // Forwards the conservative write limit of the underlying pw::stream::Writer.
+  //
+  // Precondition: Encoder has no active child encoder.
+  size_t ConservativeWriteLimit() const {
+    PW_ASSERT(!nested_encoder_open());
+    return writer_.ConservativeWriteLimit();
+  }
+
+  // Creates a nested encoder with the provided field number. Once this is
+  // called, the parent encoder is locked and not available for use until the
+  // nested encoder is finalized (either explicitly or through destruction).
+  //
+  // Precondition: Encoder has no active child encoder.
+  StreamingEncoder GetNestedEncoder(uint32_t field_number);
+
+  // Closes the proto encoder. If this encoder is a nested one, the parent is
+  // unlocked and proto encoding may resume on the parent. This is automatically
+  // called on object destruction.
+  //
+  // Precondition: Encoder has no active child encoder.
+  //
+  // Returns:
+  //   OutOfRange: Insufficient space reserved for the submessage. This
+  //     usually means config::kMaxVarintSize was set too small.
+  Status Finalize();
+
+  Status status() const {
+    if (nested_encoder_open()) {
+      return Status::Unavailable();
+    }
+    return status_;
+  }
+
+  // Writes a proto uint32 key-value pair.
+  //
+  // Precondition: Encoder has no active child encoder.
+  Status WriteUint32(uint32_t field_number, uint32_t value) {
+    return WriteUint64(field_number, value);
+  }
+
+  // Writes a repeated uint32 using packed encoding.
+  //
+  // Precondition: Encoder has no active child encoder.
+  Status WritePackedUint32(uint32_t field_number,
+                           std::span<const uint32_t> values) {
+    return WritePackedVarints(field_number, values, VarintEncodeType::kNormal);
+  }
+
+  // Writes a proto uint64 key-value pair.
+  //
+  // Precondition: Encoder has no active child encoder.
+  Status WriteUint64(uint32_t field_number, uint64_t value) {
+    return WriteVarintField(field_number, value);
+  }
+
+  // Writes a repeated uint64 using packed encoding.
+  //
+  // Precondition: Encoder has no active child encoder.
+  Status WritePackedUint64(uint64_t field_number,
+                           std::span<const uint64_t> values) {
+    return WritePackedVarints(field_number, values, VarintEncodeType::kNormal);
+  }
+
+  // Writes a proto int32 key-value pair.
+  //
+  // Precondition: Encoder has no active child encoder.
+  Status WriteInt32(uint32_t field_number, int32_t value) {
+    return WriteUint64(field_number, value);
+  }
+
+  // Writes a repeated int32 using packed encoding.
+  //
+  // Precondition: Encoder has no active child encoder.
+  Status WritePackedInt32(uint32_t field_number,
+                          std::span<const int32_t> values) {
+    return WritePackedVarints(
+        field_number,
+        std::span(reinterpret_cast<const uint32_t*>(values.data()),
+                  values.size()),
+        VarintEncodeType::kNormal);
+  }
+
+  // Writes a proto int64 key-value pair.
+  //
+  // Precondition: Encoder has no active child encoder.
+  Status WriteInt64(uint32_t field_number, int64_t value) {
+    return WriteUint64(field_number, value);
+  }
+
+  // Writes a repeated int64 using packed encoding.
+  //
+  // Precondition: Encoder has no active child encoder.
+  Status WritePackedInt64(uint32_t field_number,
+                          std::span<const int64_t> values) {
+    return WritePackedVarints(
+        field_number,
+        std::span(reinterpret_cast<const uint64_t*>(values.data()),
+                  values.size()),
+        VarintEncodeType::kNormal);
+  }
+
+  // Writes a proto sint32 key-value pair.
+  //
+  // Precondition: Encoder has no active child encoder.
+  Status WriteSint32(uint32_t field_number, int32_t value) {
+    return WriteUint64(field_number, varint::ZigZagEncode(value));
+  }
+
+  // Writes a repeated sint32 using packed encoding.
+  //
+  // Precondition: Encoder has no active child encoder.
+  Status WritePackedSint32(uint32_t field_number,
+                           std::span<const int32_t> values) {
+    return WritePackedVarints(
+        field_number,
+        std::span(reinterpret_cast<const uint32_t*>(values.data()),
+                  values.size()),
+        VarintEncodeType::kZigZag);
+  }
+
+  // Writes a proto sint64 key-value pair.
+  //
+  // Precondition: Encoder has no active child encoder.
+  Status WriteSint64(uint32_t field_number, int64_t value) {
+    return WriteUint64(field_number, varint::ZigZagEncode(value));
+  }
+
+  // Writes a repeated sint64 using packed encoding.
+  //
+  // Precondition: Encoder has no active child encoder.
+  Status WritePackedSint64(uint32_t field_number,
+                           std::span<const int64_t> values) {
+    return WritePackedVarints(
+        field_number,
+        std::span(reinterpret_cast<const uint64_t*>(values.data()),
+                  values.size()),
+        VarintEncodeType::kZigZag);
+  }
+
+  // Writes a proto bool key-value pair.
+  //
+  // Precondition: Encoder has no active child encoder.
+  Status WriteBool(uint32_t field_number, bool value) {
+    return WriteUint32(field_number, static_cast<uint32_t>(value));
+  }
+
+  // Writes a proto fixed32 key-value pair.
+  //
+  // Precondition: Encoder has no active child encoder.
+  Status WriteFixed32(uint32_t field_number, uint32_t value) {
+    std::array<std::byte, sizeof(value)> data =
+        bytes::CopyInOrder(std::endian::little, value);
+    return WriteFixed(field_number, data);
+  }
+
+  // Writes a repeated fixed32 field using packed encoding.
+  //
+  // Precondition: Encoder has no active child encoder.
+  Status WritePackedFixed32(uint32_t field_number,
+                            std::span<const uint32_t> values) {
+    return WritePackedFixed(
+        field_number, std::as_bytes(values), sizeof(uint32_t));
+  }
+
+  // Writes a proto fixed64 key-value pair.
+  //
+  // Precondition: Encoder has no active child encoder.
+  Status WriteFixed64(uint32_t field_number, uint64_t value) {
+    std::array<std::byte, sizeof(value)> data =
+        bytes::CopyInOrder(std::endian::little, value);
+    return WriteFixed(field_number, data);
+  }
+
+  // Writes a repeated fixed64 field using packed encoding.
+  //
+  // Precondition: Encoder has no active child encoder.
+  Status WritePackedFixed64(uint32_t field_number,
+                            std::span<const uint64_t> values) {
+    return WritePackedFixed(
+        field_number, std::as_bytes(values), sizeof(uint64_t));
+  }
+
+  // Writes a proto sfixed32 key-value pair.
+  //
+  // Precondition: Encoder has no active child encoder.
+  Status WriteSfixed32(uint32_t field_number, int32_t value) {
+    return WriteFixed32(field_number, static_cast<uint32_t>(value));
+  }
+
+  // Writes a repeated sfixed32 field using packed encoding.
+  //
+  // Precondition: Encoder has no active child encoder.
+  Status WritePackedSfixed32(uint32_t field_number,
+                             std::span<const int32_t> values) {
+    return WritePackedFixed(
+        field_number, std::as_bytes(values), sizeof(int32_t));
+  }
+
+  // Writes a proto sfixed64 key-value pair.
+  //
+  // Precondition: Encoder has no active child encoder.
+  Status WriteSfixed64(uint32_t field_number, int64_t value) {
+    return WriteFixed64(field_number, static_cast<uint64_t>(value));
+  }
+
+  // Writes a repeated sfixed64 field using packed encoding.
+  //
+  // Precondition: Encoder has no active child encoder.
+  Status WritePackedSfixed64(uint32_t field_number,
+                             std::span<const int64_t> values) {
+    return WritePackedFixed(
+        field_number, std::as_bytes(values), sizeof(int64_t));
+  }
+
+  // Writes a proto float key-value pair.
+  //
+  // Precondition: Encoder has no active child encoder.
+  Status WriteFloat(uint32_t field_number, float value) {
+    static_assert(sizeof(float) == sizeof(uint32_t),
+                  "Float and uint32_t are not the same size");
+    uint32_t integral_value;
+    std::memcpy(&integral_value, &value, sizeof(value));
+    std::array<std::byte, sizeof(value)> data =
+        bytes::CopyInOrder(std::endian::little, integral_value);
+    return WriteFixed(field_number, data);
+  }
+
+  // Writes a repeated float field using packed encoding.
+  //
+  // Precondition: Encoder has no active child encoder.
+  Status WritePackedFloat(uint32_t field_number,
+                          std::span<const float> values) {
+    return WritePackedFixed(field_number, std::as_bytes(values), sizeof(float));
+  }
+
+  // Writes a proto double key-value pair.
+  //
+  // Precondition: Encoder has no active child encoder.
+  Status WriteDouble(uint32_t field_number, double value) {
+    static_assert(sizeof(double) == sizeof(uint64_t),
+                  "Double and uint64_t are not the same size");
+    uint64_t integral_value;
+    std::memcpy(&integral_value, &value, sizeof(value));
+    std::array<std::byte, sizeof(value)> data =
+        bytes::CopyInOrder(std::endian::little, integral_value);
+    return WriteFixed(field_number, data);
+  }
+
+  // Writes a repeated double field using packed encoding.
+  //
+  // Precondition: Encoder has no active child encoder.
+  Status WritePackedDouble(uint32_t field_number,
+                           std::span<const double> values) {
+    return WritePackedFixed(
+        field_number, std::as_bytes(values), sizeof(double));
+  }
+
+  // Writes a proto `bytes` field as a key-value pair. This can also be used to
+  // write a pre-encoded nested submessage directly without using a nested
+  // encoder.
+  //
+  // Precondition: Encoder has no active child encoder.
+  Status WriteBytes(uint32_t field_number, ConstByteSpan value) {
+    return WriteLengthDelimitedField(field_number, value);
+  }
+
+  // Writes a proto string key-value pair.
+  //
+  // Precondition: Encoder has no active child encoder.
+  Status WriteString(uint32_t field_number, std::string_view value) {
+    return WriteBytes(field_number, std::as_bytes(std::span(value)));
+  }
+
+ private:
+  friend class MemoryEncoder;
+
+  enum class VarintEncodeType {
+    kNormal,
+    kZigZag,
+  };
+
+  constexpr StreamingEncoder(StreamingEncoder& parent, ByteSpan scratch_buffer)
+      : writer_(memory_writer_),
+        status_(scratch_buffer.empty() ? Status::ResourceExhausted()
+                                       : OkStatus()),
+        parent_(&parent),
+        nested_field_number_(0),
+        memory_writer_(scratch_buffer) {}
+
+  bool nested_encoder_open() const { return nested_field_number_ != 0; }
+
+  // Finalization logic for nested encoders that call Finalize(). While
+  // Finalize() is called on the child encoder, FinalizeNestedMessage() is
+  // called on the parent encoder.
+  Status FinalizeNestedMessage(StreamingEncoder& nested);
+
+  // Implementation for encoding all varint field types.
+  Status WriteVarintField(uint32_t field_number, uint64_t value);
+
+  // Implementation for encoding all length-delimited field types.
+  Status WriteLengthDelimitedField(uint32_t field_number, ConstByteSpan data);
+
+  // Implementation for encoding all fixed-length integer types.
+  Status WriteFixed(uint32_t field_number, ConstByteSpan data);
+
+  // Encodes a base-128 varint to the buffer.
+  Status WriteVarint(uint64_t value);
+
+  Status WriteZigzagVarint(int64_t value) {
+    return WriteVarint(varint::ZigZagEncode(value));
+  }
+
+  // Writes a list of varints to the buffer in length-delimited packed encoding.
+  template <typename T, typename = std::enable_if_t<std::is_integral<T>::value>>
+  Status WritePackedVarints(uint32_t field_number,
+                            std::span<T> values,
+                            VarintEncodeType encode_type) {
+    static_assert(std::is_same<T, const uint32_t>::value ||
+                      std::is_same<T, const int32_t>::value ||
+                      std::is_same<T, const uint64_t>::value ||
+                      std::is_same<T, const int64_t>::value,
+                  "Packed varints must be of type uint32_t, int32_t, uint64_t, "
+                  "or int64_t");
+
+    size_t payload_size = 0;
+    for (T val : values) {
+      if (encode_type == VarintEncodeType::kZigZag) {
+        int64_t integer =
+            static_cast<int64_t>(static_cast<std::make_signed_t<T>>(val));
+        payload_size += varint::EncodedSize(varint::ZigZagEncode(integer));
+      } else {
+        uint64_t integer = static_cast<uint64_t>(val);
+        payload_size += varint::EncodedSize(integer);
+      }
+    }
+
+    if (!UpdateStatusForWrite(field_number, WireType::kDelimited, payload_size)
+             .ok()) {
+      return status_;
+    }
+
+    WriteVarint(MakeKey(field_number, WireType::kDelimited));
+    WriteVarint(payload_size);
+    for (T value : values) {
+      if (encode_type == VarintEncodeType::kZigZag) {
+        WriteZigzagVarint(static_cast<std::make_signed_t<T>>(value));
+      } else {
+        WriteVarint(value);
+      }
+    }
+
+    return status_;
+  }
+
+  // Writes a list of fixed-size types to the buffer in length-delimited
+  // packed encoding. Only float, double, uint32_t, int32_t, uint64_t, and
+  // int64_t are permitted
+  Status WritePackedFixed(uint32_t field_number,
+                          std::span<const std::byte> values,
+                          size_t elem_size);
+
+  // Checks if a write is invalid or will cause the encoder to enter an error
+  // state, and preemptively sets this encoder's status to that error to block
+  // the write. Only the first error encountered is tracked.
+  //
+  // Precondition: Encoder has no active child encoder.
+  //
+  // Returns:
+  //   InvalidArgument: The field number provided was invalid.
+  //   ResourceExhausted: The requested write would have exceeded the
+  //     stream::Writer's conservative write limit.
+  //   Other: If any Write() operations on the stream::Writer caused an error,
+  //     that error will be repeated here.
+  Status UpdateStatusForWrite(uint32_t field_number,
+                              WireType type,
+                              size_t data_size);
+
+  // All proto encode operations are directly written to this writer.
+  stream::Writer& writer_;
+
+  // The current encoder status. This status is only updated to reflect the
+  // first error encountered. Any further write operations are blocked when the
+  // encoder enters an error state.
+  Status status_;
+
+  // If this is a nested encoder, this points to the encoder that created it.
+  // For user-created MemoryEncoders, parent_ points to this object as an
+  // optimization for the MemoryEncoder and nested encoders to use the same
+  // underlying buffer.
+  StreamingEncoder* parent_;
+
+  // If an encoder has a child encoder open, this is the field number of that
+  // submessage. Otherwise, this is 0 to indicate no child encoder is open.
+  uint32_t nested_field_number_;
+
+  // This memory writer is used for staging proto submessages to the
+  // scratch_buffer.
+  stream::MemoryWriter memory_writer_;
+};
+
+// A protobuf encoder that writes directly to a provided buffer. This will
+// eventually replace the original pw::protobuf::Encoder. If you want to encode
+// a proto into an in-memory buffer, use this.
+//
+// Example:
+//
+//   // Writes a proto response to the provided buffer, returning the encode
+//   // status and number of bytes written.
+//   StatusWithSize WriteProtoResponse(ByteSpan response) {
+//     // All proto writes are directly written to the `response` buffer.
+//     MemoryEncoder encoder(response);
+//     encoder.WriteUint32(kMagicNumberField, 0x1a1a2b2b);
+//     encoder.WriteString(kFavoriteFood, "cookies");
+//     return StatusWithSize(encoder.status(), encoder.size());
+//   }
+//
+// Note: Avoid using a MemoryEncoder reference as an argument for a function.
+// The StreamingEncoder is more generic.
+class MemoryEncoder : public StreamingEncoder {
+ public:
+  constexpr MemoryEncoder(ByteSpan dest) : StreamingEncoder(*this, dest) {}
+  ~MemoryEncoder() { Finalize(); }
+
+  // Disallow copy/assign to avoid confusion about who owns the buffer.
+  MemoryEncoder(const MemoryEncoder& other) = delete;
+  MemoryEncoder& operator=(const MemoryEncoder& other) = delete;
+
+  // It's not safe to move an encoder as it could cause another encoder's
+  // parent_ pointer to become invalid.
+  MemoryEncoder(MemoryEncoder&& other) = delete;
+  MemoryEncoder& operator=(MemoryEncoder&& other) = delete;
+
+  const std::byte* data() const { return memory_writer_.data(); }
+  size_t size() const { return memory_writer_.bytes_written(); }
+};
+
+}  // namespace pw::protobuf
diff --git a/pw_protobuf/public/pw_protobuf/wire_format.h b/pw_protobuf/public/pw_protobuf/wire_format.h
index a4b73a1..fa6071c 100644
--- a/pw_protobuf/public/pw_protobuf/wire_format.h
+++ b/pw_protobuf/public/pw_protobuf/wire_format.h
@@ -17,6 +17,13 @@
 
 namespace pw::protobuf {
 
+// Per the protobuf specification, valid field numbers range between 1 and
+// 2**29 - 1, inclusive. The numbers 19000-19999 are reserved for internal
+// use.
+constexpr static uint32_t kMaxFieldNumber = (1u << 29) - 1;
+constexpr static uint32_t kFirstReservedNumber = 19000;
+constexpr static uint32_t kLastReservedNumber = 19999;
+
 enum class WireType {
   kVarint = 0,
   kFixed64 = 1,
@@ -32,4 +39,10 @@
   return (field_number << kFieldNumberShift | static_cast<uint32_t>(wire_type));
 }
 
+constexpr bool ValidFieldNumber(uint32_t field_number) {
+  return field_number != 0 && field_number <= kMaxFieldNumber &&
+         !(field_number >= kFirstReservedNumber &&
+           field_number <= kLastReservedNumber);
+}
+
 }  // namespace pw::protobuf
diff --git a/pw_protobuf/streaming_encoder.cc b/pw_protobuf/streaming_encoder.cc
new file mode 100644
index 0000000..8311a77
--- /dev/null
+++ b/pw_protobuf/streaming_encoder.cc
@@ -0,0 +1,215 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#include "pw_protobuf/streaming_encoder.h"
+
+#include <cstddef>
+#include <cstring>
+#include <span>
+
+#include "pw_assert/check.h"
+#include "pw_bytes/span.h"
+#include "pw_protobuf/wire_format.h"
+#include "pw_status/status.h"
+#include "pw_status/try.h"
+#include "pw_stream/memory_stream.h"
+#include "pw_stream/stream.h"
+#include "pw_varint/varint.h"
+
+namespace pw::protobuf {
+
+StreamingEncoder StreamingEncoder::GetNestedEncoder(uint32_t field_number) {
+  PW_CHECK(!nested_encoder_open());
+  nested_field_number_ = field_number;
+
+  // Pass the unused space of the scratch buffer to the nested encoder to use
+  // as their scratch buffer.
+  size_t key_size =
+      varint::EncodedSize(MakeKey(field_number, WireType::kDelimited));
+  size_t reserved_size = key_size + config::kMaxVarintSize;
+  size_t max_size = std::min(memory_writer_.ConservativeWriteLimit(),
+                             writer_.ConservativeWriteLimit());
+
+  ByteSpan nested_buffer;
+  if (reserved_size < max_size) {
+    nested_buffer = ByteSpan(
+        memory_writer_.data() + reserved_size + memory_writer_.bytes_written(),
+        max_size - reserved_size);
+  } else {
+    nested_buffer = ByteSpan();
+  }
+  return StreamingEncoder(*this, nested_buffer);
+}
+
+Status StreamingEncoder::Finalize() {
+  // If an encoder has no parent, finalize is a no-op.
+  if (parent_ == nullptr) {
+    return OkStatus();
+  }
+
+  PW_CHECK(!nested_encoder_open(),
+           "Tried to finalize a proto encoder when it has an active submessage "
+           "encoder that hasn't been finalized!");
+  // MemoryWriters with their parent set to themselves are externally
+  // created by users. It's not valid or necessary to call
+  // FinalizeNestedMessage() on themselves.
+  if (parent_ == this) {
+    return OkStatus();
+  }
+
+  return parent_->FinalizeNestedMessage(*this);
+}
+
+Status StreamingEncoder::FinalizeNestedMessage(StreamingEncoder& nested) {
+  PW_DCHECK_PTR_EQ(
+      nested.parent_,
+      this,
+      "FinalizeNestedMessage() called on the wrong Encoder parent");
+
+  // Make the nested encoder look like it has an open child to block writes for
+  // the remainder of the object's life.
+  nested.nested_field_number_ = kFirstReservedNumber;
+  nested.parent_ = nullptr;
+  // Temporarily cache the field number of the child so we can re-enable
+  // writing to this encoder.
+  uint32_t temp_field_number = nested_field_number_;
+  nested_field_number_ = 0;
+
+  // TODO(amontanez): If a submessage fails, we could optionally discard
+  // it and continue happily. For now, we'll always invalidate the entire
+  // encoder if a single submessage fails.
+  status_.Update(nested.status_);
+  PW_TRY(status_);
+
+  if (varint::EncodedSize(nested.memory_writer_.bytes_written()) >
+      config::kMaxVarintSize) {
+    status_ = Status::OutOfRange();
+    return status_;
+  }
+
+  status_ = WriteLengthDelimitedField(temp_field_number,
+                                      nested.memory_writer_.WrittenData());
+  return status_;
+}
+
+Status StreamingEncoder::WriteVarintField(uint32_t field_number,
+                                          uint64_t value) {
+  PW_TRY(UpdateStatusForWrite(
+      field_number, WireType::kVarint, varint::EncodedSize(value)));
+
+  WriteVarint(MakeKey(field_number, WireType::kVarint));
+  return WriteVarint(value);
+}
+
+Status StreamingEncoder::WriteLengthDelimitedField(uint32_t field_number,
+                                                   ConstByteSpan data) {
+  PW_TRY(UpdateStatusForWrite(field_number, WireType::kDelimited, data.size()));
+  WriteVarint(MakeKey(field_number, WireType::kDelimited));
+  WriteVarint(data.size_bytes());
+  if (Status status = writer_.Write(data); !status.ok()) {
+    status_ = status;
+  }
+  return status_;
+}
+
+Status StreamingEncoder::WriteFixed(uint32_t field_number, ConstByteSpan data) {
+  WireType type =
+      data.size() == sizeof(uint32_t) ? WireType::kFixed32 : WireType::kFixed64;
+
+  PW_TRY(UpdateStatusForWrite(field_number, type, data.size()));
+
+  WriteVarint(MakeKey(field_number, type));
+  if (Status status = writer_.Write(data); !status.ok()) {
+    status_ = status;
+  }
+  return status_;
+}
+
+// Encodes a base-128 varint to the buffer. This function assumes the caller
+// has already checked UpdateStatusForWrite() to ensure the writer's
+// conservative write limit indicates the Writer has sufficient buffer space.
+Status StreamingEncoder::WriteVarint(uint64_t value) {
+  if (!status_.ok()) {
+    return status_;
+  }
+
+  std::array<std::byte, varint::kMaxVarint64SizeBytes> varint_encode_buffer;
+  size_t varint_size = pw::varint::EncodeLittleEndianBase128(
+      value, std::span(varint_encode_buffer));
+
+  if (Status status =
+          writer_.Write(std::span(varint_encode_buffer).first(varint_size));
+      !status.ok()) {
+    status_ = status;
+    return status_;
+  }
+
+  return OkStatus();
+}
+
+Status StreamingEncoder::WritePackedFixed(uint32_t field_number,
+                                          std::span<const std::byte> values,
+                                          size_t elem_size) {
+  if (values.empty()) {
+    return status_;
+  }
+
+  PW_CHECK_NOTNULL(values.data());
+  PW_DCHECK(elem_size == sizeof(uint32_t) || elem_size == sizeof(uint64_t));
+
+  PW_TRY(UpdateStatusForWrite(
+      field_number, WireType::kDelimited, values.size_bytes()));
+  WriteVarint(MakeKey(field_number, WireType::kDelimited));
+  WriteVarint(values.size_bytes());
+
+  for (auto val_start = values.begin(); val_start != values.end();
+       val_start += elem_size) {
+    // Allocates 8 bytes so both 4-byte and 8-byte types can be encoded as
+    // little-endian for serialization.
+    std::array<std::byte, sizeof(uint64_t)> data;
+    if (std::endian::native == std::endian::little) {
+      std::copy(val_start, val_start + elem_size, std::begin(data));
+    } else {
+      std::reverse_copy(val_start, val_start + elem_size, std::begin(data));
+    }
+    status_.Update(writer_.Write(std::span(data).first(elem_size)));
+    PW_TRY(status_);
+  }
+  return status_;
+}
+
+Status StreamingEncoder::UpdateStatusForWrite(uint32_t field_number,
+                                              WireType type,
+                                              size_t data_size) {
+  PW_CHECK(!nested_encoder_open());
+  if (!status_.ok()) {
+    return status_;
+  }
+  if (!ValidFieldNumber(field_number)) {
+    status_ = Status::InvalidArgument();
+    return status_;
+  }
+
+  size_t size = varint::EncodedSize(MakeKey(field_number, type));
+  if (type == WireType::kDelimited) {
+    size += varint::EncodedSize(data_size);
+  }
+  size += data_size;
+
+  if (size > writer_.ConservativeWriteLimit()) {
+    status_ = Status::ResourceExhausted();
+  }
+  return status_;
+}
+
+}  // namespace pw::protobuf
diff --git a/pw_protobuf/streaming_encoder_test.cc b/pw_protobuf/streaming_encoder_test.cc
new file mode 100644
index 0000000..13dd102
--- /dev/null
+++ b/pw_protobuf/streaming_encoder_test.cc
@@ -0,0 +1,408 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include "pw_protobuf/streaming_encoder.h"
+
+#include <span>
+
+#include "gtest/gtest.h"
+#include "pw_bytes/span.h"
+#include "pw_stream/memory_stream.h"
+
+namespace pw::protobuf {
+namespace {
+
+using stream::MemoryWriter;
+
+// The tests in this file use the following proto message schemas.
+//
+//   message TestProto {
+//     uint32 magic_number = 1;
+//     sint32 ziggy = 2;
+//     fixed64 cycles = 3;
+//     float ratio = 4;
+//     string error_message = 5;
+//     NestedProto nested = 6;
+//   }
+//
+//   message NestedProto {
+//     string hello = 1;
+//     uint32 id = 2;
+//     repeated DoubleNestedProto pair = 3;
+//   }
+//
+//   message DoubleNestedProto {
+//     string key = 1;
+//     string value = 2;
+//   }
+//
+
+constexpr uint32_t kTestProtoMagicNumberField = 1;
+constexpr uint32_t kTestProtoZiggyField = 2;
+constexpr uint32_t kTestProtoCyclesField = 3;
+constexpr uint32_t kTestProtoRatioField = 4;
+constexpr uint32_t kTestProtoErrorMessageField = 5;
+constexpr uint32_t kTestProtoNestedField = 6;
+
+constexpr uint32_t kNestedProtoHelloField = 1;
+constexpr uint32_t kNestedProtoIdField = 2;
+constexpr uint32_t kNestedProtoPairField = 3;
+
+constexpr uint32_t kDoubleNestedProtoKeyField = 1;
+constexpr uint32_t kDoubleNestedProtoValueField = 2;
+
+TEST(StreamingEncoder, EncodePrimitives) {
+  // TestProto tp;
+  // tp.magic_number = 42;
+  // tp.ziggy = -13;
+  // tp.cycles = 0xdeadbeef8badf00d;
+  // tp.ratio = 1.618034;
+  // tp.error_message = "broken 💩";
+
+  // Hand-encoded version of the above.
+  // clang-format off
+  constexpr uint8_t encoded_proto[] = {
+    // magic_number [varint k=1]
+    0x08, 0x2a,
+    // ziggy [varint k=2]
+    0x10, 0x19,
+    // cycles [fixed64 k=3]
+    0x19, 0x0d, 0xf0, 0xad, 0x8b, 0xef, 0xbe, 0xad, 0xde,
+    // ratio [fixed32 k=4]
+    0x25, 0xbd, 0x1b, 0xcf, 0x3f,
+    // error_message [delimited k=5],
+    0x2a, 0x0b, 'b', 'r', 'o', 'k', 'e', 'n', ' ',
+    // poop!
+    0xf0, 0x9f, 0x92, 0xa9,
+  };
+  // clang-format on
+
+  std::byte encode_buffer[32];
+  std::byte dest_buffer[32];
+  // This writer isn't necessary, it's just the most testable way to exercise
+  // a stream interface. Use a MemoryEncoder when encoding a proto directly to
+  // an in-memory buffer.
+  MemoryWriter writer(dest_buffer);
+  StreamingEncoder encoder(writer, encode_buffer);
+
+  EXPECT_EQ(encoder.WriteUint32(kTestProtoMagicNumberField, 42), OkStatus());
+  EXPECT_EQ(writer.bytes_written(), 2u);
+  EXPECT_EQ(encoder.WriteSint32(kTestProtoZiggyField, -13), OkStatus());
+  EXPECT_EQ(encoder.WriteFixed64(kTestProtoCyclesField, 0xdeadbeef8badf00d),
+            OkStatus());
+  EXPECT_EQ(encoder.WriteFloat(kTestProtoRatioField, 1.618034), OkStatus());
+  EXPECT_EQ(encoder.WriteString(kTestProtoErrorMessageField, "broken 💩"),
+            OkStatus());
+
+  ASSERT_EQ(encoder.status(), OkStatus());
+  ConstByteSpan result = writer.WrittenData();
+  EXPECT_EQ(result.size(), sizeof(encoded_proto));
+  EXPECT_EQ(std::memcmp(result.data(), encoded_proto, sizeof(encoded_proto)),
+            0);
+}
+
+TEST(StreamingEncoder, EncodeInsufficientSpace) {
+  std::byte encode_buffer[12];
+  MemoryEncoder encoder(encode_buffer);
+
+  // 2 bytes.
+  EXPECT_EQ(encoder.WriteUint32(kTestProtoMagicNumberField, 42), OkStatus());
+  // 2 bytes.
+  EXPECT_EQ(encoder.WriteSint32(kTestProtoZiggyField, -13), OkStatus());
+  // 9 bytes; not enough space! The encoder will start writing the field but
+  // should rollback when it realizes it doesn't have enough space.
+  EXPECT_EQ(encoder.WriteFixed64(kTestProtoCyclesField, 0xdeadbeef8badf00d),
+            Status::ResourceExhausted());
+  // Any further write operations should fail.
+  EXPECT_EQ(encoder.WriteFloat(kTestProtoRatioField, 1.618034),
+            Status::ResourceExhausted());
+
+  ASSERT_EQ(encoder.status(), Status::ResourceExhausted());
+}
+
+TEST(StreamingEncoder, EncodeInvalidArguments) {
+  std::byte encode_buffer[12];
+  MemoryEncoder encoder(encode_buffer);
+
+  EXPECT_EQ(encoder.WriteUint32(kTestProtoMagicNumberField, 42), OkStatus());
+  // Invalid proto field numbers.
+  EXPECT_EQ(encoder.WriteUint32(0, 1337), Status::InvalidArgument());
+
+  // TODO(amontanez): Does it make sense to support this?
+  // encoder.Clear();
+
+  EXPECT_EQ(encoder.WriteString(1u << 31, "ha"), Status::InvalidArgument());
+
+  // TODO(amontanez): Does it make sense to support this?
+  // encoder.Clear();
+
+  EXPECT_EQ(encoder.WriteBool(19091, false), Status::InvalidArgument());
+  ASSERT_EQ(encoder.status(), Status::InvalidArgument());
+}
+
+TEST(StreamingEncoder, Nested) {
+  // This is the largest complete submessage in this test.
+  constexpr size_t kLargestSubmessageSize = 0x30;
+  constexpr size_t kScratchBufferSize =
+      MaxScratchBufferSize(kLargestSubmessageSize, 2);
+  std::byte encode_buffer[kScratchBufferSize];
+  std::byte dest_buffer[128];
+  MemoryWriter writer(dest_buffer);
+  StreamingEncoder encoder(writer, encode_buffer);
+
+  // TestProto test_proto;
+  // test_proto.magic_number = 42;
+  EXPECT_EQ(encoder.WriteUint32(kTestProtoMagicNumberField, 42), OkStatus());
+
+  {
+    // NestedProto& nested_proto = test_proto.nested;
+    StreamingEncoder nested_proto =
+        encoder.GetNestedEncoder(kTestProtoNestedField);
+    // nested_proto.hello = "world";
+    EXPECT_EQ(nested_proto.WriteString(kNestedProtoHelloField, "world"),
+              OkStatus());
+
+    {
+      // DoubleNestedProto& double_nested_proto = nested_proto.append_pair();
+      StreamingEncoder double_nested_proto =
+          nested_proto.GetNestedEncoder(kNestedProtoPairField);
+      // double_nested_proto.key = "version";
+      EXPECT_EQ(double_nested_proto.WriteString(kDoubleNestedProtoKeyField,
+                                                "version"),
+                OkStatus());
+      // double_nested_proto.value = "2.9.1";
+      EXPECT_EQ(double_nested_proto.WriteString(kDoubleNestedProtoValueField,
+                                                "2.9.1"),
+                OkStatus());
+
+      EXPECT_EQ(double_nested_proto.Finalize(), OkStatus());
+    }  // end DoubleNestedProto
+
+    // nested_proto.id = 999;
+    EXPECT_EQ(nested_proto.WriteUint32(kNestedProtoIdField, 999), OkStatus());
+
+    {
+      // DoubleNestedProto& double_nested_proto = nested_proto.append_pair();
+      StreamingEncoder double_nested_proto =
+          nested_proto.GetNestedEncoder(kNestedProtoPairField);
+      // double_nested_proto.key = "device";
+      EXPECT_EQ(
+          double_nested_proto.WriteString(kDoubleNestedProtoKeyField, "device"),
+          OkStatus());
+      // double_nested_proto.value = "left-soc";
+      EXPECT_EQ(double_nested_proto.WriteString(kDoubleNestedProtoValueField,
+                                                "left-soc"),
+                OkStatus());
+      // Rely on destructor for finalization.
+    }  // end DoubleNestedProto
+
+    EXPECT_EQ(nested_proto.Finalize(), OkStatus());
+  }  // end NestedProto
+
+  // test_proto.ziggy = -13;
+  EXPECT_EQ(encoder.WriteSint32(kTestProtoZiggyField, -13), OkStatus());
+
+  // clang-format off
+  constexpr uint8_t encoded_proto[] = {
+    // magic_number
+    0x08, 0x2a,
+    // nested header (key, size)
+    0x32, 0x30,
+    // nested.hello
+    0x0a, 0x05, 'w', 'o', 'r', 'l', 'd',
+    // nested.pair[0] header (key, size)
+    0x1a, 0x10,
+    // nested.pair[0].key
+    0x0a, 0x07, 'v', 'e', 'r', 's', 'i', 'o', 'n',
+    // nested.pair[0].value
+    0x12, 0x05, '2', '.', '9', '.', '1',
+    // nested.id
+    0x10, 0xe7, 0x07,
+    // nested.pair[1] header (key, size)
+    0x1a, 0x12,
+    // nested.pair[1].key
+    0x0a, 0x06, 'd', 'e', 'v', 'i', 'c', 'e',
+    // nested.pair[1].value
+    0x12, 0x08, 'l', 'e', 'f', 't', '-', 's', 'o', 'c',
+    // ziggy
+    0x10, 0x19
+  };
+  // clang-format on
+
+  ASSERT_EQ(encoder.status(), OkStatus());
+  ConstByteSpan result = ConstByteSpan(writer.data(), writer.bytes_written());
+  EXPECT_EQ(result.size(), sizeof(encoded_proto));
+  EXPECT_EQ(std::memcmp(result.data(), encoded_proto, sizeof(encoded_proto)),
+            0);
+}
+
+TEST(StreamingEncoder, RepeatedField) {
+  std::byte encode_buffer[32];
+  MemoryEncoder encoder(encode_buffer);
+
+  // repeated uint32 values = 1;
+  constexpr uint32_t values[] = {0, 50, 100, 150, 200};
+  for (int i = 0; i < 5; ++i) {
+    encoder.WriteUint32(1, values[i]);
+  }
+
+  constexpr uint8_t encoded_proto[] = {
+      0x08, 0x00, 0x08, 0x32, 0x08, 0x64, 0x08, 0x96, 0x01, 0x08, 0xc8, 0x01};
+
+  ASSERT_EQ(encoder.status(), OkStatus());
+  ConstByteSpan result(encoder);
+  EXPECT_EQ(result.size(), sizeof(encoded_proto));
+  EXPECT_EQ(std::memcmp(result.data(), encoded_proto, sizeof(encoded_proto)),
+            0);
+}
+
+TEST(StreamingEncoder, PackedVarint) {
+  std::byte encode_buffer[32];
+  MemoryEncoder encoder(encode_buffer);
+
+  // repeated uint32 values = 1;
+  constexpr uint32_t values[] = {0, 50, 100, 150, 200};
+  encoder.WritePackedUint32(1, values);
+
+  constexpr uint8_t encoded_proto[] = {
+      0x0a, 0x07, 0x00, 0x32, 0x64, 0x96, 0x01, 0xc8, 0x01};
+  //  key   size  v[0]  v[1]  v[2]  v[3]        v[4]
+
+  ASSERT_EQ(encoder.status(), OkStatus());
+  ConstByteSpan result(encoder);
+  EXPECT_EQ(result.size(), sizeof(encoded_proto));
+  EXPECT_EQ(std::memcmp(result.data(), encoded_proto, sizeof(encoded_proto)),
+            0);
+}
+
+TEST(StreamingEncoder, PackedVarintInsufficientSpace) {
+  std::byte encode_buffer[8];
+  MemoryEncoder encoder(encode_buffer);
+
+  constexpr uint32_t values[] = {0, 50, 100, 150, 200};
+  encoder.WritePackedUint32(1, values);
+
+  EXPECT_EQ(encoder.status(), Status::ResourceExhausted());
+}
+
+TEST(StreamingEncoder, PackedFixed) {
+  std::byte encode_buffer[32];
+  MemoryEncoder encoder(encode_buffer);
+
+  // repeated fixed32 values = 1;
+  constexpr uint32_t values[] = {0, 50, 100, 150, 200};
+  encoder.WritePackedFixed32(1, values);
+
+  // repeated fixed64 values64 = 2;
+  constexpr uint64_t values64[] = {0x0102030405060708};
+  encoder.WritePackedFixed64(2, values64);
+
+  constexpr uint8_t encoded_proto[] = {
+      0x0a, 0x14, 0x00, 0x00, 0x00, 0x00, 0x32, 0x00, 0x00, 0x00, 0x64,
+      0x00, 0x00, 0x00, 0x96, 0x00, 0x00, 0x00, 0xc8, 0x00, 0x00, 0x00,
+      0x12, 0x08, 0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01};
+
+  ASSERT_EQ(encoder.status(), OkStatus());
+  ConstByteSpan result(encoder);
+  EXPECT_EQ(result.size(), sizeof(encoded_proto));
+  EXPECT_EQ(std::memcmp(result.data(), encoded_proto, sizeof(encoded_proto)),
+            0);
+}
+
+TEST(StreamingEncoder, PackedZigzag) {
+  std::byte encode_buffer[32];
+  MemoryEncoder encoder(encode_buffer);
+
+  // repeated sint32 values = 1;
+  constexpr int32_t values[] = {-100, -25, -1, 0, 1, 25, 100};
+  encoder.WritePackedSint32(1, values);
+
+  constexpr uint8_t encoded_proto[] = {
+      0x0a, 0x09, 0xc7, 0x01, 0x31, 0x01, 0x00, 0x02, 0x32, 0xc8, 0x01};
+
+  ASSERT_EQ(encoder.status(), OkStatus());
+  ConstByteSpan result(encoder);
+  EXPECT_EQ(result.size(), sizeof(encoded_proto));
+  EXPECT_EQ(std::memcmp(result.data(), encoded_proto, sizeof(encoded_proto)),
+            0);
+}
+
+TEST(StreamingEncoder, ParentUnavailable) {
+  std::byte encode_buffer[32];
+  MemoryEncoder parent(encode_buffer);
+  {
+    StreamingEncoder child = parent.GetNestedEncoder(kTestProtoNestedField);
+    ASSERT_EQ(parent.status(), Status::Unavailable());
+    ASSERT_EQ(child.status(), OkStatus());
+  }
+  ASSERT_EQ(parent.status(), OkStatus());
+}
+
+TEST(StreamingEncoder, NestedEncoderRequiresBuffer) {
+  MemoryEncoder parent((ByteSpan()));
+  {
+    StreamingEncoder child = parent.GetNestedEncoder(kTestProtoNestedField);
+
+    ASSERT_EQ(child.status(), Status::ResourceExhausted());
+  }
+  ASSERT_EQ(parent.status(), Status::ResourceExhausted());
+}
+
+TEST(StreamingEncoder, WriteTooBig) {
+  constexpr size_t kTempBufferSize = 32;
+  constexpr size_t kWriteSize = 2;
+  std::byte encode_buffer[32];
+  MemoryEncoder encoder(encode_buffer);
+  // Each write is 2 bytes. Ensure we can write 16 times.
+  for (size_t i = 0; i < kTempBufferSize; i += kWriteSize) {
+    ASSERT_EQ(encoder.WriteUint32(1, 12), OkStatus());
+  }
+  ASSERT_EQ(encoder.size(), kTempBufferSize);
+  ASSERT_EQ(encoder.WriteUint32(1, 12), Status::ResourceExhausted());
+}
+
+TEST(StreamingEncoder, EmptyChildWrites) {
+  std::byte encode_buffer[32];
+  MemoryEncoder parent(encode_buffer);
+  { StreamingEncoder child = parent.GetNestedEncoder(kTestProtoNestedField); }
+  ASSERT_EQ(parent.status(), OkStatus());
+  const size_t kExpectedSize =
+      varint::EncodedSize(
+          MakeKey(kTestProtoNestedField, WireType::kDelimited)) +
+      varint::EncodedSize(0);
+  ASSERT_EQ(parent.size(), kExpectedSize);
+}
+
+TEST(StreamingEncoder, ChildUnavailableAfterFinalize) {
+  std::byte encode_buffer[32];
+  MemoryEncoder parent(encode_buffer);
+  {
+    StreamingEncoder child = parent.GetNestedEncoder(kTestProtoNestedField);
+    child.Finalize();
+    ASSERT_EQ(child.status(), Status::Unavailable());
+  }
+}
+
+TEST(StreamingEncoder, NestedStatusPropagates) {
+  std::byte encode_buffer[32];
+  MemoryEncoder parent(encode_buffer);
+  {
+    StreamingEncoder child = parent.GetNestedEncoder(kTestProtoNestedField);
+    ASSERT_EQ(child.WriteUint32(0, 0), Status::InvalidArgument());
+  }
+  ASSERT_EQ(parent.status(), Status::InvalidArgument());
+}
+
+}  // namespace
+}  // namespace pw::protobuf