タスクの値を使用してタスク間で情報を渡す

October 04, 2024

タスク値は、Databricksジョブ内のタスク間で任意の値を渡すことができる Databricks ユーティリティ taskValues サブユーティリティを参照します。taskValues サブユーティリティ (dbutils.ジョブ.taskValues)を参照してください。

1 つのタスクで dbutils.jobs.taskValues.set() を使用してキーと値のペアを指定し、タスク名とキーを使用して後続のタスクで値を参照できます。

注:

サブユーティリティ dbutils.jobs.taskValues.set()dbutils.jobs.taskValues.get() は、Python ノートブックでのみ使用できます。 タスク値は、パラメーターをサポートするすべてのタスクの動的値参照を使用して参照できます。 参照タスク値を参照してください。

タスクの値を設定する

Python ノートブックでタスク値を設定するには、サブユーティリティ dbutils.jobs.taskValues.set()を使用します。

タスク値のキーは文字列である必要があります。 ノートブックに複数のタスク値が定義されている場合は、各キーが一意である必要があります。

タスクの値をキーに手動またはプログラムで割り当てることができます。 有効な JSON として表現できる値のみが許可されます。 値の JSON 表現のサイズは 48 KiB を超えることはできません。

たとえば、次の例では、キー の静的文字列を設定します fave_food

Python
dbutils.jobs.taskValues.set(key = "fave_food", value = "beans")

次の例では、ノートブック タスク パラメーターを使用して、特定の注文番号のすべてのレコードを照会し、現在の注文状態とレコードの合計数を返します。

Python
from pyspark.sql.functions import col

order_num = dbutils.widgets.get("order_num")

query = (spark.read.table("orders")
  .orderBy(col("updated"), ascending=False)
  .select(col("order_status"))
  .where(col("order_num") == order_num)
)

dbutils.jobs.taskValues.set(key = "record_count", value = query.count())
dbutils.jobs.taskValues.set(key = "order_status", value = query.take(1)[0][0])

このパターンを使用して値のリストを渡し、それらを使用して各タスクなどのダウンストリーム ロジックを調整できます。 「 パラメーター化された Databricks ジョブ タスクをループで実行する」を参照してください。

次の例では、製品 ID の個別の値を Python リストに抽出し、これをタスク値として設定します。

Python
prod_list = list(spark.read.table("products").select("prod_id").distinct().toPandas()["prod_id"])

dbutils.jobs.taskValues.set(key = "prod_list", value = prod_list)

参照タスクの値

Databricks では、動的値参照パターン {{tasks.<task_name>.values.<value_name>}}を使用して構成されたタスク パラメーターとしてタスク値を参照することをお勧めします。

たとえば、product_inventoryという名前のタスクのキー prod_list を持つタスク値を参照するには、構文 {{tasks.product_inventory.values.prod_list}}を使用します。

「タスク パラメーターの構成」および「動的値参照とは」を参照してください。

dbutils.jobs.taskValues.get を使用する

構文 dbutils.jobs.taskValues.get() では、アップストリーム タスク名を指定する必要があります。 この構文は、複数のダウンストリーム タスクでタスク値を使用できるため、タスク名が変更された場合に多数の更新が必要になるため、お勧めしません。

この構文を使用して、オプションで default 値と debugValueを指定できます。 キーが見つからない場合は、デフォルト値が使用されます。 この debugValue では、ノートブックをタスクとしてスケジュールする前に、ノートブックでの手動コード開発およびテスト中に使用する静的な値を設定できます。

次の例では、タスク名 に設定されたキー order_status の値を取得します order_lookup。 値 Delivered は、ノートブックを対話形式で実行している場合にのみ返されます。

Python
order_status = dbutils.jobs.taskValues.get(taskKey = "order_lookup", key = "order_status", debugValue = "Delivered")

注:

Databricks では、キーの欠落やタスクの名前の誤りによる予期されるエラーメッセージのトラブルシューティングや防止が困難な場合があるため、デフォルト値の設定はお勧めしません。

タスク値の表示

各実行のタスク値の戻り値は、タスク実行の詳細[出力] パネルに表示されます。「タスク実行履歴の表示」を参照してください。