CI部の北脇です。
BigQueryへのデータのロード方法についてご存じでしょうか?
単純なinsert処理だけでなく、Storage Write APIを利用したインサート方法も存在します。
この方法を採用すると、一般的なInsertAllでのデータ追加と比べて、料金が割安となるという優れたメリットがあります。
この記事では、そのようなStorage Write APIを活用したデータのインサート方法に焦点を当てて詳しく解説します。
BigQueryにデータを取り込む方法の一つになります。
方式としては大きくストリーミング方式とバッチ方式の2つの手法が選べます。
更に、この方法では、データの書き込みと同時にcommit(確定)をするトランザクション処理が可能です。commitが何らかの理由で失敗した場合でも、操作を安全に再試行することが可能です。
また、BigQueryへのInsertと比べてコスト効率が良いことも要点です。
無償枠では、1か月あたり最大2TiBのデータを取り込むことができます。
より詳細な情報は、以下のGoogle Cloudの公式ドキュメントをご参照ください。
参考リンク:https://cloud.google.com/bigquery/docs/write-api?hl=ja
ここでは、ストリーミング方式とバッチ方式という2つのBigQueryへのデータロード方法について解説します。
これは従来のInsertAllでのインサートに近い方法です。
この方式ではデータは1行ずつインサートされ、インサートされたデータは即座に利用可能になります。
ただし、同じレコードがBigQuery上に存在しても重複チェックは実施せず、そのままデータがインサートされます。
詳細は以下のGoogle公式ドキュメンテーションをご参照ください。
参考リンク:https://cloud.google.com/bigquery/docs/write-api?hl=ja#default_stream
いくつかのやり方がありますが今回は「保留中のタイプ」と呼ばれる方式について説明します。
この方式では、インサート対象のレコードを一時的に保留し、commitコマンドが実行されると保留していた全てのレコードが一括でインサートされます。
commitが実行されるまでは何もインサートされません。
もし途中でエラーが発生した場合、保留中だったレコードはインサートされないままリリースされた場合は破棄されます。
今回は下記usersテーブルにバッチ方式の保留中タイプでインサートする例をご紹介します。
次に、具体的な実装例をご紹介します。今回使用するのは、nodeJSです。
まず、必要なライブラリをインポートします。今回は、「@google-cloud/bigquery」と「@google-cloud/bigquery-storage」という二つのライブラリを追加、利用しています。
import { BigQuery } from "@google-cloud/bigquery";
import { adapt, managedwriter } from "@google-cloud/bigquery-storage";
次に、具体的な処理について見ていきましょう。
最初の3行では、変数としてデータセット名、テーブル名、プロジェクトIDを特定のテーブルに対して指定します。
その後のstorageWriteApiという関数が実際処理する関数となります。
const datasetId = {データセット名};
const tableName = {テーブル名};
const projectId = {プロジェクトID};
const bq = new BigQuery({ projectId: projectId });
async function storageWriteApi(jsonData: any) {
const dataset = bq.dataset(datasetId);
const table = await dataset.table(tableName);
const [metadata] = await table.getMetadata();
const { schema } = metadata;
const { WriterClient, JSONWriter } = managedwriter;
const destinationTable = `projects/${projectId}/datasets/${datasetId}/tables/${tableName}`;
const streamType = managedwriter.PendingStream;
const writeClient = new WriterClient({ projectId: projectId });
try {
const storageSchema =
adapt.convertBigQuerySchemaToStorageTableSchema(schema);
const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor(
storageSchema,
"root"
);
const connection = await writeClient.createStreamConnection({
streamType,
destinationTable,
});
const streamId = connection.getStreamId();
const writer: managedwriter.JSONWriter = new JSONWriter({
connection,
protoDescriptor,
});
const pendingWrites = [];
let offset = 0;
let pw = writer.appendRows(jsonData, offset);
pendingWrites.push(pw);
const results = await Promise.all(
pendingWrites.map((pw) => pw.getResult())
);
console.log("Write results:", results);
const rowCount = await connection.finalize();
console.log(`Row count: ${rowCount}`);
const response = await writeClient.batchCommitWriteStream({
parent: destinationTable,
writeStreams: [streamId],
});
console.log(response);
} catch (err) {
console.log(err);
} finally {
writeClient.close();
}
}
少しずつ解説していきます。
まず、データセットとテーブルを指定し、テーブルのメタデータを取得します。
const dataset = bq.dataset(datasetId);
const table = await dataset.table(tableName);
const [metadata] = await table.getMetadata();
const { schema } = metadata;
次に、managedwriterからWriterClientとJSONWriterを取り出し、その後の処理に使う各種データをセットします。
ここでは対象となるストリームタイプの設定や、Storage Write APIで利用するクライアントの設定を行います。
ストリームタイプとして今回は保留中の「PendingStream」を選択しています。const { WriterClient, JSONWriter } = managedwriter;
const destinationTable = `projects/${projectId}/datasets/${datasetId}/tables/${tableName}`;
const streamType = managedwriter.PendingStream;
const writeClient = new WriterClient({ projectId: projectId });
以上で、対象のテーブルの情報や各カラムの構成など、テーブルの定義情報を取得しました。
これがテーブル定義情報の取得から、Storage Write APIを利用するための初期設定になります。
次に、Storage Write APIを利用するためのコネクション設定と、データを一時的に保持するためのwriter変数を設定します。
const storageSchema = adapt.convertBigQuerySchemaToStorageTableSchema(schema);
const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor(
storageSchema,
"root"
);
const connection = await writeClient.createStreamConnection({
streamType,
destinationTable,
});
const streamId = connection.getStreamId();
const writer: managedwriter.JSONWriter = new JSONWriter({
connection,
protoDescriptor,
});
以上のコードにより、Storage Write APIへの接続と、書き込むデータの一時保管を行うための変数を設定することができました。
最後に、データのストリーム書き込みとその終了、そして最終的なデータのcommit(確定)を行います。
まず、インサート対象のレコードを一時的にストリームへ書き込む操作を以下のコードで行います。
const pendingWrites = [];
let offset = 0;
let pw = writer.appendRows(jsonData, offset);
pendingWrites.push(pw);
const results = await Promise.all(
pendingWrites.map((pw) => pw.getResult())
);
console.log("Write results:", results);
次に、ストリームへの書き込みを終了します。
これはconnection.finalize()を呼び出すことで行います。
この操作の後に再度appendRowsでストリームを追加しようとするとエラーが出ます。
なお、この段階ではまだデータはcommitしていないため、実際のテーブルへはインサートされていません。
const rowCount = await connection.finalize();
console.log(`Row count: ${rowCount}`);
最後に、writeClient.batchCommitWriteStreamを実行することでデータがcommit(確定)され、テーブルへのインサートが行われます。const response = await writeClient.batchCommitWriteStream({
parent: destinationTable,
writeStreams: [streamId],
});
ここまでで、プログラムの具体的な解説を終えます。
それでは、具体的なデータを使って実際にインサートの操作を行ってみましょう。
前述の関数に引数として渡すデータは、定義したテーブルの各カラムに対応したデータ形式で指定します。
const userData = [
{ id: "id1", name: "aa", age: 11 },
{ id: "id2", name: "bb", age: 22 },
];
storageWriteApi(userData);
レコードが指定した通り新しく追加されていることを確認できました。
以上、BigQueryのStorage Write APIを利用したインサート方法について解説しました。
通常のInsertと比べて実装が複雑に感じるかもしれません。
しかし、このStorage Write APIの特徴と魅力は、1カ月あたり最大2TiBまで無料で利用可能な点にあります。
もし小規模なインサート処理が必要であれば、通常のinsertで問題ありません。
しかし、ビッグデータの取り扱いや大規模なインサート処理が必要になる場合は、こちらのほうがコスト削減に効果的です。
そのような局面に直面した際は、ぜひこの方法をご検討ください。
Google Cloud、Google Workspaceに関する お問い合わせはこちら
XIMIX(サイミクス)は商標登録出願中です