1. 主要ページへ移動
  2. メニューへ移動
  3. ページ下へ移動

QES ブログ

記事公開日

最終更新日

【AWS】AWS Glueのワークフローを使いS3上のデータを加工後にRedshiftへ取り込む

  • このエントリーをはてなブックマークに追加

はじめに

こんにちは。DXソリューション営業本部の金森です。
本記事では、AWS Glueのワークフローを使用してS3にあるCSVファイルをデータ変換し、
RedshiftにロードするETLジョブの構築方法についてご説明したいと思います。

AWS Glue ワークフローについて

Glueのエンティティ間(トリガー、クローラ、ジョブ)の依存関係を構築することでETLワークロードのオーケストレーションを可能にする。そのワークフローのさまざまなノードのステータスをコンソールで視覚的に追跡でき、進行状況のモニタリングと問題のトラブルシューティングが簡単になります。また、ワークフローのエンティティ間のパラメータを共有できる。
引用元:AWS GlueでETLワークロードのオーケストレーションを実行するワークフローを提供開始

 

今回行いたいこと

以下2つのジョブをAWS Glueのワークフローを用いて実行する

・1つ目のジョブ:CSVファイル(文字コード=Shift-JIS、改行コード=CRLF)をS3から抽出し、
 文字コードをUTF-8、改行コードをLFに変換しS3に再アップロード

・2つ目のジョブ:S3に再アップロードされたCSVファイルを抽出し、Redshiftクラスターへロードする

※今回のインフラ設定については以下の記事を参照しています。
【AWS】RedshiftとAWS Glue接続時のインフラ設定

今回S3から取り込むCSVファイルの中身

今回のワークフロー画面



1つ目のジョブ設定(kanamori-test1)

処理内容

CSVファイル(文字コード=Shift-JIS、改行コード=CRLF)をS3から抽出し、文字コードをUTF-8、
改行コードをLFに変換しS3に再アップロードする

Script editorで設定

・Visual ETLでScript editorを選択し、EngineをPython shell、OptionsでStart freshを選択する。

・以下のPythonスクリプトを入力する


import boto3

input_bucket = 'aws-kensyo-kanamori'
input_key = 'glue-test/sample_kabu_shiftjis_crlf.csv'
output_bucket = 'aws-kensyo-kanamori'
output_key = 'glue-test/output/sample_kabu_utf8_lf.csv'

def convert_encoding(input_bucket, input_key, output_bucket, output_key):
s3 = boto3.resource('s3')

keyobj = s3.Object(input_bucket, input_key)
body = keyobj.get()['Body'].read()

sjis_format = body.decode('shift_jis')
formatline = sjis_format.replace('\r\n', '\n')

outputobj = s3.Object(output_bucket, output_key)
outputobj.put(Body=formatline)

convert_encoding(input_bucket, input_key, output_bucket, output_key)

Pythonスクリプトの解説

AWS SDK for PythonでAWSのリソースにアクセスするためのboto3ライブラリをインポート

import boto3


変換前のCSVファイルが格納されているS3バケットとそのファイルパス、変換後のCSVファイルをアップロードするS3バケットとそのファイルパスを指定

input_bucket = 'aws-kensyo-kanamori'
input_key = 'glue-test/sample_kabu_shiftjis_crlf.csv'
output_bucket = 'aws-kensyo-kanamori'
output_key = 'glue-test/output/sample_kabu_utf8_lf.csv'


convert_encodingという関数を定義している。
この関数は次の4つの引数を取る:input_bucket, input_key, output_bucket, output_key

def convert_encoding(input_bucket, input_key, output_bucket, output_key):


S3のリソースオブジェクトを作成。これは高レベルAPIでより簡単に操作可能。

s3 = boto3.resource('s3')


指定されたバケットとキーからS3オブジェクトを取得。

keyobj = s3.Object(input_bucket, input_key)


S3オブジェクトの内容をバイト形式で読み込む。

body = keyobj.get()['Body'].read()


読み込んだバイトデータをShift-JISエンコーディングからデコードして文字列に変換しUTF-8にする。

sjis_format = body.decode('shift_jis')


CRLF(Windows形式)をLF(Unix形式)に置換する。

formatline = sjis_format.replace('\r\n', '\n')


変換後のデータをアップロードするためのS3オブジェクトを取得する。

outputobj = s3.Object(output_bucket, output_key)


変換後のデータを指定したバケットとキーにアップロードする。

outputobj.put(Body=formatline)


定義したconvert_encoding関数を呼び出して実行

convert_encoding(input_bucket, input_key, output_bucket, output_key)

 

2つ目のジョブ設定(kanamori-test2)

処理内容

S3に再アップロードされたCSVファイルを抽出し、Redshiftクラスターへロードする

Visual ETLのGUI操作で設定

・SourcesタブからS3を選択し、以下の内容で設定をする
 -s3 source type:s3 location
 -s3 URL:1つ目のジョブで変換後のデータが出力されるS3フォルダのパス
 ※Recursiveにチェックを入れ再帰的に読み込むようにする

 -Data format:CSVで区切り文字をコンマ(,)にする

・ダミーデータをS3にアップロードして保存できる状態にする
 ※空のフォルダだとジョブの設定が保存できないため

・TargetsタブからRedshiftを選択し、glue_testの名前のテーブルを作成してデータをロードさせるように設定する

・ジョブの設定後にダミーデータをS3から削除する

ワークフロー設定

①ワークフローの作成
 ナビゲーションペインからWorkflows(orchestration)を選択し、ワークフローの追加をする

②開始トリガーを追加
 Add triggerを押す

手動でワークフローを実行するので、Trigger typeをOn demandに設定する

③1つ目のジョブを追加
 Add nodeを押し、1つ目のジョブを設定する

④1つ目のジョブが終わった後に2つ目のジョブを開始するトリガーを設定
 1つ目のジョブ(kanamori-test1)を押した後に、Add triggerを押す

Trigger typeをEventに、Trigger logicをStart after ALL watched eventに設定する

⑤2つ目のジョブを追加
 画面の右側にあるAdd nodeを押し、2つ目のジョブを設定する
これでワークフローの設定が完了する。

ワークフローの実行

ワークフローを実行するボタンを押し、HistoryタブからStatusがrunningになっているRun IDを選択。その後にview run detailを押すとワークフローの状態を確認することができる。

StatusがSucceededになればワークフローが完了し、Redshiftテーブルの作成後にデータがロードされていることが確認できる。

※データをローカルに保存し、改行コードがLF(Unix形式)になっていることも確認できました。

まとめ

今回はAWS Glueのワークフローを使用して、CSVファイル(文字コード=Shift-JIS、改行コード=CRLF)をS3から抽出し、文字コードをUTF-8、改行コードをLFに変換後、RedshiftにロードするETLジョブの構築方法について紹介しました。この手法を応用することで、他の形式のデータ変換が可能になります。皆様の参考になれば幸いです。

最後に「このサービスについて知りたい」「AWS環境の構築、移行」などのリクエストがございましたら、弊社
お問合せフォームまでお気軽にご連絡ください! のちほど当ブログにてご紹介させていただくか、複雑な内容に関するお問い合わせの内容の場合には直接営業からご連絡を差し上げます。

※Amazon Web Services、”Powered by Amazon Web Services”ロゴ、およびブログで使用されるその他のAWS商標は、米国その他の諸国における、Amazon.com, Inc.またはその関連会社の商標です。

※Pythonは、Python Software Foundationの登録商標です。

  • このエントリーをはてなブックマークに追加

お問い合わせ

Contact

ご質問やご相談、サービスに関する詳細など、何でもお気軽にご連絡ください。下記のお問い合わせフォームよりお気軽に送信ください。

お問い合わせ

資料ダウンロード

Download

当社のサービスに関する詳細情報を掲載した資料を、下記のページよりダウンロードいただけます。より深く理解していただける内容となっております。ぜひご活用ください。

資料ダウンロード