本記事の目的
Cloud Storage イベントをトリガーにして Matillion ジョブを起動することで、手動で Matillion ジョブを起動する必要がなく、データの処理を自動実行することできます。本記事では、Cloud Storage のオブジェクト作成イベントをトリガーにして Matillion ジョブを起動するための設定方法を説明します。
本記事では、Cloud Storage のオブジェクト作成イベントをトリガーにして Matillion ジョブを起動するための設定方法を説明します。
CloudStorageとは
Cloud Storageは、Google Cloud Platform(GCP)が提供する、オブジェクトストレージサービスです。Cloud Storageは、Webアプリケーション、データベース、分析、機械学習などのさまざまな用途に使用できます。
Cloud Storageは、次の特徴を持ちます。
- スケーラブル:Cloud Storageは、データの量に合わせてスケールできます。
- 高可用性:Cloud Storageは、99.9%以上の可用性を保証しています。
- セキュリティ:Cloud Storageは、データのセキュリティを強化するために、さまざまなセキュリティ機能を提供しています。
- コスト効率:Cloud Storageは、使用量に応じて課金されるため、コストを削減できます。
Cloud Storageは、データの保存や共有に最適なサービスです。
Matillionとは
Matillionはクラウドデータウェアハウス向けに構築されたデータインテグレーションツールで、Amazon Redshift、Google BigQuery(以下BQ)、Snowflake、Azure Synapseなどのクラウドデータベースプラットフォーム用に特別に構築されています。
様々なオンプレミスやSaaSのデータをクラウド上へロードするフローを簡単に作成することが可能で、分析等に向けて、散在しているデータを統合し一元管理したいという方におすすめのツールとなっています。
前提条件
本記事で説明する設定をするには、以下の設定が事前にされていることを前提とします
- Matillionインスタンスが構築されていること(参考記事:Matillion ETL for BigQueryで構築からデータ連携まで簡単に実装!|テクニカルブログ)
- gcloudコマンド、gsutilコマンドがインストールされていること
アーキテクチャ図
以下の流れで処理が動きます。
① CloudStorageにオブジェクトがアップロードされると、Pub/Subでメッセージがパブリッシュされる
② ①でパブリッシュされたメッセージでCloudFunctionsの関数が起動される
③ ②のCloudFunctions関数でMatillionジョブ起動用のメッセージをPub/Subでパブリッシュする
④ ③でパブリッシュされたメッセージにより、Matillionジョブが起動される
設定
ここからは、CloudShellまたはMatillionの画面を使用して、リソースの構築・設定を行います。
各リソース名
CloudShellを開き、各リソース名を設定します。
$ REGION={任意のリージョン名(例:asia-northeast1)}
$ PROJECT={GoogleCloudプロジェクト名}
$ TOPIC_NAME_GCS_TRIGGER={任意のGCSイベント通知Pub/Subトピック名}
$ TOPIC_NAME_MTLN_TRIGGER={任意のMatillion起動Pub/Subトピック名}
$ SUBSCRIPTION_NAME_MTLN_TRIGGER={任意のMatillion起動Pub/Subサブスクリプション名}
$ BUCKET_NAME={任意のCloudStorageバケット名}
$ FUNCTION_NAME={任意の関数名}
$ MTLN_GROUP={任意のMatillionグループ名}
$ MTLN_PROJECT={任意のMatillionプロジェクト名}
$ MTLN_VERSION={任意のMatillionバージョン名}
$ MTLN_ENVIRONMENT={任意のMatillion環境名}
$ MTLN_JOB={任意のMatillionジョブ名}
上記コマンドの設定例は以下の通りです。
例:
$ REGION=asia-northeast1
$ PROJECT={GoogleCloudプロジェクト名}
$ TOPIC_NAME_GCS_NOTIFICATION=topic-gcs-notification
$ TOPIC_NAME_MTLN_TRIGGER=topic-mtln-trigger
$ SUBSCRIPTION_NAME_MTLN_TRIGGER=sub-mtln-trigger
$ BUCKET_NAME=$PROJECT-data
$ FUNCTION_NAME=launch-mtln-job
$ MTLN_GROUP=test
$ MTLN_PROJECT=test
$ MTLN_VERSION=default
$ MTLN_ENVIRONMENT=test
$ MTLN_JOB=test-job
Pub/Sub
Cloud Storage Pub/Sub通知用トピックを作成します。
$ gcloud pubsub topics create $TOPIC_NAME_GCS_NOTIFICATION --message-storage-policy-allowed-regions=$REGION
Matillionジョブ起動用トピック・サブスクリプションを作成します。
$ gcloud pubsub topics create $TOPIC_NAME_MTLN_TRIGGER --message-storage-policy-allowed-regions=$REGION
$ gcloud pubsub subscriptions create $SUBSCRIPTION_NAME_MTLN_TRIGGER --topic=$TOPIC_NAME_MTLN_TRIGGER --enable-exactly-once-delivery --enable-message-ordering
Matillion
プロジェクト作成
① Matillionにログインし、プロジェクトの選択画面を表示し、「Create Project」をクリックする② Project Group に、 各リソース名 で設定した MTLN_GROUP の値を設定する
③ Project Name に、 各リソース名 で設定した MTLN_PROJECT の値を設定する
④ Next ボタンを押下する
⑤ Environment Name に、 各リソース名 で設定した MTLN_ENVIRONMENTの値を設定する
⑥ Default Project に、使用するGoogleCloudのプロジェクト名を設定する
⑦ Default Datasetに、使用するBigQueryのデータセット名を設定する
⑧ Finish ボタンを押下する
プロジェクトの作成が完了しました。
ジョブ作成
① Matillion画面の左ペインの Version名 「default」 を右クリックする
② 「Add Orchestration Job」をクリックし、「test-job」を作成する
③ 以下のコンポーネントをドラッグ&ドロップして、test-job のコンポーネント図に配置する
- Python Script
- End Success
- End Failure
④ 各コンポーネント間をコネクタで接続する
⑧ Python Scriptをクリックして、スクリプトを入力する
今回は、起動したMatillionジョブ情報と受信したGCSイベント情報をログ出力させるため、以下のコードを入力しました。
print('-----mtln job infomation-----')
print(f'project_group_name: {project_group_name}')
print(f'project_group_id: {project_group_id}')
print(f'project_name: {project_name}')
print(f'project_id: {project_id}')
print(f'version_name: {version_name}')
print(f'version_id: {version_id}')
print(f'environment_name: {environment_name}')
print(f'environment_default_schema: {environment_default_schema}')
print(f'environment_database: {environment_database}')
print(f'environment_endpoint: {environment_endpoint}')
print(f'environment_id: {environment_id}')
print(f'environment_port: {environment_port}')
print(f'environment_username: {environment_username}')
print(f'job_name: {job_name}')
print(f'job_id: {job_id}')
print(f'component_name: {component_name}')
print(f'component_id: {component_id}')
print(f'run_history_id: {run_history_id}')
print(f'task_id: {task_id}')
print('-----gcs object infomation-----')
print(f'BUCKET_ID: {BUCKET_ID}')
print(f'OBJECT_ID: {OBJECT_ID}')
print(f'GENERATION: {GENERATION}')
print(f'METAGENERATION: {METAGENERATION}')
print(f'TIME_CREATED: {TIME_CREATED}')
print(f'UPDATED: {UPDATED}')
⑥ Manage Variables > Manage Environment Variables をクリックする
⑦ 環境変数を以下の図の通りに入力する
⑧ OK ボタンをクリックする
ジョブの作成が完了しました。
Pub/Sub設定
① Project > Manage Pub/Sub Configuration を押下する
② Enable Pub/Sub をチェックする
③ Project にMatillionジョブ起動用トピック・サブスクリプションが存在するGoogleCloudプロジェクト名を入力する
④ Subscription で Matillionジョブ起動用サブスクリプションを選択する
⑤ Test ボタンを押下して、「success」になることを確認する
※ 権限エラーになった場合は、Matillion用GCEインスタンスのサービスアカウントの権限が不足している可能性があります。
⑥ OK ボタンを押下する
Pub/Sub通知設定が完了しました。
Cloud Storage
Cloud Shellで、オブジェクトアップロード先となる、CloudStorageのバケットを作成し、Pub/Sub通知構成を設定します。
$ gsutil mb gs://$BUCKET_NAME
$ gcloud storage buckets notifications create gs://$BUCKET_NAME --topic=$TOPIC_NAME_GCS_NOTIFICATION --event-types=OBJECT_FINALIZE
Cloud Functions
pythonソースコード
Cloud Shellで、任意のディレクトリに以下のpythonソースコードを配置します。
requirements.txt
functions-framework==3.*
google-cloud-pubsub
main.py
import functions_framework
import base64
import json
import os
from google.cloud import pubsub_v1
# 環境変数
project_id = os.environ.get('PROJECT', 'Specified environment variable is not set.')
topic_id = os.environ.get('TOPIC_ID', 'Specified environment variable is not set.')
mtln_group = os.environ.get('MTLN_GROUP', 'Specified environment variable is not set.')
mtln_project = os.environ.get('MTLN_PROJECT', 'Specified environment variable is not set.')
mtln_version = os.environ.get('MTLN_VERSION', 'Specified environment variable is not set.')
mtln_environment = os.environ.get('MTLN_ENVIRONMENT', 'Specified environment variable is not set.')
mtln_job = os.environ.get('MTLN_JOB', 'Specified environment variable is not set.')
# GCPクライアント
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
@functions_framework.cloud_event
def main(cloud_event) -> None:
print('launch-etl: start')
print(f'cloud_event: {cloud_event}')
# イベント情報の受け取り
data = base64.b64decode(cloud_event.data['message']['data']).decode()
print(f'data: {data}')
data_json = json.loads(data)
bucket = data_json['bucket']
name = data_json['name']
time_created = data_json['timeCreated']
updated = data_json['updated']
generation = data_json['generation']
metageneration = data_json['metageneration']
# Pub/Sub メッセージの生成
message = {
'group' : mtln_group,
'project' : mtln_project,
'version' : mtln_version,
'environment' : mtln_environment,
'job' : mtln_job,
'variables': {
'BUCKET_ID' : bucket,
'OBJECT_ID' : name,
'GENERATION' : generation,
'METAGENERATION' : metageneration,
'TIME_CREATED' : time_created,
'UPDATED' : updated,
}
}
message_json = json.dumps(message)
message_bytes = message_json.encode('utf-8')
print(f'launch-etl: message: {message_json}')
# メッセージのパブリッシュ
publish_future = publisher.publish(topic_path, message_bytes)
print(f'launch-etl: message published: {publish_future.result()}')
print('launch-etl: finish')
関数のデプロイ
import base64
import json
import os
from google.cloud import pubsub_v1
# 環境変数
project_id = os.environ.get('PROJECT', 'Specified environment variable is not set.')
topic_id = os.environ.get('TOPIC_ID', 'Specified environment variable is not set.')
mtln_group = os.environ.get('MTLN_GROUP', 'Specified environment variable is not set.')
mtln_project = os.environ.get('MTLN_PROJECT', 'Specified environment variable is not set.')
mtln_version = os.environ.get('MTLN_VERSION', 'Specified environment variable is not set.')
mtln_environment = os.environ.get('MTLN_ENVIRONMENT', 'Specified environment variable is not set.')
mtln_job = os.environ.get('MTLN_JOB', 'Specified environment variable is not set.')
# GCPクライアント
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
@functions_framework.cloud_event
def main(cloud_event) -> None:
print('launch-etl: start')
print(f'cloud_event: {cloud_event}')
# イベント情報の受け取り
data = base64.b64decode(cloud_event.data['message']['data']).decode()
print(f'data: {data}')
data_json = json.loads(data)
bucket = data_json['bucket']
name = data_json['name']
time_created = data_json['timeCreated']
updated = data_json['updated']
generation = data_json['generation']
metageneration = data_json['metageneration']
# Pub/Sub メッセージの生成
message = {
'group' : mtln_group,
'project' : mtln_project,
'version' : mtln_version,
'environment' : mtln_environment,
'job' : mtln_job,
'variables': {
'BUCKET_ID' : bucket,
'OBJECT_ID' : name,
'GENERATION' : generation,
'METAGENERATION' : metageneration,
'TIME_CREATED' : time_created,
'UPDATED' : updated,
}
}
message_json = json.dumps(message)
message_bytes = message_json.encode('utf-8')
print(f'launch-etl: message: {message_json}')
# メッセージのパブリッシュ
publish_future = publisher.publish(topic_path, message_bytes)
print(f'launch-etl: message published: {publish_future.result()}')
print('launch-etl: finish')
Cloud Shellで、関数をデプロイします。
$ gcloud functions deploy $FUNCTION_NAME \
--gen2 \
--runtime python311 \
--region $REGION \
--source . \
--entry-point main \
--trigger-topic $TOPIC_NAME_GCS_NOTIFICATION \
--set-env-vars PROJECT=$PROJECT,TOPIC_ID=$TOPIC_NAME_MTLN_TRIGGER,MTLN_GROUP=$MTLN_GROUP,MTLN_PROJECT=$MTLN_PROJECT,MTLN_VERSION=$MTLN_VERSION,MTLN_ENVIRONMENT=$MTLN_ENVIRONMENT,MTLN_JOB=$MTLN_JOB \
--ingress-settings "internal-only" \
--egress-settings "all" \
--serve-all-traffic-latest-revision-
動作確認
試しに、ファイルをCloudStorageにアップロードしてみます。
$ gsutil cp test.txt gs://$BUCKET_NAME
Matillionの画面を見ると、右下ペインの Tasks に実行されたジョブのタスクが表示されます。
Python Scriptの実行結果を確認します。
① Tasks > 実行されたTask > 詳細表示 ボタンを押下する
② ジョブ名 > Python Script > Message の詳細表示ボタンをクリックする
③ 出力結果を確認する
CloudStorageオブジェクト作成イベントにより、Matillionジョブが起動されることを確認しました。
クリーンアップ
Matillionの画面で、Project > Delete Project を選択し、作成したプロジェクトを削除します。
CloudShellで、以下のコマンドを実行し、Google Cloudの以下のリソースを削除します。
$ gcloud functions delete $FUNCTION_NAME --region $REGION
$ gcloud storage rm --recursive gs://$BUCKET_NAME
$ gcloud pubsub topics delete $TOPIC_NAME_GCS_NOTIFICATION
$ gcloud pubsub topics delete $TOPIC_NAME_MTLN_TRIGGER
最後に
Matillionジョブを使用することで、BigQueryへのファイル読み込みやデータ加工、データマートの作成が簡単に行えます。
また、MatillionのShared Jobsという機能を使うことで、処理の共通化も簡単となり、開発工数の削減が期待できます。
是非、Matillion ETL for BigQuery の導入をご検討ください。
Google Cloud、Google Workspace に関するご相談はXIMIXへ!
Google Cloud、Google Workspaceに関する お問い合わせはこちら
XIMIX(サイミクス)は商標登録出願中です
執筆者紹介
- カテゴリ:
- クラウド
- キーワード:
- Google Cloud