今年の7月にAWS CDK (Cloud Development Kit) がGAとなりました。
AWS クラウド開発キット (CDK) – TypeScript と Python 用がご利用可能に | Amazon Web Services ブログ
APIリファレンスも公開されているため、これでPythonを使ってAWSのリソースを作成することができるようになりました。
API Reference · AWS CDK
ただ、ドキュメントではTypeScriptの書き方がメインであり、Pythonでどう書くのかイマイチ分かりませんでした。
そこで、AWS CDK + Pythonで、ネストした AWS StepFunctions のワークフローを作ってみることにしました。
なお、記事の末尾にもあるように、ソースコードは公開しています。この記事はそのソースコードの解説です。
目次
- 環境
- 環境構築
- 作成順について
- S3バケットの作成
- IAM Managed Policy (管理ポリシー)の作成
- IAMロールを作成
- Lambdaの作成
- サブのStateMachine (Step Functions) 作成
- メインのStateMachine (Step Functions) の作成
- 実行結果
- 削除
- 動的並列処理について
- ソースコード
環境
今回作成するStepFunctionsのワークフローです。
メインのStateMachineを実行します。サブのStateMachineを3パラレルで起動します。
サブの方は、各タスクにエラーハンドリングが付いています。
各StateMachineの機能です。
- メイン
- LambdaからS3へファイルを保存
- サブのStateMachineを3パラレルで起動
- サブ
- Lambdaを実行
環境構築
Getting Startedに従い、環境を構築します。
Getting Started With the AWS CDK - AWS Cloud Development Kit (AWS CDK)
$ npm install -g aws-cdk $ cdk --version 1.9.0 (build 30f158a) $ mkdir step_functions $ cd step_functions/
cdk init
でPythonを使ったCDK環境を作成します。
$ cdk init --language python Applying project template app for python Executing Creating virtualenv... # ちなみに、ディレクトリの中に何かファイルがあるとエラー $ cdk init --language python `cdk init` cannot be run in a non-empty directory!
Pythonの仮想環境が準備されるため、activate後にインストールします。
$ source .env/bin/activate $ pip install -r requirements.txt
次にAWS CDKで使うリソースに対するモジュールを追加でインストールします。
どのモジュールが必要なのかは、APIリファレンスのトップに記載されています。*1
まずは、StepFunctionまわりで必要なモジュールをインストールします。
$ pip install aws_cdk.aws_stepfunctions $ pip install aws_cdk.aws_stepfunctions_tasks
次にLambdaまわり。
$ pip install aws_cdk.aws_lambda
S3バケットにファイルを保存するため、S3のモジュールも必要です。
$ pip install aws_cdk.aws_s3
あとは、S3バケットにファイルを保存するために、IAMロールも必要になります。
$ pip install aws_cdk.aws_iam
作成順について
AWS CDKでは、下位のリソースから順に作成します。
今回は
- S3
- IAM Managed Policy
- IAM Role
- Lambda
- サブのStatemMachine (Step Functions)
- メインのStatemMachine (Step Functions)
の順で作成します。
Pythonでは、 core.Stack
を継承したクラスの __init__()
にて、作成したいリソースのオブジェクトを生成します。
class StepFunctionsStack(core.Stack): def __init__(self, scope: core.Construct, id: str, **kwargs) -> None: # S3バケットを生成 self.bucket = self.create_s3_bucket() # 管理ポリシーを生成 self.managed_policy = self.create_managed_policy() ...
S3バケットの作成
ドキュメントに従い、S3バケットを新規作成します。
class Bucket (construct) · AWS CDK
ドキュメントはTypeScript形式で書かれています。
new Bucket(scope: Construct, id: string, props?: Bucket<wbr>Props)
Pythonに読み替えた時の書き方です。
def create_s3_bucket(self): return Bucket( self, # scope 'S3 Bucket', # id bucket_name=f'sfn-bucket-by-aws-cdk', # propsのbucketName )
Pythonで読み替える時のポイントは以下です。
- scopeは
self
で読み替え - 引数名は、snake_caseで読み替え
props
にて指定可能なキーと値は、ドキュメントのConstruct Props
に記載
ちなみに、もし既存のS3バケットを利用したい場合は
- fromBucketArn()
- fromBucketAttributes()
- fromBucketName()
などの静的メソッドを使います。
これにより、既存のS3バケットのオブジェクトが取得できるので、他のリソースでの指定が可能になります。
IAM Managed Policy (管理ポリシー)の作成
今回は、S3バケットにアクセスする管理ポリシーを作成します*2。
class ManagedPolicy (construct) · AWS CDK
Managed Policyを作成するには、
- PolicyStatement
- ManagedPolicy
の順でオブジェクトを作成します。
PolicyStatementの作成
ドキュメントに従い作成します。
class PolicyStatement · AWS CDK
なお、resourcesでは、上記で作成したBucketのARNを参照する必要があります。
ドキュメントにあるように、Bucketオブジェクトのプロパティ bucket_arn
でARNを参照します。
https://docs.aws.amazon.com/cdk/api/latest/docs/@aws-cdk_aws-s3.Bucket.html#properties
def create_managed_policy(self): statement = PolicyStatement( effect=Effect.ALLOW, actions=[ "s3:PutObject", ], resources=[ f'{self.bucket.bucket_arn}/*', ] )
ManagedPolicyの作成
こちらもドキュメント通りです。
class ManagedPolicy (construct) · AWS CDK
return ManagedPolicy( self, 'Managed Policy', managed_policy_name='sfn_lambda_policy', statements=[statement], )
IAMロールを作成
次に、LambdaからS3バケットへアクセスするためのIAMロールを作成します。
まずは、 ServicePrincipal
オブジェクトでLambdaを指定します。
class ServicePrincipal · AWS CDK
def create_role(self): service_principal = ServicePrincipal('lambda.amazonaws.com')
次に、 Role
オブジェクトを作成します。
class Role (construct) · AWS CDK
ポイントは以下です。
assumed_by
に、ServicePrincipalオブジェクトを指定managed_policies
に、作成したManaged Policyを指定
return Role( self, 'Role', assumed_by=service_principal, role_name='sfn_lambda_role', managed_policies=[self.managed_policy], )
Lambdaの作成
今回は4つのLambdaを作成します。
Lambda名 | 用途 |
---|---|
sfn_first_lambda | S3にファイルを保存 |
sfn_second_lambda | サブStateMachineの1番目のLambda |
sfn_third_lambda | サブStateMachineの2番目のLambda |
sfn_error_lambda | sfn_second_lambda・sfn_third_lambdaでエラーが起きた時に実行されるLambda |
なお、CDKで使うLambda本体は、どこかのディレクトリに入れておけばOKです。
CDKのFunctionオブジェクトを生成する際に、そのディレクトリパスを指定することで、CDKがzip化・アップロードまで面倒を見てくれます。
S3にファイルを保存するLambdaを作成
Lambda本体
lambda_function/first/lambda_function.py
として作成します。
ポイントは以下です。
- boto3でS3へアップロードする
- Lambdaのランタイムには
boto3
が含まれている
- Lambdaのランタイムには
- NumPyを使って値を取得する
- 環境変数
BUCKET_NAME
で、保存先のS3バケット名を受け取る - Lambdaのパラメータとして
message
が渡されてくる- StateMachineのInputより渡されることを想定
- 戻り地は、
body
とmessage
を持つdict
import os import boto3 from numpy.random import rand def lambda_handler(event, context): body = f'{event["message"]} \n value: {rand()}' client = boto3.client('s3') client.put_object( Bucket=os.environ['BUCKET_NAME'], Key='sfn_first.txt', Body=body, ) return { 'body': body, 'message': event['message'], }
CDKでLambdaリソース(Function)を作成
まずは、Lambda本体のあるファイルパスを使って、AssetCodeオブジェクトを生成します。
class AssetCode · AWS CDK
function_path = str(self.lambda_path_base.joinpath('first')) code = AssetCode(function_path)
次にNumPyを用意します。
Lambdaでは、NumPyのようによく使われるモジュールは Lambda Layers
として用意されていますので、今回もこちらを利用します。
新機能 – AWS Lambda :あらゆるプログラム言語への対応と一般的なコンポーネントの共有 | Amazon Web Services ブログ
LayerVersionオブジェクトにて、既存のLambda Layersを扱えます。
class LayerVersion (construct) · AWS CDK
今回はARNを指定してNumPyのLayerを取得するため、静的メソッド from_layer_version_arn()
を使います。
scipy_layer = LayerVersion.from_layer_version_arn(
self, f'sfn_scipy_layer_for_first', AWS_SCIPY_ARN)
なお、Lambda LayersのARNについては、AWSアカウントごとに異なるようです。
そのため、一度、Lambda Consoleにて、該当LayerのARNを確認する必要があります。
最後に、FunctionオブジェクトでLambdaリソースを生成します。
class Function (construct) · AWS CDK
return Function( self, f'id_first', # Lambda本体のソースコードがあるディレクトリを指定 code=code, # Lambda本体のハンドラ名を指定 handler='lambda_function.lambda_handler', # ランタイムの指定 runtime=Runtime.PYTHON_3_7, # 環境変数の設定 environment={'BUCKET_NAME': self.bucket.bucket_name}, function_name='sfn_first_lambda', layers=[scipy_layer], memory_size=128, role=self.role, timeout=core.Duration.seconds(10), )
ちなみに、Layerを自分で作成したい場合は以下のようにします。
Codeオブジェクトのリファレンスは以下です。
class Code · AWS CDK
LayerVersion( self, 'layer_id', code=Code.from_asset('your_zip_filepath'), compatible_runtimes=[Runtime.PYTHON_3_7], layer_version_name='layer_version_name', )
残りのLambda
2番目のLambda本体
エラーハンドリングしたいため、パラレル番号が偶数の場合はエラーとしています。
また、 result_path
で結果をわかりやすく表示したいため、戻り値も絞っています。
def lambda_handler(event, context): if event['parallel_no'] % 2 == 0: raise Exception('偶数です') return { 'message': event['message'], 'const_value': event['const_value'] }
3番目のLambda本体
こちらもエラーハンドリングしたいため、パラレル番号が1の場合はエラーとしています。
また、最後は文字列を返すようにしました。
def lambda_handler(event, context): if event['parallel_no'] == 1: raise Exception('強制的にエラーとします') return 'only 3rd message.'
エラーハンドリングのLambda本体
タスクでエラーが発生した場合は
{ "resource": "arn:aws:lambda:region:id:function:sfn_error_lambda", "input": { "Error": "Exception", "Cause": "{\"errorMessage\": \"\\u5076\\u6570\\u3067\\u3059\", \"errorType\": \"Exception\", \"stackTrace\": [\" File \\\"/var/task/lambda_function.py\\\", line 5, in lambda_handler\\n raise Exception('\\u5076\\u6570\\u3067\\u3059') \\n\"]}" }, "timeoutInSeconds": null }
という値が渡されてきます。
この中の Cause
はJSON文字列のため、Lambdaで見えるようにして返します。
def lambda_handler(event, context): return { # JSONをPythonオブジェクト化することで、文字化けを直す 'error_message': json.loads(event['Cause']), }
Functionオブジェクト
ほぼ同じ内容なので一つのメソッドで生成しています。
def create_other_lambda(self, function_name): function_path = str(self.lambda_path_base.joinpath(function_name)) return Function( self, f'id_{function_name}', code=AssetCode(function_path), handler='lambda_function.lambda_handler', runtime=Runtime.PYTHON_3_7, function_name=f'sfn_{function_name}_lambda', memory_size=128, timeout=core.Duration.seconds(10), ) # 使う時 self.second_lambda = self.create_other_lambda('second') self.third_lambda = self.create_other_lambda('third') self.error_lambda = self.create_other_lambda('error')
サブのStateMachine (Step Functions) 作成
概要 (InputPath・OutputPath・ResultPathを試す)
ここまででパラレル実行する、サブのStateMachineのリソースが用意できました。
それらを組み合わせて、サブのStateMachineを作成していきます。
StateMachineでは、タスクという単位で処理を定義します。
タスク - AWS Step Functions
サブのStateMachineでは3つのタスクを用意します。
今回、InputPath・OutputPath・ResultPathを試そうと考えました。
そこで、こんな感じの設定にしました。
タスク名 | Lambda | InputPath | ResultPath | OutputPath |
---|---|---|---|---|
Second Task | sfn_second_lambda | $['first_result', 'parallel_no', 'message', 'context_name', 'const_value] | $.second_result' | ['second_result', 'parallel_no'] |
Third Task | sfn_third_lambda | $ (Lambdaの戻り値で上書き) | ||
Error Task | sfn_error_lambda |
Second Taskでは、
- InputPath
- 前のタスクの結果から
'first_result', 'parallel_no', 'message', 'context_name', 'const_value
だけ受け取って処理する
- 前のタスクの結果から
- OutPath
- Lambdaの戻り値を、入力値に
second_result
という項目を追加する
- Lambdaの戻り値を、入力値に
- ResultPath
- 次のタスクには
'second_result', 'parallel_no'
のみ渡す
- 次のタスクには
とします。
また、Third Taskでは、
- ResultPath
- 入力値をすべて捨て、Lambdaの戻り値だけを出力する
とします。
実際のソースコード
まず、複数タスクで使うため、1つのエラータスクを作成します。
Lambdaを起動するためにはInvokeFunctionクラスを使います。引数としてLambdaオブジェクトを設定します。
class InvokeFunction · AWS CDK
error_task = Task(
self,
'Error Task',
task=InvokeFunction(self.error_lambda),
)
次に、Second Taskを生成します。
InputPath・OutputPath・ResultPathに対応する引数があるため、それぞれ指定します。
second_task = Task( self, 'Second Task', task=InvokeFunction(self.second_lambda), # 渡されてきた項目を絞ってLambdaに渡す input_path="$['first_result', 'parallel_no', 'message', 'context_name', 'const_value]", # 結果は second_result という項目に入れる result_path='$.second_result', # 次のタスクに渡す項目は絞る output_path="$['second_result', 'parallel_no']" )
Second Taskの中でエラーが発生した場合のハンドリングをするため、 add_catch
でエラーが発生した時のタスクを追加します。
エラー処理 - AWS Step Functions
second_task.add_catch(error_task, errors=['States.ALL'])
Third Taskを作成します。こちらもエラーハンドリングを追加します。
third_task = Task( self, 'Third Task', task=InvokeFunction(self.third_lambda), # third_lambdaの結果だけに差し替え result_path='$', ) # こちらもエラーハンドリングを追加 third_task.add_catch(error_task, errors=['States.ALL'])
次に、Second Taskの後にThird Taskを実行できるよう、 next()
にて設定します。
definition = second_task.next(third_task)
最後に、StateMachineを作成します。
return StateMachine( self, 'Sub StateMachine', definition=definition, state_machine_name='sfn_sub_state_machine', )
メインのStateMachine (Step Functions) の作成
最後のリソースとして、メインのStateMachineを作成します。
まずは First Task を作成します。ここは先程と同じです。
first_task = Task( self, 'S3 Lambda Task', task=InvokeFunction(self.first_lambda, payload={'message': 'Hello world'}), comment='Main StateMachine', )
次のタスクがパラレル実行するサブのStateMachineです。
最初にParallelオブジェクトを生成します。
class Parallel (construct) · AWS CDK
parallel_task = Parallel(
self,
'Parallel Task',
)
次に、パラレル実行の設定を行います。流れは以下となります。
StartExecution
オブジェクトで、使用するStateMachineやInputなどを指定します。
class StartExecution · AWS CDKStartExecution
をTask
に渡します。Task
をParallel.branch()
に渡します。
for i in range(1, 4): # 1. sub_task = StartExecution( self.sub_state_machine, input={ 'parallel_no': i, 'first_result.$': '$', # first_taskのレスポンスにある、messageをセット 'message.$': '$.message', # コンテキストオブジェクトの名前をセット 'context_name.$': '$$.State.Name', # 固定値を2つ追加(ただ、Taskのinputでignore_valueは無視) 'const_value': 'ham', 'ignore_value': 'ignore', }, ) # 2. invoke_sub_task = Task( self, f'Sub Task {i}', task=sub_task, ) # 3. parallel_task.branch(invoke_sub_task)
以上でStepFunctionsが完成しました。
$と$$について
上記例では、
'message.$': '$.message'
'context_name.$': '$$.State.Name'
としている部分がありました。
$
や $$
については、それぞれ以下の意味となります。
$
について
パスを使用して値を選択するキーと値のペアの場合、キーの名前は .$ で終わる必要があります。 InputPath およびパラメータ - AWS Step Functions
$$
について
コンテキストオブジェクトにアクセスするには、パスを使用して状態の入力を選択したときと同様に、.$ を末尾に追加したパラメータ名をまず指定します。次に、入力の代わりにコンテキストオブジェクトデータにアクセスするには、$$. をパスの先頭に追加します。
実行結果
ではStepFunctionsのConsoleより実行してみます。
メインのStateMachine
S3 Lambda TaskのLambdaFunctionScheduled
{ "resource": "arn:aws:lambda:region:account_id:function:sfn_first_lambda", "input": { "message": "Hello world" }, "timeoutInSeconds": null }
S3 Lambda TaskのTaskStateExited
{ "name": "S3 Lambda Task", "output": { "body": "Hello world \n value: 0.035671270119142284", "message": "Hello world" } }
サブのStateMachine
3パターンあるため、それぞれ記載します。
- すべて成功
- Second Taskでエラー
- Third Taskでエラー
すべて成功
Second TaskのLambdaFunctionScheduledでは、設定した
input_path="$['first_result', 'parallel_no', 'message', 'context_name', 'const_value']",
の通りのinputとなっています。
{ "resource": "arn:aws:lambda:region:account_id:function:sfn_second_lambda", "input": { "first_result": { "body": "Hello world \n value: 0.035671270119142284", "message": "Hello world" }, "parallel_no": 3, "message": "Hello world", "context_name": "Sub Task 3", "const_value": "ham" }, "timeoutInSeconds": null }
Second TaskのTaskStateExitedでも、
result_path='$.second_result', output_path="$['second_result', 'parallel_no']"
の通りのoutputです。
{ "name": "Second Task", "output": { "second_result": { "message": "Hello world", "const_value": "ham" }, "parallel_no": 3 } }
Third TaskのLambdaFunctionScheduledはこんな感じ。Secondのoutputを引き継いでいます。
{ "resource": "arn:aws:lambda:region:account_id:function:sfn_third_lambda", "input": { "second_result": { "message": "Hello world", "const_value": "ham" }, "parallel_no": 3 }, "timeoutInSeconds": null }
Third TaskのTaskStateExited
result_path='$',
通り、outputはLambdaの戻り値の文字列だけになっています。
{ "name": "Third Task", "output": "only 3rd message." }
Second Taskでエラー
Second TaskのLambdaFunctionScheduledは以下の通り(抜粋)。
{ "input": { "parallel_no": 2, "context_name": "Sub Task 2", }, }
Error TaskのTaskStateExitedを確認します。日本語がそのまま表示されています。
{ "name": "Error Task", "output": { "error_message": { "errorMessage": "偶数です", "errorType": "Exception", "stackTrace": [ " File \"/var/task/lambda_function.py\", line 3, in lambda_handler\n raise Exception('偶数です')\n" ] } } }
Third Taskでエラー
Third TaskのLambdaFunctionScheduledは以下の通り(抜粋)。
{ "parallel_no": 1 }, }
Error TaskのTaskStateExitedを確認します。日本語がそのまま表示されています。
{ "output": { "error_message": { "errorMessage": "強制的にエラーとします", "errorType": "Exception", "stackTrace": [ " File \"/var/task/lambda_function.py\", line 3, in lambda_handler\n raise Exception('強制的にエラーとします')\n" ] } } }
削除
cdk destroy
にて削除できます。
$ cdk destroy Are you sure you want to delete: step-functions (y/n)? y ... ✅ step-functions: destroyed
ただ、上記の方法で作成したS3は、removalpolicyの値により削除されません。
https://docs.aws.amazon.com/cdk/api/latest/docs/@aws-cdk_aws-s3.Bucket.html#removalpolicy
そのため、手動で削除するか、作成する時に以下のような設定を行います。
AWS CloudFormationでStackを削除したときにリソースを消さない設定 | DevelopersIO
動的並列処理について
先日、StepFunctionsでの動的並列処理がサポートされました。
AWS Step Functions がワークフローでの動的並列処理のサポートを追加
CDKでは、以下のissueがcloseとなれば利用できそうです。
- Support for Dynamic StepFunction Dynamic Batching with Map and Iterator · Issue #4137 · aws/aws-cdk
- feat: support for map state, add test for parallel state. by melalawi · Pull Request #4145 · aws/aws-cdk
ソースコード
GitHubに上げました。 step_functions
ディレクトリが今回のファイルです。
https://github.com/thinkAmi-sandbox/AWS_CDK-sample
*1:例えばLambdaの場合は、 aws_cdk.aws_lambda が必要です:https://docs.aws.amazon.com/cdk/api/latest/docs/aws-lambda-readme.html
*2:もし、ポリシーを使う場合はこちら:https://docs.aws.amazon.com/cdk/api/latest/docs/@aws-cdk_aws-iam.Policy.html