blob: ea5b8d272ce01ea0fa5ca527a9ef2eb16c75d6b9 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2022 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Tests for retreiving and parsing metrics."""
from unittest import TestCase, mock, main
from pw_metric.metric_parser import parse_metrics
from pw_metric_proto import metric_service_pb2
from pw_status import Status
from pw_tokenizer import detokenize, tokens
DATABASE = tokens.Database([
tokens.TokenizedStringEntry(0x01148a48, "total_dropped"),
tokens.TokenizedStringEntry(0x03796798, "min_queue_remaining"),
tokens.TokenizedStringEntry(0x22198280, "total_created"),
tokens.TokenizedStringEntry(0x534a42f4, "max_queue_used"),
tokens.TokenizedStringEntry(0x5d087463, "pw::work_queue::WorkQueue"),
tokens.TokenizedStringEntry(0xa7c43965, "log"),
])
class TestParseMetrics(TestCase):
"""Test parsing metrics received from RPCs"""
def setUp(self) -> None:
"""Creating detokenizer and mocking RPC."""
self.detokenize = detokenize.Detokenizer(DATABASE)
self.rpc_timeout_s = 1
self.rpcs = mock.Mock()
self.rpcs.pw = mock.Mock()
self.rpcs.pw.metric = mock.Mock()
self.rpcs.pw.metric.proto = mock.Mock()
self.rpcs.pw.metric.proto.MetricService = mock.Mock()
self.rpcs.pw.metric.proto.MetricService.Get = mock.Mock()
self.rpcs.pw.metric.proto.MetricService.Get.return_value = mock.Mock()
self.rpcs.pw.metric.proto.MetricService.Get.return_value.status = (
Status.OK)
# Creating a group and metric name for better identification.
self.log = 0xa7c43965
self.total_created = 0x22198280
self.total_dropped = 0x01148a48
self.min_queue_remaining = 0x03796798
self.metric = [
metric_service_pb2.Metric(
token_path=[self.log, self.total_created],
string_path='N/A',
as_float=3.0),
metric_service_pb2.Metric(
token_path=[self.log, self.total_dropped],
string_path='N/A',
as_float=4.0)
]
def test_invalid_detokenizer(self) -> None:
"""Test invalid detokenizer was supplied."""
self.assertEqual({},
parse_metrics(self.rpcs, None, self.rpc_timeout_s),
msg='Valid detokenizer.')
def test_bad_stream_status(self) -> None:
"""Test stream response has a status other than OK."""
self.rpcs.pw.metric.proto.MetricService.Get.return_value.status = (
Status.ABORTED)
self.assertEqual({},
parse_metrics(self.rpcs, self.detokenize,
self.rpc_timeout_s),
msg='Stream response was not aborted.')
def test_parse_metrics(self) -> None:
"""Test metrics being parsed and recorded."""
# Loading metric into RPC.
self.rpcs.pw.metric.proto.MetricService.Get.return_value.responses = [
metric_service_pb2.MetricResponse(metrics=self.metric)
]
self.assertEqual(
{'log': {
'total_created': 3.0,
'total_dropped': 4.0,
}},
parse_metrics(self.rpcs, self.detokenize, self.rpc_timeout_s),
msg='Metrics are not equal.')
def test_three_metric_names(self) -> None:
"""Test creating a dictionary with three paths."""
# Creating another leaf.
self.metric.append(
metric_service_pb2.Metric(
token_path=[self.log, self.min_queue_remaining],
string_path='N/A',
as_float=1.0))
self.rpcs.pw.metric.proto.MetricService.Get.return_value.responses = [
metric_service_pb2.MetricResponse(metrics=self.metric)
]
self.assertEqual(
{
'log': {
'total_created': 3.0,
'total_dropped': 4.0,
'min_queue_remaining': 1.0,
},
},
parse_metrics(self.rpcs, self.detokenize, self.rpc_timeout_s),
msg='Metrics are not equal.')
def test_inserting_unknown_token(self) -> None:
# Inserting an unknown token as a group name.
self.metric.append(
metric_service_pb2.Metric(token_path=[0x007, self.total_dropped],
string_path='N/A',
as_float=1.0))
self.rpcs.pw.metric.proto.MetricService.Get.return_value.responses = [
metric_service_pb2.MetricResponse(metrics=self.metric)
]
self.assertEqual(
{
'log': {
'total_created': 3.0,
'total_dropped': 4.0,
},
'$': {
'total_dropped': 1.0
},
},
parse_metrics(self.rpcs, self.detokenize, self.rpc_timeout_s),
msg='Metrics are not equal.')
def test_multiple_metric_response(self) -> None:
"""Tests multiple metric responses being handled."""
# Adding more than one MetricResponses.
metric = [
metric_service_pb2.Metric(token_path=[0x007, self.total_dropped],
string_path='N/A',
as_float=1.0)
]
self.rpcs.pw.metric.proto.MetricService.Get.return_value.responses = [
metric_service_pb2.MetricResponse(metrics=self.metric),
metric_service_pb2.MetricResponse(metrics=metric)
]
self.assertEqual(
{
'log': {
'total_created': 3.0,
'total_dropped': 4.0,
},
'$': {
'total_dropped': 1.0,
},
},
parse_metrics(self.rpcs, self.detokenize, self.rpc_timeout_s),
msg='Metrics are not equal.')
def test_paths_longer_than_two(self) -> None:
"""Tests metric paths longer than two."""
# Path longer than two.
longest_metric = [
metric_service_pb2.Metric(token_path=[
self.log, self.total_created, self.min_queue_remaining
],
string_path='N/A',
as_float=1.0),
]
self.rpcs.pw.metric.proto.MetricService.Get.return_value.responses = [
metric_service_pb2.MetricResponse(metrics=longest_metric),
]
self.assertEqual(
{'log': {
'total_created': {
'min_queue_remaining': 1.0
},
}},
parse_metrics(self.rpcs, self.detokenize, self.rpc_timeout_s),
msg='Metrics are not equal.')
# Create a new leaf in log.
longest_metric.append(
metric_service_pb2.Metric(
token_path=[self.log, self.total_dropped],
string_path='N/A',
as_float=3.0))
metric = [
metric_service_pb2.Metric(token_path=[0x007, self.total_dropped],
string_path='N/A',
as_float=1.0),
metric_service_pb2.Metric(token_path=[0x007, self.total_created],
string_path='N/A',
as_float=2.0),
]
self.rpcs.pw.metric.proto.MetricService.Get.return_value.responses = [
metric_service_pb2.MetricResponse(metrics=longest_metric),
metric_service_pb2.MetricResponse(metrics=metric),
]
self.assertEqual(
{
'log': {
'total_created': {
'min_queue_remaining': 1.0,
},
'total_dropped': 3.0,
},
'$': {
'total_dropped': 1.0,
'total_created': 2.0,
},
},
parse_metrics(self.rpcs, self.detokenize, self.rpc_timeout_s),
msg='Metrics are not equal.')
def test_conflicting_keys(self) -> None:
"""Tests conflicting key and value assignment."""
longest_metric = [
metric_service_pb2.Metric(token_path=[
self.log, self.total_created, self.min_queue_remaining
],
string_path='N/A',
as_float=1.0),
]
# Creates a conflict at log/total_created, should throw an error.
self.rpcs.pw.metric.proto.MetricService.Get.return_value.responses = [
metric_service_pb2.MetricResponse(metrics=longest_metric),
metric_service_pb2.MetricResponse(metrics=self.metric),
]
parse_metrics(self.rpcs, self.detokenize, self.rpc_timeout_s)
self.assertRaises(ValueError, msg='Expected Value Error.')
def test_conflicting_logs(self) -> None:
"""Tests conflicting loga being streamed."""
longest_metric = [
metric_service_pb2.Metric(
token_path=[self.log, self.total_created],
string_path='N/A',
as_float=1.0),
]
# Creates a duplicate metric for log/total_created.
self.rpcs.pw.metric.proto.MetricService.Get.return_value.responses = [
metric_service_pb2.MetricResponse(metrics=longest_metric),
metric_service_pb2.MetricResponse(metrics=self.metric),
]
parse_metrics(self.rpcs, self.detokenize, self.rpc_timeout_s)
self.assertRaises(ValueError, msg='Expected Value Error.')
# Duplicate metrics being loaded.
self.rpcs.pw.metric.proto.MetricService.Get.return_value.responses = [
metric_service_pb2.MetricResponse(metrics=self.metric),
metric_service_pb2.MetricResponse(metrics=self.metric),
]
parse_metrics(self.rpcs, self.detokenize, self.rpc_timeout_s)
self.assertRaises(ValueError, msg='Expected Value Error.')
if __name__ == '__main__':
main()