記事公開日
最終更新日
【AWS】S3バッチオペレーションでLambdaを呼び出してみた

はじめに
こんにちは。DXソリューション営業本部の金森です。
本記事では、S3のバッチオペレーションを使用してLambda関数を呼び出し、異なるAWSアカウント間でS3バケットのデータをコピーする方法についてご説明したいと思います。
S3バッチオペレーションについて
S3バッチオペレーションはAmazon S3のデータ管理機能です。Amazon S3マネジメントコンソールからの数回クリックにより、または単一のAPIリクエストによって数十億個のものオブジェクトを大規模に管理できます。この機能を使用して、オブジェクトメタデータとプロパティの変更や、その他のストレージ管理タスク(オブジェクトのバケット間でのコピーまたはレプリケート、オブジェクトタグセットの置き換え、アクセスコントロールの変更、S3 Glacierからのアーカイブオブジェクトの復元など)を実行できます。
引用元:Amazon S3 バッチオペレーション
今回の構成図とやってみたこと
構成図
やってみたこと
・S3バッチオペレーションを使用してLambda関数を呼び出し、S3バケットにあるデータを別アカウントのS3バケットにコピーする
・今回S3バケットにあるコピー対象の2つのファイル
設定の流れ
①マニフェストファイルの作成(AWSアカウントA)
②Lambdaの設定(AWSアカウントA)
③コピー先S3バケットの設定(AWSアカウントB)
④S3バッチオペレーションの設定(AWSアカウントA)
マニフェストファイルの作成(AWSアカウントA)
S3バッチオペレーションは、マニフェストファイルに含まれる全てのオブジェクトに対して指定された操作を実行します。
・今回は手動でCSV形式のマニフェストファイルを以下のように作成します。
-A列にはS3バケット名、B列にはオブジェクトキー(コピー対象のファイル)を入力
-2行目に架空のオブジェクトキーを入力 ※エラー処理の確認をするため
・マニフェストファイル作成後はコピー元S3バケットにアップロードしておく
Lambdaの設定(AWSアカウントA)
Lambda用のIAMロールを作成
①IAMポリシーの設定
-コピー元S3バケットから読み込む権限、コピー先S3バケットへ書き込む権限を与える
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"s3:GetObject"
],
"Resource": "arn:aws:s3:::s3batchoperations-source/*"
},
{
"Sid": "Statement1",
"Effect": "Allow",
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::s3batchoperations-target/*"
}
]
}
②信頼ポリシーの設定
-LambdaにIAMロールを引き受ける権限を与える
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
Lambda関数のS3バッチオペレーションに対してのリクエストとレスポンス
・リクエストJSON:Lambda関数がS3バッチオペレーションに対して処理すべきオブジェクトの情報を取得するためにリクエストを送る
・リクエストJSONの中身の例
{
"invocationSchemaVersion": "1.0",
"invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo",
"job": {
"id": "f3cc4f60-61f6-4a2b-8a21-d07600c373ce"
},
"tasks": [
{
"taskId": "dGFza2lkZ29lc2hlcmUK",
"s3Key": "sample_2.csv",
"s3VersionId": "1",
"s3BucketArn": "arn:aws:s3:us-east-1:0123456788:awsexamplebucket1"
}
]
}
・レスポンスJSON:Lambda関数がS3バッチオペレーションに対してLambda関数での処理結果を報告する
※S3バッチオペレーションはこの結果を元にジョブ全体のステータスを判断する
・レスポンスJSONの中身の例
{
"invocationSchemaVersion": "1.0",
"treatMissingKeysAs" : "PermanentFailure",
"invocationId" : "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo",
"results": [
{
"taskId": "dGFza2lkZ29lc2hlcmUK",
"resultCode": "Succeeded",
"resultString": "Copy successful"
}
]
}
※マニフェストファイルのリスト1行ごとにLambdaがリクエストJSONから情報を取得し、レスポンスJSON(Lambda関数での処理結果)をS3バッチオペレーションに報告する
Lambda関数の作成
①一から作成を選択し、ランタイムをPython3.12に、作成したIAMロールを設定し関数の作成を押す
②以下のようにPythonスクリプトを入力する
import boto3
from botocore.exceptions import ClientError
s3 = boto3.client("s3")
def lambda_handler(event, context):
invocation_id = event["invocationId"]
invocation_schema_version = event["invocationSchemaVersion"]
task = event["tasks"][0]
task_id = task["taskId"]
results = []
result_code = None
result_string = None
try:
source_obj_key = task["s3Key"]
source_bucket_name = task["s3BucketArn"].split(":")[-1]
destination_bucket_name = "s3batchoperations-target"
destination_bucket_key = source_obj_key
copy_source = {
'Bucket': source_bucket_name,
'Key': source_obj_key,
}
s3.copy(
copy_source,
destination_bucket_name,
destination_bucket_key,
)
result_code = "Succeeded"
result_string = "Copy successful"
except ClientError as error:
if error.response["Error"]["Code"] == "AccessDenied":
result_code = "PermanentFailure"
result_string = str(error)
elif error.response["Error"]["Code"] == "403" and error.response["Error"]["Message"] == "Forbidden":
result_code = "Succeeded"
result_string = "Object not found"
else:
result_code = "PermanentFailure"
result_string = str(error)
except Exception as error:
result_code = "PermanentFailure"
result_string = str(error)
finally:
results.append(
{
'taskId': task_id,
'resultCode': result_code,
'resultString': result_string
}
)
return{
'invocationSchemaVersion': invocation_schema_version,
'treatMissingKeysAs': 'PermanentFailure',
'invocationId': invocation_id,
'results': results
}
Pythonスクリプトの解説
AWS SDK for PythonでAWSのリソースにアクセスするためにboto3ライブラリをインポート
また、AWSのサービスにリクエストを送ってエラーが発生した時に対処するためにClientErrorをインポート
import boto3
from botocore.exceptions import ClientError
S3クライアントオブジェクトを作成
s3 = boto3.client("s3")
Lambda関数を定義 ※Lambdaがイベントを受け取った際に自動的に呼び出される。
def lambda_handler(event, context):
Lambda関数が呼び出された時に受け取るeventオブジェクトから必要な情報を取得
※先ほど説明したリクエストJSONの中身から情報を取得している
invocation_id = event["invocationId"]
invocation_schema_version = event["invocationSchemaVersion"]
task = event["tasks"][0]
task_id = task["taskId"]
結果を格納するための変数resultsをリストとして初期化、同じくresult_codeとresult_stringは後に更新するために初期化する
results = []
result_code = None
result_string = None
tryブロック内でS3バケット間のオブジェクトをコピーする処理を実行
※エラーが発生した場合は後続のexceptブロックでの処理が実行される
try:
source_obj_key = task["s3Key"] #s3Keyという値にオブジェクトキーが格納されている
source_bucket_name = task["s3BucketArn"].split(":")[-1] #S3バケットのARNからバケット名部分だけを取り出す
destination_bucket_name = "s3batchoperations-target"
destination_bucket_key = source_obj_key #コピー元のオブジェクトをそのままコピー先バケットにコピーする
copy_source = {
'Bucket': source_bucket_name,
'Key': source_obj_key,
}
#S3のcopyメソッドを使用してオブジェクトをコピーする
s3.copy(
copy_source,
destination_bucket_name,
destination_bucket_key,
)
#コピー処理が正常に完了したことを示すための結果コードとメッセージを設定する
result_code = "Succeeded"
result_string = "Copy successful"
このexceptブロックでは、AWS API呼び出しでエラーが発生した場合に、特定のエラーコードに基づいて処理を実行
except ClientError as error: #発生したClientErrorをerrorという変数に格納し、エラーメッセージやエラーコードなどの情報を取得する
#コピー先のS3バケットポリシー、Lambda用のIAMロールに権限が無くエラーが発生した場合の処理
if error.response["Error"]["Code"] == "AccessDenied":
result_code = "PermanentFailure"
result_string = str(error)
#マニフェストファイルに存在しないオブジェクトキーが入力されていてエラーが発生した場合の処理
elif error.response["Error"]["Code"] == "403" and error.response["Error"]["Message"] == "Forbidden":
result_code = "Succeeded"
result_string = "Object not found"
#その他のClientErrorが発生した場合の処理
else:
result_code = "PermanentFailure"
result_string = str(error)
※マニフェストファイルに架空のオブジェクトキーが入力されていてエラーが発生した場合に、故意にS3バッチジョブを成功したかのように示す理由は、今回はS3バッチオペレーションのジョブ自体が完了したら成功と表示させるためで、マニフェストファイルとS3バケットに存在するデータの矛盾に関しては成功として処理している
このexceptブロックでは、AWS API呼び出し以外での予期しないエラーが発生した場合の処理を実行する
Exceptionを使うことでほぼ全ての種類のエラーを把握することができる
except Exception as error:
result_code = "PermanentFailure"
result_string = str(error)
finallyブロックでは、上記で実行される処理結果をresultsリストに追加する
finally:
results.append(
{
'taskId': task_id,
'resultCode': result_code,
'resultString': result_string
}
)
Lambda関数の実行結果を呼び出し元であるS3バッチオペレーションにJSON形式で返す処理を実行している
return{
'invocationSchemaVersion': invocation_schema_version,
'treatMissingKeysAs': 'PermanentFailure',
'invocationId': invocation_id,
'results': results
}
コピー先S3バケットの設定(AWSアカウントB)
・バケットポリシーを設定
-Lambda用のIAMロールにコピー先S3バケットに書き込む権限を与える
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Statement1",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::123456789:role/s3batchoperations-lambda-role"
},
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::s3batchoperations-target/*"
}
]
}
S3バッチオペレーションの設定(AWSアカウントA)
S3バッチオペレーション用のIAMロールを作成
①IAMポリシーの設定
-Lambda関数の呼び出し、マニフェストファイルを読み取る権限を与える
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "BatchOperationsLambdaPolicy",
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction"
],
"Resource": [
"arn:aws:lambda:ap-northeast-1:123456789:function:s3batchoperations-lambda"
]
},
{
"Sid": "BatchOperationss3Policy",
"Effect": "Allow",
"Action": [
"s3:GetObjectVersion",
"s3:PutObject",
"s3:GetBucketVersioning",
"s3:GetObject"
],
"Resource": [
"arn:aws:s3:::s3batchoperations-source/*"
]
}
]
}
②信頼ポリシーの設定
-S3バッチオペレーションにIAMロールを引き受ける権限を与える
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "batchoperations.s3.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
バッチジョブの作成
①S3コンソールのナビゲーションペインからバッチオペレーションを選択し、ジョブの作成を押す
②以下のようにCSVを選択し、作成したマニフェストファイルをマニフェストオブジェクトに設定する
③オペレーションでLambda関数を呼び出すを選択し、作成したLambda関数を設定、そして呼び出しスキーマのバージョンを1.0に設定する。
④作成したS3バッチオペレーション用のIAMロールの設定と、バッチジョブが完了した時に完了レポートを生成するように設定する。
⑤最後にジョブの作成を押せば設定が完了
S3バッチオペレーションの実行(AWSアカウントA)
①ジョブの設定が完了後、ステータスが実行のための確認待ち状態のジョブIDを選択する
②ジョブの実行をする
③ジョブが完了するとステータスが完了済みと表示され結果を確認できる
④コピー先のS3バケット(AWSアカウントB)にも、今回コピー対象のファイルが存在することが確認できる
⑤完了レポートが生成されているのも確認ができる
⑥ダウンロードして確認してみると、1行目にはS3バケットに存在しないオブジェクトキーを指定したのでObject not foundとメッセージが表示されている
※1行目に架空のオブジェクトキーの結果が入力されている理由は、マニフェストファイルに入力されている順番ではなくランダムに処理されるため
まとめ
今回はAmazon S3バッチオペレーションを使用してLambda関数を呼び出し、S3バケットにあるデータを別アカウントのS3バケットにコピーする方法について紹介しました。Lambda関数と組み合わせることで大量のデータを効率的に処理することができます。
最後に「このサービスについて知りたい」「AWS環境の構築、移行」などのリクエストがございましたら、弊社お問合せフォームまでお気軽にご連絡ください! のちほど当ブログにてご紹介させていただくか、複雑な内容に関するお問い合わせの内容の場合には直接営業からご連絡を差し上げます。
※Amazon Web Services、”Powered by Amazon Web Services”ロゴ、およびブログで使用されるその他のAWS商標は、米国その他の諸国における、Amazon.com, Inc.またはその関連会社の商標です。
※Pythonは、Python Software Foundationの登録商標です。