AWS AppSyncのPipeline Resolverを使って、複数のDynamoDBの値をマージして返すAPIを作成してみた

前回・前々回と、単一のデータソースから値を取得するAWS AppSync APIを作成しました。

 
ただ、単一ではなく、「複数のDynamoDBから値を取得し、その結果をマージして返す」ようなAPIが作れないかが気になりました。

調べてみたところ、通常のResolverの代わりにPipeline Resolverを使えば良さそうでした。
チュートリアル: パイプラインリゾルバー - AWS AppSync

 
公式ドキュメントによると、今回のAPIに必要なものは

  • Data source
  • Schema
  • Function
  • Pipeline Resolver
    • Before mapping template
    • After mapping template

でした。

そこで、公式チュートリアルよりももう少し単純な例を使って、理解を深めてみることにしました。

 
目次

 

環境

  • Python 3.7.3
    • graphqlclient 0.2.4
  • AWS AppSync
    • データソース:Amazon DynamoDB x 2
      • 両DynamoDBとも作成済
    • API Key 認証
    • データ取得(Query)のみ、APIとして実装

 

DynamoDB

事前に用意した2つのDynamoDBは、以下の通りです*1

Blog table

id (key) title author_id
1 ham 100
2 spam 200

 
 
Author table

Blog tableの author_id に紐づく id を持つテーブルです。

id (key) name
100 foo
200 bar

 

APIの仕様

Blog.id == "1" となるようなQueryを投げると

Blog.id Blog.title Author.id Author.name
1 ham 100 foo

が返るものとします。

 

AWS AppSync APIのひな形作成

Build from scratch より作成します。

API nameは Pipeline Resolver API とします。

 

Data sourceの作成

Data Sources のCreate Data sourceより作成します。今回は2つ作成します。

なお、作成する際 、両方ともAutomatically generate GraphQLon にすると、2つのData source分がマージされた Schema が作成されます。便利。

ただ、今回はSchemaのResolverではなく、FunctionでDatasourceから値を取得することから、自分でSchemaを作成することになります。そのため、 off でも良いです。

また、ロールは新規作成としますが、必要な権限を持ったロールがある場合はそちらを利用しても良さそうです。

 

BlogテーブルのData source
項目
Data source name Blog_DataSource
Data source type Amazon DynamoDB table
Region DynamoDB Blog のあるRegion
Table name Blog
Create or use an existing role New role

 

AuthorテーブルのData source

Data source name・Table nameを変えるくらいで、あとは同じです。

項目
Data source name Author_DataSource
Data source type Amazon DynamoDB table
Region DynamoDB Author のあるRegion
Table name Author
Create or use an existing role New role

 

Data source Schemaの作成

今回は、

  • リクエストを受け付けるQueryの型: getBlogWithAuthor
  • 戻り値の型: BlogWithAuthor

のみ定義します。

前述のとおり、データの取得はFunctionで行うため、schema { query: Query } の定義は不要です。

type BlogWithAuthor {
    id: String!
    title: String
    author_id: String
    author_name: String
}

type Query {
    getBlogWithAuthor(id: String!): BlogWithAuthor
}

 

Functionの作成

Functionを作成してからでないとPipeline Resolverで使えなかったため、まずはFunctionから作成します。

今回、Functionは2つ用意します。

  • Blogテーブルからデータを取得
    • Function名: GetBlog
  • Blogテーブルの author_id と等しい id を持つAuthor tableデータを取得し、Blog table とAuthor tableをマージ
    • Function名: GetBlogWithAuthor

 

Functionを作るには

  • Data source name
  • Function name
  • request mapping template
  • response mapping template

が必要ですので、それぞれ作成していきます。

 
なお、mapping template系の書き方については、以下が参考になりました。ありがとうございました。
AWS AppSyncのPipeline Resolverで複数データリソースを扱う場合のVTLの書き方 | Takumon Blog

 

Function: GetBlog

Data sourceとFunction nameです。

項目
Data source name Blog_DataSource
Function name GetBlog

 

request mapping template

右にあるテンプレート GetItem を使うことで、Blog tableから指定した id のデータを取得しています。

{
    "operation": "GetItem",
    "key": {
        "id": $util.dynamodb.toDynamoDBJson($ctx.args.id),
    }
}

 

response mapping template

デフォルトのままです。

## Raise a GraphQL field error in case of a datasource invocation error
#if($ctx.error)
    $util.error($ctx.error.message, $ctx.error.type)
#end
## Pass back the result from DynamoDB. **
$util.toJson($ctx.result)

 

Function: GetBlogWithAuthor

Data sourceとFunction nameです。

項目
Data source name Author_DataSource
Function name GetBlogWithAuthor

 

request mapping template

GetBlogと似ていますが、1点異なるのは、toDynamoDBJsonの引数 $ctx.prev.result.author_id です。

Function: GetBlogWithAuthorは、GetBlogの後に動作させますが、その際、GetBlogで取得した author_id を使ってAuthorテーブルからデータを取得します。

Pipeline Resolverでは、 $ctx.prev.result で、そのFunctionの直前に動作したFunctionの結果を参照できます(今回だと、GetBlog Functionの結果)。

{
    "operation": "GetItem",
    "key": {
        "id": $util.dynamodb.toDynamoDBJson($ctx.prev.result.author_id),
    }
}

 

response mapping template

GetBlogとは異なり、結果をマージする処理を追加しています。

今回は、GetBlogで取得したBlogデータに、Authorテーブルから取得したnameをマージして返します。

なお、マージする際のキー author_name は、Schemaの type BlogWithAuthor で定義した author_name と一致させます。

type BlogWithAuthor {
    ...
    author_name: String
}

 
以上より、response mapping templateはこんな感じになります。

## Raise a GraphQL field error in case of a datasource invocation error
#if($ctx.error)
    $util.error($ctx.error.message, $ctx.error.type)
#end
## Pass back the result from DynamoDB. **

## GetBlogの結果に、author_nameとして今回取得したAuthorテーブルのnameの値をマージ
$util.qr($ctx.prev.result.put("author_name", $ctx.result.name))

## マージしたデータを返す
$util.toJson($ctx.prev.result)

 

Pipeline Resolverの作成

Schemaページの右側にあるResolverのうち、Query: getBlogWithAuthorの Attach をクリックします。

Data source nameの下に Convert to pipeline resolver があるため、何も入力しない状態でそのリンクをクリックします。

すると

  • Before mapping template
  • Function
  • After mapping template

を定義できるようになります。

 

Functionの配置

以下の順番でFunctionを配置します。

  1. Before mapping template
  2. GetBlog
  3. GetBlogWithAuthor
  4. After mapping template

 

Before mapping templateの定義

Queryから呼び出される、Before mapping template を定義します。

## Query: getBlogWithAuthorから渡された `id` を `$result` 変数に入れる
#set($result = { "id": $ctx.args.id })

## JSON化して、次のFunction: GetBlog へと渡す
$util.toJson($result)

参考:Queryの内容

type Query {
    getBlogWithAuthor(id: String!): BlogWithAuthor
}

 

After mapping templateの定義

Function: GetBlogWithAuthorから呼び出される、After mapping template を定義します。

$util.toJson($ctx.result)

こちらはFunctionの値をそのままJSON化するだけです。

 

AppSync APIクライアントの作成

今回もPythonで作ります。

  • データがある場合 (Blog.id == "1")
  • データが無い場合 (Blog.id == "x")

をそれぞれ取得するようなQueryを実行します。

from graphqlclient import GraphQLClient

from secret import API_KEY, API_URL


def execute_query_api(gql_client):
    # pipeline resolverを使ったqueryを呼ぶ:データがある場合
    query = """
        query {
          getBlogWithAuthor(id: "1") {
            id
            title
            author_id
            author_name
          }
        }
    """
    result = gql_client.execute(query)
    print(result)

    # pipeline resolverを使ったqueryを呼ぶ:データがない場合
    query = """
        query {
          getBlogWithAuthor(id: "x") {
            id
            title
            author_id
            author_name
          }
        }
    """
    result = gql_client.execute(query)
    print(result)


if __name__ == '__main__':
    c = GraphQLClient(API_URL)
    c.inject_token(API_KEY, 'X-Api-Key')
    execute_query_api(c)

 

実行結果

データがある場合の結果です。期待した結果となりました。

{"data":{"getBlogWithAuthor":{"id":"1","title":"ham","author_id":"100","author_name":"foo"}}}

 
一方、データがない場合はこのようになります。

{
"data":{"getBlogWithAuthor":null},
"errors":[{"path":["getBlogWithAuthor"],
           "data":null,
           "errorType":"DynamoDB:AmazonDynamoDBException",
           "errorInfo":null,
           "locations":[{"line":3,"column":11,"sourceName":null}],
           "message":"The provided key element does not match the schema
                      (Service: AmazonDynamoDBv2; Status Code: 400;
                       Error Code: ValidationException;
                       Request ID: xxx)"}]}

 

ソースコード

Githubに上げました。 pipeline_resolver ディレクトリの中が、今回のソースコードとなります。
https://github.com/thinkAmi-sandbox/AWS_AppSync_python_client-sample

*1:あまり良いDynamoDBの構造ではないかもしれませんが、とりあえず...