pw_multisink: Lock multisink operations

Guards all multisink transactions with a lock. The new configuration
option PW_MULTISINK_LOCK_INTERRUPT_SAFE allows the project to select
the type of lock used to guard transactions. By default, it is enabled
and makes use of an interrupt spin-lock. If disabled, a mutex is used
instead.

Change-Id: I71ab2729d130c524da27e0d06beb0c3fdf73d145
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/45720
Commit-Queue: Prashanth Swaminathan <prashanthsw@google.com>
Pigweed-Auto-Submit: Prashanth Swaminathan <prashanthsw@google.com>
Reviewed-by: Ewout van Bekkum <ewout@google.com>
diff --git a/pw_multisink/BUILD b/pw_multisink/BUILD
index 11f772c..f7840a2 100644
--- a/pw_multisink/BUILD
+++ b/pw_multisink/BUILD
@@ -29,6 +29,7 @@
         "multisink.cc",
     ],
     hdrs = [
+        "public/pw_multisink/config.h",
         "public/pw_multisink/drain.h",
         "public/pw_multisink/multisink.h",
     ],
@@ -36,6 +37,9 @@
     deps = [
         "//pw_assert",
         "//pw_bytes",
+        "//pw_sync:interrupt_spin_lock",
+        "//pw_sync:lock_annotations",
+        "//pw_sync:mutex",
         "//pw_result",
         "//pw_ring_buffer",
         "//pw_varint",
diff --git a/pw_multisink/BUILD.gn b/pw_multisink/BUILD.gn
index c8446d0..dac134c 100644
--- a/pw_multisink/BUILD.gn
+++ b/pw_multisink/BUILD.gn
@@ -26,6 +26,7 @@
 pw_source_set("pw_multisink") {
   public_configs = [ ":default_config" ]
   public = [
+    "public/pw_multisink/config.h",
     "public/pw_multisink/drain.h",
     "public/pw_multisink/multisink.h",
   ]
@@ -34,6 +35,9 @@
     "$dir_pw_result",
     "$dir_pw_ring_buffer",
     "$dir_pw_status",
+    "$dir_pw_sync:interrupt_spin_lock",
+    "$dir_pw_sync:lock_annotations",
+    "$dir_pw_sync:mutex",
   ]
   deps = [
     "$dir_pw_assert",
diff --git a/pw_multisink/docs.rst b/pw_multisink/docs.rst
index cf406f3..7052e14 100644
--- a/pw_multisink/docs.rst
+++ b/pw_multisink/docs.rst
@@ -1,9 +1,25 @@
 .. _module-pw_multisink:
 
-------------
+============
 pw_multisink
-------------
+============
 This is an module that forwards messages to multiple attached sinks, which
 consume messages asynchronously. It is not ready for use and is under
 construction.
 
+Module Configuration Options
+============================
+The following configurations can be adjusted via compile-time configuration
+of this module, see the
+:ref:`module documentation <module-structure-compile-time-configuration>` for
+more details.
+
+.. c:macro:: PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE
+
+  Whether an interrupt-safe lock is used to guard multisink read/write operations.
+
+  By default, this option is enabled and the multisink uses an interrupt spin-lock
+  to guard its transactions. If disabled, a mutex is used instead.
+
+  Disabling this will alter the entry precondition of the multisink, requiring that
+  it not be called from an interrupt context.
diff --git a/pw_multisink/drain.cc b/pw_multisink/drain.cc
index e4548c5..cc684f1 100644
--- a/pw_multisink/drain.cc
+++ b/pw_multisink/drain.cc
@@ -13,15 +13,19 @@
 // the License.
 #include "pw_multisink/drain.h"
 
+#include "pw_assert/check.h"
+
 namespace pw {
 namespace multisink {
 
 Result<ConstByteSpan> Drain::GetEntry(ByteSpan entry,
                                       uint32_t& drop_count_out) {
+  PW_DCHECK_NOTNULL(multisink_);
   uint32_t entry_sequence_id = 0;
   drop_count_out = 0;
+
   const Result<ConstByteSpan> result =
-      MultiSink::GetEntry(*this, entry, entry_sequence_id);
+      multisink_->GetEntry(*this, entry, entry_sequence_id);
 
   // Exit immediately if the result isn't OK or OUT_OF_RANGE, as the
   // entry_sequence_id cannot be used for computation. Later invocations to
diff --git a/pw_multisink/multisink.cc b/pw_multisink/multisink.cc
index a956071..c1eca3c 100644
--- a/pw_multisink/multisink.cc
+++ b/pw_multisink/multisink.cc
@@ -28,17 +28,15 @@
                                           uint32_t& sequence_id_out) {
   size_t bytes_read = 0;
 
-  // Exit immediately if there's no multisink attached to this drain.
-  if (drain.multisink_ == nullptr) {
-    return Status::FailedPrecondition();
-  }
+  std::lock_guard lock(lock_);
+  PW_DCHECK_PTR_EQ(drain.multisink_, this);
 
   const Status status =
       drain.reader_.PeekFrontWithPreamble(buffer, sequence_id_out, bytes_read);
   if (status.IsOutOfRange()) {
     // If the drain has caught up, report the last handled sequence ID so that
     // it can still process any dropped entries.
-    sequence_id_out = drain.multisink_->sequence_id_ - 1;
+    sequence_id_out = sequence_id_ - 1;
     return status;
   }
   PW_CHECK(drain.reader_.PopFront().ok());
@@ -46,14 +44,16 @@
 }
 
 Status MultiSink::AttachDrain(Drain& drain) {
-  PW_DCHECK(drain.multisink_ == nullptr);
+  std::lock_guard lock(lock_);
+  PW_DCHECK_PTR_EQ(drain.multisink_, nullptr);
   drain.multisink_ = this;
   drain.last_handled_sequence_id_ = sequence_id_ - 1;
   return ring_buffer_.AttachReader(drain.reader_);
 }
 
 Status MultiSink::DetachDrain(Drain& drain) {
-  PW_DCHECK(drain.multisink_ == this);
+  std::lock_guard lock(lock_);
+  PW_DCHECK_PTR_EQ(drain.multisink_, this);
   drain.multisink_ = nullptr;
   return ring_buffer_.DetachReader(drain.reader_);
 }
diff --git a/pw_multisink/public/pw_multisink/config.h b/pw_multisink/public/pw_multisink/config.h
new file mode 100644
index 0000000..98de4a7
--- /dev/null
+++ b/pw_multisink/public/pw_multisink/config.h
@@ -0,0 +1,41 @@
+// Copyright 2021 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+#pragma once
+
+// PW_MULTISINK_LOCK_INTERRUPT_SAFE controls whether an interrupt-safe lock is
+// used when reading and writing from the underlying ring-buffer. This is
+// enabled by default, using an interrupt spin-lock instead of a mutex.
+// Disabling this alters the entry precondition of the multisink, requiring that
+// it not be invoked from an interrupt context.
+#if !defined(PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE)
+#define PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE 1
+#endif  // !defined(PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE)
+
+#if PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE
+#include "pw_sync/interrupt_spin_lock.h"
+#else  // !PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE
+#include "pw_sync/mutex.h"
+#endif  // PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE
+
+namespace pw {
+namespace multisink {
+
+#if PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE
+using LockType = pw::sync::InterruptSpinLock;
+#else   // !PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE
+using LockType = pw::sync::Mutex;
+#endif  // PW_MULTISINK_CONFIG_LOCK_INTERRUPT_SAFE
+
+}  // namespace multisink
+}  // namespace pw
diff --git a/pw_multisink/public/pw_multisink/multisink.h b/pw_multisink/public/pw_multisink/multisink.h
index 097e4a3..d86ad35 100644
--- a/pw_multisink/public/pw_multisink/multisink.h
+++ b/pw_multisink/public/pw_multisink/multisink.h
@@ -13,10 +13,14 @@
 // the License.
 #pragma once
 
+#include <mutex>
+
 #include "pw_bytes/span.h"
+#include "pw_multisink/config.h"
 #include "pw_result/result.h"
 #include "pw_ring_buffer/prefixed_entry_ring_buffer.h"
 #include "pw_status/status.h"
+#include "pw_sync/lock_annotations.h"
 
 namespace pw {
 namespace multisink {
@@ -25,10 +29,12 @@
 // An asynchronous single-writer multi-reader queue that ensures readers can
 // poll for dropped message counts, which is useful for logging or similar
 // scenarios where readers need to be aware of the input message sequence.
+//
+// This class is thread-safe but NOT IRQ-safe when
+// PW_MULTISINK_LOCK_INTERRUPT_SAFE is disabled.
+//
 // TODO(pwbug/342): Support notifying readers when the queue is readable,
 // rather than requiring them to poll to check for new entries.
-// TODO(pwbug/343): Add thread-safety, separate from the thread-safety work
-// planned for the underlying ring buffer.
 class MultiSink {
  public:
   // Constructs a multisink using a ring buffer backed by the provided buffer.
@@ -42,12 +48,16 @@
   // The sequence ID of the multisink will always increment as a result of
   // calling HandleEntry, regardless of whether pushing the entry succeeds.
   //
+  // Precondition: If PW_MULTISINK_LOCK_INTERRUPT_SAFE is disabled, this
+  // function must not be called from an interrupt context.
+  //
   // Return values:
   // Ok - Entry was successfully pushed to the ring buffer.
   // InvalidArgument - Size of data to write is zero bytes.
   // OutOfRange - Size of data is greater than buffer size.
   // FailedPrecondition - Buffer was not initialized.
-  Status HandleEntry(ConstByteSpan entry) {
+  Status HandleEntry(ConstByteSpan entry) PW_LOCKS_EXCLUDED(lock_) {
+    std::lock_guard lock(lock_);
     return ring_buffer_.PushBack(entry, sequence_id_++);
   }
 
@@ -56,7 +66,10 @@
   // before being sent to the multisink (e.g. the writer failed to encode
   // the message). This API increments the sequence ID of the multisink by
   // the provided `drop_count`.
-  void HandleDropped(uint32_t drop_count = 1) { sequence_id_ += drop_count; }
+  void HandleDropped(uint32_t drop_count = 1) PW_LOCKS_EXCLUDED(lock_) {
+    std::lock_guard lock(lock_);
+    sequence_id_ += drop_count;
+  }
 
   // Attach a drain to the multisink. Drains may not be associated with more
   // than one multisink at a time. Entries pushed before the drain was attached
@@ -66,7 +79,7 @@
   // Return values:
   // Ok - Drain was successfully attached.
   // InvalidArgument - Drain is currently associated with another multisink.
-  Status AttachDrain(Drain& drain);
+  Status AttachDrain(Drain& drain) PW_LOCKS_EXCLUDED(lock_);
 
   // Detaches a drain from the multisink. Drains may only be detached if they
   // were previously attached to this multisink.
@@ -74,11 +87,14 @@
   // Return values:
   // Ok - Drain was successfully detached.
   // InvalidArgument - Drain is not currently associated with this multisink.
-  Status DetachDrain(Drain& drain);
+  Status DetachDrain(Drain& drain) PW_LOCKS_EXCLUDED(lock_);
 
   // Removes all data from the internal buffer. The multisink's sequence ID is
   // not modified, so readers may interpret this event as droppping entries.
-  void Clear() { ring_buffer_.Clear(); }
+  void Clear() PW_LOCKS_EXCLUDED(lock_) {
+    std::lock_guard lock(lock_);
+    ring_buffer_.Clear();
+  }
 
  protected:
   friend Drain;
@@ -94,13 +110,15 @@
   // the next available entry.
   // DataLoss - An entry was read from the multisink, but did not contains an
   // encoded sequence ID.
-  static Result<ConstByteSpan> GetEntry(Drain& drain,
-                                        ByteSpan buffer,
-                                        uint32_t& sequence_id_out);
+  Result<ConstByteSpan> GetEntry(Drain& drain,
+                                 ByteSpan buffer,
+                                 uint32_t& sequence_id_out)
+      PW_LOCKS_EXCLUDED(lock_);
 
  private:
-  ring_buffer::PrefixedEntryRingBufferMulti ring_buffer_;
-  uint32_t sequence_id_ = 0;
+  ring_buffer::PrefixedEntryRingBufferMulti ring_buffer_ PW_GUARDED_BY(lock_);
+  uint32_t sequence_id_ PW_GUARDED_BY(lock_);
+  LockType lock_;
 };
 
 }  // namespace multisink