Pythonで、 AWS AppSyncのquery・mutation・subscriptionを試してみた

最近 AWS AppSync にふれる機会がありました。

そこで今回は、AWS AppSyncのGraphQLインタフェースを使って、Pythonでquery・mutation・subscriptionを試してみましたので、メモを残します。

 

目次

 

環境

 
既存のDynamoDBは

  • title (key)
  • content

という2列を持つAppSyncToDoテーブルです。

$ aws dynamodb describe-table --table-name AppSyncToDo
{
    "Table": {
        "AttributeDefinitions": [
            {
                "AttributeName": "title",
                "AttributeType": "S"
            }
        ],
        "TableName": "AppSyncToDo",
        "KeySchema": [
            {
                "AttributeName": "title",
                "KeyType": "HASH"
            }
        ],
        "TableStatus": "ACTIVE",
        "CreationDateTime": xxx,
        "ProvisionedThroughput": {
            "NumberOfDecreasesToday": 0,
            "ReadCapacityUnits": 5,
            "WriteCapacityUnits": 5
        },
        "TableSizeBytes": 49,
        "ItemCount": 2,
        "TableArn": "arn:aws:dynamodb:xxx",
        "TableId": "xxx",
        "LatestStreamLabel": "xxx",
        "LatestStreamArn": "arn:aws:dynamodb:xxx"
    }
}

 

 

長いのでまとめ

Web上ではJavaScriptのサンプルが多いですが、Pythonでも問題なく動作しました。

以降は、試した時の流れです。

 

AWS AppSyncでAPIを作る

AppSyncのコンソールに入り、 Create API します。

  • Getting Started
    • Import DynamoDB table を選択
  • Import DynamoDB Table
    • RegionとTable nameは、既存のものを選択
    • Create or use an existing roleは、 New role を選択
    • Name the model は、適当に付ける
    • Configure model fieldsは、Keyのtitleの他、 contentString で用意
    • API nameは、適当に付ける

 

すると、以下のようなSchemeが自動的にできました。

type AppSyncToDo {
    title: String!
    content: String
}

type AppSyncToDoConnection {
    items: [AppSyncToDo]
    nextToken: String
}

input CreateAppSyncToDoInput {
    title: String!
    content: String
}

input DeleteAppSyncToDoInput {
    title: String!
}

type Mutation {
    createAppSyncToDo(input: CreateAppSyncToDoInput!): AppSyncToDo
    updateAppSyncToDo(input: UpdateAppSyncToDoInput!): AppSyncToDo
    deleteAppSyncToDo(input: DeleteAppSyncToDoInput!): AppSyncToDo
}

type Query {
    getAppSyncToDo(title: String!): AppSyncToDo
    listAppSyncToDos(filter: TableAppSyncToDoFilterInput, limit: Int, nextToken: String): AppSyncToDoConnection
}

type Subscription {
    onCreateAppSyncToDo(title: String, content: String): AppSyncToDo
        @aws_subscribe(mutations: ["createAppSyncToDo"])
    onUpdateAppSyncToDo(title: String, content: String): AppSyncToDo
        @aws_subscribe(mutations: ["updateAppSyncToDo"])
    onDeleteAppSyncToDo(title: String, content: String): AppSyncToDo
        @aws_subscribe(mutations: ["deleteAppSyncToDo"])
}

input TableAppSyncToDoFilterInput {
    title: TableStringFilterInput
    content: TableStringFilterInput
}

input TableBooleanFilterInput {
    ne: Boolean
    eq: Boolean
}

input TableFloatFilterInput {
    ne: Float
    eq: Float
    le: Float
    lt: Float
    ge: Float
    gt: Float
    contains: Float
    notContains: Float
    between: [Float]
}

input TableIDFilterInput {
    ne: ID
    eq: ID
    le: ID
    lt: ID
    ge: ID
    gt: ID
    contains: ID
    notContains: ID
    between: [ID]
    beginsWith: ID
}

input TableIntFilterInput {
    ne: Int
    eq: Int
    le: Int
    lt: Int
    ge: Int
    gt: Int
    contains: Int
    notContains: Int
    between: [Int]
}

input TableStringFilterInput {
    ne: String
    eq: String
    le: String
    lt: String
    ge: String
    gt: String
    contains: String
    notContains: String
    between: [String]
    beginsWith: String
}

input UpdateAppSyncToDoInput {
    title: String!
    content: String
}

 
また、Queriesも自動生成されました。queryとmutationの2つができています。

# Click the orange "Play" button and select the createAppSyncToDo
# mutation to create an object in DynamoDB.
# If you see an error that starts with "Unable to assume role",
# wait a moment and try again.
mutation createAppSyncToDo($createappsynctodoinput: CreateAppSyncToDoInput!) {
  createAppSyncToDo(input: $createappsynctodoinput) {
    title
    content
  }
}


# After running createAppSyncToDo, try running the listAppSyncToDos query.
query listAppSyncToDos {
  listAppSyncToDos {
    items {
      title
      content
    }
  }
}

 
mutation用の QUERY VARIABLES にも、初期値が設定されています。

{
  "createappsynctodoinput": {
    "title": "Hello, world!",
    "content": "Hello, world!"
  }
}

 
試しに createAppSyncToDo を実行してみると、結果が右に表示されました。

{
  "data": {
    "createAppSyncToDo": {
      "title": "Hello, world!",
      "content": "Hello, world!"
    }
  }
}

 
DynamoDBにもデータが追加されています。

$ aws dynamodb get-item --table-name AppSyncToDo --key '{"title": {"S": "Hello, world!"}}'
{
    "Item": {
        "content": {
            "S": "Hello, world!"
        },
        "title": {
            "S": "Hello, world!"
        }
    }
}

 
これで、AppSyncとDynamoDBが連携できていることが分かりました。

 

mutationの実行

APIができたため、次はPythonのGraphQLクライアントライブラリを使って、AppSyncにmutationを投げてデータを登録してみます。

PythonのGraphQLクライアントライブラリはGraphQLサイトにまとめられているものの他、Githubで公開されているものがあります。
https://graphql.org/code/#python

今回は、READMEにAPI Keyの渡し方が書いてあった、 python-graphql-client を使って試してみます。
prisma/python-graphql-client: Simple GraphQL client for Python 2.7+

 
pip install graphqlclient でインストール後、こんな感じでmutationを実行するスクリプトを作成します。

from graphqlclient import GraphQLClient

def execute_mutation_api(gql_client, title, content):
    # AWS AppSyncのQueriesをそのまま貼って動作する
    mutation = """
        mutation createAppSyncToDo($createappsynctodoinput: CreateAppSyncToDoInput!) {
            createAppSyncToDo(input: $createappsynctodoinput) {
                title
                content
              }
            }
    """

    variables = {
        "createappsynctodoinput": {
            "title": title,
            "content": content,
        }
    }

    result = gql_client.execute(mutation, variables=variables)
    print(result)
    
    
if __name__ == '__main__':

    c = GraphQLClient(API_URL)
    c.inject_token(API_KEY, 'X-Api-Key')

    # 登録する
    execute_mutation_api(c, 'ham', 'spam')

 
execute_mutation_api 関数の mutation および variables は、Queriesに記載されている内容をそのまま貼っています。

また、 API_URLAPI_KEY については、AppSyncのSettingsに記載されている内容を使います。

 
準備ができたため、スクリプトを実行してみると、ログに以下が出力されました。

{"data":{"createAppSyncToDo":{"title":"ham","content":"spam"}}}

 
awscliで、DynamoDBの内容を確認します。

$ aws dynamodb get-item --table-name AppSyncToDo --key '{"title": {"S": "ham"}}'
{
    "Item": {
        "content": {
            "S": "spam"
        },
        "title": {
            "S": "ham"
        }
    }
}

 
データが登録されており、mutationは成功したようです。

 

queryの実行

続いて、DynamoDBのデータを query を使って取得してみます。

query内容は、AppSyncで自動的に作成された listAppSyncToDos をそのまま使います。

def execute_query_api(gql_client):
    query = """
        query listAppSyncToDos {
          listAppSyncToDos {
            items {
              title
              content
            }
          }
        }
    """
    result = gql_client.execute(query)
    print(result)
    
if __name__ == '__main__':
    c = GraphQLClient(API_URL)
    c.inject_token(API_KEY, 'X-Api-Key')

    # 登録した情報を取得する
    execute_query_api(c)

 
このスクリプトを実行してみます。

{"data":{"listAppSyncToDos":{"items":[
    {"title":"ham","content":"spam"},
    {"title":"Hello, world!","content":"Hello, world!"}
]}}}

AppSyncのコンソールから入力した内容、および、mutationで登録した内容を取得できました*1

 

subscriptionの実行

onCreate系のsubscription

最後にsubscriptionを実行してみます。

AppSyncのSettingsにはhttpsのエンドポイントはあるものの、subscriptionで使われると思われるWebSocketのエンドポイントが見当たりませんでした。

 
いろいろ試してみたところ、AppSyncでのsubscriptionの流れは

  1. httpsのエンドポイントにsubscriptionをHTTPリクエストする
  2. MQTTのエンドポイントやその他の情報が返ってくる
  3. WebSocket(wss)を使って、MQTTのエンドポイントに接続
  4. DynamoDBでイベントが発生した時に、通知を受け取る

となるようです。

 
そこで、onCreate系のsubscriptionを例に、順に試してみます。

 

httpsのエンドポイントにsubscriptionをHTTPリクエスト & レスポンス

DynamoDBで新規作成イベントが発生した場合にtitleとcontentを受け取るsubscriptionを用意します。

def execute_subscription_api(gql_client, subscription):
    # Subscription APIに投げると、MQTTの接続情報が返ってくる
    r = gql_client.execute(subscription)

    # JSON文字列なので、デシリアライズしてPythonオブジェクトにする
    response = json.loads(r)

    # 中身を見てみる
    print(response)
    

if __name__ == '__main__':
    c = GraphQLClient(API_URL)
    c.inject_token(API_KEY, 'X-Api-Key')
    
    # DynamoDBが更新された時の通知を1回だけ受け取る
    # Subscription API用のGraphQL (onCreate系)
    subscription = """
        subscription {
            onCreateAppSyncToDo {
                title
                content
            }
        }
    """
    execute_subscription_api(c, subscription)

 
実行してみると次のようなレスポンスが返ってきます。

{'extensions': 
     {'subscription':
          {
              'mqttConnections': [
                  {'url': 'wss://<host>.iot.<region>.amazonaws.com/mqtt?<v4_credential>',
                   'topics': ['path/to/onCreateAppSyncToDo/'], 
                   'client': '<client_id>'}],
              'newSubscriptions': {
                  'onCreateAppSyncToDo': 
                      {'topic': 'path/to/onCreateAppSyncToDo/', 
                       'expireTime': None}}}}, 
    'data': {'onCreateAppSyncToDo': None}}

 
キー mqttConnections の中に、MQTTのエンドポイントやtopics、Client ID が入っていました。

 

WebSocket(wss)を使って、MQTTのエンドポイントに接続

次はPythonのMQTTクライアントを使って、MQTTのエンドポイントに接続してみます。

今回は、MQTTクライアントとして paho.mqtt.python (paho-mqtt) を使います。
eclipse/paho.mqtt.python: paho.mqtt.python

 
続いて、MQTTエンドポイント接続についてです。

レスポンスの url を見ると、プロトコルwss と、セキュアなTLSによるWebSocketを使っています。また、エンドポイントはAWS IoTのようです。

TLS & AWS IoTを使うということは、接続用の証明書などを用意しないといけないのかなと思いました。
例:X.509 証明書と AWS IoT - AWS IoT

 
しかし、レスポンスのurlを見ると、以下にある AWS 署名バージョン 4 がクエリ文字列としてすでに追加されていました。
MQTT over WebSocket プロトコル - AWS IoT

また、AWS署名バージョン4が追加済の場合に、 paho-mqtt を使って AWS IoTのMQTTエンドポイントと接続している例が、以下に記載されていました。
https://github.com/eclipse/paho.mqtt.python/issues/277#issuecomment-372019123

実際に試してみたところ、たしかにAWS IoTの証明書まわりは不要でした。

 
そこで、subscription関数に、

  1. paho-mqtt を使って、MQTTエンドポイントに接続
  2. 接続できたら、レスポンスにあった topic をsubscribeする
  3. topicからメッセージが送られてきたら、メッセージ内容を出力して、接続を切断(disconnect)する

という実装を追加してみました。

def execute_subscription_api(gql_client):
    ...
    def on_connect(client, userdata, flags, respons_code):
        print('connected')
        # 接続できたのでsubscribeする
        client.subscribe(topic)

    def on_message(client, userdata, msg):
        # メッセージを表示する
        print(f'{msg.topic} {str(msg.payload)}')

        # メッセージを受信したので、今回は切断してみる
        # これがないと、再びメッセージを待ち続ける
        client.disconnect()

    # Subscribeするのに必要な情報を取得する
    client_id = response['extensions']['subscription']['mqttConnections'][0]['client']
    topic = response['extensions']['subscription']['mqttConnections'][0]['topics'][0]

    # URLはparseして、扱いやすくする
    url = response['extensions']['subscription']['mqttConnections'][0]['url']
    urlparts = urlparse(url)

    # ヘッダーとして、netloc(ネットワーク上の位置)を設定
    headers = {
        'Host': '{0:s}'.format(urlparts.netloc),
    }

    # 送信時、ClientIDを指定した上でWebSocketで送信しないと、通信できないので注意
    mqtt_client = MQTTClient(client_id=client_id, transport='websockets')

    # 接続時のコールバックメソッドを登録する
    mqtt_client.on_connect = on_connect

    # データ受信時のコールバックメソッドを登録する
    mqtt_client.on_message = on_message

    # ヘッダやパスを指定する
    mqtt_client.ws_set_options(path=f'{urlparts.path}?{urlparts.query}',
                               headers=headers)

    # TLSを有効にする
    mqtt_client.tls_set()

    # wssで接続するため、443ポートに投げる
    mqtt_client.connect(urlparts.netloc, port=443)

    # 受信するのを待つ
    mqtt_client.loop_forever()

 
ポイントは、MQTTのクライアントを生成する際

MQTTClient(client_id=client_id, transport='websockets')

と、

  • client_idに、レスポンスの client の値を指定
  • transportとして、 websockets を指定

の2つとなります。

 
次に、スクリプトを実行してみると、コンソールに connected が表示されたままになりました。うまくいっているようです。

 

DynamoDBでイベントが発生した時に、通知を受け取る

最後に、AppSyncのコンソールから以下のデータを1件登録してみます。

{
  "createappsynctodoinput": {
    "title": "new",
    "content": "new content"
  }
}

 
すると、コンソールが進み、以下のログを出して終了しました*2

path/to/onCreateAppSyncToDo/ b'{"data":{"onCreateAppSyncToDo":
    {"title":"new",
     "content":"new content",
     "__typename":"AppSyncToDo"}}}'

onCreate系のsubscriptionができているようです。

 
ちなみに、一番最初で見たとおり、DynamoDBのストリームは無効化してあります。しかし、AppSyncではDynamoDBの変更を検知し、クライアント側に通知が来ました。これは更新系・削除系でも同じでした。

 

onUpdate系のsubscription

同様にして、onUpdate系を試してみます。subscriptionはこんな感じです。

update_subscription = """
    subscription {
        onUpdateAppSyncToDo {
            title
            content
        }
    }
"""
execute_subscription_api(c, update_subscription)

 
MQTTで接続後、AppSyncコンソールのQueriesを更新系のmutationに差し替えて実行します。

mutation updateAppSyncToDo($updateappsynctodoinput: UpdateAppSyncToDoInput!) {
  updateAppSyncToDo(input: $updateappsynctodoinput) {
    title
    content
  }
}

QUERY VARIABLESも変更します。

{
  "updateappsynctodoinput": {
    "title": "new",
    "content": "update"
  }
}

 
AppSync上で上記のmutationを実行します。すると、MQTTを実行していたコンソールに以下が表示され、更新系のsubscriptionも受信できました。

path/to/onUpdateAppSyncToDo/ b'{"data":{"onUpdateAppSyncToDo":
    {"title":"new",
     "content":"update",
     "__typename":"AppSyncToDo"}}}'

 

onDelete系のsubscription

onDelete系も試してみます。Python上では以下のsubscriptionを作成します。

delete_subscription = """
    subscription {
        onDeleteAppSyncToDo {
            title
            content
        }
    }
"""
execute_subscription_api(c, delete_subscription)

MQTTで接続後、AppSyncコンソールのQueriesを削除系のmutationに差し替えて実行します。

mutation deleteAppSyncToDo($deleteappsynctodoinput: DeleteAppSyncToDoInput!) {
  deleteAppSyncToDo(input: $deleteappsynctodoinput) {
    title
    content
  }
}

QUERY VARIABLESも変更します。

{
  "deleteappsynctodoinput": {
    "title": "new"
  }
}

 
AppSync上で上記のmutationを実行します。すると、MQTTを実行していたコンソールに以下が表示され、削除系のsubscriptionも受信できました。

{
  "data": {
    "deleteAppSyncToDo": {
      "title": "new",
      "content": "update"
    }
  }
}

 
以上より、Pythonで、 AWS AppSyncのquery・mutation・subscriptionをすべて試すことができました。

 

ソースコード

GitHubにあげました。 query_mutation_subscription ディレクトリの中が今回のソースコードです。
https://github.com/thinkAmi-sandbox/AWS_AppSync_python_client-sample

 

その他

PythonのGraphQLクライアントのみでsubscription

今回、subscriptionではMQTTクライアントも併用していました。

GraphQLクライアントだけでできればいいなーとは思ったのですが、今のところ対応しているクライアントは無さそうです。

 
もしくは、WebSocketだけのクライアント実装がありました(HTTPは話せない)

 

イベント情報

来週(2019/7/4)、ぎーらぼで AWS Expert Online (AWS AppSync関連) のイベントがあります。
AWS Expert Online at JAWS-UG長野 - connpass

*1:実際には一行ですが、見やすいように改行してあります

*2:表示の都合上、途中で改行しています