blob: 78a8e4eb052b85138d8750ffbe73d920c7e06146 [file] [log] [blame]
# Copyright 2023 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Utils to decode logs."""
import logging
from pw_log.log_decoder import LogStreamDecoder
from pw_log.proto import log_pb2
import pw_rpc
import pw_status
_LOG = logging.getLogger(__name__)
class LogStreamHandler:
"""Handles an RPC Log Stream.
Args:
rpcs: RPC services to request RPC Log Streams.
decoder: LogStreamDecoder
"""
def __init__(
self, rpcs: pw_rpc.client.Services, decoder: LogStreamDecoder
) -> None:
self.rpcs = rpcs
self._decoder = decoder
def listen_to_logs(self) -> None:
"""Requests Logs streamed over RPC.
The RPCs remain open until the server cancels or closes them, either
with a response or error packet.
"""
def on_log_entries(_, log_entries_proto: log_pb2.LogEntries) -> None:
self._decoder.parse_log_entries_proto(log_entries_proto)
self.rpcs.pw.log.Logs.Listen.open(
on_next=on_log_entries,
on_completed=lambda _, status: self.handle_log_stream_completed(
status
),
on_error=lambda _, error: self.handle_log_stream_error(error),
)
def handle_log_stream_error(self, error: pw_status.Status) -> None:
"""Resets the log stream RPC on error to avoid losing logs.
Override this function to change default behavior.
"""
_LOG.error(
'Log stream error: %s from source %s',
error,
self.source_name,
)
# Only re-request logs if the RPC was not cancelled by the client.
if error != pw_status.Status.CANCELLED:
self.listen_to_logs()
def handle_log_stream_completed(self, status: pw_status.Status) -> None:
"""Resets the log stream RPC on completed to avoid losing logs.
Override this function to change default behavior.
"""
_LOG.debug(
'Log stream completed with status: %s for source: %s',
status,
self.source_name,
)
self.listen_to_logs()
@property
def source_name(self) -> str:
return self._decoder.source_name