Databricks ジョブでの JAR の使用
Java アーカイブ ( JAR) ファイル形式は、一般的な ZIP ファイル形式に基づいており、多くの Java または Scala ファイルを 1 つに集約するために使用されます。 JAR タスクを使用すると、Databricks ジョブに Java または Scala コードを迅速かつ確実にインストールできます。 この記事では、JAR と、JAR にパッケージ化されたアプリケーションを実行するジョブを作成する例を示します。 この例では、次のことを行います。
サンプル アプリケーションを定義する JAR プロジェクトを作成します。
サンプル ファイルを JAR にバンドルします。
JAR を実行するジョブを作成します。
ジョブを実行し、結果を表示する。
ステップ 1: サンプルのためのローカルディレクトリを作成する
サンプルコードと生成されたアーティファクトを保持するローカルディレクトリを作成します (例: . databricks_jar_test
.
ステップ 2: JARを作成する
Java または Scala を使用して JAR を作成するには、次の手順に従います。
Java JARを作成する
databricks_jar_test
フォルダから、次の内容のPrintArgs.java
という名前のファイルを作成します。import java.util.Arrays; public class PrintArgs { public static void main(String[] args) { System.out.println(Arrays.toString(args)); } }
PrintArgs.java
ファイルをコンパイルすると、次のファイルが作成されますPrintArgs.class
javac PrintArgs.java
(オプション) コンパイルされたプログラムを実行します。
java PrintArgs Hello World! # [Hello, World!]
PrintArgs.java
ファイルやPrintArgs.class
ファイルと同じフォルダに、META-INF
という名前のフォルダを作成します。META-INF
フォルダーに、次の内容でMANIFEST.MF
という名前のファイルを作成します。このファイルの最後に改行を追加してください。Main-Class: PrintArgs
databricks_jar_test
フォルダーのルートから、PrintArgs.jar
という名前の JAR を作成します。jar cvfm PrintArgs.jar META-INF/MANIFEST.MF *.class
(オプション) テストするには、
databricks_jar_test
フォルダーのルートから JAR を実行します。java -jar PrintArgs.jar Hello World! # [Hello, World!]
注:
エラー
no main manifest attribute, in PrintArgs.jar
が発生した場合は、必ずMANIFEST.MF
ファイルの末尾に改行を追加してから、JAR を再度作成して実行してください。PrintArgs.jar
ボリュームにアップロードします。「Unity Catalog ボリュームへのファイルのアップロード」を参照してください。
Scala JARを作成する
databricks_jar_test
フォルダから、次の内容を含むbuild.sbt
という名前の空のファイルを作成します。ThisBuild / scalaVersion := "2.12.14" ThisBuild / organization := "com.example" lazy val PrintArgs = (project in file(".")) .settings( name := "PrintArgs" )
databricks_jar_test
フォルダから、フォルダ構造src/main/scala/example
を作成します。example
フォルダーに、次の内容でPrintArgs.scala
という名前のファイルを作成します。package example object PrintArgs { def main(args: Array[String]): Unit = { println(args.mkString(", ")) } }
プログラムをコンパイルします。
sbt compile
(オプション) コンパイルされたプログラムを実行します。
sbt "run Hello World\!" # Hello, World!
databricks_jar_test/project
フォルダーに、次の内容でassembly.sbt
という名前のファイルを作成します。addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.0.0")
databricks_jar_test
フォルダのルートからassembly
コマンドを実行すると、target
フォルダの下に JAR が生成されます。sbt assembly
(オプション) テストするには、
databricks_jar_test
フォルダーのルートから JAR を実行します。java -jar target/scala-2.12/PrintArgs-assembly-0.1.0-SNAPSHOT.jar Hello World! # Hello, World!
PrintArgs-assembly-0.1.0-SNAPSHOT.jar
ボリュームにアップロードします。「Unity Catalog ボリュームへのファイルのアップロード」を参照してください。
ステップ3.JAR を実行する Databricks ジョブを作成する
Databricksページに移動し、次のいずれかを実行します。
サイドバーで、ワークフローとクリック。
サイドバーで、新規をクリックし、メニューからジョブを選択します。
[タスク] タブに表示される [タスク] ダイアログ ボックスで、 [ジョブの名前を追加]をジョブ名 (例:
JAR example
に置き換えます。[タスク名]にタスクの名前を入力します。たとえば、 Javaの場合は
java_jar_task
、 Scalaの場合はscala_jar_task
です。タイプにはJARを選択します。
この例では、メイン クラスに、Java の場合は
PrintArgs
、Scala の場合はexample.PrintArgs
を入力します。クラスターについては、互換性のあるクラスターを選択します。 Java および Scala ライブラリのサポートを参照してください。
依存ライブラリの場合は、 「+ 追加」をクリックします。
[依存ライブラリの追加]ダイアログで、 [ボリューム]を選択した状態で、前のステップでJAR (
PrintArgs.jar
またはPrintArgs-assembly-0.1.0-SNAPSHOT.jar
) をアップロードした場所をVolumes File Pathに入力するか、フィルターまたは参照してJAR見つけます。 それを選択します。[追加] をクリックします。
この例では、 「否」に
["Hello", "World!"]
と入力します。[追加] をクリックします。
ステップ 4: ジョブを実行し、ジョブ実行の詳細を表示する
クリックして ワークフローを実行します。 実行の詳細 を表示するには、「 トリガーされた実行 」ポップアップで「 実行の表示 」をクリックするか、 ジョブ実行 ビューで実行の「 開始時刻 」列のリンクをクリックします。
実行が完了すると、タスクに渡された引数を含む出力が出力パネルに表示されます。
JAR ジョブの出力サイズ制限
stdout に出力されるログ出力などのジョブ出力には、20 MB のサイズ制限が適用されます。 出力の合計サイズが大きい場合、実行はキャンセルされ、失敗としてマークされます。
この制限を回避するには、 spark.databricks.driver.disableScalaOutput
Spark 構成をtrue
に設定して、stdout がドライバーから Databricks に返されないようにすることができます。 デフォルトでは、フラグの値はfalse
です。 このフラグは、Scala JAR ジョブと Scala ノートブックのセル出力を制御します。 フラグが有効になっている場合、Spark はジョブ実行結果をクライアントに返しません。 このフラグは、クラスターのログ ファイルに書き込まれるデータには影響しません。 Databricks 、ノートブックの結果が無効になるため、このフラグをJARジョブのジョブクラスターに対してのみ設定することをお勧めします。
推奨事項: ジョブのクリーンアップにtry-finally
ブロックを使用する
次の 2 つの部分で構成される JAR を考えてみましょう。
jobBody()
ジョブの主要部分が含まれます。jobCleanup()
これは、その関数が成功したか例外を返したかに関係なく、jobBody()
後に実行する必要があります。
たとえば、 jobBody()
はテーブルを作成し、 jobCleanup()
はそれらのテーブルを削除します。
clean-up メソッドが呼び出されるようにする安全な方法は、コードに try-finally
ブロックを配置することです。
try {
jobBody()
} finally {
jobCleanup()
}
sys.addShutdownHook(jobCleanup)
または次のコードを使用してクリーンアップを試みないでください。
val cleanupThread = new Thread { override def run = jobCleanup() }
Runtime.getRuntime.addShutdownHook(cleanupThread)
Databricks で Spark コンテナーの有効期間が管理される方法のため、シャットダウン フックは確実に実行されません。
JAR ジョブ・パラメーターの構成
文字列配列を使用して、問題を JARジョブに渡します。JSONJobs API の「新しいジョブの作成」操作 ( POST /jobs/create
) に渡されるリクエスト本文のspark_jar_task
オブジェクトを参照してください。 これらの問題にアクセスするには、 main
関数に渡されたString
配列を検査します。
ライブラリの依存関係を管理する
Spark ドライバーには、オーバーライドできない特定のライブラリ依存関係があります。 ジョブが競合するライブラリを追加した場合、 Sparkドライバーのライブラリの依存関係が優先されます。
ドライバー ライブラリの依存関係の完全なリストを取得するには、同じ Spark バージョン (または調査するドライバーを含むクラスター) で構成されたクラスターに接続されたノートブックで次のコマンドを実行します。
%sh
ls /databricks/jars
JAR のライブラリ依存関係を定義する場合、Databricks では Spark と Hadoop をprovided
依存関係としてリストすることをお勧めします。 Maven で、提供された依存関係として Spark と Hadoop を追加します。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
<scope>provided</scope>
</dependency>
sbt
で、提供された依存関係として Spark と Hadoop を追加します。
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0" % "provided"
libraryDependencies += "org.apache.hadoop" %% "hadoop-core" % "1.2.1" % "provided"
ヒント
実行しているバージョンに基づいて、依存関係の正しい Scala バージョンを指定します。
次のステップ
Databricks ジョブの作成と実行の詳細については、「 ワークフローのスケジュールと調整」を参照してください。