-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathAwsClient.py
More file actions
72 lines (56 loc) · 2.04 KB
/
AwsClient.py
File metadata and controls
72 lines (56 loc) · 2.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTShadowClient
import AWSIoTPythonSDK
import logging
import time
import json
import datetime
import os
import sys
from time import sleep
from signal import pause
class AwsClient:
def __init__(self):
certificatePath, privateKeyPath = self._get_cert_files()
settings = self._get_settings()
host = settings['host_name']
rootCAPath = "cert/AmazonRootCA1.pem"
port = 8883
clientId = "coffee-panic-client"
# Init AWSIoTMQTTClient
client = AWSIoTMQTTShadowClient(clientId)
client.configureEndpoint(host, port)
client.configureCredentials(rootCAPath, privateKeyPath, certificatePath)
# AWSIoTMQTTClient connection configuration
client.configureAutoReconnectBackoffTime(1, 32, 20)
client.configureConnectDisconnectTimeout(10) # 10 sec
client.configureMQTTOperationTimeout(5) # 5 sec
# Connect to AWS IoT
client.connect()
self.shadow_handler = client.createShadowHandlerWithName(settings['thing_name'], True)
def _get_settings(self):
with open('./settings.conf') as config_file:
settings = json.loads(config_file.read())
return settings
def _get_cert_files(self):
certificatePath = None
privateKeyPath = None
certificate_directory = os.listdir('./cert')
for file in certificate_directory:
if file.endswith('pem.crt'):
certificatePath = 'cert/' + file
elif file.endswith('.pem.key'):
privateKeyPath = 'cert/' + file
if not privateKeyPath or not certificatePath:
sys.exit("You need to add a certificate and private key file to the cert directory\n"
+"Certificate files can be generated at in AWS IoT")
return (certificatePath, privateKeyPath)
def publish_weight(self, scale_reading):
self.publish_coffee_message(scale_reading)
def shadow_callback(self, *args):
pass
def publish_coffee_message(self, scale_reading):
message = scale_reading
message['unit'] = "gram"
message['time'] = str(datetime.datetime.now())
messageJson = json.dumps( {'state': {"reported": message } } )
self.shadow_handler.shadowUpdate(messageJson, self.shadow_callback, 5 )