ブログ | XIMIX

GCSにあるCSVをBigQueryにインサートする

作成者: XIMIX 北脇|2023.12.07

 

はじめに

CI部の北脇です。

今回は、Google Cloud Storage(GCS)に保存されているCSVファイルをBigQueryにインサートする方法を紹介します。
この方法は、GCS上に保存されているCSVファイルを、BigQueryで簡単に利用したい場合に便利です。
また、CSV内の一部のカラムのみインサートしたりデータをSQLで加工した後にインサートしたい場合に便利です。

具体的な手順としては、まずCSVファイルをGCS上に配置します。
その後、このCSVファイルをBigQueryの外部テーブルとして読み込み、BigQueryにインサートします。
以下、具体的な手順を解説していきます。

GCSにファイル配置

まずはじめに、BigQuery上でインサート対象となるテーブルを作成します。

次に、Google Cloud Storage(GCS)上に新たなバケットを作成します。
今回は、東京をリージョンとして設定しました。

作成したバケットに、インサート対象となるCSVファイルを配置します。

以上で事前準備は完了です。
次に、GCSからBigQueryへのインサートを行う具体的な実装例を示します。

実装例

実際のプログラムを示します。

import { BigQuery, Query } from "@google-cloud/bigquery";

const tableName = "test.gcs_test";
const tableNameTemp = "{GCSの外部テーブル名}";
const projectId = "{プロジェクトID}";
const schema = [
  { name: "id", type: "STRING" },
  { name: "name", type: "STRING" },
  { name: "age", type: "INTEGER" },
  { name: "created_at", type: "DATETIME" },
];
const bigquery = new BigQuery({ projectId: projectId });

async function loadFromGcsToBigQuery() {
  const externalDataConfig: any = {
    sourceFormat: "CSV",
  sourceUris: ["gs://{csvを配置したGCS上のパス}"],
    csvOptions: { skipLeadingRows: 1 },
    schema: {
      fields: schema,
    },
  };
  //GCSにあるCSVからロード
  const query = `insert into \`${tableName}\`(id, name, age, created_at)
select
id, 
name, 
age, 
created_at
from ${tableNameTemp};`;

  const options: Query = {
    query,
    tableDefinitions: { [tableNameTemp]: externalDataConfig },
  };

  // Run the query as a job
  try {
    const [job] = await bigquery.createQueryJob(options);
    await job.getQueryResults();
  } catch (e) {
    console.log(e);
  }
}

//実行
loadFromGcsToBigQuery();

まず、GCSの外部テーブル名やインサート先のテーブル名、インサート先テーブルの各カラム名を定義します。
const tableName = "test.gcs_test";
const tableNameTemp = "{GCSの外部テーブル名}";
const projectId = "{プロジェクトID}";
const schema = [
  { name: "id", type: "STRING" },
  { name: "name", type: "STRING" },
  { name: "age", type: "INTEGER" },
  { name: "created_at", type: "DATETIME" },
];

実際の処理はloadFromGcsToBigQuery関数で行っています。
ここで、GCSの初期設定を以下のように行います。

const externalDataConfig: any = {
    sourceFormat: "CSV",
  sourceUris: ["gs://{csvを配置したGCS上のパス}"],
    csvOptions: { skipLeadingRows: 1 },
    schema: {
      fields: schema,
    },
};

この設定には、GCSのパスやCSVファイルの最初の行をスキップする指定が含まれています。
次に、以下のようにクエリを作成します。

//GCSにあるCSVからロード
  const query = `insert into \`${tableName}\`(id, name, age, created_at)
select
id, 
name, 
age, 
created_at
from ${tableNameTemp};`;

  const options: Query = {
    query,
    tableDefinitions: { [tableNameTemp]: externalDataConfig },
  };

  // Run the query as a job
  try {
    const [job] = await bigquery.createQueryJob(options);
    await job.getQueryResults();
  } catch (e) {
    console.log(e);
  }

クエリにはGCSの外部テーブル名としてtableNameTempを指定し、GCS内のCSVを一時的な外部テーブルとして扱います。
最終的に、createQueryJobで上記のクエリを実行し、指定したテーブルにデータをインサートします。

実行結果が下記になります。

CSVで定義したレコードがインサートされていることがわかりました。

特定のレコードのみ値を編集する

プログラムのquery変数を下記のように変更してみます。
const query = `insert into \`${tableName}\`(id, name, age, created_at)
select
id, 
name, 
if(created_at = '2023-01-02', age - 1, age), 
created_at
from ${tableNameTemp};`;

created_atが"2023-01-02"の場合ageの値を-1するように修正しました。
このように外部テーブルとして利用できるためSQLのなかで特定のレコードのみ値を編集することが可能となります。

赤枠が該当レコードです。
created_atが2023-01-02のレコードに対してageが-1された結果がインサートされました!

Google Cloud、Google Workspace に関するご相談はXIMIXへ!

Google Cloud、Google Workspaceに関する お問い合わせはこちら
XIMIX(サイミクス)は商標登録出願中です