最近 AWS AppSync にふれる機会がありました。
そこで今回は、AWS AppSyncのGraphQLインタフェースを使って、Pythonでquery・mutation・subscriptionを試してみましたので、メモを残します。
目次
環境
Python 3.7.3
- awscli 1.16.189
- graphqlclient 0.2.4
- paho-mqtt 1.4.0
AWS AppSync
- データソース:DynamoDB
- 既存のDynamoDBを使用
- ストリームは無効
- 認証は、
API Key
- データソース:DynamoDB
既存のDynamoDBは
- title (key)
- content
という2列を持つAppSyncToDoテーブルです。
$ aws dynamodb describe-table --table-name AppSyncToDo { "Table": { "AttributeDefinitions": [ { "AttributeName": "title", "AttributeType": "S" } ], "TableName": "AppSyncToDo", "KeySchema": [ { "AttributeName": "title", "KeyType": "HASH" } ], "TableStatus": "ACTIVE", "CreationDateTime": xxx, "ProvisionedThroughput": { "NumberOfDecreasesToday": 0, "ReadCapacityUnits": 5, "WriteCapacityUnits": 5 }, "TableSizeBytes": 49, "ItemCount": 2, "TableArn": "arn:aws:dynamodb:xxx", "TableId": "xxx", "LatestStreamLabel": "xxx", "LatestStreamArn": "arn:aws:dynamodb:xxx" } }
長いのでまとめ
Web上ではJavaScriptのサンプルが多いですが、Pythonでも問題なく動作しました。
以降は、試した時の流れです。
AWS AppSyncでAPIを作る
AppSyncのコンソールに入り、 Create API
します。
- Getting Started
Import DynamoDB table
を選択
- Import DynamoDB Table
- RegionとTable nameは、既存のものを選択
- Create or use an existing roleは、
New role
を選択 - Name the model は、適当に付ける
- Configure model fieldsは、Keyのtitleの他、
content
をString
で用意 - API nameは、適当に付ける
すると、以下のようなSchemeが自動的にできました。
type AppSyncToDo { title: String! content: String } type AppSyncToDoConnection { items: [AppSyncToDo] nextToken: String } input CreateAppSyncToDoInput { title: String! content: String } input DeleteAppSyncToDoInput { title: String! } type Mutation { createAppSyncToDo(input: CreateAppSyncToDoInput!): AppSyncToDo updateAppSyncToDo(input: UpdateAppSyncToDoInput!): AppSyncToDo deleteAppSyncToDo(input: DeleteAppSyncToDoInput!): AppSyncToDo } type Query { getAppSyncToDo(title: String!): AppSyncToDo listAppSyncToDos(filter: TableAppSyncToDoFilterInput, limit: Int, nextToken: String): AppSyncToDoConnection } type Subscription { onCreateAppSyncToDo(title: String, content: String): AppSyncToDo @aws_subscribe(mutations: ["createAppSyncToDo"]) onUpdateAppSyncToDo(title: String, content: String): AppSyncToDo @aws_subscribe(mutations: ["updateAppSyncToDo"]) onDeleteAppSyncToDo(title: String, content: String): AppSyncToDo @aws_subscribe(mutations: ["deleteAppSyncToDo"]) } input TableAppSyncToDoFilterInput { title: TableStringFilterInput content: TableStringFilterInput } input TableBooleanFilterInput { ne: Boolean eq: Boolean } input TableFloatFilterInput { ne: Float eq: Float le: Float lt: Float ge: Float gt: Float contains: Float notContains: Float between: [Float] } input TableIDFilterInput { ne: ID eq: ID le: ID lt: ID ge: ID gt: ID contains: ID notContains: ID between: [ID] beginsWith: ID } input TableIntFilterInput { ne: Int eq: Int le: Int lt: Int ge: Int gt: Int contains: Int notContains: Int between: [Int] } input TableStringFilterInput { ne: String eq: String le: String lt: String ge: String gt: String contains: String notContains: String between: [String] beginsWith: String } input UpdateAppSyncToDoInput { title: String! content: String }
また、Queriesも自動生成されました。queryとmutationの2つができています。
# Click the orange "Play" button and select the createAppSyncToDo # mutation to create an object in DynamoDB. # If you see an error that starts with "Unable to assume role", # wait a moment and try again. mutation createAppSyncToDo($createappsynctodoinput: CreateAppSyncToDoInput!) { createAppSyncToDo(input: $createappsynctodoinput) { title content } } # After running createAppSyncToDo, try running the listAppSyncToDos query. query listAppSyncToDos { listAppSyncToDos { items { title content } } }
mutation用の QUERY VARIABLES にも、初期値が設定されています。
{ "createappsynctodoinput": { "title": "Hello, world!", "content": "Hello, world!" } }
試しに createAppSyncToDo
を実行してみると、結果が右に表示されました。
{ "data": { "createAppSyncToDo": { "title": "Hello, world!", "content": "Hello, world!" } } }
DynamoDBにもデータが追加されています。
$ aws dynamodb get-item --table-name AppSyncToDo --key '{"title": {"S": "Hello, world!"}}' { "Item": { "content": { "S": "Hello, world!" }, "title": { "S": "Hello, world!" } } }
これで、AppSyncとDynamoDBが連携できていることが分かりました。
mutationの実行
APIができたため、次はPythonのGraphQLクライアントライブラリを使って、AppSyncにmutationを投げてデータを登録してみます。
PythonのGraphQLクライアントライブラリはGraphQLサイトにまとめられているものの他、Githubで公開されているものがあります。
https://graphql.org/code/#python
今回は、READMEにAPI Keyの渡し方が書いてあった、 python-graphql-client
を使って試してみます。
prisma/python-graphql-client: Simple GraphQL client for Python 2.7+
pip install graphqlclient
でインストール後、こんな感じでmutationを実行するスクリプトを作成します。
from graphqlclient import GraphQLClient def execute_mutation_api(gql_client, title, content): # AWS AppSyncのQueriesをそのまま貼って動作する mutation = """ mutation createAppSyncToDo($createappsynctodoinput: CreateAppSyncToDoInput!) { createAppSyncToDo(input: $createappsynctodoinput) { title content } } """ variables = { "createappsynctodoinput": { "title": title, "content": content, } } result = gql_client.execute(mutation, variables=variables) print(result) if __name__ == '__main__': c = GraphQLClient(API_URL) c.inject_token(API_KEY, 'X-Api-Key') # 登録する execute_mutation_api(c, 'ham', 'spam')
execute_mutation_api
関数の mutation
および variables
は、Queriesに記載されている内容をそのまま貼っています。
また、 API_URL
と API_KEY
については、AppSyncのSettingsに記載されている内容を使います。
準備ができたため、スクリプトを実行してみると、ログに以下が出力されました。
{"data":{"createAppSyncToDo":{"title":"ham","content":"spam"}}}
awscliで、DynamoDBの内容を確認します。
$ aws dynamodb get-item --table-name AppSyncToDo --key '{"title": {"S": "ham"}}' { "Item": { "content": { "S": "spam" }, "title": { "S": "ham" } } }
データが登録されており、mutationは成功したようです。
queryの実行
続いて、DynamoDBのデータを query
を使って取得してみます。
query内容は、AppSyncで自動的に作成された listAppSyncToDos
をそのまま使います。
def execute_query_api(gql_client): query = """ query listAppSyncToDos { listAppSyncToDos { items { title content } } } """ result = gql_client.execute(query) print(result) if __name__ == '__main__': c = GraphQLClient(API_URL) c.inject_token(API_KEY, 'X-Api-Key') # 登録した情報を取得する execute_query_api(c)
このスクリプトを実行してみます。
{"data":{"listAppSyncToDos":{"items":[ {"title":"ham","content":"spam"}, {"title":"Hello, world!","content":"Hello, world!"} ]}}}
AppSyncのコンソールから入力した内容、および、mutationで登録した内容を取得できました*1。
subscriptionの実行
onCreate系のsubscription
最後にsubscriptionを実行してみます。
AppSyncのSettingsにはhttpsのエンドポイントはあるものの、subscriptionで使われると思われるWebSocketのエンドポイントが見当たりませんでした。
いろいろ試してみたところ、AppSyncでのsubscriptionの流れは
- httpsのエンドポイントにsubscriptionをHTTPリクエストする
- MQTTのエンドポイントやその他の情報が返ってくる
- WebSocket(wss)を使って、MQTTのエンドポイントに接続
- DynamoDBでイベントが発生した時に、通知を受け取る
となるようです。
そこで、onCreate系のsubscriptionを例に、順に試してみます。
httpsのエンドポイントにsubscriptionをHTTPリクエスト & レスポンス
DynamoDBで新規作成イベントが発生した場合にtitleとcontentを受け取るsubscriptionを用意します。
def execute_subscription_api(gql_client, subscription): # Subscription APIに投げると、MQTTの接続情報が返ってくる r = gql_client.execute(subscription) # JSON文字列なので、デシリアライズしてPythonオブジェクトにする response = json.loads(r) # 中身を見てみる print(response) if __name__ == '__main__': c = GraphQLClient(API_URL) c.inject_token(API_KEY, 'X-Api-Key') # DynamoDBが更新された時の通知を1回だけ受け取る # Subscription API用のGraphQL (onCreate系) subscription = """ subscription { onCreateAppSyncToDo { title content } } """ execute_subscription_api(c, subscription)
実行してみると次のようなレスポンスが返ってきます。
{'extensions': {'subscription': { 'mqttConnections': [ {'url': 'wss://<host>.iot.<region>.amazonaws.com/mqtt?<v4_credential>', 'topics': ['path/to/onCreateAppSyncToDo/'], 'client': '<client_id>'}], 'newSubscriptions': { 'onCreateAppSyncToDo': {'topic': 'path/to/onCreateAppSyncToDo/', 'expireTime': None}}}}, 'data': {'onCreateAppSyncToDo': None}}
キー mqttConnections
の中に、MQTTのエンドポイントやtopics、Client ID が入っていました。
WebSocket(wss)を使って、MQTTのエンドポイントに接続
次はPythonのMQTTクライアントを使って、MQTTのエンドポイントに接続してみます。
今回は、MQTTクライアントとして paho.mqtt.python (paho-mqtt)
を使います。
eclipse/paho.mqtt.python: paho.mqtt.python
続いて、MQTTエンドポイント接続についてです。
レスポンスの url
を見ると、プロトコルが wss
と、セキュアなTLSによるWebSocketを使っています。また、エンドポイントはAWS IoTのようです。
TLS & AWS IoTを使うということは、接続用の証明書などを用意しないといけないのかなと思いました。
例:X.509 証明書と AWS IoT - AWS IoT
しかし、レスポンスのurlを見ると、以下にある AWS 署名バージョン 4
がクエリ文字列としてすでに追加されていました。
MQTT over WebSocket プロトコル - AWS IoT
また、AWS署名バージョン4が追加済の場合に、 paho-mqtt
を使って AWS IoTのMQTTエンドポイントと接続している例が、以下に記載されていました。
https://github.com/eclipse/paho.mqtt.python/issues/277#issuecomment-372019123
実際に試してみたところ、たしかにAWS IoTの証明書まわりは不要でした。
そこで、subscription関数に、
paho-mqtt
を使って、MQTTエンドポイントに接続- 接続できたら、レスポンスにあった
topic
をsubscribeする - topicからメッセージが送られてきたら、メッセージ内容を出力して、接続を切断(disconnect)する
という実装を追加してみました。
def execute_subscription_api(gql_client): ... def on_connect(client, userdata, flags, respons_code): print('connected') # 接続できたのでsubscribeする client.subscribe(topic) def on_message(client, userdata, msg): # メッセージを表示する print(f'{msg.topic} {str(msg.payload)}') # メッセージを受信したので、今回は切断してみる # これがないと、再びメッセージを待ち続ける client.disconnect() # Subscribeするのに必要な情報を取得する client_id = response['extensions']['subscription']['mqttConnections'][0]['client'] topic = response['extensions']['subscription']['mqttConnections'][0]['topics'][0] # URLはparseして、扱いやすくする url = response['extensions']['subscription']['mqttConnections'][0]['url'] urlparts = urlparse(url) # ヘッダーとして、netloc(ネットワーク上の位置)を設定 headers = { 'Host': '{0:s}'.format(urlparts.netloc), } # 送信時、ClientIDを指定した上でWebSocketで送信しないと、通信できないので注意 mqtt_client = MQTTClient(client_id=client_id, transport='websockets') # 接続時のコールバックメソッドを登録する mqtt_client.on_connect = on_connect # データ受信時のコールバックメソッドを登録する mqtt_client.on_message = on_message # ヘッダやパスを指定する mqtt_client.ws_set_options(path=f'{urlparts.path}?{urlparts.query}', headers=headers) # TLSを有効にする mqtt_client.tls_set() # wssで接続するため、443ポートに投げる mqtt_client.connect(urlparts.netloc, port=443) # 受信するのを待つ mqtt_client.loop_forever()
ポイントは、MQTTのクライアントを生成する際
MQTTClient(client_id=client_id, transport='websockets')
と、
- client_idに、レスポンスの
client
の値を指定 - transportとして、
websockets
を指定
の2つとなります。
次に、スクリプトを実行してみると、コンソールに connected
が表示されたままになりました。うまくいっているようです。
DynamoDBでイベントが発生した時に、通知を受け取る
最後に、AppSyncのコンソールから以下のデータを1件登録してみます。
{ "createappsynctodoinput": { "title": "new", "content": "new content" } }
すると、コンソールが進み、以下のログを出して終了しました*2。
path/to/onCreateAppSyncToDo/ b'{"data":{"onCreateAppSyncToDo": {"title":"new", "content":"new content", "__typename":"AppSyncToDo"}}}'
onCreate系のsubscriptionができているようです。
ちなみに、一番最初で見たとおり、DynamoDBのストリームは無効化してあります。しかし、AppSyncではDynamoDBの変更を検知し、クライアント側に通知が来ました。これは更新系・削除系でも同じでした。
onUpdate系のsubscription
同様にして、onUpdate系を試してみます。subscriptionはこんな感じです。
update_subscription = """ subscription { onUpdateAppSyncToDo { title content } } """ execute_subscription_api(c, update_subscription)
MQTTで接続後、AppSyncコンソールのQueriesを更新系のmutationに差し替えて実行します。
mutation updateAppSyncToDo($updateappsynctodoinput: UpdateAppSyncToDoInput!) { updateAppSyncToDo(input: $updateappsynctodoinput) { title content } }
QUERY VARIABLESも変更します。
{ "updateappsynctodoinput": { "title": "new", "content": "update" } }
AppSync上で上記のmutationを実行します。すると、MQTTを実行していたコンソールに以下が表示され、更新系のsubscriptionも受信できました。
path/to/onUpdateAppSyncToDo/ b'{"data":{"onUpdateAppSyncToDo": {"title":"new", "content":"update", "__typename":"AppSyncToDo"}}}'
onDelete系のsubscription
onDelete系も試してみます。Python上では以下のsubscriptionを作成します。
delete_subscription = """ subscription { onDeleteAppSyncToDo { title content } } """ execute_subscription_api(c, delete_subscription)
MQTTで接続後、AppSyncコンソールのQueriesを削除系のmutationに差し替えて実行します。
mutation deleteAppSyncToDo($deleteappsynctodoinput: DeleteAppSyncToDoInput!) { deleteAppSyncToDo(input: $deleteappsynctodoinput) { title content } }
QUERY VARIABLESも変更します。
{ "deleteappsynctodoinput": { "title": "new" } }
AppSync上で上記のmutationを実行します。すると、MQTTを実行していたコンソールに以下が表示され、削除系のsubscriptionも受信できました。
{ "data": { "deleteAppSyncToDo": { "title": "new", "content": "update" } } }
以上より、Pythonで、 AWS AppSyncのquery・mutation・subscriptionをすべて試すことができました。
ソースコード
GitHubにあげました。 query_mutation_subscription
ディレクトリの中が今回のソースコードです。
https://github.com/thinkAmi-sandbox/AWS_AppSync_python_client-sample
その他
PythonのGraphQLクライアントのみでsubscription
今回、subscriptionではMQTTクライアントも併用していました。
GraphQLクライアントだけでできればいいなーとは思ったのですが、今のところ対応しているクライアントは無さそうです。
- What is the chance to add Subscription ? · Issue #11 · prisma/python-graphql-client
- websocket / subscriptions support? · Issue #26 · profusion/sgqlc
- Are websockets supported? · Issue #42 · graphql-python/gql
- websocket / subscriptions support? · Issue #21 · graphql-python/gql-next
- python-graphql-client/client.py at master · hsdp/python-graphql-client
もしくは、WebSocketだけのクライアント実装がありました(HTTPは話せない)
イベント情報
来週(2019/7/4)、ぎーらぼで AWS Expert Online (AWS AppSync関連) のイベントがあります。
AWS Expert Online at JAWS-UG長野 - connpass