[Linux] Add test logic to verify the subscription state in the Linux tv-casting-app. (#33398)

* [Linux] Add test logic to verify the subscription state in the Linux tv-casting-app.

* Fix timeout error caused by extract_value_from_string function, increased timeout values to prevent flaky test, and addressed PR comments from @sharadb-amazon.

* Refined logging messages format to improve readability.

* Addressed PR comments from @andy31415.

* Fixed CI check error.

* Trivial comment change to trigger Darwin Tests CI check to run again.
diff --git a/.github/workflows/examples-linux-tv-casting-app.yaml b/.github/workflows/examples-linux-tv-casting-app.yaml
index 679f4be..dfae4ee 100644
--- a/.github/workflows/examples-linux-tv-casting-app.yaml
+++ b/.github/workflows/examples-linux-tv-casting-app.yaml
@@ -67,7 +67,7 @@
               run: |
                   ./scripts/run_in_build_env.sh \
                     "python3 ./scripts/tests/run_tv_casting_test.py"
-              timeout-minutes: 1
+              timeout-minutes: 2 # Comment this out to debug if GitHub Action times out.
 
             - name: Uploading Size Reports
               uses: ./.github/actions/upload-size-reports
diff --git a/scripts/tests/run_tv_casting_test.py b/scripts/tests/run_tv_casting_test.py
index c2492c5..074d459 100644
--- a/scripts/tests/run_tv_casting_test.py
+++ b/scripts/tests/run_tv_casting_test.py
@@ -16,6 +16,7 @@
 
 import logging
 import os
+import re
 import subprocess
 import sys
 import tempfile
@@ -31,11 +32,14 @@
 TV_APP_MAX_START_WAIT_SEC = 2
 
 # The maximum amount of time to commission the Linux tv-casting-app and the tv-app before timeout.
-COMMISSIONING_STAGE_MAX_WAIT_SEC = 10
+COMMISSIONING_STAGE_MAX_WAIT_SEC = 15
 
 # The maximum amount of time to test that the launchURL is sent from the Linux tv-casting-app and received on the tv-app before timeout.
 TEST_LAUNCHURL_MAX_WAIT_SEC = 10
 
+# The maximum amount of time to verify the subscription state in the Linux tv-casting-app output before timeout.
+VERIFY_SUBSCRIPTION_STATE_MAX_WAIT_SEC = 10
+
 # File names of logs for the Linux tv-casting-app and the Linux tv-app.
 LINUX_TV_APP_LOGS = 'Linux-tv-app-logs.txt'
 LINUX_TV_CASTING_APP_LOGS = 'Linux-tv-casting-app-logs.txt'
@@ -46,6 +50,10 @@
 PRODUCT_ID = 0x8001  # Test product id
 DEVICE_TYPE_CASTING_VIDEO_PLAYER = 0x23    # Device type library 10.3: Casting Video Player
 
+# Values to verify the subscription state against from the `ReportDataMessage` in the Linux tv-casting-app output.
+CLUSTER_MEDIA_PLAYBACK = '0x506'  # Application Cluster Spec 6.10.3 Cluster ID: Media Playback
+ATTRIBUTE_CURRENT_PLAYBACK_STATE = '0x0000_0000'  # Application Cluster Spec 6.10.6 Attribute ID: Current State of Playback
+
 
 class ProcessManager:
     """A context manager for managing subprocesses.
@@ -68,10 +76,30 @@
         self.process.wait()
 
 
+class LogValueExtractor:
+    """A utility class for extracting values from log lines.
+
+    This class provides a centralized way to extract values from log lines and manage the error handling and logging process.
+    """
+
+    def __init__(self, casting_state: str, log_paths: List[str]):
+        self.casting_state = casting_state
+        self.log_paths = log_paths
+
+    def extract_from(self, line: str, value_name: str):
+        if value_name in line:
+            try:
+                return extract_value_from_string(line, value_name, self.casting_state, self.log_paths)
+            except ValueError:
+                logging.error(f'Failed to extract `{value_name}` value from line: {line}')
+                handle_casting_failure(self.casting_state, self.log_paths)
+        return None
+
+
 def dump_temporary_logs_to_console(log_file_path: str):
     """Dump log file to the console; log file will be removed once the function exits."""
     """Write the entire content of `log_file_path` to the console."""
-    print('\nDumping logs from: ', log_file_path)
+    print(f'\nDumping logs from: {log_file_path}')
 
     with open(log_file_path, 'r') as file:
         for line in file:
@@ -80,36 +108,74 @@
 
 def handle_casting_failure(casting_state: str, log_file_paths: List[str]):
     """Log '{casting_state} failed!' as error, dump log files to console, exit on error."""
-    logging.error(casting_state + ' failed!')
+    logging.error(f'{casting_state} failed!')
 
     for log_file_path in log_file_paths:
         try:
             dump_temporary_logs_to_console(log_file_path)
         except Exception as e:
-            logging.exception(f"Failed to dump {log_file_path}: {e}")
+            logging.exception(f'Failed to dump {log_file_path}: {e}')
 
     sys.exit(1)
 
 
-def extract_value_from_string(line: str) -> str:
+def extract_value_from_string(line: str, value_name: str, casting_state: str, log_paths) -> str:
     """Extract and return value from given input string.
 
-    The string is expected to be in the following format as it is received
-    from the Linux tv-casting-app output:
-    \x1b[0;34m[1713741926895] [7276:9521344] [DIS] Vendor ID: 65521\x1b[0m
-    The integer value to be extracted here is 65521.
-    Or:
-    \x1b[0;34m[1714583616179] [7029:2386956] [SVR] 	device Name: Test TV casting app\x1b[0m
-    The substring to be extracted here is 'Test TV casting app'.
-    """
-    value = line.split(':')[-1].strip().replace('\x1b[0m', '')
+    Some string examples as they are received from the Linux tv-casting-app and/or tv-app output:
+    1. On 'darwin' machines:
+        \x1b[0;34m[1715206773402] [20056:2842184] [DMG]  Cluster = 0x506,\x1b[0m
+        The substring to be extracted here is '0x506'.
 
-    return value
+        Or:
+        \x1b[0;32m[1714582264602] [77989:2286038] [SVR] Discovered Commissioner #0\x1b[0m
+        The integer value to be extracted here is '0'.
+
+        Or:
+        \x1b[0;34m[1713741926895] [7276:9521344] [DIS] Vendor ID: 65521\x1b[0m
+        The integer value to be extracted here is '65521'.
+
+        Or:
+        \x1b[0;34m[1714583616179] [7029:2386956] [SVR] 	device Name: Test TV casting app\x1b[0m
+        The substring to be extracted here is 'Test TV casting app'.
+
+    2. On 'linux' machines:
+        [1716224960.316809][6906:6906] CHIP:DMG: \t\t\t\t\tCluster = 0x506,\n
+        [1716224958.576320][6906:6906] CHIP:SVR: Discovered Commissioner #0
+        [1716224958.576407][6906:6906] CHIP:DIS: \tVendor ID: 65521\n
+        [1716224959.580746][6906:6906] CHIP:SVR: \tdevice Name: Test TV casting app\n
+    """
+    log_line_pattern = ''
+    if sys.platform == 'darwin':
+        log_line_pattern = r'\x1b\[0;\d+m\[\d+\] \[\d+:\d+\] \[[A-Z]{1,3}\] (.+)\x1b\[0m'
+    elif sys.platform == 'linux':
+        log_line_pattern = r'\[\d+\.\d+\]\[\d+:\d+\] [A-Z]{1,4}:[A-Z]{1,3}: (.+)'
+
+    log_line_match = re.search(log_line_pattern, line)
+
+    if log_line_match:
+        log_text_of_interest = log_line_match.group(1)
+
+        if '=' in log_text_of_interest:
+            delimiter = '='
+        elif '#' in log_text_of_interest:
+            delimiter = '#'
+        else:
+            delimiter = ':'
+
+        return log_text_of_interest.split(delimiter)[-1].strip(' ,')
+    else:
+        raise ValueError(f'Could not extract {value_name} from the following line: {line}')
 
 
 def validate_value(casting_state: str, expected_value: Union[str, int], log_paths: List[str], line: str, value_name: str) -> Optional[str]:
     """Validate a value in a string against an expected value during a given casting state."""
-    value = extract_value_from_string(line)
+    log_value_extractor = LogValueExtractor(casting_state, log_paths)
+    value = log_value_extractor.extract_from(line, value_name)
+    if not value:
+        logging.error(f'Failed to extract {value_name} value from the following line: {line}')
+        logging.error(f'Failed to validate against the expected {value_name} value: {expected_value}!')
+        handle_casting_failure(casting_state, log_paths)
 
     if isinstance(expected_value, int):
         value = int(value)
@@ -154,8 +220,8 @@
     while True:
         # Check if we exceeded the maximum wait time for initiating 'cast request' from the Linux tv-casting-app to the Linux tv-app.
         if time.time() - start_wait_time > COMMISSIONING_STAGE_MAX_WAIT_SEC:
-            logging.error('The command `cast request ' + valid_discovered_commissioner_number +
-                          '` was not issued to the Linux tv-casting-app process within the timeout.')
+            logging.error(
+                f'The command `cast request {valid_discovered_commissioner_number}` was not issued to the Linux tv-casting-app process within the timeout.')
             return False
 
         tv_casting_app_output_line = tv_casting_app_process.stdout.readline()
@@ -170,13 +236,16 @@
                 next_line = tv_casting_app_process.stdout.readline()
                 linux_tv_casting_app_log_file.write(next_line)
                 linux_tv_casting_app_log_file.flush()
-                logging.info('Sent `' + next_line.rstrip('\n') + '` to the Linux tv-casting-app process.')
+                next_line = next_line.rstrip('\n')
+                logging.info(f'Sent `{next_line}` to the Linux tv-casting-app process.')
+
                 return True
 
 
-def extract_device_info_from_tv_casting_app(tv_casting_app_info: Tuple[subprocess.Popen, TextIO]) -> Tuple[Optional[str], Optional[int], Optional[int]]:
+def extract_device_info_from_tv_casting_app(tv_casting_app_info: Tuple[subprocess.Popen, TextIO], casting_state: str, log_paths: List[str]) -> Tuple[Optional[str], Optional[int], Optional[int]]:
     """Extract device information from the 'Identification Declaration' block in the Linux tv-casting-app output."""
     tv_casting_app_process, linux_tv_casting_app_log_file = tv_casting_app_info
+    log_value_extractor = LogValueExtractor(casting_state, log_paths)
 
     device_name = None
     vendor_id = None
@@ -186,14 +255,12 @@
         linux_tv_casting_app_log_file.write(line)
         linux_tv_casting_app_log_file.flush()
 
-        if 'device Name' in line:
-            device_name = extract_value_from_string(line)
-        elif 'vendor id' in line:
-            vendor_id = extract_value_from_string(line)
-            vendor_id = int(vendor_id)
-        elif 'product id' in line:
-            product_id = extract_value_from_string(line)
-            product_id = int(product_id)
+        if value := log_value_extractor.extract_from(line, 'device Name'):
+            device_name = value
+        elif value := log_value_extractor.extract_from(line, 'vendor id'):
+            vendor_id = int(value)
+        elif value := log_value_extractor.extract_from(line, 'product id'):
+            product_id = int(value)
 
         if device_name and vendor_id and product_id:
             break
@@ -211,7 +278,8 @@
     while True:
         # Check if we exceeded the maximum wait time for validating the device information from the Linux tv-app to the corresponding values from the Linux tv-app.
         if time.time() - start_wait_time > COMMISSIONING_STAGE_MAX_WAIT_SEC:
-            logging.erro('The device information from the Linux tv-app output was not validated against the corresponding values from the Linux tv-casting-app output within the timeout.')
+            logging.error(
+                'The device information from the Linux tv-app output was not validated against the corresponding values from the Linux tv-casting-app output within the timeout.')
             return False
 
         tv_app_line = tv_app_process.stdout.readline()
@@ -221,7 +289,7 @@
             linux_tv_app_log_file.flush()
 
             if 'Identification Declaration Start' in tv_app_line:
-                logging.info('"Identification Declaration" block from the Linux tv-app output:')
+                logging.info('Found the `Identification Declaration` block in the Linux tv-app output:')
                 logging.info(tv_app_line.rstrip('\n'))
                 parsing_identification_block = True
             elif parsing_identification_block:
@@ -266,8 +334,8 @@
                 tv_app_line = tv_app_process.stdout.readline()
                 linux_tv_app_log_file.write(tv_app_line)
                 linux_tv_app_log_file.flush()
-
-                logging.info('Sent `' + tv_app_line.rstrip('\n') + '` to the Linux tv-app process.')
+                tv_app_line = tv_app_line.rstrip('\n')
+                logging.info(f'Sent `{tv_app_line}` to the Linux tv-app process.')
                 return True
 
 
@@ -310,6 +378,55 @@
                 return True
 
 
+def parse_tv_casting_app_for_report_data_msg(tv_casting_app_info: Tuple[subprocess.Popen, TextIO], log_paths: List[str]):
+    """Parse the Linux tv-casting-app for `ReportDataMessage` block and return the first message block with valid `Cluster` and `Attribute` values."""
+    tv_casting_app_process, linux_tv_casting_app_log_file = tv_casting_app_info
+    log_value_extractor = LogValueExtractor('Testing subscription', log_paths)
+
+    continue_parsing = False
+    report_data_message = []
+
+    start_wait_time = time.time()
+
+    while True:
+        # Check if we exceeded the maximum wait time to parse the Linux tv-casting-app output for `ReportDataMessage` block.
+        if time.time() - start_wait_time > VERIFY_SUBSCRIPTION_STATE_MAX_WAIT_SEC:
+            logging.error(
+                'The relevant `ReportDataMessage` block for the MediaPlayback:CurrentState subscription was not found in the Linux tv-casting-app process within the timeout.')
+            report_data_message.clear()
+            return report_data_message
+
+        tv_casting_line = tv_casting_app_process.stdout.readline()
+
+        if tv_casting_line:
+            linux_tv_casting_app_log_file.write(tv_casting_line)
+            linux_tv_casting_app_log_file.flush()
+
+            if 'ReportDataMessage =' in tv_casting_line:
+                report_data_message.append(tv_casting_line.rstrip('\n'))
+                continue_parsing = True
+            elif continue_parsing:
+                report_data_message.append(tv_casting_line.rstrip('\n'))
+
+                if cluster_value := log_value_extractor.extract_from(tv_casting_line, 'Cluster ='):
+                    if cluster_value != CLUSTER_MEDIA_PLAYBACK:
+                        report_data_message.clear()
+                        continue_parsing = False
+
+                elif attribute_value := log_value_extractor.extract_from(tv_casting_line, 'Attribute ='):
+                    if attribute_value != ATTRIBUTE_CURRENT_PLAYBACK_STATE:
+                        report_data_message.clear()
+                        continue_parsing = False
+
+                elif 'InteractionModelRevision' in tv_casting_line:
+                    # Capture the closing brace `}` of the `ReportDataMessage` block.
+                    tv_casting_line = tv_casting_app_process.stdout.readline()
+                    linux_tv_casting_app_log_file.write(tv_casting_line)
+                    linux_tv_casting_app_log_file.flush()
+                    report_data_message.append(tv_casting_line.rstrip('\n'))
+                    return report_data_message
+
+
 def parse_tv_app_output_for_launchUrl_msg_success(tv_app_info: Tuple[subprocess.Popen, TextIO], log_paths: List[str]):
     """Parse the Linux tv-app output for the relevant string indicating that the launchUrl was received."""
 
@@ -359,7 +476,7 @@
             linux_tv_casting_app_log_file.flush()
 
             if 'InvokeResponseMessage =' in tv_casting_line:
-                logging.info('Found the InvokeResponseMessage block in the Linux tv-casting-app output:')
+                logging.info('Found the `InvokeResponseMessage` block in the Linux tv-casting-app output:')
                 logging.info(tv_casting_line.rstrip('\n'))
                 continue_parsing_invoke_response_msg_block = True
 
@@ -395,11 +512,11 @@
         linux_tv_casting_app_log_file.flush()
 
         # Fail fast if "No commissioner discovered" string found.
-        if "No commissioner discovered" in line:
+        if 'No commissioner discovered' in line:
             logging.error(line.rstrip('\n'))
             handle_casting_failure('Discovery', log_paths)
 
-        elif "Discovered Commissioner" in line:
+        elif 'Discovered Commissioner' in line:
             valid_discovered_commissioner = line.rstrip('\n')
 
         elif valid_discovered_commissioner:
@@ -415,7 +532,7 @@
 
         # A valid commissioner has VENDOR_ID, PRODUCT_ID, and DEVICE TYPE in its list of entries.
         if valid_vendor_id and valid_product_id and valid_device_type:
-            logging.info('Found a valid commissioner in the Linux tv-casting-app logs:')
+            logging.info('Found a valid commissioner in the Linux tv-casting-app output:')
             logging.info(valid_discovered_commissioner)
             logging.info(valid_vendor_id)
             logging.info(valid_product_id)
@@ -433,7 +550,14 @@
         handle_casting_failure('Commissioning', log_paths)
 
     # Extract the values from the 'Identification Declaration' block in the tv-casting-app output that we want to validate against.
-    expected_device_name, expected_vendor_id, expected_product_id = extract_device_info_from_tv_casting_app(tv_casting_app_info)
+    expected_device_name, expected_vendor_id, expected_product_id = extract_device_info_from_tv_casting_app(
+        tv_casting_app_info, 'Commissioning', log_paths)
+
+    if not expected_device_name or not expected_vendor_id or not expected_product_id:
+        logging.error('There is an error with the expected device info values that were extracted from the `Identification Declaration` block.')
+        logging.error(
+            f'expected_device_name: {expected_device_name}, expected_vendor_id: {expected_vendor_id}, expected_product_id: {expected_product_id}')
+        handle_casting_failure('Commissioning', log_paths)
 
     if not validate_identification_declaration_message_on_tv_app(tv_app_info, expected_device_name, expected_vendor_id, expected_product_id, log_paths):
         handle_casting_failure('Commissioning', log_paths)
@@ -445,6 +569,23 @@
         handle_casting_failure('Commissioning', log_paths)
 
 
+def test_subscription_fn(tv_casting_app_info: Tuple[subprocess.Popen, TextIO], log_paths: List[str]):
+    """Test the subscription state of the Linux tv-casting-app by validating the `ReportDataMessage` block."""
+
+    valid_report_data_msg = parse_tv_casting_app_for_report_data_msg(tv_casting_app_info, log_paths)
+
+    if valid_report_data_msg:
+        logging.info('Found the `ReportDataMessage` block in the Linux tv-casting-app output:')
+
+        for line in valid_report_data_msg:
+            logging.info(line)
+
+        logging.info('Testing subscription success!\n')
+        valid_report_data_msg.clear()
+    else:
+        handle_casting_failure('Testing subscription', log_paths)
+
+
 def test_launchUrl_fn(tv_casting_app_info: Tuple[subprocess.Popen, TextIO], tv_app_info: Tuple[subprocess.Popen, TextIO], log_paths: List[str]):
     """Test that the Linux tv-casting-app sent the launchUrl and that the Linux tv-app received the launchUrl."""
 
@@ -454,7 +595,7 @@
     if not parse_tv_casting_app_output_for_launchUrl_msg_success(tv_casting_app_info, log_paths):
         handle_casting_failure('Testing launchUrl', log_paths)
 
-    logging.info('Testing launchUrl success!\n')
+    logging.info('Testing launchUrl success!')
 
 
 @click.command()
@@ -499,12 +640,15 @@
                         handle_casting_failure('Discovery', log_paths)
 
                     # We need the valid discovered commissioner number to continue with commissioning.
-                    # Example string: \x1b[0;32m[1714582264602] [77989:2286038] [SVR] Discovered Commissioner #0\x1b[0m
-                    #                 The value '0' will be extracted from the string.
-                    valid_discovered_commissioner_number = valid_discovered_commissioner.split('#')[-1].replace('\x1b[0m', '')
+                    log_value_extractor = LogValueExtractor('Commissioning', log_paths)
+                    valid_discovered_commissioner_number = log_value_extractor.extract_from(
+                        valid_discovered_commissioner, 'Discovered Commissioner #')
+                    if not valid_discovered_commissioner_number:
+                        logging.error(f'Failed to find `Discovered Commissioner #` in line: {valid_discovered_commissioner}')
+                        handle_casting_failure('Commissioning', log_paths)
 
                     test_commissioning_fn(valid_discovered_commissioner_number, tv_casting_app_info, tv_app_info, log_paths)
-
+                    test_subscription_fn(tv_casting_app_info, log_paths)
                     test_launchUrl_fn(tv_casting_app_info, tv_app_info, log_paths)