2019/07/27

GCP: Iphoneから、端末位置情報をIoT分析環境に送信する -- python, GCP, IoTcore

Goal: 「IoTデータ分析環境立ち上げ on GCP」の環境に対して、Iphoneから位置情報データをPushするサンプル。Iphoneをエッジ端末として活用して、色々できそう。

How 

0. 前準備 Pythonista関連
   a)Pythonista のインストール
   https://apps.apple.com/jp/app/pythonista-3/id1085978097
   b)Gitを使いたかったら、stashをインストール
   https://github.com/ywangd/stash
   c)説明サイト ググったらたくさんあると思いますが、私は、以下のリンクを参照させていただきました。ありがとうございました。
   http://hitoriblog.com/?p=42145#Python-5




1. Pythonista を使って、端末の位置情報、スピード等を取得する
  a)Iphone上で、位置情報などを確認してみる
import location
import json
import datetime
address_now=location.get_location()
address_now.update(created=datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"))
output = json.dumps(address_now)
print (output)
print(type(output))
print(type(address_now))
results = location.reverse_geocode(address_now)
print (results)




2. GCPのPub/Subにパブリッシュする。これの詳細は、前出のブログを参照

   a)GCP提供のサンプルプログラムをベースに追加加工する。

      https://github.com/GoogleCloudPlatform/python-docs-samples/tree/master/iot/api-client/mqtt_example
   をローカルPCにクローンして、Pythonの環境などをセットする。
   cloudiot_mqtt_example.pyをベースに変更を加えたものが以下。

#!/usr/bin/env python
import argparse
import datetime
import os
import random
import ssl
import time
import jwt
# --- 追加(ここから) ---
from jwt.contrib.algorithms.pycrypto import RSAAlgorithm
from jwt.contrib.algorithms.py_ecdsa import ECAlgorithm
try:
jwt.unregister_algorithm('RS256')
jwt.unregister_algorithm('ES256')
except:
pass
jwt.register_algorithm('RS256', RSAAlgorithm(RSAAlgorithm.SHA256))
jwt.register_algorithm('ES256', ECAlgorithm(ECAlgorithm.SHA256))
import location
import json
# --- 追加(ここまで) ---
import paho.mqtt.client as mqtt
# The initial backoff time after a disconnection occurs, in seconds.
minimum_backoff_time = 1
# The maximum backoff time before giving up, in seconds.
MAXIMUM_BACKOFF_TIME = 32
# Whether to wait with exponential backoff before publishing.
should_backoff = False
# [START iot_mqtt_jwt]
def create_jwt(project_id, private_key_file, algorithm):
token = {
# The time that the token was issued at
'iat': datetime.datetime.utcnow(),
# The time the token expires.
'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=60),
# The audience field should always be set to the GCP project id.
'aud': project_id
}
# Read the private key file.
with open(private_key_file, 'r') as f:
private_key = f.read()
print('Creating JWT using {} from private key file {}'.format(
algorithm, private_key_file))
return jwt.encode(token, private_key, algorithm=algorithm)
# [END iot_mqtt_jwt]
# [START iot_mqtt_config]
def error_str(rc):
"""Convert a Paho error to a human readable string."""
return '{}: {}'.format(rc, mqtt.error_string(rc))
def on_connect(unused_client, unused_userdata, unused_flags, rc):
"""Callback for when a device connects."""
print('on_connect', mqtt.connack_string(rc))
# After a successful connect, reset backoff time and stop backing off.
global should_backoff
global minimum_backoff_time
should_backoff = False
minimum_backoff_time = 1
def on_disconnect(unused_client, unused_userdata, rc):
"""Paho callback for when a device disconnects."""
print('on_disconnect', error_str(rc))
# Since a disconnect occurred, the next loop iteration will wait with
# exponential backoff.
global should_backoff
should_backoff = True
def on_publish(unused_client, unused_userdata, unused_mid):
"""Paho callback when a message is sent to the broker."""
print('on_publish')
def on_message(unused_client, unused_userdata, message):
"""Callback when the device receives a message on a subscription."""
payload = str(message.payload)
print('Received message \'{}\' on topic \'{}\' with Qos {}'.format(
payload, message.topic, str(message.qos)))
def get_client(
project_id, cloud_region, registry_id, device_id, private_key_file,
algorithm, ca_certs, mqtt_bridge_hostname, mqtt_bridge_port):
"""Create our MQTT client. The client_id is a unique string that identifies
this device. For Google Cloud IoT Core, it must be in the format below."""
client = mqtt.Client(
client_id=('projects/{}/locations/{}/registries/{}/devices/{}'
.format(
project_id,
cloud_region,
registry_id,
device_id)))
# With Google Cloud IoT Core, the username field is ignored, and the
# password field is used to transmit a JWT to authorize the device.
client.username_pw_set(
username='unused',
password=create_jwt(
project_id, private_key_file, algorithm))
# Enable SSL/TLS support.
client.tls_set(ca_certs=ca_certs, tls_version=ssl.PROTOCOL_TLSv1_2)
# Register message callbacks. https://eclipse.org/paho/clients/python/docs/
# describes additional callbacks that Paho supports. In this example, the
# callbacks just print to standard out.
client.on_connect = on_connect
client.on_publish = on_publish
client.on_disconnect = on_disconnect
client.on_message = on_message
# Connect to the Google MQTT bridge.
client.connect(mqtt_bridge_hostname, mqtt_bridge_port)
# This is the topic that the device will receive configuration updates on.
mqtt_config_topic = '/devices/{}/config'.format(device_id)
# Subscribe to the config topic.
client.subscribe(mqtt_config_topic, qos=1)
# The topic that the device will receive commands on.
mqtt_command_topic = '/devices/{}/commands/#'.format(device_id)
# Subscribe to the commands topic, QoS 1 enables message acknowledgement.
print('Subscribing to {}'.format(mqtt_command_topic))
client.subscribe(mqtt_command_topic, qos=0)
return client
# [END iot_mqtt_config]
def parse_command_line_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description=(
'Example Google Cloud IoT Core MQTT device connection code.'))
parser.add_argument(
'--project_id',
default=os.environ.get('GOOGLE_CLOUD_PROJECT'),
help='GCP cloud project name')
parser.add_argument(
'--registry_id', required=True, help='Cloud IoT Core registry id')
parser.add_argument(
'--device_id', required=True, help='Cloud IoT Core device id')
parser.add_argument(
'--private_key_file',
required=True, help='Path to private key file.')
parser.add_argument(
'--algorithm',
choices=('RS256', 'ES256'),
required=True,
help='Which encryption algorithm to use to generate the JWT.')
parser.add_argument(
'--cloud_region', default='us-central1', help='GCP cloud region')
parser.add_argument(
'--ca_certs',
default='roots.pem',
help=('CA root from https://pki.google.com/roots.pem'))
parser.add_argument(
'--num_messages',
type=int,
default=100,
help='Number of messages to publish.')
parser.add_argument(
'--message_type',
choices=('event', 'state'),
default='event',
help=('Indicates whether the message to be published is a '
'telemetry event or a device state message.'))
parser.add_argument(
'--mqtt_bridge_hostname',
default='mqtt.googleapis.com',
help='MQTT bridge hostname.')
parser.add_argument(
'--mqtt_bridge_port',
choices=(8883, 443),
default=8883,
type=int,
help='MQTT bridge port.')
parser.add_argument(
'--jwt_expires_minutes',
default=20,
type=int,
help=('Expiration time, in minutes, for JWT tokens.'))
return parser.parse_args()
# [START iot_mqtt_run]
def main():
global minimum_backoff_time
args = parse_command_line_args()
# Publish to the events or state topic based on the flag.
sub_topic = 'events' if args.message_type == 'event' else 'state'
mqtt_topic = '/devices/{}/{}'.format(args.device_id, sub_topic)
jwt_iat = datetime.datetime.utcnow()
jwt_exp_mins = args.jwt_expires_minutes
client = get_client(
args.project_id, args.cloud_region, args.registry_id, args.device_id,
args.private_key_file, args.algorithm, args.ca_certs,
args.mqtt_bridge_hostname, args.mqtt_bridge_port)
# --- 追加(ここから) ---
location.start_updates()
time.sleep(1)
# --- 追加(ここまで) ---
# Publish num_messages mesages to the MQTT bridge once per second.
for i in range(1, args.num_messages + 1):
# Process network events.
client.loop()
# Wait if backoff is required.
if should_backoff:
# If backoff time is too large, give up.
if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
print('Exceeded maximum backoff time. Giving up.')
break
# Otherwise, wait and connect again.
delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
print('Waiting for {} before reconnecting.'.format(delay))
time.sleep(delay)
minimum_backoff_time *= 2
client.connect(args.mqtt_bridge_hostname, args.mqtt_bridge_port)
# --- 修正(ここから) ---
outputdata = location.get_location()
outputdata.update(created=datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),dev_id=args.device_id)
payload = json.dumps(outputdata)
# --- 修正(ここまで) ---
print('Publishing message {}/{}: \'{}\''.format(
i, args.num_messages, payload))
# [START iot_mqtt_jwt_refresh]
seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
if seconds_since_issue > 60 * jwt_exp_mins:
print('Refreshing token after {}s').format(seconds_since_issue)
jwt_iat = datetime.datetime.utcnow()
client = get_client(
args.project_id, args.cloud_region,
args.registry_id, args.device_id, args.private_key_file,
args.algorithm, args.ca_certs, args.mqtt_bridge_hostname,
args.mqtt_bridge_port)
# [END iot_mqtt_jwt_refresh]
# Publish "payload" to the MQTT topic. qos=1 means at least once
# delivery. Cloud IoT Core also supports qos=0 for at most once
# delivery.
client.publish(mqtt_topic, payload, qos=1)
# Send events every second. State should not be updated as often
time.sleep(1 if args.message_type == 'event' else 5)
# --- 追加(ここから) ---
location.stop_updates()
# --- 追加(ここまで) ---
print('Finished.')
# [END iot_mqtt_run]
if __name__ == '__main__':
main()