mctp/dispatch: deferred recv via DispatchOutcome and drive_pending
diff --git a/services/mctp/api/src/wire.rs b/services/mctp/api/src/wire.rs index 5633092..8be7ca5 100644 --- a/services/mctp/api/src/wire.rs +++ b/services/mctp/api/src/wire.rs
@@ -542,6 +542,18 @@ } } +/// Extract the `timeout_millis` field from a `Recv` request. +/// +/// The 4-byte little-endian timeout follows the 12-byte request header. +/// Returns 0 if the buffer is too short (no timeout → wait forever). +pub fn get_recv_timeout(buf: &[u8]) -> u32 { + let start = MctpRequestHeader::SIZE; + if buf.len() < start + 4 { + return 0; + } + u32::from_le_bytes(buf[start..start + 4].try_into().unwrap()) +} + // ============================================================================ // Tests // ============================================================================
diff --git a/services/mctp/server/src/dispatch.rs b/services/mctp/server/src/dispatch.rs index 15a11f6..2f6f90c 100644 --- a/services/mctp/server/src/dispatch.rs +++ b/services/mctp/server/src/dispatch.rs
@@ -9,32 +9,51 @@ use openprot_mctp_api::wire::{ self, flags, MctpOp, MctpRequestHeader, }; -use openprot_mctp_api::ResponseCode; +use openprot_mctp_api::{Handle, ResponseCode}; -use crate::{Sender, Server}; +use crate::{RecvResult, Sender, Server}; + +/// Outcome of [`dispatch_mctp_op`]. +pub enum DispatchOutcome { + /// Response is ready; `response[..n]` bytes are filled. + Reply(usize), + /// No message was available for `Recv`; the call has been registered + /// as pending. The platform must store its reply token keyed on + /// `handle` and call [`drive_pending`] when new data arrives or on + /// each timer tick. + Pending { + /// The handle whose recv was deferred. + handle: Handle, + }, +} /// Dispatch an IPC request to the MCTP server. /// /// Decodes the request header, calls the appropriate `Server` method, -/// and encodes the response into `response`. Returns the response length. +/// and encodes the response into `response`. /// -/// This is the MCTP equivalent of `dispatch_i2c_op` in the I2C server. +/// `now_millis` is the current monotonic time used to set recv deadlines. +/// +/// Returns `DispatchOutcome::Reply(n)` when a response is immediately +/// available, or `DispatchOutcome::Pending { handle }` when a `Recv` +/// has been registered and will be fulfilled later by [`drive_pending`]. pub fn dispatch_mctp_op<S: Sender, const N: usize>( request: &[u8], response: &mut [u8], server: &mut Server<S, N>, recv_buf: &mut [u8], -) -> usize { + now_millis: u64, +) -> DispatchOutcome { let header = match MctpRequestHeader::from_bytes(request) { Some(h) => h, - None => return encode_error(response, ResponseCode::BadArgument), + None => return DispatchOutcome::Reply(encode_error(response, ResponseCode::BadArgument)), }; let Some(op) = header.operation() else { - return encode_error(response, ResponseCode::BadArgument); + return DispatchOutcome::Reply(encode_error(response, ResponseCode::BadArgument)); }; - match op { + let n = match op { MctpOp::SetEid => match server.set_eid(header.eid) { Ok(()) => encode_success(response), Err(e) => encode_error(response, e.code), @@ -59,7 +78,7 @@ }, MctpOp::Recv => { - let handle = openprot_mctp_api::Handle(header.handle); + let handle = Handle(header.handle); match server.try_recv(handle, recv_buf) { Some(meta) => { @@ -75,17 +94,16 @@ .unwrap_or_else(|_| encode_error(response, ResponseCode::InternalError)) } None => { - // No message available yet. - // In a real Pigweed server, we'd register a pending recv - // and respond later. For now, return TimedOut. - encode_error(response, ResponseCode::TimedOut) + let timeout = wire::get_recv_timeout(request); + let _ = server.register_recv(handle, timeout, now_millis); + return DispatchOutcome::Pending { handle }; } } } MctpOp::Send => { let handle = if header.flags & flags::HAS_HANDLE != 0 { - Some(openprot_mctp_api::Handle(header.handle)) + Some(Handle(header.handle)) } else { None }; @@ -110,12 +128,49 @@ } MctpOp::Unbind => { - let handle = openprot_mctp_api::Handle(header.handle); + let handle = Handle(header.handle); match server.unbind(handle) { Ok(()) => encode_success(response), Err(e) => encode_error(response, e.code), } } + }; + + DispatchOutcome::Reply(n) +} + +/// Drive pending receive calls to completion. +/// +/// Call this on timer ticks and after feeding inbound packets to the server. +/// For each handle that is now ready (message arrived or timed out), +/// `on_ready(handle, response_len)` is called with `response` filled. +/// The platform must look up its stored reply token for `handle` and send +/// the response through it. +pub fn drive_pending<S: Sender, const N: usize>( + server: &mut Server<S, N>, + now_millis: u64, + recv_buf: &mut [u8], + response: &mut [u8], + mut on_ready: impl FnMut(Handle, usize), +) { + let (_, ready) = server.update(now_millis, recv_buf); + for (handle, result) in ready { + let len = match result { + RecvResult::Message(meta) => { + let payload = &recv_buf[..meta.payload_size]; + wire::encode_recv_response( + response, + meta.msg_type, + meta.msg_ic, + meta.remote_eid, + meta.msg_tag, + payload, + ) + .unwrap_or_else(|_| encode_error(response, ResponseCode::InternalError)) + } + RecvResult::TimedOut => encode_error(response, ResponseCode::TimedOut), + }; + on_ready(handle, len); } }
diff --git a/services/mctp/server/tests/dispatch.rs b/services/mctp/server/tests/dispatch.rs index 3aab24d..6336796 100644 --- a/services/mctp/server/tests/dispatch.rs +++ b/services/mctp/server/tests/dispatch.rs
@@ -12,11 +12,25 @@ use std::cell::RefCell; use mctp::Eid; -use openprot_mctp_api::wire; -use openprot_mctp_server::{dispatch::dispatch_mctp_op, Server}; +use openprot_mctp_api::{wire, Handle}; +use openprot_mctp_server::{dispatch::{dispatch_mctp_op, drive_pending, DispatchOutcome}, Server}; use common::{transfer, BufferSender}; +// Convenience wrapper: calls dispatch_mctp_op with now_millis=0 and panics on Pending. +fn dispatch_reply<S: openprot_mctp_server::Sender, const N: usize>( + request: &[u8], + response: &mut [u8], + server: &mut Server<S, N>, + recv_buf: &mut [u8], +) -> usize { + match dispatch_mctp_op(request, response, server, recv_buf, 0) { + DispatchOutcome::Reply(n) => n, + DispatchOutcome::Pending { handle } => { + panic!("unexpected Pending for handle {:?}", handle) + } + } +} // --------------------------------------------------------------------------- // Tests @@ -35,13 +49,13 @@ // SetEid(42) let req_len = wire::encode_set_eid(&mut req, 42).unwrap(); - let resp_len = dispatch_mctp_op(&req[..req_len], &mut resp, &mut server, &mut recv_buf); + let resp_len = dispatch_reply(&req[..req_len], &mut resp, &mut server, &mut recv_buf); let header = wire::decode_response_header(&resp[..resp_len]).unwrap(); assert!(header.is_success()); // GetEid → 42 let req_len = wire::encode_get_eid(&mut req).unwrap(); - let resp_len = dispatch_mctp_op(&req[..req_len], &mut resp, &mut server, &mut recv_buf); + let resp_len = dispatch_reply(&req[..req_len], &mut resp, &mut server, &mut recv_buf); let header = wire::decode_response_header(&resp[..resp_len]).unwrap(); assert!(header.is_success()); assert_eq!(header.eid, 42); @@ -66,14 +80,14 @@ // Register listener on A for MsgType(1) let req_len = wire::encode_listener(&mut req, 1).unwrap(); - let resp_len = dispatch_mctp_op(&req[..req_len], &mut resp, &mut server_a, &mut recv_buf); + let resp_len = dispatch_reply(&req[..req_len], &mut resp, &mut server_a, &mut recv_buf); let header = wire::decode_response_header(&resp[..resp_len]).unwrap(); assert!(header.is_success()); let listener_handle = header.handle; // Register req on B targeting EID 8 let req_len = wire::encode_req(&mut req, 8).unwrap(); - let resp_len = dispatch_mctp_op(&req[..req_len], &mut resp, &mut server_b, &mut recv_buf); + let resp_len = dispatch_reply(&req[..req_len], &mut resp, &mut server_b, &mut recv_buf); let header = wire::decode_response_header(&resp[..resp_len]).unwrap(); assert!(header.is_success()); let req_handle = header.handle; @@ -90,7 +104,7 @@ payload, ) .unwrap(); - let resp_len = dispatch_mctp_op(&req[..req_len], &mut resp, &mut server_b, &mut recv_buf); + let resp_len = dispatch_reply(&req[..req_len], &mut resp, &mut server_b, &mut recv_buf); let header = wire::decode_response_header(&resp[..resp_len]).unwrap(); assert!(header.is_success()); @@ -99,7 +113,7 @@ // A receives via dispatch let req_len = wire::encode_recv(&mut req, listener_handle, 0).unwrap(); - let resp_len = dispatch_mctp_op(&req[..req_len], &mut resp, &mut server_a, &mut recv_buf); + let resp_len = dispatch_reply(&req[..req_len], &mut resp, &mut server_a, &mut recv_buf); let header = wire::decode_response_header(&resp[..resp_len]).unwrap(); assert!(header.is_success()); assert_eq!(header.msg_type, 1); @@ -118,7 +132,7 @@ recv_payload, ) .unwrap(); - let resp_len = dispatch_mctp_op(&req[..req_len], &mut resp, &mut server_a, &mut recv_buf); + let resp_len = dispatch_reply(&req[..req_len], &mut resp, &mut server_a, &mut recv_buf); let send_header = wire::decode_response_header(&resp[..resp_len]).unwrap(); assert!(send_header.is_success()); @@ -127,7 +141,7 @@ // B receives the echo via dispatch let req_len = wire::encode_recv(&mut req, req_handle, 0).unwrap(); - let resp_len = dispatch_mctp_op(&req[..req_len], &mut resp, &mut server_b, &mut recv_buf); + let resp_len = dispatch_reply(&req[..req_len], &mut resp, &mut server_b, &mut recv_buf); let header = wire::decode_response_header(&resp[..resp_len]).unwrap(); assert!(header.is_success()); assert_eq!(header.msg_type, 1); @@ -154,7 +168,7 @@ // Two bytes — shorter than MctpRequestHeader::SIZE (12) let bad_request = [0u8; 2]; - let resp_len = dispatch_mctp_op(&bad_request, &mut resp, &mut server, &mut recv_buf); + let resp_len = dispatch_reply(&bad_request, &mut resp, &mut server, &mut recv_buf); let header = wire::decode_response_header(&resp[..resp_len]).unwrap(); assert!(!header.is_success()); @@ -176,7 +190,7 @@ // 12-byte header with opcode 0xFF (unrecognised) let mut bad_request = [0u8; 12]; bad_request[0] = 0xFF; - let resp_len = dispatch_mctp_op(&bad_request, &mut resp, &mut server, &mut recv_buf); + let resp_len = dispatch_reply(&bad_request, &mut resp, &mut server, &mut recv_buf); let header = wire::decode_response_header(&resp[..resp_len]).unwrap(); assert!(!header.is_success()); @@ -184,14 +198,12 @@ } // --------------------------------------------------------------------------- -// MctpOp::Recv when no message is ready → TimedOut +// MctpOp::Recv — deferred path // --------------------------------------------------------------------------- -/// `Recv` dispatched when no message has arrived must return `TimedOut`. +/// `Recv` when no message is ready returns `Pending`, not an immediate error. #[test] -fn dispatch_recv_no_message_returns_timed_out() { - use openprot_mctp_api::ResponseCode; - +fn dispatch_recv_returns_pending_when_no_message() { let buf = RefCell::new(Vec::new()); let sender = BufferSender { packets: &buf }; let mut server: Server<_, 16> = Server::new(Eid(8), 0, sender); @@ -202,17 +214,17 @@ // Register a listener let req_len = wire::encode_listener(&mut req, 1).unwrap(); - let resp_len = dispatch_mctp_op(&req[..req_len], &mut resp, &mut server, &mut recv_buf); + let resp_len = dispatch_reply(&req[..req_len], &mut resp, &mut server, &mut recv_buf); let h = wire::decode_response_header(&resp[..resp_len]).unwrap(); - assert!(h.is_success()); - let listener_handle = h.handle; + let listener_handle = Handle(h.handle); - // Attempt Recv immediately — no inbound packet - let req_len = wire::encode_recv(&mut req, listener_handle, 0).unwrap(); - let resp_len = dispatch_mctp_op(&req[..req_len], &mut resp, &mut server, &mut recv_buf); - let header = wire::decode_response_header(&resp[..resp_len]).unwrap(); - assert!(!header.is_success()); - assert_eq!(header.response_code(), ResponseCode::TimedOut); + // Attempt Recv immediately — no inbound packet → Pending + let req_len = wire::encode_recv(&mut req, listener_handle.0, 500).unwrap(); + let outcome = dispatch_mctp_op(&req[..req_len], &mut resp, &mut server, &mut recv_buf, 0); + assert!( + matches!(outcome, DispatchOutcome::Pending { handle } if handle == listener_handle), + "expected Pending" + ); } // --------------------------------------------------------------------------- @@ -232,14 +244,14 @@ // Allocate a listener handle let req_len = wire::encode_listener(&mut req, 1).unwrap(); - let resp_len = dispatch_mctp_op(&req[..req_len], &mut resp, &mut server, &mut recv_buf); + let resp_len = dispatch_reply(&req[..req_len], &mut resp, &mut server, &mut recv_buf); let h = wire::decode_response_header(&resp[..resp_len]).unwrap(); assert!(h.is_success()); let listener_handle = h.handle; // Unbind it let req_len = wire::encode_unbind(&mut req, listener_handle).unwrap(); - let resp_len = dispatch_mctp_op(&req[..req_len], &mut resp, &mut server, &mut recv_buf); + let resp_len = dispatch_reply(&req[..req_len], &mut resp, &mut server, &mut recv_buf); let header = wire::decode_response_header(&resp[..resp_len]).unwrap(); assert!(header.is_success()); } @@ -257,7 +269,154 @@ let mut recv_buf = [0u8; 255]; let req_len = wire::encode_unbind(&mut req, 0xDEAD_BEEF).unwrap(); - let resp_len = dispatch_mctp_op(&req[..req_len], &mut resp, &mut server, &mut recv_buf); + let resp_len = dispatch_reply(&req[..req_len], &mut resp, &mut server, &mut recv_buf); let header = wire::decode_response_header(&resp[..resp_len]).unwrap(); assert!(header.is_success()); } + +// --------------------------------------------------------------------------- +// drive_pending — deferred recv delivery +// --------------------------------------------------------------------------- + +/// After a `Pending` recv, `drive_pending` fires `on_ready` once a packet arrives. +#[test] +fn dispatch_recv_resolved_by_drive_pending() { + let buf_a = RefCell::new(Vec::new()); + let sender_a = BufferSender { packets: &buf_a }; + let mut server_a: Server<_, 16> = Server::new(Eid(8), 0, sender_a); + + let buf_b = RefCell::new(Vec::new()); + let sender_b = BufferSender { packets: &buf_b }; + let mut server_b: Server<_, 16> = Server::new(Eid(42), 0, sender_b); + + let mut req = [0u8; 128]; + let mut resp = [0u8; 128]; + let mut recv_buf = [0u8; 255]; + + // Register listener on A + let req_len = wire::encode_listener(&mut req, 7).unwrap(); + let resp_len = dispatch_reply(&req[..req_len], &mut resp, &mut server_a, &mut recv_buf); + let h = wire::decode_response_header(&resp[..resp_len]).unwrap(); + let listener_handle = Handle(h.handle); + + // A tries recv before any message arrives → Pending + let req_len = wire::encode_recv(&mut req, listener_handle.0, 1000).unwrap(); + let outcome = dispatch_mctp_op(&req[..req_len], &mut resp, &mut server_a, &mut recv_buf, 0); + assert!(matches!(outcome, DispatchOutcome::Pending { .. })); + + // B allocates req handle and sends a message to A + let req_len = wire::encode_req(&mut req, 8).unwrap(); + let resp_len = dispatch_reply(&req[..req_len], &mut resp, &mut server_b, &mut recv_buf); + let h = wire::decode_response_header(&resp[..resp_len]).unwrap(); + let req_handle = h.handle; + + let payload = b"hello pending"; + let req_len = + wire::encode_send(&mut req, Some(req_handle), 7, None, None, false, payload).unwrap(); + dispatch_reply(&req[..req_len], &mut resp, &mut server_b, &mut recv_buf); + transfer(&buf_b, &mut server_a); + + // drive_pending should deliver the message + let mut fired_handle: Option<Handle> = None; + let mut fired_len = 0usize; + drive_pending(&mut server_a, 0, &mut recv_buf, &mut resp, |h, n| { + fired_handle = Some(h); + fired_len = n; + }); + + assert_eq!(fired_handle, Some(listener_handle)); + let header = wire::decode_response_header(&resp[..fired_len]).unwrap(); + assert!(header.is_success()); + assert_eq!(header.msg_type, 7); + assert_eq!(header.eid, 42); + let recv_payload = wire::get_response_payload(&resp[..fired_len], &header).unwrap(); + assert_eq!(recv_payload, payload); +} + +/// After a `Pending` recv with a timeout, `drive_pending` at `now > deadline` +/// fires `on_ready` with a `TimedOut` response. +#[test] +fn dispatch_recv_timeout() { + use openprot_mctp_api::ResponseCode; + + let buf = RefCell::new(Vec::new()); + let sender = BufferSender { packets: &buf }; + let mut server: Server<_, 16> = Server::new(Eid(8), 0, sender); + + let mut req = [0u8; 64]; + let mut resp = [0u8; 64]; + let mut recv_buf = [0u8; 255]; + + let req_len = wire::encode_listener(&mut req, 1).unwrap(); + let resp_len = dispatch_reply(&req[..req_len], &mut resp, &mut server, &mut recv_buf); + let h = wire::decode_response_header(&resp[..resp_len]).unwrap(); + let listener_handle = Handle(h.handle); + + // Register recv with 100ms timeout at now=0 + let req_len = wire::encode_recv(&mut req, listener_handle.0, 100).unwrap(); + let outcome = dispatch_mctp_op(&req[..req_len], &mut resp, &mut server, &mut recv_buf, 0); + assert!(matches!(outcome, DispatchOutcome::Pending { .. })); + + // Advance time past deadline — no message ever arrives + let mut fired_handle: Option<Handle> = None; + let mut fired_len = 0usize; + drive_pending(&mut server, 200, &mut recv_buf, &mut resp, |h, n| { + fired_handle = Some(h); + fired_len = n; + }); + + assert_eq!(fired_handle, Some(listener_handle)); + let header = wire::decode_response_header(&resp[..fired_len]).unwrap(); + assert!(!header.is_success()); + assert_eq!(header.response_code(), ResponseCode::TimedOut); +} + +/// If a message arrives *before* `Recv` is dispatched, `dispatch_mctp_op` +/// returns `Reply` immediately without registering a pending entry. +#[test] +fn dispatch_recv_immediate_if_message_waiting() { + let buf_a = RefCell::new(Vec::new()); + let sender_a = BufferSender { packets: &buf_a }; + let mut server_a: Server<_, 16> = Server::new(Eid(8), 0, sender_a); + + let buf_b = RefCell::new(Vec::new()); + let sender_b = BufferSender { packets: &buf_b }; + let mut server_b: Server<_, 16> = Server::new(Eid(42), 0, sender_b); + + let mut req = [0u8; 128]; + let mut resp = [0u8; 128]; + let mut recv_buf = [0u8; 255]; + + // Register listener on A + let req_len = wire::encode_listener(&mut req, 3).unwrap(); + let resp_len = dispatch_reply(&req[..req_len], &mut resp, &mut server_a, &mut recv_buf); + let h = wire::decode_response_header(&resp[..resp_len]).unwrap(); + let listener_handle = h.handle; + + // B sends a message to A *before* Recv is dispatched + let req_len = wire::encode_req(&mut req, 8).unwrap(); + let resp_len = dispatch_reply(&req[..req_len], &mut resp, &mut server_b, &mut recv_buf); + let h = wire::decode_response_header(&resp[..resp_len]).unwrap(); + let req_handle = h.handle; + + let payload = b"early"; + let req_len = + wire::encode_send(&mut req, Some(req_handle), 3, None, None, false, payload).unwrap(); + dispatch_reply(&req[..req_len], &mut resp, &mut server_b, &mut recv_buf); + transfer(&buf_b, &mut server_a); + + // Recv dispatched after message is already in the router → Reply, not Pending + let req_len = wire::encode_recv(&mut req, listener_handle, 1000).unwrap(); + let outcome = dispatch_mctp_op(&req[..req_len], &mut resp, &mut server_a, &mut recv_buf, 0); + let n = match outcome { + DispatchOutcome::Reply(n) => n, + DispatchOutcome::Pending { .. } => panic!("expected Reply, got Pending"), + }; + + let header = wire::decode_response_header(&resp[..n]).unwrap(); + assert!(header.is_success()); + assert_eq!(header.msg_type, 3); + assert_eq!(header.eid, 42); + let recv_payload = wire::get_response_payload(&resp[..n], &header).unwrap(); + assert_eq!(recv_payload, payload); +}