Databricksノートブックを別のノートブックから実行する

重要

ノートブックのオーケストレーションには、Databricks ジョブを使用します。 コード モジュール化のシナリオでは、ワークスペース ファイルを使用します。 この記事で説明する手法は、動的なパラメーター セットでノートブックをループする場合や、ワークスペース ファイルにアクセスできない場合など、Databricks ジョブを使用してユース ケースを実装できない場合にのみ使用してください。 詳細については、 Databricksジョブ共有コード」を参照してください。

%rundbutils.notebook.run()の比較

%run コマンドを使用すると、ノートブック内に別のノートブックを含めることができます。%run を使用して、たとえば、サポート関数を別のノートブックに配置することで、コードをモジュール化できます。また、これを使用して、分析のステップを実装するノートブックを連結することもできます。 %runを使用すると、呼び出されたノートブックがすぐに実行され、そこで定義されている関数と変数が呼び出し元のノートブックで使用できるようになります。

dbutils.notebookAPI は、パラメーターをノートブックに渡したり、ノートブックから値を取得したりできるため、%run の補完になります。これにより、依存関係を持つ複雑なワークフローやパイプラインを構築できます。たとえば、ディレクトリ内のファイルの一覧を取得し、その名前を別のノートブックに渡すことができます。これは%runでは不可能なことです。また、戻り値に基づく条件付きワークフローを作成したり、相対パスを使用して他のノートブックを呼び出したりすることもできます。

%runとは異なり、dbutils.notebook.run()メソッドでは、新しいジョブを開始してノートブックを実行します。

これらのメソッドは、すべてのdbutilsAPIと同様、PythonとScalaでのみ使用できます。ただし、dbutils.notebook.run()を使用してRノートブックを呼び出すことは可能です。

%runを使ってノートブックをインポートする

この例では、最初のノートブックは関数reverseを定義しています。この関数は、%runマジックを使用してshared-code-notebookを実行した後に2番目のノートブックで使用できます。

共有コードノートブック
ノートブックのインポート例

これらのノートブックは両方ともワークスペース内の同じディレクトリにあるため、./shared-code-notebookのプレフィックスである./を使用して、現在実行中のノートブックを基準にしてパスを解決する必要があることを示せます。ノートブックは、%run ./dir/notebookなどのディレクトリで整理することも、%run /Users/username@organization.com/directory/notebookなどの絶対パスを使用することもできます。

dbutils.notebook API

dbutils.notebookAPIで使用できるメソッドは、runexitです。パラメーターと戻り値はどちらも文字列でなければなりません。

run(path: String,  timeout_seconds: int, arguments: Map): String

ノートブックを実行し、その終了値を返します。このメソッドは、すぐに実行される一時的なジョブを開始するものです。

timeout_secondsパラメーターは、実行のタイムアウトを制御します(0はタイムアウトなしを意味します)。runの呼び出しは、指定された時間内に終了しない場合の例外をスローします。Databricksが10分以上停止した場合、timeout_secondsに関係なくノートブックの実行は失敗となります。

argumentsパラメーターは、ターゲットノートブックのウィジェット値を設定します。具体的には、実行中のノートブックにAというウィジェットがあり、run()呼び出しの引数パラメーターの一部として("A": "B")というキーと値のペアを渡した場合、ウィジェットAの値を取得すると"B"が返されます。ウィジェットの作成と操作については、「Databricksのウィジェット」を参照してください。

  • argumentsパラメーターは、ラテン文字(ASCII文字セット)のみを受け入れます。非ASCII文字を使用すると、エラーが返されます。

  • dbutils.notebookAPIを使用して作成されたジョブは、30日以内に完了する必要があります。

run の使用

dbutils.notebook.run("notebook-name", 60, {"argument": "data", "argument2": "data2", ...})
dbutils.notebook.run("notebook-name", 60, Map("argument" -> "data", "argument2" -> "data2", ...))

run の例

以下のように、workflowsという名前のノートブックに、ウィジェットの値を出力する fooという名前のウィジェットがあるとします。

dbutils.widgets.text("foo", "fooDefault", "fooEmptyLabel")
print(dbutils.widgets.get("foo"))

dbutils.notebook.run("workflows", 60, {"foo": "bar"})を実行すると、次の結果が生成されます。

ウィジェット付きのノートブック

ウィジェットには、デフォルトではなく、dbutils.notebook.run(), "bar"を使って渡された値があります。

exit(value: String): void 値を持つノートブックを終了します。runメソッドを使用してノートブックを呼び出す場合、これが返される値です。

dbutils.notebook.exit("returnValue")

ジョブでdbutils.notebook.exitを呼び出すと、ノートブックは正常に完了します。ジョブを失敗させる場合は、例外をスローします。

次の例では、引数をDataImportNotebookに渡し、DataImportNotebookからの結果に基づいて別のノートブック(DataCleaningNotebookまたはErrorHandlingNotebook)を実行します。

if-elseの例

コードが実行されると、実行中のノートブックへのリンクを含むテーブルが表示されます。

実行中のノートブックへのリンク

実行の詳細を表示するには、表内の開始時刻リンクをクリックします。 実行が完了したら、 「終了時間」リンクをクリックして実行の詳細を表示することもできます。

一次的ノートブック実行の結果

構造化データを渡す

このセクションでは、ノートブック間で構造化データを渡す方法を説明します。

# Example 1 - returning data through temporary views.
# You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
# return a name referencing data stored in a temporary view.

## In callee notebook
spark.range(5).toDF("value").createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))

# Example 2 - returning data through DBFS.
# For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

## In callee notebook
dbutils.fs.rm("/tmp/results/my_data", recurse=True)
spark.range(5).toDF("value").write.format("parquet").save("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(spark.read.format("parquet").load(returned_table))

# Example 3 - returning JSON data.
# To return multiple values, you can use standard JSON libraries to serialize and deserialize results.

## In callee notebook
import json
dbutils.notebook.exit(json.dumps({
  "status": "OK",
  "table": "my_data"
}))

## In caller notebook
import json

result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
print(json.loads(result))
// Example 1 - returning data through temporary views.
// You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
// return a name referencing data stored in a temporary view.

/** In callee notebook */
sc.parallelize(1 to 5).toDF().createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
val global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))

// Example 2 - returning data through DBFS.
// For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

/** In callee notebook */
dbutils.fs.rm("/tmp/results/my_data", recurse=true)
sc.parallelize(1 to 5).toDF().write.format("parquet").save("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(sqlContext.read.format("parquet").load(returned_table))

// Example 3 - returning JSON data.
// To return multiple values, you can use standard JSON libraries to serialize and deserialize results.

/** In callee notebook */

// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper

// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)

// Exit with json
dbutils.notebook.exit(jsonMapper.writeValueAsString(Map("status" -> "OK", "table" -> "my_data")))

/** In caller notebook */

// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper

// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)

val result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
println(jsonMapper.readValue[Map[String, String]](result))

エラーに対応する

このセクションでは、エラーに対応する方法について説明します。

# Errors throw a WorkflowException.

def run_with_retry(notebook, timeout, args = {}, max_retries = 3):
  num_retries = 0
  while True:
    try:
      return dbutils.notebook.run(notebook, timeout, args)
    except Exception as e:
      if num_retries > max_retries:
        raise e
      else:
        print("Retrying error", e)
        num_retries += 1

run_with_retry("LOCATION_OF_CALLEE_NOTEBOOK", 60, max_retries = 5)
// Errors throw a WorkflowException.

import com.databricks.WorkflowException

// Since dbutils.notebook.run() is just a function call, you can retry failures using standard Scala try-catch
// control flow. Here we show an example of retrying a notebook a number of times.
def runRetry(notebook: String, timeout: Int, args: Map[String, String] = Map.empty, maxTries: Int = 3): String = {
  var numTries = 0
  while (true) {
    try {
      return dbutils.notebook.run(notebook, timeout, args)
    } catch {
      case e: WorkflowException if numTries < maxTries =>
        println("Error, retrying: " + e)
    }
    numTries += 1
  }
  "" // not reached
}

runRetry("LOCATION_OF_CALLEE_NOTEBOOK", timeout = 60, maxTries = 5)

複数のノートブックを同時に実行する

スレッド(ScalaPython)やフューチャー(ScalaPython)などの標準のScalaやPythonコンストラクトを使用することで、複数のノートブックを同時に実行できます。サンプルノートブックでは、これらの構成要素の使用方法を示します。

  1. 次の4つのノートブックをダウンロードします。これらはScalaで記述されています。

  2. ノートブックをワークスペース内の1つのフォルダーにインポートします。

  3. ノートブックの[同時実行]を実行します。

ノートブックを同時実行する

ノートブックを新しいタブで開く

並列ノートブックで実行する

ノートブックを新しいタブで開く

ノートブックのテスト

ノートブックを新しいタブで開く

ノートブックのテスト-2

ノートブックを新しいタブで開く