記事公開日
最終更新日
【AWS】AWS Glueのワークフローを使いS3上のデータを加工後にRedshiftへ取り込む

はじめに
こんにちは。DXソリューション営業本部の金森です。
本記事では、AWS Glueのワークフローを使用してS3にあるCSVファイルをデータ変換し、
RedshiftにロードするETLジョブの構築方法についてご説明したいと思います。
AWS Glue ワークフローについて
Glueのエンティティ間(トリガー、クローラ、ジョブ)の依存関係を構築することでETLワークロードのオーケストレーションを可能にする。そのワークフローのさまざまなノードのステータスをコンソールで視覚的に追跡でき、進行状況のモニタリングと問題のトラブルシューティングが簡単になります。また、ワークフローのエンティティ間のパラメータを共有できる。
引用元:AWS GlueでETLワークロードのオーケストレーションを実行するワークフローを提供開始
今回行いたいこと
以下2つのジョブをAWS Glueのワークフローを用いて実行する
・1つ目のジョブ:CSVファイル(文字コード=Shift-JIS、改行コード=CRLF)をS3から抽出し、
文字コードをUTF-8、改行コードをLFに変換しS3に再アップロード
・2つ目のジョブ:S3に再アップロードされたCSVファイルを抽出し、Redshiftクラスターへロードする
※今回のインフラ設定については以下の記事を参照しています。
【AWS】RedshiftとAWS Glue接続時のインフラ設定
今回S3から取り込むCSVファイルの中身
今回のワークフロー画面
1つ目のジョブ設定(kanamori-test1)
処理内容
CSVファイル(文字コード=Shift-JIS、改行コード=CRLF)をS3から抽出し、文字コードをUTF-8、
改行コードをLFに変換しS3に再アップロードする
Script editorで設定
・Visual ETLでScript editorを選択し、EngineをPython shell、OptionsでStart freshを選択する。
・以下のPythonスクリプトを入力する
import boto3
input_bucket = 'aws-kensyo-kanamori'
input_key = 'glue-test/sample_kabu_shiftjis_crlf.csv'
output_bucket = 'aws-kensyo-kanamori'
output_key = 'glue-test/output/sample_kabu_utf8_lf.csv'
def convert_encoding(input_bucket, input_key, output_bucket, output_key):
s3 = boto3.resource('s3')
keyobj = s3.Object(input_bucket, input_key)
body = keyobj.get()['Body'].read()
sjis_format = body.decode('shift_jis')
formatline = sjis_format.replace('\r\n', '\n')
outputobj = s3.Object(output_bucket, output_key)
outputobj.put(Body=formatline)
convert_encoding(input_bucket, input_key, output_bucket, output_key)
Pythonスクリプトの解説
AWS SDK for PythonでAWSのリソースにアクセスするためのboto3ライブラリをインポート
import boto3
変換前のCSVファイルが格納されているS3バケットとそのファイルパス、変換後のCSVファイルをアップロードするS3バケットとそのファイルパスを指定
input_bucket = 'aws-kensyo-kanamori'
input_key = 'glue-test/sample_kabu_shiftjis_crlf.csv'
output_bucket = 'aws-kensyo-kanamori'
output_key = 'glue-test/output/sample_kabu_utf8_lf.csv'
convert_encodingという関数を定義している。
この関数は次の4つの引数を取る:input_bucket, input_key, output_bucket, output_key
def convert_encoding(input_bucket, input_key, output_bucket, output_key):
S3のリソースオブジェクトを作成。これは高レベルAPIでより簡単に操作可能。
s3 = boto3.resource('s3')
指定されたバケットとキーからS3オブジェクトを取得。
keyobj = s3.Object(input_bucket, input_key)
S3オブジェクトの内容をバイト形式で読み込む。
body = keyobj.get()['Body'].read()
読み込んだバイトデータをShift-JISエンコーディングからデコードして文字列に変換しUTF-8にする。
sjis_format = body.decode('shift_jis')
CRLF(Windows形式)をLF(Unix形式)に置換する。
formatline = sjis_format.replace('\r\n', '\n')
変換後のデータをアップロードするためのS3オブジェクトを取得する。
outputobj = s3.Object(output_bucket, output_key)
変換後のデータを指定したバケットとキーにアップロードする。
outputobj.put(Body=formatline)
定義したconvert_encoding関数を呼び出して実行
convert_encoding(input_bucket, input_key, output_bucket, output_key)
2つ目のジョブ設定(kanamori-test2)
処理内容
S3に再アップロードされたCSVファイルを抽出し、Redshiftクラスターへロードする
Visual ETLのGUI操作で設定
・SourcesタブからS3を選択し、以下の内容で設定をする
-s3 source type:s3 location
-s3 URL:1つ目のジョブで変換後のデータが出力されるS3フォルダのパス
※Recursiveにチェックを入れ再帰的に読み込むようにする
-Data format:CSVで区切り文字をコンマ(,)にする
・ダミーデータをS3にアップロードして保存できる状態にする
※空のフォルダだとジョブの設定が保存できないため
・TargetsタブからRedshiftを選択し、glue_testの名前のテーブルを作成してデータをロードさせるように設定する
・ジョブの設定後にダミーデータをS3から削除する
ワークフロー設定
①ワークフローの作成
ナビゲーションペインからWorkflows(orchestration)を選択し、ワークフローの追加をする
②開始トリガーを追加
Add triggerを押す
手動でワークフローを実行するので、Trigger typeをOn demandに設定する
③1つ目のジョブを追加
Add nodeを押し、1つ目のジョブを設定する
④1つ目のジョブが終わった後に2つ目のジョブを開始するトリガーを設定
1つ目のジョブ(kanamori-test1)を押した後に、Add triggerを押す
Trigger typeをEventに、Trigger logicをStart after ALL watched eventに設定する
⑤2つ目のジョブを追加
画面の右側にあるAdd nodeを押し、2つ目のジョブを設定する
これでワークフローの設定が完了する。
ワークフローの実行
ワークフローを実行するボタンを押し、HistoryタブからStatusがrunningになっているRun IDを選択。その後にview run detailを押すとワークフローの状態を確認することができる。
StatusがSucceededになればワークフローが完了し、Redshiftテーブルの作成後にデータがロードされていることが確認できる。
※データをローカルに保存し、改行コードがLF(Unix形式)になっていることも確認できました。
まとめ
今回はAWS Glueのワークフローを使用して、CSVファイル(文字コード=Shift-JIS、改行コード=CRLF)をS3から抽出し、文字コードをUTF-8、改行コードをLFに変換後、RedshiftにロードするETLジョブの構築方法について紹介しました。この手法を応用することで、他の形式のデータ変換が可能になります。皆様の参考になれば幸いです。
最後に「このサービスについて知りたい」「AWS環境の構築、移行」などのリクエストがございましたら、弊社お問合せフォームまでお気軽にご連絡ください! のちほど当ブログにてご紹介させていただくか、複雑な内容に関するお問い合わせの内容の場合には直接営業からご連絡を差し上げます。
※Amazon Web Services、”Powered by Amazon Web Services”ロゴ、およびブログで使用されるその他のAWS商標は、米国その他の諸国における、Amazon.com, Inc.またはその関連会社の商標です。
※Pythonは、Python Software Foundationの登録商標です。