#
#    Copyright (c) 2023 Project CHIP Authors
#    All rights reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#
import logging

import chip.clusters as Clusters
from chip import ChipDeviceCtrl
from chip.clusters.Types import NullValue
from chip.interaction_model import InteractionModelError, Status
from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main, utc_time_in_matter_epoch
from mobly import asserts

# We don't have a good pipe between the c++ enums in CommissioningDelegate and python
# so this is hardcoded.
# I realize this is dodgy, not sure how to cross the enum from c++ to python cleanly
kCheckForMatchingFabric = 3
kConfigureUTCTime = 6
kConfigureTimeZone = 7
kConfigureDSTOffset = 8
kConfigureDefaultNTP = 9
kConfigureTrustedTimeSource = 19


class TestCommissioningTimeSync(MatterBaseTest):
    def setup_class(self):
        self.commissioner = None
        self.commissioned = False
        return super().setup_class()

    async def destroy_current_commissioner(self):
        if self.commissioner:
            if self.commissioned:
                fabricidx = await self.read_single_attribute_check_success(dev_ctrl=self.commissioner, cluster=Clusters.OperationalCredentials, attribute=Clusters.OperationalCredentials.Attributes.CurrentFabricIndex)
                cmd = Clusters.OperationalCredentials.Commands.RemoveFabric(fabricIndex=fabricidx)
                await self.send_single_cmd(cmd=cmd)
            self.commissioner.Shutdown()
        self.commissioner = None
        self.commissioned = False

    @async_test_body
    async def teardown_test(self):
        await self.destroy_current_commissioner()
        return super().teardown_test()

    async def commission_and_base_checks(self):
        params = await self.default_controller.OpenCommissioningWindow(
            nodeid=self.dut_node_id, timeout=600, iteration=10000, discriminator=1234, option=1)
        await self.commissioner.CommissionOnNetwork(
            nodeId=self.dut_node_id, setupPinCode=params.setupPinCode,
            filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=1234)
        self.commissioned = True

        # Check the feature map - if we have a time cluster, we want UTC time to be set
        features = await self.read_single_attribute(dev_ctrl=self.default_controller, node_id=self.dut_node_id,
                                                    endpoint=0, attribute=Clusters.TimeSynchronization.Attributes.FeatureMap)
        self.supports_time = True
        if isinstance(features, Clusters.Attribute.ValueDecodeFailure):
            asserts.assert_true(isinstance(features.Reason, InteractionModelError), InteractionModelError,
                                f'Unexpected exception from reading time cluster feature map {features.Reason}')
            asserts.assert_equal(features.Reason.status, Status.UnsupportedCluster,
                                 f'Unexpected error response from reading time cluster feature map {features.Reason}')
            self.supports_time = False

        asserts.assert_equal(self.commissioner.CheckStageSuccessful(
            kConfigureUTCTime), self.supports_time, 'UTC time stage incorrect')

        if self.supports_time:
            self.supports_time_zone = bool(features & Clusters.TimeSynchronization.Bitmaps.Feature.kTimeZone)
            self.supports_default_ntp = bool(features & Clusters.TimeSynchronization.Bitmaps.Feature.kNTPClient)
            self.supports_trusted_time_source = bool(features & Clusters.TimeSynchronization.Bitmaps.Feature.kTimeSyncClient)
        else:
            self.supports_time_zone = False
            self.supports_default_ntp = False
            self.supports_trusted_time_source = False

    async def create_commissioner(self):
        if self.commissioner:
            await self.destroy_current_commissioner()
        new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
        new_fabric_admin = new_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=2)
        self.commissioner = new_fabric_admin.NewController(nodeId=112233, useTestCommissioner=True)

        self.commissioner.ResetCommissioningParameters()
        self.commissioner.ResetTestCommissioner()

        # If the app we're testing against doesn't have a time cluster, we still want to run this
        # tests and we want it to succeed, so catch the unsupported cluster error here and ignore
        try:
            cmd = Clusters.TimeSynchronization.Commands.SetDefaultNTP(defaultNTP=NullValue)
            await self.send_single_cmd(cmd=cmd)
            cmd = Clusters.TimeSynchronization.Commands.SetTrustedTimeSource(NullValue)
            await self.send_single_cmd(cmd=cmd)
        except InteractionModelError as e:
            if e.status == Status.UnsupportedCluster:
                pass

    async def commission_stages(self, time_zone: bool, dst: bool, default_ntp: bool, trusted_time_source: bool):
        await self.create_commissioner()

        logging.info(
            f'Running Commissioning test - time_zone: {time_zone}, dst: {dst}, default_ntp: {default_ntp}, trusted_time_source: {trusted_time_source}')

        if time_zone:
            self.commissioner.SetTimeZone(offset=3600, validAt=0)
        if dst:
            six_months = 1.577e+13  # in us
            dst_valid_until = utc_time_in_matter_epoch() + int(six_months)
            self.commissioner.SetDSTOffset(offset=3600, validStarting=0, validUntil=dst_valid_until)
        if default_ntp:
            self.commissioner.SetDefaultNTP("fe80::1")
        if trusted_time_source:
            self.commissioner.SetTrustedTimeSource(self.commissioner.nodeId, 0)

        await self.commission_and_base_checks()

        should_set_time_zone = bool(self.supports_time_zone and time_zone)
        should_set_dst = bool(self.supports_time_zone and time_zone and dst)
        should_set_default_ntp = bool(self.supports_default_ntp and default_ntp)
        should_set_trusted_time = bool(self.supports_trusted_time_source and trusted_time_source)

        asserts.assert_equal(self.commissioner.CheckStageSuccessful(kConfigureTimeZone),
                             should_set_time_zone, 'Incorrect value for time zone stage check')
        asserts.assert_equal(self.commissioner.CheckStageSuccessful(kConfigureDSTOffset),
                             should_set_dst, 'Incorrect value for kConfigureDSTOffset stage')
        asserts.assert_equal(self.commissioner.CheckStageSuccessful(kConfigureDefaultNTP),
                             should_set_default_ntp, 'Incorrect value for kConfigureDefaultNTP stage')
        asserts.assert_equal(self.commissioner.CheckStageSuccessful(kConfigureTrustedTimeSource),
                             should_set_trusted_time, 'Incorrect value for kConfigureTrustedTimeSource stage')

        if should_set_time_zone:
            received = await self.read_single_attribute_check_success(cluster=Clusters.TimeSynchronization, attribute=Clusters.TimeSynchronization.Attributes.TimeZone)
            expected = [Clusters.TimeSynchronization.Structs.TimeZoneStruct(offset=3600, validAt=0)]
            asserts.assert_equal(received, expected, "Time zone was not correctly set by commissioner")

        if should_set_dst:
            received = await self.read_single_attribute_check_success(cluster=Clusters.TimeSynchronization, attribute=Clusters.TimeSynchronization.Attributes.DSTOffset)
            expected = [Clusters.TimeSynchronization.Structs.DSTOffsetStruct(
                offset=3600, validStarting=0, validUntil=dst_valid_until)]
            asserts.assert_equal(received, expected, "DST was not set correctly by the commissioner")

        if should_set_trusted_time:
            fabric_idx = await self.read_single_attribute_check_success(cluster=Clusters.OperationalCredentials, attribute=Clusters.OperationalCredentials.Attributes.CurrentFabricIndex, dev_ctrl=self.commissioner)
            received = await self.read_single_attribute_check_success(cluster=Clusters.TimeSynchronization, attribute=Clusters.TimeSynchronization.Attributes.TrustedTimeSource)
            expected = Clusters.TimeSynchronization.Structs.TrustedTimeSourceStruct(
                fabricIndex=fabric_idx, nodeID=self.commissioner.nodeId, endpoint=0)
            asserts.assert_equal(received, expected, "Trusted Time source was not set properly")

        if should_set_default_ntp:
            received = await self.read_single_attribute_check_success(cluster=Clusters.TimeSynchronization, attribute=Clusters.TimeSynchronization.Attributes.DefaultNTP)
            expected = "fe80::1"
            asserts.assert_equal(received, expected, "Default NTP was not set properly")

    @async_test_body
    async def test_CommissioningAllBasic(self):
        # We want to assess all combos (ie, all flags in the range of 0b0000 to 0b1111)
        for i in range(0, 0xF):
            time_zone = bool(i & 0x1)
            dst = bool(i & 0x2)
            default_ntp = bool(i & 0x4)
            trusted_time_source = bool(i & 0x8)
            await self.commission_stages(time_zone, dst, default_ntp, trusted_time_source)

    @async_test_body
    async def test_CommissioningPreSetValues(self):

        await self.create_commissioner()

        # If we're running this against a node that doesn't have a time cluster, this test doesn't apply
        # and the remaining cases are covered in the base case above.
        try:
            trusted_time_source = Clusters.TimeSynchronization.Structs.FabricScopedTrustedTimeSourceStruct(
                nodeID=0x5555, endpoint=0)
            cmd = Clusters.TimeSynchronization.Commands.SetTrustedTimeSource(trusted_time_source)
            await self.send_single_cmd(cmd)

            cmd = Clusters.TimeSynchronization.Commands.SetDefaultNTP("fe80::02")
            await self.send_single_cmd(cmd)
        except InteractionModelError as e:
            if e.status == Status.UnsupportedCluster:
                await self.destroy_current_commissioner()
                return

        self.commissioner.SetTimeZone(offset=3600, validAt=0)
        six_months = 1.577e+13  # in us
        self.commissioner.SetDSTOffset(offset=3600, validStarting=0, validUntil=utc_time_in_matter_epoch() + int(six_months))
        self.commissioner.SetDefaultNTP("fe80::1")
        self.commissioner.SetTrustedTimeSource(self.commissioner.nodeId, 0)

        await self.commission_and_base_checks()
        asserts.assert_equal(self.commissioner.CheckStageSuccessful(kConfigureTimeZone),
                             self.supports_time_zone, 'Incorrect value for time zone stage check')
        asserts.assert_equal(self.commissioner.CheckStageSuccessful(kConfigureDSTOffset),
                             self.supports_time_zone, 'Incorrect value for kConfigureDSTOffset stage')
        asserts.assert_false(self.commissioner.CheckStageSuccessful(kConfigureDefaultNTP), 'kConfigureDefaultNTP incorrectly set')
        asserts.assert_false(self.commissioner.CheckStageSuccessful(
            kConfigureTrustedTimeSource), 'kConfigureTrustedTimeSource incorrectly set')

    @async_test_body
    async def test_FabricCheckStage(self):
        await self.create_commissioner()

        # This was moved into a different stage when the time sync stuff was added
        asserts.assert_equal(self.commissioner.GetFabricCheckResult(), -1, "Fabric check result is already set")
        self.commissioner.SetCheckMatchingFabric(True)
        await self.commission_and_base_checks()
        asserts.assert_true(self.commissioner.CheckStageSuccessful(
            kCheckForMatchingFabric), "Did not run check for matching fabric stage")
        asserts.assert_equal(self.commissioner.GetFabricCheckResult(), 0, "Fabric check result did not get set by pairing delegate")

        # Let's try it again with no check
        await self.create_commissioner()
        asserts.assert_equal(self.commissioner.GetFabricCheckResult(), -1, "Fabric check result is already set")
        self.commissioner.SetCheckMatchingFabric(False)
        await self.commission_and_base_checks()
        asserts.assert_false(self.commissioner.CheckStageSuccessful(
            kCheckForMatchingFabric), "Incorrectly ran check for matching fabric stage")
        asserts.assert_equal(self.commissioner.GetFabricCheckResult(), -1, "Fabric check result incorrectly set")

    @async_test_body
    async def test_TimeZoneName(self):
        await self.create_commissioner()
        self.commissioner.SetTimeZone(offset=3600, validAt=0, name="test")
        await self.commission_and_base_checks()
        asserts.assert_true(self.commissioner.CheckStageSuccessful(kConfigureTimeZone), 'Time zone was not successfully set')

        received = await self.read_single_attribute_check_success(cluster=Clusters.TimeSynchronization, attribute=Clusters.TimeSynchronization.Attributes.TimeZone)
        expected = [Clusters.TimeSynchronization.Structs.TimeZoneStruct(offset=3600, validAt=0, name="test")]
        asserts.assert_equal(received, expected, "Time zone was not correctly set by commissioner")

        await self.create_commissioner()
        # name is max 64 per the spec
        sixty_five_byte_string = "x" * 65
        self.commissioner.SetTimeZone(offset=3600, validAt=0, name=sixty_five_byte_string)
        await self.commission_and_base_checks()
        asserts.assert_true(self.commissioner.CheckStageSuccessful(kConfigureTimeZone), 'Time zone was not successfully set')

        received = await self.read_single_attribute_check_success(cluster=Clusters.TimeSynchronization, attribute=Clusters.TimeSynchronization.Attributes.TimeZone)
        expected = [Clusters.TimeSynchronization.Structs.TimeZoneStruct(offset=3600, validAt=0, name=None)]
        asserts.assert_equal(received, expected, "Commissioner did not ignore too-long name")

        await self.create_commissioner()
        # name is max 64 per the spec
        sixty_four_byte_string = "x" * 64
        self.commissioner.SetTimeZone(offset=3600, validAt=0, name=sixty_four_byte_string)
        await self.commission_and_base_checks()
        asserts.assert_true(self.commissioner.CheckStageSuccessful(kConfigureTimeZone), 'Time zone was not successfully set')

        received = await self.read_single_attribute_check_success(cluster=Clusters.TimeSynchronization, attribute=Clusters.TimeSynchronization.Attributes.TimeZone)
        expected = [Clusters.TimeSynchronization.Structs.TimeZoneStruct(offset=3600, validAt=0, name=sixty_four_byte_string)]
        asserts.assert_equal(received, expected, "Time zone 64 byte name was not correctly set")

    @async_test_body
    async def test_DefaultNtpSize(self):
        await self.create_commissioner()
        too_long_name = "x." + "x" * 127
        self.commissioner.SetDefaultNTP(too_long_name)
        await self.commission_and_base_checks()
        asserts.assert_false(self.commissioner.CheckStageSuccessful(kConfigureDefaultNTP),
                             'Commissioner attempted to set default NTP to a too long value')

        await self.create_commissioner()
        just_fits_name = "x." + "x" * 126
        self.commissioner.SetDefaultNTP(just_fits_name)
        await self.commission_and_base_checks()
        asserts.assert_true(self.commissioner.CheckStageSuccessful(kConfigureDefaultNTP),
                            'Commissioner did not correctly set default NTP')
        received = await self.read_single_attribute_check_success(cluster=Clusters.TimeSynchronization, attribute=Clusters.TimeSynchronization.Attributes.DefaultNTP)
        asserts.assert_equal(received, just_fits_name, 'Commissioner incorrectly set default NTP name')


# TODO(cecille): Test - Add hooks to change the time zone response to indicate no DST is needed
# TODO(cecille): Test - Set commissioningParameters TimeZone and DST list size to > node list size to ensure they get truncated
if __name__ == "__main__":
    default_matter_test_main()
