今年の7月にAWS CDK (Cloud Development Kit) がGAとなりました。
AWS クラウド開発キット (CDK) – TypeScript と Python 用がご利用可能に | Amazon Web Services ブログ
APIリファレンスも公開されているため、これでPythonを使ってAWSのリソースを作成することができるようになりました。
API Reference · AWS CDK
ただ、ドキュメントではTypeScriptの書き方がメインであり、Pythonでどう書くのかイマイチ分かりませんでした。
そこで、AWS CDK + Pythonで、ネストした AWS StepFunctions のワークフローを作ってみることにしました。
なお、記事の末尾にもあるように、ソースコードは公開しています。この記事はそのソースコードの解説です。
目次
環境
今回作成するStepFunctionsのワークフローです。
メインのStateMachineを実行します。サブのStateMachineを3パラレルで起動します。
サブの方は、各タスクにエラーハンドリングが付いています。
各StateMachineの機能です。
- メイン
- LambdaからS3へファイルを保存
- サブのStateMachineを3パラレルで起動
- サブ
環境構築
Getting Startedに従い、環境を構築します。
Getting Started With the AWS CDK - AWS Cloud Development Kit (AWS CDK)
AWS CDKのCLIをインストールします。
$ npm install -g aws-cdk
$ cdk --version
1.9.0 (build 30f158a)
$ mkdir step_functions
$ cd step_functions/
cdk init
でPythonを使ったCDK環境を作成します。
$ cdk init --language python
Applying project template app for python
Executing Creating virtualenv...
# ちなみに、ディレクトリの中に何かファイルがあるとエラー
$ cdk init --language python
`cdk init` cannot be run in a non-empty directory!
Pythonの仮想環境が準備されるため、activate後にインストールします。
$ source .env/bin/activate
$ pip install -r requirements.txt
次にAWS CDKで使うリソースに対するモジュールを追加でインストールします。
どのモジュールが必要なのかは、APIリファレンスのトップに記載されています。*1
まずは、StepFunctionまわりで必要なモジュールをインストールします。
$ pip install aws_cdk.aws_stepfunctions
$ pip install aws_cdk.aws_stepfunctions_tasks
次にLambdaまわり。
$ pip install aws_cdk.aws_lambda
S3バケットにファイルを保存するため、S3のモジュールも必要です。
$ pip install aws_cdk.aws_s3
あとは、S3バケットにファイルを保存するために、IAMロールも必要になります。
$ pip install aws_cdk.aws_iam
作成順について
AWS CDKでは、下位のリソースから順に作成します。
今回は
- S3
- IAM Managed Policy
- IAM Role
- Lambda
- サブのStatemMachine (Step Functions)
- メインのStatemMachine (Step Functions)
の順で作成します。
Pythonでは、 core.Stack
を継承したクラスの __init__()
にて、作成したいリソースのオブジェクトを生成します。
class StepFunctionsStack(core.Stack):
def __init__(self, scope: core.Construct, id: str, **kwargs) -> None:
self.bucket = self.create_s3_bucket()
self.managed_policy = self.create_managed_policy()
...
ドキュメントに従い、S3バケットを新規作成します。
class Bucket (construct) · AWS CDK
ドキュメントはTypeScript形式で書かれています。
new Bucket(scope: Construct, id: string, props?: Bucket<wbr>Props)
Pythonに読み替えた時の書き方です。
def create_s3_bucket(self):
return Bucket(
self,
'S3 Bucket',
bucket_name=f'sfn-bucket-by-aws-cdk',
)
Pythonで読み替える時のポイントは以下です。
- scopeは
self
で読み替え
- 引数名は、snake_caseで読み替え
props
にて指定可能なキーと値は、ドキュメントの Construct Props
に記載
ちなみに、もし既存のS3バケットを利用したい場合は
- fromBucketArn()
- fromBucketAttributes()
- fromBucketName()
などの静的メソッドを使います。
これにより、既存のS3バケットのオブジェクトが取得できるので、他のリソースでの指定が可能になります。
IAM Managed Policy (管理ポリシー)の作成
今回は、S3バケットにアクセスする管理ポリシーを作成します*2。
class ManagedPolicy (construct) · AWS CDK
Managed Policyを作成するには、
- PolicyStatement
- ManagedPolicy
の順でオブジェクトを作成します。
PolicyStatementの作成
ドキュメントに従い作成します。
class PolicyStatement · AWS CDK
なお、resourcesでは、上記で作成したBucketのARNを参照する必要があります。
ドキュメントにあるように、Bucketオブジェクトのプロパティ bucket_arn
でARNを参照します。
https://docs.aws.amazon.com/cdk/api/latest/docs/@aws-cdk_aws-s3.Bucket.html#properties
def create_managed_policy(self):
statement = PolicyStatement(
effect=Effect.ALLOW,
actions=[
"s3:PutObject",
],
resources=[
f'{self.bucket.bucket_arn}/*',
]
)
ManagedPolicyの作成
こちらもドキュメント通りです。
class ManagedPolicy (construct) · AWS CDK
return ManagedPolicy(
self,
'Managed Policy',
managed_policy_name='sfn_lambda_policy',
statements=[statement],
)
IAMロールを作成
次に、LambdaからS3バケットへアクセスするためのIAMロールを作成します。
まずは、 ServicePrincipal
オブジェクトでLambdaを指定します。
class ServicePrincipal · AWS CDK
def create_role(self):
service_principal = ServicePrincipal('lambda.amazonaws.com')
次に、 Role
オブジェクトを作成します。
class Role (construct) · AWS CDK
ポイントは以下です。
assumed_by
に、ServicePrincipalオブジェクトを指定
managed_policies
に、作成したManaged Policyを指定
return Role(
self,
'Role',
assumed_by=service_principal,
role_name='sfn_lambda_role',
managed_policies=[self.managed_policy],
)
Lambdaの作成
今回は4つのLambdaを作成します。
Lambda名 |
用途 |
sfn_first_lambda |
S3にファイルを保存 |
sfn_second_lambda |
サブStateMachineの1番目のLambda |
sfn_third_lambda |
サブStateMachineの2番目のLambda |
sfn_error_lambda |
sfn_second_lambda・sfn_third_lambdaでエラーが起きた時に実行されるLambda |
なお、CDKで使うLambda本体は、どこかのディレクトリに入れておけばOKです。
CDKのFunctionオブジェクトを生成する際に、そのディレクトリパスを指定することで、CDKがzip化・アップロードまで面倒を見てくれます。
S3にファイルを保存するLambdaを作成
Lambda本体
lambda_function/first/lambda_function.py
として作成します。
ポイントは以下です。
- boto3でS3へアップロードする
- Lambdaのランタイムには
boto3
が含まれている
- NumPyを使って値を取得する
- 環境変数
BUCKET_NAME
で、保存先のS3バケット名を受け取る
- Lambdaのパラメータとして
message
が渡されてくる
- StateMachineのInputより渡されることを想定
- 戻り地は、
body
と message
を持つdict
import os
import boto3
from numpy.random import rand
def lambda_handler(event, context):
body = f'{event["message"]} \n value: {rand()}'
client = boto3.client('s3')
client.put_object(
Bucket=os.environ['BUCKET_NAME'],
Key='sfn_first.txt',
Body=body,
)
return {
'body': body,
'message': event['message'],
}
CDKでLambdaリソース(Function)を作成
まずは、Lambda本体のあるファイルパスを使って、AssetCodeオブジェクトを生成します。
class AssetCode · AWS CDK
function_path = str(self.lambda_path_base.joinpath('first'))
code = AssetCode(function_path)
次にNumPyを用意します。
Lambdaでは、NumPyのようによく使われるモジュールは Lambda Layers
として用意されていますので、今回もこちらを利用します。
新機能 – AWS Lambda :あらゆるプログラム言語への対応と一般的なコンポーネントの共有 | Amazon Web Services ブログ
LayerVersionオブジェクトにて、既存のLambda Layersを扱えます。
class LayerVersion (construct) · AWS CDK
今回はARNを指定してNumPyのLayerを取得するため、静的メソッド from_layer_version_arn()
を使います。
scipy_layer = LayerVersion.from_layer_version_arn(
self, f'sfn_scipy_layer_for_first', AWS_SCIPY_ARN)
なお、Lambda LayersのARNについては、AWSアカウントごとに異なるようです。
そのため、一度、Lambda Consoleにて、該当LayerのARNを確認する必要があります。
最後に、FunctionオブジェクトでLambdaリソースを生成します。
class Function (construct) · AWS CDK
return Function(
self,
f'id_first',
code=code,
handler='lambda_function.lambda_handler',
runtime=Runtime.PYTHON_3_7,
environment={'BUCKET_NAME': self.bucket.bucket_name},
function_name='sfn_first_lambda',
layers=[scipy_layer],
memory_size=128,
role=self.role,
timeout=core.Duration.seconds(10),
)
ちなみに、Layerを自分で作成したい場合は以下のようにします。
Codeオブジェクトのリファレンスは以下です。
class Code · AWS CDK
LayerVersion(
self,
'layer_id',
code=Code.from_asset('your_zip_filepath'),
compatible_runtimes=[Runtime.PYTHON_3_7],
layer_version_name='layer_version_name',
)
残りのLambda
2番目のLambda本体
エラーハンドリングしたいため、パラレル番号が偶数の場合はエラーとしています。
また、 result_path
で結果をわかりやすく表示したいため、戻り値も絞っています。
def lambda_handler(event, context):
if event['parallel_no'] % 2 == 0:
raise Exception('偶数です')
return {
'message': event['message'],
'const_value': event['const_value']
}
3番目のLambda本体
こちらもエラーハンドリングしたいため、パラレル番号が1の場合はエラーとしています。
また、最後は文字列を返すようにしました。
def lambda_handler(event, context):
if event['parallel_no'] == 1:
raise Exception('強制的にエラーとします')
return 'only 3rd message.'
エラーハンドリングのLambda本体
タスクでエラーが発生した場合は
{
"resource": "arn:aws:lambda:region:id:function:sfn_error_lambda",
"input": {
"Error": "Exception",
"Cause": "{\"errorMessage\": \"\\u5076\\u6570\\u3067\\u3059\",
\"errorType\": \"Exception\",
\"stackTrace\": [\" File \\\"/var/task/lambda_function.py\\\", line 5,
in lambda_handler\\n raise Exception('\\u5076\\u6570\\u3067\\u3059')
\\n\"]}"
},
"timeoutInSeconds": null
}
という値が渡されてきます。
この中の Cause
はJSON文字列のため、Lambdaで見えるようにして返します。
def lambda_handler(event, context):
return {
'error_message': json.loads(event['Cause']),
}
Functionオブジェクト
ほぼ同じ内容なので一つのメソッドで生成しています。
def create_other_lambda(self, function_name):
function_path = str(self.lambda_path_base.joinpath(function_name))
return Function(
self,
f'id_{function_name}',
code=AssetCode(function_path),
handler='lambda_function.lambda_handler',
runtime=Runtime.PYTHON_3_7,
function_name=f'sfn_{function_name}_lambda',
memory_size=128,
timeout=core.Duration.seconds(10),
)
self.second_lambda = self.create_other_lambda('second')
self.third_lambda = self.create_other_lambda('third')
self.error_lambda = self.create_other_lambda('error')
サブのStateMachine (Step Functions) 作成
ここまででパラレル実行する、サブのStateMachineのリソースが用意できました。
それらを組み合わせて、サブのStateMachineを作成していきます。
StateMachineでは、タスクという単位で処理を定義します。
タスク - AWS Step Functions
サブのStateMachineでは3つのタスクを用意します。
今回、InputPath・OutputPath・ResultPathを試そうと考えました。
そこで、こんな感じの設定にしました。
タスク名 |
Lambda |
InputPath |
ResultPath |
OutputPath |
Second Task |
sfn_second_lambda |
$['first_result', 'parallel_no', 'message', 'context_name', 'const_value] |
$.second_result' |
['second_result', 'parallel_no'] |
Third Task |
sfn_third_lambda |
|
$ (Lambdaの戻り値で上書き) |
|
Error Task |
sfn_error_lambda |
|
|
|
Second Taskでは、
- InputPath
- 前のタスクの結果から
'first_result', 'parallel_no', 'message', 'context_name', 'const_value
だけ受け取って処理する
- OutPath
- Lambdaの戻り値を、入力値に
second_result
という項目を追加する
- ResultPath
- 次のタスクには
'second_result', 'parallel_no'
のみ渡す
とします。
また、Third Taskでは、
- ResultPath
- 入力値をすべて捨て、Lambdaの戻り値だけを出力する
とします。
まず、複数タスクで使うため、1つのエラータスクを作成します。
Lambdaを起動するためにはInvokeFunctionクラスを使います。引数としてLambdaオブジェクトを設定します。
class InvokeFunction · AWS CDK
error_task = Task(
self,
'Error Task',
task=InvokeFunction(self.error_lambda),
)
次に、Second Taskを生成します。
InputPath・OutputPath・ResultPathに対応する引数があるため、それぞれ指定します。
second_task = Task(
self,
'Second Task',
task=InvokeFunction(self.second_lambda),
input_path="$['first_result', 'parallel_no', 'message', 'context_name', 'const_value]",
result_path='$.second_result',
output_path="$['second_result', 'parallel_no']"
)
Second Taskの中でエラーが発生した場合のハンドリングをするため、 add_catch
でエラーが発生した時のタスクを追加します。
エラー処理 - AWS Step Functions
second_task.add_catch(error_task, errors=['States.ALL'])
Third Taskを作成します。こちらもエラーハンドリングを追加します。
third_task = Task(
self,
'Third Task',
task=InvokeFunction(self.third_lambda),
result_path='$',
)
third_task.add_catch(error_task, errors=['States.ALL'])
次に、Second Taskの後にThird Taskを実行できるよう、 next()
にて設定します。
definition = second_task.next(third_task)
最後に、StateMachineを作成します。
return StateMachine(
self,
'Sub StateMachine',
definition=definition,
state_machine_name='sfn_sub_state_machine',
)
メインのStateMachine (Step Functions) の作成
最後のリソースとして、メインのStateMachineを作成します。
まずは First Task を作成します。ここは先程と同じです。
first_task = Task(
self,
'S3 Lambda Task',
task=InvokeFunction(self.first_lambda, payload={'message': 'Hello world'}),
comment='Main StateMachine',
)
次のタスクがパラレル実行するサブのStateMachineです。
最初にParallelオブジェクトを生成します。
class Parallel (construct) · AWS CDK
parallel_task = Parallel(
self,
'Parallel Task',
)
次に、パラレル実行の設定を行います。流れは以下となります。
StartExecution
オブジェクトで、使用するStateMachineやInputなどを指定します。
class StartExecution · AWS CDK
StartExecution
を Task
に渡します。
Task
を Parallel.branch()
に渡します。
for i in range(1, 4):
sub_task = StartExecution(
self.sub_state_machine,
input={
'parallel_no': i,
'first_result.$': '$',
'message.$': '$.message',
'context_name.$': '$$.State.Name',
'const_value': 'ham',
'ignore_value': 'ignore',
},
)
invoke_sub_task = Task(
self,
f'Sub Task {i}',
task=sub_task,
)
parallel_task.branch(invoke_sub_task)
以上でStepFunctionsが完成しました。
$と$$について
上記例では、
'message.$': '$.message'
'context_name.$': '$$.State.Name'
としている部分がありました。
$
や $$
については、それぞれ以下の意味となります。
$
について
パスを使用して値を選択するキーと値のペアの場合、キーの名前は .$ で終わる必要があります。
InputPath およびパラメータ - AWS Step Functions
$$
について
コンテキストオブジェクトにアクセスするには、パスを使用して状態の入力を選択したときと同様に、.$ を末尾に追加したパラメータ名をまず指定します。次に、入力の代わりにコンテキストオブジェクトデータにアクセスするには、$$. をパスの先頭に追加します。
コンテキストオブジェクト - AWS Step Functions
実行結果
ではStepFunctionsのConsoleより実行してみます。
メインのStateMachine
S3 Lambda TaskのLambdaFunctionScheduled
{
"resource": "arn:aws:lambda:region:account_id:function:sfn_first_lambda",
"input": {
"message": "Hello world"
},
"timeoutInSeconds": null
}
S3 Lambda TaskのTaskStateExited
{
"name": "S3 Lambda Task",
"output": {
"body": "Hello world \n value: 0.035671270119142284",
"message": "Hello world"
}
}
サブのStateMachine
3パターンあるため、それぞれ記載します。
- すべて成功
- Second Taskでエラー
- Third Taskでエラー
すべて成功
Second TaskのLambdaFunctionScheduledでは、設定した
input_path="$['first_result', 'parallel_no', 'message', 'context_name', 'const_value']",
の通りのinputとなっています。
{
"resource": "arn:aws:lambda:region:account_id:function:sfn_second_lambda",
"input": {
"first_result": {
"body": "Hello world \n value: 0.035671270119142284",
"message": "Hello world"
},
"parallel_no": 3,
"message": "Hello world",
"context_name": "Sub Task 3",
"const_value": "ham"
},
"timeoutInSeconds": null
}
Second TaskのTaskStateExitedでも、
result_path='$.second_result',
output_path="$['second_result', 'parallel_no']"
の通りのoutputです。
{
"name": "Second Task",
"output": {
"second_result": {
"message": "Hello world",
"const_value": "ham"
},
"parallel_no": 3
}
}
Third TaskのLambdaFunctionScheduledはこんな感じ。Secondのoutputを引き継いでいます。
{
"resource": "arn:aws:lambda:region:account_id:function:sfn_third_lambda",
"input": {
"second_result": {
"message": "Hello world",
"const_value": "ham"
},
"parallel_no": 3
},
"timeoutInSeconds": null
}
Third TaskのTaskStateExited
result_path='$',
通り、outputはLambdaの戻り値の文字列だけになっています。
{
"name": "Third Task",
"output": "only 3rd message."
}
Second Taskでエラー
Second TaskのLambdaFunctionScheduledは以下の通り(抜粋)。
{
"input": {
"parallel_no": 2,
"context_name": "Sub Task 2",
},
}
Error TaskのTaskStateExitedを確認します。日本語がそのまま表示されています。
{
"name": "Error Task",
"output": {
"error_message": {
"errorMessage": "偶数です",
"errorType": "Exception",
"stackTrace": [
" File \"/var/task/lambda_function.py\", line 3, in lambda_handler\n raise Exception('偶数です')\n"
]
}
}
}
Third Taskでエラー
Third TaskのLambdaFunctionScheduledは以下の通り(抜粋)。
{
"parallel_no": 1
},
}
Error TaskのTaskStateExitedを確認します。日本語がそのまま表示されています。
{
"output": {
"error_message": {
"errorMessage": "強制的にエラーとします",
"errorType": "Exception",
"stackTrace": [
" File \"/var/task/lambda_function.py\", line 3, in lambda_handler\n raise Exception('強制的にエラーとします')\n"
]
}
}
}
削除
cdk destroy
にて削除できます。
$ cdk destroy
Are you sure you want to delete: step-functions (y/n)? y
...
✅ step-functions: destroyed
ただ、上記の方法で作成したS3は、removalpolicyの値により削除されません。
https://docs.aws.amazon.com/cdk/api/latest/docs/@aws-cdk_aws-s3.Bucket.html#removalpolicy
そのため、手動で削除するか、作成する時に以下のような設定を行います。
AWS CloudFormationでStackを削除したときにリソースを消さない設定 | DevelopersIO
動的並列処理について
先日、StepFunctionsでの動的並列処理がサポートされました。
AWS Step Functions がワークフローでの動的並列処理のサポートを追加
CDKでは、以下のissueがcloseとなれば利用できそうです。
GitHubに上げました。 step_functions
ディレクトリが今回のファイルです。
https://github.com/thinkAmi-sandbox/AWS_CDK-sample