#!/usr/bin/env python | |
import boto3 | |
import json | |
class Certificate(): | |
def __init__(self, certId=''): | |
self.id = certId | |
self.arn = '' | |
self.client = boto3.client('iot') | |
if (self.id != ''): | |
result = self.client.describe_certificate(certificateId=self.id) | |
self.arn = result['certificateDescription']['certificateArn'] | |
def create(self): | |
assert not self.exists(), "Cert already exists" | |
cert = self.create_keys_and_certificate() | |
self.id = cert["certificateId"] | |
self.arn = cert["certificateArn"] | |
return cert | |
def create_keys_and_certificate(self): | |
result = self.client.create_keys_and_certificate(setAsActive=True) | |
return result | |
def delete(self): | |
cert_not_found = True | |
# Detach Policies attached to the cert | |
policies_attached = self.list_policies() | |
for policy in policies_attached: | |
self.detach_policy(policy['policyName']) | |
# Detach Things attached to the cert | |
things_attached = self.list_things() | |
for thing in things_attached: | |
self.detach_thing(thing) | |
# Update the status of the certificate to INACTIVE | |
try: | |
self.client.update_certificate(certificateId=self.id, | |
newStatus='INACTIVE') | |
cert_not_found = False | |
except self.client.exceptions.ResourceNotFoundException: | |
cert_not_found = True | |
return cert_not_found | |
# Delete the certificate | |
try: | |
self.client.delete_certificate(certificateId=self.id) | |
cert_not_found = False | |
except self.client.exceptions.ResourceNotFoundException: | |
cert_not_found = True | |
return cert_not_found | |
def exists(self): | |
if self.id == '': | |
return False | |
else: | |
return True | |
def get_arn(self): | |
return self.arn | |
def list_policies(self): | |
policies = self.client.list_principal_policies(principal=self.arn) | |
policies = policies['policies'] | |
return policies | |
def attach_policy(self, policy_name): | |
self.client.attach_policy(policyName=policy_name, target=self.arn) | |
def detach_policy(self, policy_name): | |
self.client.detach_policy(policyName=policy_name, target=self.arn) | |
def list_things(self): | |
things = self.client.list_principal_things(principal=self.arn) | |
things = things['things'] | |
return things | |
def attach_thing(self, thing_name): | |
self.client.attach_thing_principal(thingName=thing_name, | |
principal=self.arn) | |
def detach_thing(self, thing_name): | |
self.client.detach_thing_principal(thingName=thing_name, | |
principal=self.arn) |