AWS CDK + Pythonで、ネストした AWS StepFunctions のワークフローを作ってみた

今年の7月にAWS CDK (Cloud Development Kit) がGAとなりました。
AWS クラウド開発キット (CDK) – TypeScript と Python 用がご利用可能に | Amazon Web Services ブログ

APIリファレンスも公開されているため、これでPythonを使ってAWSのリソースを作成することができるようになりました。
API Reference · AWS CDK

 
ただ、ドキュメントではTypeScriptの書き方がメインであり、Pythonでどう書くのかイマイチ分かりませんでした。

そこで、AWS CDK + Pythonで、ネストした AWS StepFunctions のワークフローを作ってみることにしました。

 
なお、記事の末尾にもあるように、ソースコードは公開しています。この記事はそのソースコードの解説です。

 

目次

 

環境

 
今回作成するStepFunctionsのワークフローです。

メインのStateMachineを実行します。サブのStateMachineを3パラレルで起動します。

f:id:thinkAmi:20190929163040p:plain:w300

 
サブの方は、各タスクにエラーハンドリングが付いています。

f:id:thinkAmi:20190929163143p:plain:w250

 
各StateMachineの機能です。

  • メイン
    • LambdaからS3へファイルを保存
    • サブのStateMachineを3パラレルで起動
  • サブ
    • Lambdaを実行

 

環境構築

Getting Startedに従い、環境を構築します。
Getting Started With the AWS CDK - AWS Cloud Development Kit (AWS CDK)

AWS CDKのCLIをインストールします。

$ npm install -g aws-cdk

$ cdk --version
1.9.0 (build 30f158a)

$ mkdir step_functions

$ cd step_functions/

 

cdk initPythonを使ったCDK環境を作成します。

$ cdk init --language python
Applying project template app for python
Executing Creating virtualenv...


# ちなみに、ディレクトリの中に何かファイルがあるとエラー
$ cdk init --language python
`cdk init` cannot be run in a non-empty directory!

 
Pythonの仮想環境が準備されるため、activate後にインストールします。

$ source .env/bin/activate

$ pip install -r requirements.txt

 
次にAWS CDKで使うリソースに対するモジュールを追加でインストールします。

どのモジュールが必要なのかは、APIリファレンスのトップに記載されています。*1

 
まずは、StepFunctionまわりで必要なモジュールをインストールします。

$ pip install aws_cdk.aws_stepfunctions

$ pip install aws_cdk.aws_stepfunctions_tasks

 
次にLambdaまわり。

$ pip install aws_cdk.aws_lambda

 
S3バケットにファイルを保存するため、S3のモジュールも必要です。

$ pip install aws_cdk.aws_s3

 
あとは、S3バケットにファイルを保存するために、IAMロールも必要になります。

$ pip install aws_cdk.aws_iam

 

作成順について

AWS CDKでは、下位のリソースから順に作成します。

今回は

  1. S3
  2. IAM Managed Policy
  3. IAM Role
  4. Lambda
  5. サブのStatemMachine (Step Functions)
  6. メインのStatemMachine (Step Functions)

の順で作成します。

 
Pythonでは、 core.Stack を継承したクラスの __init__() にて、作成したいリソースのオブジェクトを生成します。

class StepFunctionsStack(core.Stack):
    def __init__(self, scope: core.Construct, id: str, **kwargs) -> None:
        # S3バケットを生成
        self.bucket = self.create_s3_bucket()
        # 管理ポリシーを生成
        self.managed_policy = self.create_managed_policy()
        ...

 

S3バケットの作成

ドキュメントに従い、S3バケットを新規作成します。
class Bucket (construct) · AWS CDK

 
ドキュメントはTypeScript形式で書かれています。

new Bucket(scope: Construct, id: string, props?: Bucket<wbr>Props)

Pythonに読み替えた時の書き方です。

def create_s3_bucket(self):
    return Bucket(
        self,  # scope
        'S3 Bucket',  # id
        bucket_name=f'sfn-bucket-by-aws-cdk',  # propsのbucketName
    )

Pythonで読み替える時のポイントは以下です。

  • scopeは self で読み替え
  • 引数名は、snake_caseで読み替え
  • props にて指定可能なキーと値は、ドキュメントの Construct Props に記載

 
ちなみに、もし既存のS3バケットを利用したい場合は

  • fromBucketArn()
  • fromBucketAttributes()
  • fromBucketName()

などの静的メソッドを使います。

これにより、既存のS3バケットのオブジェクトが取得できるので、他のリソースでの指定が可能になります。

 

IAM Managed Policy (管理ポリシー)の作成

今回は、S3バケットにアクセスする管理ポリシーを作成します*2
class ManagedPolicy (construct) · AWS CDK

 
Managed Policyを作成するには、

  1. PolicyStatement
  2. ManagedPolicy

の順でオブジェクトを作成します。

 

PolicyStatementの作成

ドキュメントに従い作成します。
class PolicyStatement · AWS CDK

なお、resourcesでは、上記で作成したBucketのARNを参照する必要があります。

ドキュメントにあるように、Bucketオブジェクトのプロパティ bucket_arn でARNを参照します。
https://docs.aws.amazon.com/cdk/api/latest/docs/@aws-cdk_aws-s3.Bucket.html#properties

def create_managed_policy(self):
    statement = PolicyStatement(
        effect=Effect.ALLOW,
        actions=[
            "s3:PutObject",
        ],
        resources=[
            f'{self.bucket.bucket_arn}/*',
        ]
    )

 

ManagedPolicyの作成

こちらもドキュメント通りです。
class ManagedPolicy (construct) · AWS CDK

return ManagedPolicy(
    self,
    'Managed Policy',
    managed_policy_name='sfn_lambda_policy',
    statements=[statement],
)

 

IAMロールを作成

次に、LambdaからS3バケットへアクセスするためのIAMロールを作成します。

まずは、 ServicePrincipal オブジェクトでLambdaを指定します。
class ServicePrincipal · AWS CDK

def create_role(self):
    service_principal = ServicePrincipal('lambda.amazonaws.com')

 
次に、 Role オブジェクトを作成します。
class Role (construct) · AWS CDK

ポイントは以下です。

  • assumed_by に、ServicePrincipalオブジェクトを指定
  • managed_policies に、作成したManaged Policyを指定
return Role(
    self,
    'Role',
    assumed_by=service_principal,
    role_name='sfn_lambda_role',
    managed_policies=[self.managed_policy],
)

 

Lambdaの作成

今回は4つのLambdaを作成します。

Lambda名 用途
sfn_first_lambda S3にファイルを保存
sfn_second_lambda サブStateMachineの1番目のLambda
sfn_third_lambda サブStateMachineの2番目のLambda
sfn_error_lambda sfn_second_lambda・sfn_third_lambdaでエラーが起きた時に実行されるLambda

 
なお、CDKで使うLambda本体は、どこかのディレクトリに入れておけばOKです。

CDKのFunctionオブジェクトを生成する際に、そのディレクトリパスを指定することで、CDKがzip化・アップロードまで面倒を見てくれます。

 

S3にファイルを保存するLambdaを作成
Lambda本体

lambda_function/first/lambda_function.py として作成します。

ポイントは以下です。

  • boto3でS3へアップロードする
  • NumPyを使って値を取得する
  • 環境変数 BUCKET_NAME で、保存先のS3バケット名を受け取る
  • Lambdaのパラメータとして message が渡されてくる
    • StateMachineのInputより渡されることを想定
  • 戻り地は、 bodymessage を持つdict
import os
import boto3
from numpy.random import rand


def lambda_handler(event, context):
    body = f'{event["message"]} \n value: {rand()}'
    client = boto3.client('s3')
    client.put_object(
        Bucket=os.environ['BUCKET_NAME'],
        Key='sfn_first.txt',
        Body=body,
    )

    return {
        'body': body,
        'message': event['message'],
    }

 

CDKでLambdaリソース(Function)を作成

まずは、Lambda本体のあるファイルパスを使って、AssetCodeオブジェクトを生成します。
class AssetCode · AWS CDK

function_path = str(self.lambda_path_base.joinpath('first'))
code = AssetCode(function_path)

 
次にNumPyを用意します。

Lambdaでは、NumPyのようによく使われるモジュールは Lambda Layers として用意されていますので、今回もこちらを利用します。
新機能 – AWS Lambda :あらゆるプログラム言語への対応と一般的なコンポーネントの共有 | Amazon Web Services ブログ

 
LayerVersionオブジェクトにて、既存のLambda Layersを扱えます。
class LayerVersion (construct) · AWS CDK

今回はARNを指定してNumPyのLayerを取得するため、静的メソッド from_layer_version_arn() を使います。

scipy_layer = LayerVersion.from_layer_version_arn(
    self, f'sfn_scipy_layer_for_first', AWS_SCIPY_ARN)

なお、Lambda LayersのARNについては、AWSアカウントごとに異なるようです。

そのため、一度、Lambda Consoleにて、該当LayerのARNを確認する必要があります。

 
最後に、FunctionオブジェクトでLambdaリソースを生成します。
class Function (construct) · AWS CDK

return Function(
    self,
    f'id_first',
    # Lambda本体のソースコードがあるディレクトリを指定
    code=code,
    # Lambda本体のハンドラ名を指定
    handler='lambda_function.lambda_handler',
    # ランタイムの指定
    runtime=Runtime.PYTHON_3_7,
    # 環境変数の設定
    environment={'BUCKET_NAME': self.bucket.bucket_name},
    function_name='sfn_first_lambda',
    layers=[scipy_layer],
    memory_size=128,
    role=self.role,
    timeout=core.Duration.seconds(10),
)

 
ちなみに、Layerを自分で作成したい場合は以下のようにします。

Codeオブジェクトのリファレンスは以下です。
class Code · AWS CDK

LayerVersion(
    self,
    'layer_id',
    code=Code.from_asset('your_zip_filepath'),
    compatible_runtimes=[Runtime.PYTHON_3_7],
    layer_version_name='layer_version_name',
)

 

残りのLambda
2番目のLambda本体

エラーハンドリングしたいため、パラレル番号が偶数の場合はエラーとしています。

また、 result_path で結果をわかりやすく表示したいため、戻り値も絞っています。

def lambda_handler(event, context):
    if event['parallel_no'] % 2 == 0:
        raise Exception('偶数です')

    return {
        'message': event['message'],
        'const_value': event['const_value']
    }

 

3番目のLambda本体

こちらもエラーハンドリングしたいため、パラレル番号が1の場合はエラーとしています。

また、最後は文字列を返すようにしました。

def lambda_handler(event, context):
    if event['parallel_no'] == 1:
        raise Exception('強制的にエラーとします')

    return 'only 3rd message.'

 

エラーハンドリングのLambda本体

タスクでエラーが発生した場合は

{
  "resource": "arn:aws:lambda:region:id:function:sfn_error_lambda",
  "input": {
    "Error": "Exception",
    "Cause": "{\"errorMessage\": \"\\u5076\\u6570\\u3067\\u3059\",
               \"errorType\": \"Exception\",
               \"stackTrace\": [\"  File \\\"/var/task/lambda_function.py\\\", line 5,
                  in lambda_handler\\n    raise Exception('\\u5076\\u6570\\u3067\\u3059')
              \\n\"]}"
  },
  "timeoutInSeconds": null
}

という値が渡されてきます。

この中の CauseJSON文字列のため、Lambdaで見えるようにして返します。

def lambda_handler(event, context):
    return {
        # JSONをPythonオブジェクト化することで、文字化けを直す
        'error_message': json.loads(event['Cause']),
    }

 

Functionオブジェクト

ほぼ同じ内容なので一つのメソッドで生成しています。

def create_other_lambda(self, function_name):
    function_path = str(self.lambda_path_base.joinpath(function_name))

    return Function(
        self,
        f'id_{function_name}',
        code=AssetCode(function_path),
        handler='lambda_function.lambda_handler',
        runtime=Runtime.PYTHON_3_7,
        function_name=f'sfn_{function_name}_lambda',
        memory_size=128,
        timeout=core.Duration.seconds(10),
    )
    
# 使う時
self.second_lambda = self.create_other_lambda('second')
self.third_lambda = self.create_other_lambda('third')
self.error_lambda = self.create_other_lambda('error')

 

サブのStateMachine (Step Functions) 作成

概要 (InputPath・OutputPath・ResultPathを試す)

ここまででパラレル実行する、サブのStateMachineのリソースが用意できました。

それらを組み合わせて、サブのStateMachineを作成していきます。

 
StateMachineでは、タスクという単位で処理を定義します。
タスク - AWS Step Functions

サブのStateMachineでは3つのタスクを用意します。

今回、InputPath・OutputPath・ResultPathを試そうと考えました。

 
そこで、こんな感じの設定にしました。

タスク名 Lambda InputPath ResultPath OutputPath
Second Task sfn_second_lambda $['first_result', 'parallel_no', 'message', 'context_name', 'const_value] $.second_result' ['second_result', 'parallel_no']
Third Task sfn_third_lambda $ (Lambdaの戻り値で上書き)
Error Task sfn_error_lambda

 
Second Taskでは、

  • InputPath
    • 前のタスクの結果から 'first_result', 'parallel_no', 'message', 'context_name', 'const_value だけ受け取って処理する
  • OutPath
    • Lambdaの戻り値を、入力値に second_result という項目を追加する
  • ResultPath
    • 次のタスクには 'second_result', 'parallel_no' のみ渡す

とします。

 
また、Third Taskでは、

  • ResultPath
    • 入力値をすべて捨て、Lambdaの戻り値だけを出力する

とします。

 

実際のソースコード

まず、複数タスクで使うため、1つのエラータスクを作成します。

Lambdaを起動するためにはInvokeFunctionクラスを使います。引数としてLambdaオブジェクトを設定します。
class InvokeFunction · AWS CDK

error_task = Task(
    self,
    'Error Task',
    task=InvokeFunction(self.error_lambda),
)

 
次に、Second Taskを生成します。

InputPath・OutputPath・ResultPathに対応する引数があるため、それぞれ指定します。

second_task = Task(
    self,
    'Second Task',
    task=InvokeFunction(self.second_lambda),

    # 渡されてきた項目を絞ってLambdaに渡す
    input_path="$['first_result', 'parallel_no', 'message', 'context_name', 'const_value]",

    # 結果は second_result という項目に入れる
    result_path='$.second_result',

    # 次のタスクに渡す項目は絞る
    output_path="$['second_result', 'parallel_no']"
)

 
Second Taskの中でエラーが発生した場合のハンドリングをするため、 add_catch でエラーが発生した時のタスクを追加します。
エラー処理 - AWS Step Functions

second_task.add_catch(error_task, errors=['States.ALL'])

 
Third Taskを作成します。こちらもエラーハンドリングを追加します。

third_task = Task(
    self,
    'Third Task',
    task=InvokeFunction(self.third_lambda),

    # third_lambdaの結果だけに差し替え
    result_path='$',
)
# こちらもエラーハンドリングを追加
third_task.add_catch(error_task, errors=['States.ALL'])

 
次に、Second Taskの後にThird Taskを実行できるよう、 next() にて設定します。

definition = second_task.next(third_task)

 
最後に、StateMachineを作成します。

return StateMachine(
    self,
    'Sub StateMachine',
    definition=definition,
    state_machine_name='sfn_sub_state_machine',
)

 

メインのStateMachine (Step Functions) の作成

最後のリソースとして、メインのStateMachineを作成します。

 
まずは First Task を作成します。ここは先程と同じです。

first_task = Task(
    self,
    'S3 Lambda Task',
    task=InvokeFunction(self.first_lambda, payload={'message': 'Hello world'}),
    comment='Main StateMachine',
)

 
次のタスクがパラレル実行するサブのStateMachineです。

最初にParallelオブジェクトを生成します。
class Parallel (construct) · AWS CDK

parallel_task = Parallel(
    self,
    'Parallel Task',
)

 
次に、パラレル実行の設定を行います。流れは以下となります。

  1. StartExecution オブジェクトで、使用するStateMachineやInputなどを指定します。
    class StartExecution · AWS CDK

  2. StartExecutionTask に渡します。

  3. TaskParallel.branch() に渡します。

for i in range(1, 4):
    # 1.
    sub_task = StartExecution(
        self.sub_state_machine,
        input={
            'parallel_no': i,
            'first_result.$': '$',

            # first_taskのレスポンスにある、messageをセット
            'message.$': '$.message',

            # コンテキストオブジェクトの名前をセット
            'context_name.$': '$$.State.Name',
            # 固定値を2つ追加(ただ、Taskのinputでignore_valueは無視)
            'const_value': 'ham',
            'ignore_value': 'ignore',
        },
    )

    # 2.
    invoke_sub_task = Task(
        self,
        f'Sub Task {i}',
        task=sub_task,
    )

    # 3.
    parallel_task.branch(invoke_sub_task)

 

以上でStepFunctionsが完成しました。

 

$と$$について

上記例では、

  • 'message.$': '$.message'
  • 'context_name.$': '$$.State.Name'

としている部分がありました。

$$$ については、それぞれ以下の意味となります。

$ について

パスを使用して値を選択するキーと値のペアの場合、キーの名前は .$ で終わる必要があります。 InputPath およびパラメータ - AWS Step Functions

 
$$ について

コンテキストオブジェクトにアクセスするには、パスを使用して状態の入力を選択したときと同様に、.$ を末尾に追加したパラメータ名をまず指定します。次に、入力の代わりにコンテキストオブジェクトデータにアクセスするには、$$. をパスの先頭に追加します。

コンテキストオブジェクト - AWS Step Functions

 

実行結果

ではStepFunctionsのConsoleより実行してみます。

 

メインのStateMachine

f:id:thinkAmi:20190929170656p:plain:w300

S3 Lambda TaskのLambdaFunctionScheduled

{
  "resource": "arn:aws:lambda:region:account_id:function:sfn_first_lambda",
  "input": {
    "message": "Hello world"
  },
  "timeoutInSeconds": null
}

S3 Lambda TaskのTaskStateExited

{
  "name": "S3 Lambda Task",
  "output": {
    "body": "Hello world \n value: 0.035671270119142284",
    "message": "Hello world"
  }
}

 

サブのStateMachine

3パターンあるため、それぞれ記載します。

  • すべて成功
  • Second Taskでエラー
  • Third Taskでエラー
すべて成功

f:id:thinkAmi:20190929171108p:plain:w300

Second TaskのLambdaFunctionScheduledでは、設定した

input_path="$['first_result', 'parallel_no', 'message', 'context_name', 'const_value']",

の通りのinputとなっています。

{
  "resource": "arn:aws:lambda:region:account_id:function:sfn_second_lambda",
  "input": {
    "first_result": {
      "body": "Hello world \n value: 0.035671270119142284",
      "message": "Hello world"
    },
    "parallel_no": 3,
    "message": "Hello world",
    "context_name": "Sub Task 3",
    "const_value": "ham"
  },
  "timeoutInSeconds": null
}

 
Second TaskのTaskStateExitedでも、

result_path='$.second_result',
output_path="$['second_result', 'parallel_no']"

の通りのoutputです。

{
  "name": "Second Task",
  "output": {
    "second_result": {
      "message": "Hello world",
      "const_value": "ham"
    },
    "parallel_no": 3
  }
}

 

Third TaskのLambdaFunctionScheduledはこんな感じ。Secondのoutputを引き継いでいます。

{
  "resource": "arn:aws:lambda:region:account_id:function:sfn_third_lambda",
  "input": {
    "second_result": {
      "message": "Hello world",
      "const_value": "ham"
    },
    "parallel_no": 3
  },
  "timeoutInSeconds": null
}

 

Third TaskのTaskStateExited

result_path='$',通り、outputはLambdaの戻り値の文字列だけになっています。

{
  "name": "Third Task",
  "output": "only 3rd message."
}

 

Second Taskでエラー

f:id:thinkAmi:20190929172018p:plain:w300

Second TaskのLambdaFunctionScheduledは以下の通り(抜粋)。

{
  "input": {
    "parallel_no": 2,
    "context_name": "Sub Task 2",
  },
}

 
Error TaskのTaskStateExitedを確認します。日本語がそのまま表示されています。

{
  "name": "Error Task",
  "output": {
    "error_message": {
      "errorMessage": "偶数です",
      "errorType": "Exception",
      "stackTrace": [
        "  File \"/var/task/lambda_function.py\", line 3, in lambda_handler\n    raise Exception('偶数です')\n"
      ]
    }
  }
}

 

Third Taskでエラー

f:id:thinkAmi:20190929172322p:plain:w300

Third TaskのLambdaFunctionScheduledは以下の通り(抜粋)。

{
    "parallel_no": 1
  },
}

 
Error TaskのTaskStateExitedを確認します。日本語がそのまま表示されています。

{
  "output": {
    "error_message": {
      "errorMessage": "強制的にエラーとします",
      "errorType": "Exception",
      "stackTrace": [
        "  File \"/var/task/lambda_function.py\", line 3, in lambda_handler\n    raise Exception('強制的にエラーとします')\n"
      ]
    }
  }
}

 

削除

cdk destroy にて削除できます。

$ cdk destroy
Are you sure you want to delete: step-functions (y/n)? y
...
 ✅  step-functions: destroyed

 

ただ、上記の方法で作成したS3は、removalpolicyの値により削除されません。
https://docs.aws.amazon.com/cdk/api/latest/docs/@aws-cdk_aws-s3.Bucket.html#removalpolicy

そのため、手動で削除するか、作成する時に以下のような設定を行います。
AWS CloudFormationでStackを削除したときにリソースを消さない設定 | DevelopersIO

 

動的並列処理について

先日、StepFunctionsでの動的並列処理がサポートされました。
AWS Step Functions がワークフローでの動的並列処理のサポートを追加

 
CDKでは、以下のissueがcloseとなれば利用できそうです。

 

ソースコード

GitHubに上げました。 step_functions ディレクトリが今回のファイルです。
https://github.com/thinkAmi-sandbox/AWS_CDK-sample

*1:例えばLambdaの場合は、 aws_cdk.aws_lambda が必要です:https://docs.aws.amazon.com/cdk/api/latest/docs/aws-lambda-readme.html

*2:もし、ポリシーを使う場合はこちら:https://docs.aws.amazon.com/cdk/api/latest/docs/@aws-cdk_aws-iam.Policy.html