#
#    Copyright (c) 2023 Project CHIP Authors
#    All rights reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#
import logging

import chip.clusters as Clusters
from chip import ChipDeviceCtrl
from chip.clusters.Types import NullValue
from chip.interaction_model import InteractionModelError, Status
from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main, utc_time_in_matter_epoch
from mobly import asserts

# We don't have a good pipe between the c++ enums in CommissioningDelegate and python
# so this is hardcoded.
# I realize this is dodgy, not sure how to cross the enum from c++ to python cleanly
kConfigureUTCTime = 6
kConfigureTimeZone = 7
kConfigureDSTOffset = 8
kConfigureDefaultNTP = 9
kConfigureTrustedTimeSource = 19


class TestCommissioningTimeSync(MatterBaseTest):
    def setup_class(self):
        self.commissioner = None
        self.commissioned = False
        return super().setup_class()

    async def destroy_current_commissioner(self):
        if self.commissioner:
            if self.commissioned:
                fabricidx = await self.read_single_attribute_check_success(dev_ctrl=self.commissioner, cluster=Clusters.OperationalCredentials, attribute=Clusters.OperationalCredentials.Attributes.CurrentFabricIndex)
                cmd = Clusters.OperationalCredentials.Commands.RemoveFabric(fabricIndex=fabricidx)
                await self.send_single_cmd(cmd=cmd)
            self.commissioner.Shutdown()
        self.commissioner = None
        self.commissioned = False

    @async_test_body
    async def teardown_test(self):
        await self.destroy_current_commissioner()
        return super().teardown_test()

    async def commission_and_base_checks(self):
        params = self.default_controller.OpenCommissioningWindow(
            nodeid=self.dut_node_id, timeout=600, iteration=10000, discriminator=1234, option=1)
        errcode = self.commissioner.CommissionOnNetwork(
            nodeId=self.dut_node_id, setupPinCode=params.setupPinCode,
            filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=1234)
        asserts.assert_true(errcode.is_success, 'Commissioning did not complete successfully')
        self.commissioned = True

        # Check the feature map - if we have a time cluster, we want UTC time to be set
        features = await self.read_single_attribute(dev_ctrl=self.default_controller, node_id=self.dut_node_id,
                                                    endpoint=0, attribute=Clusters.TimeSynchronization.Attributes.FeatureMap)
        self.supports_time = True
        if isinstance(features, Clusters.Attribute.ValueDecodeFailure):
            asserts.assert_true(isinstance(features.Reason, InteractionModelError), InteractionModelError,
                                f'Unexpected exception from reading time cluster feature map {features.Reason}')
            asserts.assert_equal(features.Reason.status, Status.UnsupportedCluster,
                                 f'Unexpected error response from reading time cluster feature map {features.Reason}')
            self.supports_time = False

        asserts.assert_equal(self.commissioner.CheckStageSuccessful(
            kConfigureUTCTime), self.supports_time, 'UTC time stage incorrect')

        if self.supports_time:
            self.supports_time_zone = bool(features & Clusters.TimeSynchronization.Bitmaps.Feature.kTimeZone)
            self.supports_default_ntp = bool(features & Clusters.TimeSynchronization.Bitmaps.Feature.kNTPClient)
            self.supports_trusted_time_source = bool(features & Clusters.TimeSynchronization.Bitmaps.Feature.kTimeSyncClient)
        else:
            self.supports_time_zone = False
            self.supports_default_ntp = False
            self.supports_trusted_time_source = False

    async def create_commissioner(self):
        if self.commissioner:
            self.destroy_current_commissioner()
        new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
        new_fabric_admin = new_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=2)
        self.commissioner = new_fabric_admin.NewController(nodeId=112233, useTestCommissioner=True)

        self.commissioner.ResetCommissioningParameters()
        self.commissioner.ResetTestCommissioner()

        # If the app we're testing against doesn't have a time cluster, we still want to run this
        # tests and we want it to succeed, so catch the unsupported cluster error here and ignore
        try:
            cmd = Clusters.TimeSynchronization.Commands.SetDefaultNTP(defaultNTP=NullValue)
            await self.send_single_cmd(cmd=cmd)
            cmd = Clusters.TimeSynchronization.Commands.SetTrustedTimeSource(NullValue)
            await self.send_single_cmd(cmd=cmd)
        except InteractionModelError as e:
            if e.status == Status.UnsupportedCluster:
                pass

    async def commission_stages(self, time_zone: bool, dst: bool, default_ntp: bool, trusted_time_source: bool):
        await self.create_commissioner()

        logging.info(
            f'Running Commissioning test - time_zone: {time_zone}, dst: {dst}, default_ntp: {default_ntp}, trusted_time_source: {trusted_time_source}')

        if time_zone:
            self.commissioner.SetTimeZone(offset=3600, validAt=0)
        if dst:
            six_months = 1.577e+13  # in us
            self.commissioner.SetDSTOffset(offset=3600, validStarting=0, validUntil=utc_time_in_matter_epoch() + int(six_months))
        if default_ntp:
            self.commissioner.SetDefaultNTP("fe80::1")
        if trusted_time_source:
            self.commissioner.SetTrustedTimeSource(self.commissioner.nodeId, 0)

        await self.commission_and_base_checks()

        should_set_time_zone = bool(self.supports_time_zone and time_zone)
        should_set_dst = bool(self.supports_time_zone and time_zone and dst)
        should_set_default_ntp = bool(self.supports_default_ntp and default_ntp)
        should_set_trusted_time = bool(self.supports_trusted_time_source and trusted_time_source)

        print(f'{should_set_time_zone} {should_set_dst} {should_set_default_ntp} {should_set_trusted_time}')
        asserts.assert_equal(self.commissioner.CheckStageSuccessful(kConfigureTimeZone),
                             should_set_time_zone, 'Incorrect value for time zone stage check')
        asserts.assert_equal(self.commissioner.CheckStageSuccessful(kConfigureDSTOffset),
                             should_set_dst, 'Incorrect value for kConfigureDSTOffset stage')
        asserts.assert_equal(self.commissioner.CheckStageSuccessful(kConfigureDefaultNTP),
                             should_set_default_ntp, 'Incorrect value for kConfigureDefaultNTP stage')
        asserts.assert_equal(self.commissioner.CheckStageSuccessful(kConfigureTrustedTimeSource),
                             should_set_trusted_time, 'Incorrect value for kConfigureTrustedTimeSource stage')

    @async_test_body
    async def test_CommissioningAllBasic(self):
        # We want to assess all combos (ie, all flags in the range of 0b0000 to 0b1111)
        for i in range(0, 0xF):
            time_zone = bool(i & 0x1)
            dst = bool(i & 0x2)
            default_ntp = bool(i & 0x4)
            trusted_time_source = bool(i & 0x8)
            await self.commission_stages(time_zone, dst, default_ntp, trusted_time_source)

    @async_test_body
    async def test_CommissioningPreSetValues(self):

        await self.create_commissioner()

        # If we're running this against a node that doesn't have a time cluster, this test doesn't apply
        # and the remaining cases are covered in the base case above.
        try:
            trusted_time_source = Clusters.TimeSynchronization.Structs.FabricScopedTrustedTimeSourceStruct(
                nodeID=0x5555, endpoint=0)
            cmd = Clusters.TimeSynchronization.Commands.SetTrustedTimeSource(trusted_time_source)
            await self.send_single_cmd(cmd)

            cmd = Clusters.TimeSynchronization.Commands.SetDefaultNTP("fe80::02")
            await self.send_single_cmd(cmd)
        except InteractionModelError as e:
            if e.status == Status.UnsupportedCluster:
                await self.destroy_current_commissioner()
                return

        self.commissioner.SetTimeZone(offset=3600, validAt=0)
        six_months = 1.577e+13  # in us
        self.commissioner.SetDSTOffset(offset=3600, validStarting=0, validUntil=utc_time_in_matter_epoch() + int(six_months))
        self.commissioner.SetDefaultNTP("fe80::1")
        self.commissioner.SetTrustedTimeSource(self.commissioner.nodeId, 0)

        await self.commission_and_base_checks()
        asserts.assert_equal(self.commissioner.CheckStageSuccessful(kConfigureTimeZone),
                             self.supports_time_zone, 'Incorrect value for time zone stage check')
        asserts.assert_equal(self.commissioner.CheckStageSuccessful(kConfigureDSTOffset),
                             self.supports_time_zone, 'Incorrect value for kConfigureDSTOffset stage')
        asserts.assert_false(self.commissioner.CheckStageSuccessful(kConfigureDefaultNTP), 'kConfigureDefaultNTP incorrectly set')
        asserts.assert_false(self.commissioner.CheckStageSuccessful(
            kConfigureTrustedTimeSource), 'kConfigureTrustedTimeSource incorrectly set')


# TODO(cecille): Test - Add hooks to change the time zone response to indicate no DST is needed
# TODO(cecille): Test - Set commissioningParameters TimeZone and DST list size to > node list size to ensure they get truncated
if __name__ == "__main__":
    default_matter_test_main()
