Cloud Storage イベントをトリガーにしてMatillionジョブを起動する

 2023.11.30 XIMIX 重永

        

本記事の目的

Cloud Storage イベントをトリガーにして Matillion ジョブを起動することで、手動で Matillion ジョブを起動する必要がなく、データの処理を自動実行することできます。本記事では、Cloud Storage のオブジェクト作成イベントをトリガーにして Matillion ジョブを起動するための設定方法を説明します。

本記事では、Cloud Storage のオブジェクト作成イベントをトリガーにして Matillion ジョブを起動するための設定方法を説明します。

CloudStorageとは

Cloud Storageは、Google Cloud Platform(GCP)が提供する、オブジェクトストレージサービスです。Cloud Storageは、Webアプリケーション、データベース、分析、機械学習などのさまざまな用途に使用できます。

Cloud Storageは、次の特徴を持ちます。

  • スケーラブル:Cloud Storageは、データの量に合わせてスケールできます。
  • 高可用性:Cloud Storageは、99.9%以上の可用性を保証しています。
  • セキュリティ:Cloud Storageは、データのセキュリティを強化するために、さまざまなセキュリティ機能を提供しています。
  • コスト効率:Cloud Storageは、使用量に応じて課金されるため、コストを削減できます。

Cloud Storageは、データの保存や共有に最適なサービスです。

Matillionとは

Matillionはクラウドデータウェアハウス向けに構築されたデータインテグレーションツールで、Amazon Redshift、Google BigQuery(以下BQ)、Snowflake、Azure Synapseなどのクラウドデータベースプラットフォーム用に特別に構築されています。

様々なオンプレミスやSaaSのデータをクラウド上へロードするフローを簡単に作成することが可能で、分析等に向けて、散在しているデータを統合し一元管理したいという方におすすめのツールとなっています。

前提条件

本記事で説明する設定をするには、以下の設定が事前にされていることを前提とします

アーキテクチャ図

1-2

以下の流れで処理が動きます。

① CloudStorageにオブジェクトがアップロードされると、Pub/Subでメッセージがパブリッシュされる

② ①でパブリッシュされたメッセージでCloudFunctionsの関数が起動される

③ ②のCloudFunctions関数でMatillionジョブ起動用のメッセージをPub/Subでパブリッシュする

④ ③でパブリッシュされたメッセージにより、Matillionジョブが起動される

設定

ここからは、CloudShellまたはMatillionの画面を使用して、リソースの構築・設定を行います。

各リソース名

CloudShellを開き、各リソース名を設定します。

$ REGION={任意のリージョン名(例:asia-northeast1)}
$ PROJECT={GoogleCloudプロジェクト名}
$ TOPIC_NAME_GCS_TRIGGER={任意のGCSイベント通知Pub/Subトピック名}
$ TOPIC_NAME_MTLN_TRIGGER={任意のMatillion起動Pub/Subトピック名}
$ SUBSCRIPTION_NAME_MTLN_TRIGGER={任意のMatillion起動Pub/Subサブスクリプション名}
$ BUCKET_NAME={任意のCloudStorageバケット名}
$ FUNCTION_NAME={任意の関数名}
$ MTLN_GROUP={任意のMatillionグループ名}
$ MTLN_PROJECT={任意のMatillionプロジェクト名}
$ MTLN_VERSION={任意のMatillionバージョン名}
$ MTLN_ENVIRONMENT={任意のMatillion環境名}
$ MTLN_JOB={任意のMatillionジョブ名}

上記コマンドの設定例は以下の通りです。

例:
$ REGION=asia-northeast1
$ PROJECT={GoogleCloudプロジェクト名}
$ TOPIC_NAME_GCS_NOTIFICATION=topic-gcs-notification
$ TOPIC_NAME_MTLN_TRIGGER=topic-mtln-trigger
$ SUBSCRIPTION_NAME_MTLN_TRIGGER=sub-mtln-trigger
$ BUCKET_NAME=$PROJECT-data
$ FUNCTION_NAME=launch-mtln-job
$ MTLN_GROUP=test
$ MTLN_PROJECT=test
$ MTLN_VERSION=default
$ MTLN_ENVIRONMENT=test
$ MTLN_JOB=test-job

Pub/Sub

Cloud Storage Pub/Sub通知用トピックを作成します。

$ gcloud pubsub topics create $TOPIC_NAME_GCS_NOTIFICATION --message-storage-policy-allowed-regions=$REGION

Matillionジョブ起動用トピック・サブスクリプションを作成します。

$ gcloud pubsub topics create $TOPIC_NAME_MTLN_TRIGGER --message-storage-policy-allowed-regions=$REGION

$ gcloud pubsub subscriptions create $SUBSCRIPTION_NAME_MTLN_TRIGGER --topic=$TOPIC_NAME_MTLN_TRIGGER --enable-exactly-once-delivery --enable-message-ordering


Matillion

プロジェクト作成

① Matillionにログインし、プロジェクトの選択画面を表示し、「Create Project」をクリックする

2

② Project Group に、 各リソース名 で設定した MTLN_GROUP の値を設定する

③ Project Name に、 各リソース名 で設定した MTLN_PROJECT の値を設定する

④ Next ボタンを押下する

3-1

⑤ Environment Name に、 各リソース名 で設定した MTLN_ENVIRONMENTの値を設定する

⑥ Default Project に、使用するGoogleCloudのプロジェクト名を設定する

⑦ Default Datasetに、使用するBigQueryのデータセット名を設定する

⑧ Finish ボタンを押下する

4

プロジェクトの作成が完了しました。

ジョブ作成

① Matillion画面の左ペインの Version名 「default」 を右クリックする

② 「Add Orchestration Job」をクリックし、「test-job」を作成する

6

③ 以下のコンポーネントをドラッグ&ドロップして、test-job のコンポーネント図に配置する

  • Python Script
  • End Success
  • End Failure

④ 各コンポーネント間をコネクタで接続する

7

⑧ Python Scriptをクリックして、スクリプトを入力する

今回は、起動したMatillionジョブ情報と受信したGCSイベント情報をログ出力させるため、以下のコードを入力しました。

print('-----mtln job infomation-----')
print(f'project_group_name: {project_group_name}')
print(f'project_group_id: {project_group_id}')
print(f'project_name: {project_name}')
print(f'project_id: {project_id}')
print(f'version_name: {version_name}')
print(f'version_id: {version_id}')
print(f'environment_name: {environment_name}')
print(f'environment_default_schema: {environment_default_schema}')
print(f'environment_database: {environment_database}')
print(f'environment_endpoint: {environment_endpoint}')
print(f'environment_id: {environment_id}')
print(f'environment_port: {environment_port}')
print(f'environment_username: {environment_username}')
print(f'job_name: {job_name}')
print(f'job_id: {job_id}')
print(f'component_name: {component_name}')
print(f'component_id: {component_id}')
print(f'run_history_id: {run_history_id}')
print(f'task_id: {task_id}')

print('-----gcs object infomation-----')
print(f'BUCKET_ID: {BUCKET_ID}')
print(f'OBJECT_ID: {OBJECT_ID}')
print(f'GENERATION: {GENERATION}')
print(f'METAGENERATION: {METAGENERATION}')
print(f'TIME_CREATED: {TIME_CREATED}')
print(f'UPDATED: {UPDATED}')

8

⑥ Manage Variables > Manage Environment Variables をクリックする

⑦ 環境変数を以下の図の通りに入力する

⑧ OK ボタンをクリックする

9

ジョブの作成が完了しました。

Pub/Sub設定

① Project > Manage Pub/Sub Configuration を押下する

11

② Enable Pub/Sub をチェックする

③ Project にMatillionジョブ起動用トピック・サブスクリプションが存在するGoogleCloudプロジェクト名を入力する

④ Subscription で Matillionジョブ起動用サブスクリプションを選択する

⑤ Test ボタンを押下して、「success」になることを確認する

※ 権限エラーになった場合は、Matillion用GCEインスタンスのサービスアカウントの権限が不足している可能性があります。

⑥ OK ボタンを押下する

12

Pub/Sub通知設定が完了しました。

 

Cloud Storage

Cloud Shellで、オブジェクトアップロード先となる、CloudStorageのバケットを作成し、Pub/Sub通知構成を設定します。

$ gsutil mb gs://$BUCKET_NAME

$ gcloud storage buckets notifications create gs://$BUCKET_NAME --topic=$TOPIC_NAME_GCS_NOTIFICATION --event-types=OBJECT_FINALIZE

Cloud Functions

pythonソースコード

Cloud Shellで、任意のディレクトリに以下のpythonソースコードを配置します。

requirements.txt

functions-framework==3.*
google-cloud-pubsub

main.py

import functions_framework
import base64
import json
import os
from google.cloud import pubsub_v1

# 環境変数
project_id           = os.environ.get('PROJECT', 'Specified environment variable is not set.')
topic_id             = os.environ.get('TOPIC_ID', 'Specified environment variable is not set.')
mtln_group           = os.environ.get('MTLN_GROUP', 'Specified environment variable is not set.')
mtln_project         = os.environ.get('MTLN_PROJECT', 'Specified environment variable is not set.')
mtln_version         = os.environ.get('MTLN_VERSION', 'Specified environment variable is not set.')
mtln_environment     = os.environ.get('MTLN_ENVIRONMENT', 'Specified environment variable is not set.')
mtln_job             = os.environ.get('MTLN_JOB', 'Specified environment variable is not set.')

# GCPクライアント
publisher      = pubsub_v1.PublisherClient()
topic_path     = publisher.topic_path(project_id, topic_id) 


@functions_framework.cloud_event
def main(cloud_event) -> None:

  print('launch-etl: start')
    print(f'cloud_event: {cloud_event}')

  # イベント情報の受け取り
  data = base64.b64decode(cloud_event.data['message']['data']).decode()
  print(f'data: {data}')
  data_json = json.loads(data)
  bucket         = data_json['bucket']
  name           = data_json['name']
  time_created   = data_json['timeCreated']
  updated        = data_json['updated']
  generation     = data_json['generation']
  metageneration = data_json['metageneration']

  # Pub/Sub メッセージの生成
  message = {
      'group'       : mtln_group,
      'project'     : mtln_project,
      'version'     : mtln_version,
      'environment' : mtln_environment,
      'job'         : mtln_job,
      'variables':  {
          'BUCKET_ID'      : bucket,
          'OBJECT_ID'      : name,
          'GENERATION'     : generation,
          'METAGENERATION' : metageneration,
          'TIME_CREATED'   : time_created,
          'UPDATED'        : updated,
      }
  }
  message_json = json.dumps(message)
  message_bytes = message_json.encode('utf-8')
  print(f'launch-etl: message: {message_json}')

  # メッセージのパブリッシュ
    publish_future = publisher.publish(topic_path, message_bytes)

  print(f'launch-etl: message published: {publish_future.result()}')
    print('launch-etl: finish')
関数のデプロイ

Cloud Shellで、関数をデプロイします。

$ gcloud functions deploy $FUNCTION_NAME  \
--gen2 \
--runtime python311  \
--region $REGION  \
--source .  \
--entry-point main  \
--trigger-topic $TOPIC_NAME_GCS_NOTIFICATION  \
--set-env-vars PROJECT=$PROJECT,TOPIC_ID=$TOPIC_NAME_MTLN_TRIGGER,MTLN_GROUP=$MTLN_GROUP,MTLN_PROJECT=$MTLN_PROJECT,MTLN_VERSION=$MTLN_VERSION,MTLN_ENVIRONMENT=$MTLN_ENVIRONMENT,MTLN_JOB=$MTLN_JOB  \
--ingress-settings "internal-only"  \
--egress-settings "all"  \
--serve-all-traffic-latest-revision-

動作確認

試しに、ファイルをCloudStorageにアップロードしてみます。

$ gsutil cp test.txt gs://$BUCKET_NAME

Matillionの画面を見ると、右下ペインの Tasks に実行されたジョブのタスクが表示されます。

Python Scriptの実行結果を確認します。

① Tasks > 実行されたTask > 詳細表示 ボタンを押下する

13

② ジョブ名 > Python Script > Message の詳細表示ボタンをクリックする

③ 出力結果を確認する

14-1

CloudStorageオブジェクト作成イベントにより、Matillionジョブが起動されることを確認しました。

クリーンアップ

Matillionの画面で、Project > Delete Project を選択し、作成したプロジェクトを削除します。

CloudShellで、以下のコマンドを実行し、Google Cloudの以下のリソースを削除します。

$ gcloud functions delete $FUNCTION_NAME --region $REGION
$ gcloud storage rm --recursive gs://$BUCKET_NAME
$ gcloud pubsub topics delete $TOPIC_NAME_GCS_NOTIFICATION
$ gcloud pubsub topics delete $TOPIC_NAME_MTLN_TRIGGER

最後に

Matillionジョブを使用することで、BigQueryへのファイル読み込みやデータ加工、データマートの作成が簡単に行えます。

また、MatillionのShared Jobsという機能を使うことで、処理の共通化も簡単となり、開発工数の削減が期待できます。

是非、Matillion ETL for BigQuery の導入をご検討ください。

Google Cloud、Google Workspace に関するご相談はXIMIXへ!

Google Cloud、Google Workspaceに関する お問い合わせはこちら
XIMIX(サイミクス)は商標登録出願中です

執筆者紹介

Hikaru Shigenaga
XIMIX 重永
2019年よりGoogleCloudで開発を始める。 主に、データ分析基盤構築案件のPM/PLを担当。 主にGKE・CloudFunctions・BigQuery・CloudStorage・Matillion(ETLツール)を扱う。 Looker・Tableauの勉強中。

Cloud Storage イベントをトリガーにしてMatillionジョブを起動する

BACK TO LIST