BigQuery Storage Write APIを利用してBigQueryにインサートする

 2023.12.09 2023.12.10

 

はじめに

CI部の北脇です。
BigQueryへのデータのロード方法についてご存じでしょうか?
単純なinsert処理だけでなく、Storage Write APIを利用したインサート方法も存在します。
この方法を採用すると、一般的なInsertAllでのデータ追加と比べて、料金が割安となるという優れたメリットがあります。
この記事では、そのようなStorage Write APIを活用したデータのインサート方法に焦点を当てて詳しく解説します。

BigQuery Storage Write APIとは

BigQueryにデータを取り込む方法の一つになります。
方式としては大きくストリーミング方式とバッチ方式の2つの手法が選べます。
更に、この方法では、データの書き込みと同時にcommit(確定)をするトランザクション処理が可能です。commitが何らかの理由で失敗した場合でも、操作を安全に再試行することが可能です。

また、BigQueryへのInsertと比べてコスト効率が良いことも要点です。
無償枠では、1か月あたり最大2TiBのデータを取り込むことができます。

より詳細な情報は、以下のGoogle Cloudの公式ドキュメントをご参照ください。
参考リンク:
https://cloud.google.com/bigquery/docs/write-api?hl=ja

ストリーミング方式とバッチ方式について

ここでは、ストリーミング方式とバッチ方式という2つのBigQueryへのデータロード方法について解説します。

ストリーミング方式

これは従来のInsertAllでのインサートに近い方法です。
この方式ではデータは1行ずつインサートされ、インサートされたデータは即座に利用可能になります。
ただし、同じレコードがBigQuery上に存在しても重複チェックは実施せず、そのままデータがインサートされます。

詳細は以下のGoogle公式ドキュメンテーションをご参照ください。
参考リンク:https://cloud.google.com/bigquery/docs/write-api?hl=ja#default_stream

バッチ方式

いくつかのやり方がありますが今回は「保留中のタイプ」と呼ばれる方式について説明します。

この方式では、インサート対象のレコードを一時的に保留し、commitコマンドが実行されると保留していた全てのレコードが一括でインサートされます。
commitが実行されるまでは何もインサートされません。
もし途中でエラーが発生した場合、保留中だったレコードはインサートされないままリリースされた場合は破棄されます。

インサート先テーブル

今回は下記usersテーブルにバッチ方式の保留中タイプでインサートする例をご紹介します。

実装例

次に、具体的な実装例をご紹介します。今回使用するのは、nodeJSです。

まず、必要なライブラリをインポートします。今回は、「@google-cloud/bigquery」と「@google-cloud/bigquery-storage」という二つのライブラリを追加、利用しています。

import { BigQuery } from "@google-cloud/bigquery";
import { adapt, managedwriter } from "@google-cloud/bigquery-storage";

次に、具体的な処理について見ていきましょう。
最初の3行では、変数としてデータセット名、テーブル名、プロジェクトIDを特定のテーブルに対して指定します。
その後のstorageWriteApiという関数が実際処理する関数となります。

const datasetId = {データセット名};
const tableName = {テーブル名};
const projectId = {プロジェクトID};
const bq = new BigQuery({ projectId: projectId });

async function storageWriteApi(jsonData: any) {
  const dataset = bq.dataset(datasetId);
  const table = await dataset.table(tableName);
  const [metadata] = await table.getMetadata();
  const { schema } = metadata;

  const { WriterClient, JSONWriter } = managedwriter;
  const destinationTable = `projects/${projectId}/datasets/${datasetId}/tables/${tableName}`;
  const streamType = managedwriter.PendingStream;
  const writeClient = new WriterClient({ projectId: projectId });
  try {
    const storageSchema =
      adapt.convertBigQuerySchemaToStorageTableSchema(schema);
    const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor(
      storageSchema,
      "root"
    );

    const connection = await writeClient.createStreamConnection({
      streamType,
      destinationTable,
    });

    const streamId = connection.getStreamId();
    const writer: managedwriter.JSONWriter = new JSONWriter({
      connection,
      protoDescriptor,
    });
    const pendingWrites = [];
    let offset = 0;
    let pw = writer.appendRows(jsonData, offset);
    pendingWrites.push(pw);

    const results = await Promise.all(
      pendingWrites.map((pw) => pw.getResult())
    );
    console.log("Write results:", results);

    const rowCount = await connection.finalize();
    console.log(`Row count: ${rowCount}`);

    const response = await writeClient.batchCommitWriteStream({
      parent: destinationTable,
      writeStreams: [streamId],
    });

    console.log(response);
  } catch (err) {
    console.log(err);
  } finally {
    writeClient.close();
  }
}

少しずつ解説していきます。
まず、データセットとテーブルを指定し、テーブルのメタデータを取得します。

const dataset = bq.dataset(datasetId);
const table = await dataset.table(tableName);
const [metadata] = await table.getMetadata();
const { schema } = metadata;

次に、managedwriterからWriterClientとJSONWriterを取り出し、その後の処理に使う各種データをセットします。
ここでは対象となるストリームタイプの設定や、Storage Write APIで利用するクライアントの設定を行います。
ストリームタイプとして今回は保留中の「PendingStream」を選択しています。
const { WriterClient, JSONWriter } = managedwriter;
const destinationTable = `projects/${projectId}/datasets/${datasetId}/tables/${tableName}`;
const streamType = managedwriter.PendingStream;
const writeClient = new WriterClient({ projectId: projectId });

以上で、対象のテーブルの情報や各カラムの構成など、テーブルの定義情報を取得しました。
これがテーブル定義情報の取得から、Storage Write APIを利用するための初期設定になります。

次に、Storage Write APIを利用するためのコネクション設定と、データを一時的に保持するためのwriter変数を設定します。

const storageSchema = adapt.convertBigQuerySchemaToStorageTableSchema(schema);
const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor(
storageSchema,
  "root"
);

const connection = await writeClient.createStreamConnection({
  streamType,
  destinationTable,
});

const streamId = connection.getStreamId();
const writer: managedwriter.JSONWriter = new JSONWriter({
  connection,
  protoDescriptor,
});

以上のコードにより、Storage Write APIへの接続と、書き込むデータの一時保管を行うための変数を設定することができました。

最後に、データのストリーム書き込みとその終了、そして最終的なデータのcommit(確定)を行います。
まず、インサート対象のレコードを一時的にストリームへ書き込む操作を以下のコードで行います。

const pendingWrites = [];
let offset = 0;
let pw = writer.appendRows(jsonData, offset);
pendingWrites.push(pw);

const results = await Promise.all(
pendingWrites.map((pw) => pw.getResult())
);
console.log("Write results:", results);

次に、ストリームへの書き込みを終了します。
これはconnection.finalize()を呼び出すことで行います。
この操作の後に再度appendRowsでストリームを追加しようとするとエラーが出ます。
なお、この段階ではまだデータはcommitしていないため、実際のテーブルへはインサートされていません。

const rowCount = await connection.finalize();
console.log(`Row count: ${rowCount}`);

最後に、writeClient.batchCommitWriteStreamを実行することでデータがcommit(確定)され、テーブルへのインサートが行われます。
const response = await writeClient.batchCommitWriteStream({
parent: destinationTable,
writeStreams: [streamId],
});

ここまでで、プログラムの具体的な解説を終えます。
それでは、具体的なデータを使って実際にインサートの操作を行ってみましょう。
前述の関数に引数として渡すデータは、定義したテーブルの各カラムに対応したデータ形式で指定します。

const userData = [
  { id: "id1", name: "aa", age: 11 },
  { id: "id2", name: "bb", age: 22 },
];

storageWriteApi(userData);


レコードが指定した通り新しく追加されていることを確認できました。

まとめ

以上、BigQueryのStorage Write APIを利用したインサート方法について解説しました。
通常のInsertと比べて実装が複雑に感じるかもしれません。
しかし、このStorage Write APIの特徴と魅力は、1カ月あたり最大2TiBまで無料で利用可能な点にあります。

もし小規模なインサート処理が必要であれば、通常のinsertで問題ありません。
しかし、ビッグデータの取り扱いや大規模なインサート処理が必要になる場合は、こちらのほうがコスト削減に効果的です。
そのような局面に直面した際は、ぜひこの方法をご検討ください。

Google Cloud、Google Workspace に関するご相談はXIMIXへ!

Google Cloud、Google Workspaceに関する お問い合わせはこちら
XIMIX(サイミクス)は商標登録出願中です


BigQuery Storage Write APIを利用してBigQueryにインサートする

BACK TO LIST