CI部の北脇です。
今回は、Google Cloud Storage(GCS)に保存されているCSVファイルをBigQueryにインサートする方法を紹介します。
この方法は、GCS上に保存されているCSVファイルを、BigQueryで簡単に利用したい場合に便利です。
また、CSV内の一部のカラムのみインサートしたりデータをSQLで加工した後にインサートしたい場合に便利です。
具体的な手順としては、まずCSVファイルをGCS上に配置します。
その後、このCSVファイルをBigQueryの外部テーブルとして読み込み、BigQueryにインサートします。
以下、具体的な手順を解説していきます。
まずはじめに、BigQuery上でインサート対象となるテーブルを作成します。
次に、Google Cloud Storage(GCS)上に新たなバケットを作成します。
今回は、東京をリージョンとして設定しました。
作成したバケットに、インサート対象となるCSVファイルを配置します。
以上で事前準備は完了です。
次に、GCSからBigQueryへのインサートを行う具体的な実装例を示します。
実際のプログラムを示します。
import { BigQuery, Query } from "@google-cloud/bigquery";
const tableName = "test.gcs_test";
const tableNameTemp = "{GCSの外部テーブル名}";
const projectId = "{プロジェクトID}";
const schema = [
{ name: "id", type: "STRING" },
{ name: "name", type: "STRING" },
{ name: "age", type: "INTEGER" },
{ name: "created_at", type: "DATETIME" },
];
const bigquery = new BigQuery({ projectId: projectId });
async function loadFromGcsToBigQuery() {
const externalDataConfig: any = {
sourceFormat: "CSV",
sourceUris: ["gs://{csvを配置したGCS上のパス}"],
csvOptions: { skipLeadingRows: 1 },
schema: {
fields: schema,
},
};
//GCSにあるCSVからロード
const query = `insert into \`${tableName}\`(id, name, age, created_at)
select
id,
name,
age,
created_at
from ${tableNameTemp};`;
const options: Query = {
query,
tableDefinitions: { [tableNameTemp]: externalDataConfig },
};
// Run the query as a job
try {
const [job] = await bigquery.createQueryJob(options);
await job.getQueryResults();
} catch (e) {
console.log(e);
}
}
//実行
loadFromGcsToBigQuery();
まず、GCSの外部テーブル名やインサート先のテーブル名、インサート先テーブルの各カラム名を定義します。const tableName = "test.gcs_test";
const tableNameTemp = "{GCSの外部テーブル名}";
const projectId = "{プロジェクトID}";
const schema = [
{ name: "id", type: "STRING" },
{ name: "name", type: "STRING" },
{ name: "age", type: "INTEGER" },
{ name: "created_at", type: "DATETIME" },
];
実際の処理はloadFromGcsToBigQuery関数で行っています。
ここで、GCSの初期設定を以下のように行います。const externalDataConfig: any = {
sourceFormat: "CSV",
sourceUris: ["gs://{csvを配置したGCS上のパス}"],
csvOptions: { skipLeadingRows: 1 },
schema: {
fields: schema,
},
};
この設定には、GCSのパスやCSVファイルの最初の行をスキップする指定が含まれています。
次に、以下のようにクエリを作成します。
//GCSにあるCSVからロード
const query = `insert into \`${tableName}\`(id, name, age, created_at)
select
id,
name,
age,
created_at
from ${tableNameTemp};`;
const options: Query = {
query,
tableDefinitions: { [tableNameTemp]: externalDataConfig },
};
// Run the query as a job
try {
const [job] = await bigquery.createQueryJob(options);
await job.getQueryResults();
} catch (e) {
console.log(e);
}
クエリにはGCSの外部テーブル名としてtableNameTempを指定し、GCS内のCSVを一時的な外部テーブルとして扱います。
最終的に、createQueryJobで上記のクエリを実行し、指定したテーブルにデータをインサートします。
実行結果が下記になります。
CSVで定義したレコードがインサートされていることがわかりました。
プログラムのquery変数を下記のように変更してみます。const query = `insert into \`${tableName}\`(id, name, age, created_at)
select
id,
name,
if(created_at = '2023-01-02', age - 1, age),
created_at
from ${tableNameTemp};`;
created_atが"2023-01-02"の場合ageの値を-1するように修正しました。
このように外部テーブルとして利用できるためSQLのなかで特定のレコードのみ値を編集することが可能となります。
赤枠が該当レコードです。
created_atが2023-01-02のレコードに対してageが-1された結果がインサートされました!
Google Cloud、Google Workspaceに関する お問い合わせはこちら
XIMIX(サイミクス)は商標登録出願中です