2023年1月6日

【Google Cloud】Workflows使ってみた~Part2~


Content

みなさま、こんにちは。Y.Yです。
今回は、Google Cloud の Workflows に関する記事のPart2です!

今回も様々な処理を試していきます。
Workflows をこれから使ってみたい方や、最近使い始めた方の参考になれば幸いです。
(Part1では、Workflows に関する基本的な内容や BigQuery との連携について記載しました。
まだご覧になっていない方は、こちらも是非ご覧ください!)

Cloud Storageとの連携

ここでは、Cloud Storageに格納したオブジェクトへの処理を試していきます。

最初に、変数に値を設定します。

main:
    params: [input]
    steps:
    - 設定値割当:
        assign:
            - sourceBucket: バケット名を入力
            - destinationBucket: ${sourceBucket}
            - sourceObject: sample-file.txt
            - destinationObject: ${"copied-" + sourceObject}

(1)ファイルコピー

バケットに格納したファイルを、指定した先にコピーします。
(今回は同じバケットにファイル名を変更してコピーします。)

    - copy(api):
        call: googleapis.storage.v1.objects.copy
        args:
            destinationBucket: ${destinationBucket}  #コピー先バケット
            destinationObject: ${destinationObject}  #コピー先オブジェクト
            sourceBucket: ${sourceBucket}            #コピー元バケット
            sourceObject: ${sourceObject}            #コピー元オブジェクト
            body:

argsの中身は、コピー先のバケット名とオブジェクト名、コピー元のバケット名とオブジェクト名を指定しています。
今回は、最初のステップで設定した変数をそれぞれ指定しました。
※上のコードでは必要最小限のフィールドのみ指定しています。
 その他の設定フィールドについては、公式ドキュメントをご確認ください。(以降の処理も同様)

ワークフローを実行してみると、正常にコピーオブジェクトが作成されました。
【実行前】

【実行後】

(2)ファイル削除

上でコピーしたファイルを削除します。

    - delete(api):
        call: googleapis.storage.v1.objects.delete
        args:
            bucket: ${destinationBucket}
            object: ${destinationObject}

argsでは、削除対象のオブジェクトのバケット名とオブジェクト名を指定します。

ワークフローを実行すると、先ほどコピーされたオブジェクトが削除されました。

(3)ファイル内のテキストを取得

Cloud Storageに格納した下記のファイルからテキストを取得し、ログに出力します。

実行するには、下記のコードを記述します。

    - get(API):
        call: googleapis.storage.v1.objects.get
        args:
            bucket: ${sourceBucket}
            object: ${sourceObject}
            query:
              alt: media
        result: getResultApi
    - ログ出力:
        call: sys.log
        args:
            text: ${getResultApi}
            severity: INFO

argsには、対象オブジェクトの格納バケット名とオブジェクト名を指定します。
オブジェクトデータを取得するには、queryに「alt: media」と指定します。
(オブジェクトメタデータを取得する場合は、「alt: json」と指定します。)
resultには、戻り値を格納する変数を定義します。

実行すると、以下のようにログに出力されます。

(4)おまけ

(1)~(3)の処理を、下記のように HTTPリクエストを送信する形で書き直すこともできます。

main:
    steps:
    - 設定値割当:
        assign:
            - sourceBucket: バケット名を入力                    #コピー元バケット
            - destinationBucket: ${sourceBucket}              #コピー先バケット
            - sourceObject: sample-file.txt                   #コピー元オブジェクト
            - destinationObject: ${"copied-" + sourceObject}  #コピー先オブジェクト
    - copy(http):
        call: http.post
        args:
            url: '${"https://storage.googleapis.com/storage/v1/b/"+sourceBucket+"/o/"+sourceObject+"/copyTo/b/"+destinationBucket+"/o/"+destinationObject}'
            auth:
              type: OAuth2
    - delete(http):
        call: http.delete
        args:
            url: '${"https://storage.googleapis.com/storage/v1/b/"+destinationBucket+"/o/"+destinationObject}'
            auth:
              type: OAuth2
    - get(HTTP):
        call: http.get
        args:
          url: '${"https://storage.googleapis.com/download/storage/v1/b/"+sourceBucket+"/o/"+sourceObject}'
          auth:
            type: OAuth2
          query:
            alt: media
        result: getResultHttp
    - ログ出力:
        call: sys.log
        args:
            text: ${getResultHttp["body"]}
            severity: INFO

別のワークフローを起動する

次に、ワークフローから別のワークフローを呼び出す処理を試してみます。

まず、呼び出す側のワークフローを書きます。

main:
    steps:
    - 設定値割当:
        assign:
            # Project
            - projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
            # Workflow
            - workflowLocation: asia-northeast1    #起動するワークフローのロケーション
            - workflowName: triggered-workflow     #起動するワークフロー名
    - Workflow起動ログ:
        call: sys.log
        args:
          text: '${"ワークフローを起動します。フロー名:" + workflowName}'
          severity: INFO
    - Workflow起動処理:
        call: http.post
        args:
            url: ${"https://workflowexecutions.googleapis.com/v1/projects/" + projectId + "/locations/" + workflowLocation + "/workflows/" + workflowName + "/executions"}
            body:
                argument: "{\"parameter1\": \"trigger-test1\",\"parameter2\": \"trigger-test2\"}"
            auth:
                type: OAuth2

bodyのargumentに、引数として parameter1 と parameter2 を設定しています。
これらの値は、呼び出される側のワークフローでランタイム引数に格納されます。

次に、呼び出される側のワークフローを書きます。

main:
    params: [input]
    steps:
    - トリガー元からの引数をログに出力:
        call: sys.log
        args:
          text: '${"引数1:" + input.parameter1 + ", 引数2:" + input.parameter2}'
          severity: INFO

ランタイム引数に格納された値をログに出力する処理です。

呼び出し元のワークフローを実行してみます。

呼び出し先のワークフローを確認してみると、起動されていることを確認できました。

Cloud Funtionsの関数を呼び出す

最後に、Cloud Functionsの関数を呼び出す処理を試してみます。
関数に引数を渡し、戻り値を受け取ります。

まず、以下の処理(Pythonコード)をCloud Functionsにデプロイします。
2つの引数のうち、大きい方の値とメッセージを返すだけのシンプルな処理です。

from flask import jsonify

def main(request):
    param_dict = request.get_json()
    parameter1 = param_dict["parameter1"]
    parameter2 = param_dict["parameter2"]

    if parameter1 > parameter2:
      bigger_value = parameter1
    else:
      bigger_value = parameter2

    msg = "関数の呼び出しに成功しました。"
    return jsonify({'message': msg, 'biggerValue': bigger_value}) 

次に、下記のコードのワークフローを作成します。

main:
    params: [input]
    steps:
    - 設定値割当:
        assign:
            # Project
            - projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
            # Cloud Functions
            - functionName: workflow-test
            - functionLocation: asia-northeast1
            - value1: ${input.value1}
            - value2: ${input.value2}
    - function実行:
        call: http.post
        args:
            url: ${"https://" + functionLocation + "-" + projectId + ".cloudfunctions.net/" + functionName}
            body:
                parameter1: ${value1}
                parameter2: ${value2}
            auth:
                type: OIDC
        result: functionResult
    - ログ出力(メッセージ):
        call: sys.log
        args:
          text: ${functionResult.body.message}
          severity: INFO
    - ログ出力(大きい値):
        call: sys.log
        args:
          text: ${functionResult.body.biggerValue}
          severity: INFO

ランタイム引数に設定する2つの値をCloud Functions関数に渡し、戻り値をログに出力する処理です。
httpリクエストのbodyで、Cloud Functions関数に渡す引数(parameter1, parameter2)を設定しています。

ランタイム引数を設定して、ワークフローを実行してみます。

Cloud Functions関数の呼び出しに成功し、受け取った値(メッセージと大きい方の値)がログに出力されました。

まとめ

今回は、Cloud Storage内のオブジェクトへの処理、他のワークフローの呼び出し、Cloud Functions関数の呼び出しについて書いてみました!
本記事で紹介した内容以外にも、様々な処理を呼び出すことができます。
是非みなさんもWorkflowsを使ってみてくださいね!

当社、システムサポートは、Google Cloudの導入・移行・運営支援を行っています。
Google Cloud に関してのご用命の際は「クラウド導入支援サービス for Google Cloud」へご連絡ください。

2023年1月6日 【Google Cloud】Workflows使ってみた~Part2~

Category Google Cloud

ご意見・ご相談・料金のお見積もりなど、
お気軽にお問い合わせください。

お問い合わせはこちら