2023年2月16日

【Google Cloud】Workflowsをイベントトリガーで動かしてみた(Eventarc API)


Content

みなさま、こんにちは。Y.Yです。

今回はGoogle CloudのWorkflowsに関する記事です。
Eventarc APIを用いたWorkflowsのイベントトリガー機能を試してみたいと思います!
Workflowsを使用している方、イベントトリガー機能の使用を考えている方の参考になれば幸いです。

※Workflowsの基本的な内容や処理の紹介は、
【Google Cloud】Workflows使ってみた~Part1~
【Google Cloud】Workflows使ってみた~Part2~
を是非ご覧ください!

Cloud Storage オブジェクトのファイナライズトリガー

まずは、Cloud Storageのバケットへのオブジェクトのファイナライズ(作成/上書き)をトリガーに起動する処理を試してみます!

ファイルがアップロードされた際に、アップロードファイル内のデータをBigQueryに取り込む処理を作ります。

設定

ワークフローの作成画面でワークフロー名などの必要な情報を入力し、「新しいトリガーの追加」でEventarcを選択します。

Eventarc APIがまだ有効になっていない場合は下の画面が表示されます。
APIを有効にして設定に進みます。

①トリガー名を入力し、トリガーのタイプは「自社」、イベントプロバイダは「Cloud Storage」を指定します。
②イベントには、「google.cloud.storage.object.v1.finalized」を指定し、トリガー対象のバケットを指定します。
③設定が完了したら、「トリガーを保存」をクリックします。

※ファイナライズ以外にも様々なイベントをトリガーに指定できます。

ワークフローのコードには以下の内容を入力します。

main:
  params: [event]
  steps:
    - イベント発生ログ:
        call: sys.log
        args:
            text: '${"ファイルがアップロードされました。ファイル名:" + event.data.name}'
            severity: INFO
    - ロード処理:
        try:
            steps:
                - 処理開始ログ:
                    call: sys.log
                    args:
                        text: "ロード処理を実行します。データセット:test_workflow, テーブル:test_tbl"
                        severity: INFO
                - ロード:
                    call: googleapis.bigquery.v2.jobs.insert
                    args:
                        projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                        body:
                            configuration:
                                load:
                                    sourceUris: '${"gs://" + event.bucket + "/" + event.data.name}'
                                    fieldDelimiter: \t
                                    destinationTable:
                                        projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                                        datasetId: test_workflow
                                        tableId: test_tbl
                                    createDisposition: CREATE_NEVER
                                    writeDisposition: WRITE_APPEND
                                    skipLeadingRows: 1
        except:
            as: e
            steps:
                - エラーログ:
                    call: sys.log
                    args:
                        text: "テーブルへのロード処理でエラーが発生しました。"
                        severity: ERROR
                - エラー内容ログ出力:
                    call: sys.log
                    args:
                        text: ${e}
                        severity: ERROR
                - raiseError:
                    raise: "BigQuery Job Error"

ワークフローのランタイム引数(上のコードのevent)には、下記のデータがイベント情報として渡されます。
(バケット名は [バケット名] に変換しています)

{
  "bucket": "[バケット名]",
  "data": {
    "bucket": "[バケット名]",
    "contentType": "text/plain",
    "crc32c": "dPjWtg==",
    "etag": "CLXmv/Dx+vsCEAE=",
    "generation": "1671082084922165",
    "id": "[バケット名]/test_tbl.txt/1671082084922165",
    "kind": "storage#object",
    "md5Hash": "3mEijkySugTVJ2XJkqBxDw==",
    "mediaLink": "https://storage.googleapis.com/download/storage/v1/b/[バケット名]/o/test_tbl.txt?generation=1671082084922165&alt=media",
    "metageneration": "1",
    "name": "test_tbl.txt",
    "selfLink": "https://www.googleapis.com/storage/v1/b/[バケット名]/o/test_tbl.txt",
    "size": "73",
    "storageClass": "STANDARD",
    "timeCreated": "2022-12-15T05:28:04.930Z",
    "timeStorageClassUpdated": "2022-12-15T05:28:04.930Z",
    "updated": "2022-12-15T05:28:04.930Z"
  },
  "datacontenttype": "application/json",
  "id": "6603729032775695",
  "source": "//storage.googleapis.com/projects/_/buckets/[バケット名]",
  "specversion": "1.0",
  "subject": "objects/test_tbl.txt",
  "time": "2022-12-15T05:28:04.930634Z",
  "type": "google.cloud.storage.object.v1.finalized"
}

動作検証

ワークフローを作成し、実際に動かしてみます。

取り込みファイルと取込先のテーブルは以下です。
【取り込みファイル】

【取り込み先テーブル】

バケットにファイルをアップロードします。

作成したワークフローを確認してみると、正常に起動されていることが確認できました。

【ログ】

【取り込み先テーブル】

 

BigQueryのジョブ完了トリガー

次に、BigQueryのジョブ完了をトリガーに起動する処理を作成してみます。

設定・作成

まず、以下のコードのワークフローを作成します。
(今回は先にワークフローを作り、コマンドでトリガーを作ります)

main:
  params: [event]
  steps:
    - イベント情報を変数に設定:
        assign:
          - query: ${event.data.protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.query.query}                            #実行したクエリ
          - jobId: ${event.data.protoPayload.serviceData.jobCompletedEvent.job.jobName.jobId}                                           #ジョブID
          - jobLocation: ${event.data.protoPayload.serviceData.jobCompletedEvent.job.jobName.location}                                  #ジョブロケーション
          - user: ${event.data.protoPayload.authenticationInfo.principalEmail}                                                          #ジョブ実行ユーザ
          - createTime: ${event.data.protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.createTime}                           #ジョブ作成日時
          - startTime: ${event.data.protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.startTime}                             #ジョブ開始日時
          - endTime: ${event.data.protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.endTime}                                 #ジョブ終了日時
          - state: ${event.data.protoPayload.serviceData.jobCompletedEvent.job.jobStatus.state}                                         #ジョブ実行状態
          - errorCode: ${default(map.get(event.data.protoPayload.serviceData.jobCompletedEvent.job.jobStatus.error,"code"), "-")}       #エラーコード(デフォルト値:-)
          - errorMessage: ${default(map.get(event.data.protoPayload.serviceData.jobCompletedEvent.job.jobStatus.error,"message"), "-")} #エラーメッセージ(デフォルト値:-)
    - ジョブ情報ログ:
        call: sys.log
        args:
          text: '${"ジョブID:" + jobId + " ロケーション:" + jobLocation + "実行状態:" + state + " 開始日時:" + startTime + " 終了日時:" + endTime}'
          severity: INFO
    - クエリ:
        call: sys.log
        args:
          text: '${"実行されたクエリ:" + query}'
          severity: INFO
    - エラー確認:
        switch:
          - condition: ${errorMessage == "-"}
            next: end
          - condition: true
            next: エラーログ
    - エラーログ:
        call: sys.log
        args:
          text: '${"ジョブでエラーが発生しました。コード:" + errorCode + " メッセージ:" + errorMessage}'
          severity: ERROR

一部のジョブ情報をログに出力し、エラーが発生していたらエラー情報をログに出力する処理です。
実行時にランタイム引数に渡されるイベント情報は数が多いため省略します。
今回は一部の情報のみ変数に格納しました。

ワークフローを作ったら、トリガーを作成します。
Eventarcトリガーで使用するサービスアカウントに、IAMの「Eventarcイベント受信者」ロールを付与します。

以下のコマンドをCloud Shellターミナルで実行し、ワークフローにトリガーを作成します。

gcloud eventarc triggers create {Eventarcトリガー名] \
  --location={Eventarcトリガーのロケーション} \
  --destination-workflow={トリガー対象ワークフロー名} \
  --destination-workflow-location={トリガー対象ワークフローのロケーション} \
  --event-filters="type=google.cloud.audit.log.v1.written" \
  --event-filters="serviceName=bigquery.googleapis.com" \
  --event-filters="methodName=jobservice.jobcompleted" \
  --service-account={Eventarcトリガーで使用するサービスアカウント}

ワークフローを確認するとトリガーが作成されていることが分かります。

動作確認

トリガーが作成できたため、実際にBigQueryでジョブを実行し、ワークフローを起動させてみます。
まずは、BigQuery で正常なクエリを実行します。

SQL実行後にワークフローを確認すると、正常に起動されており、以下のログが出力されていることを確認できました。

次に、エラー(データ型不正)の起きるクエリを実行してみます。

ワークフローを確認すると、今回も正常に起動されており、以下のログが出力されていました。

まとめ

今回は、Cloud StorageへのファイルのファイナライズとBigQueryへのジョブ実行をトリガーに起動する処理を試してみました。
Cloud Functionsのように、イベント発生時に自動で起動してくれるのはかなり便利ですね。
みなさんも是非、Workflowsのイベントトリガー機能を試してみてくださいね!

ご覧いただきありがとうございました。

当社、システムサポートは、Google Cloudの導入・移行・運営支援を行っています。
お問い合わせは以下よりお願いいたします。


Google Cloud導入についてのお問い合わせはこちら

2023年2月16日 【Google Cloud】Workflowsをイベントトリガーで動かしてみた(Eventarc API)

Category Google Cloud

ご意見・ご相談・料金のお見積もりなど、
お気軽にお問い合わせください。

お問い合わせはこちら