Databricks ジョブでの JAR の使用

Javaアーカイブまたは JAR ファイル形式は、一般的なZIPファイル形式に基づいており、多くのJavaまたはScalaファイルを1つに集約するために使用されます。 JAR タスクを使用すると、Databricks ジョブに Java または Scala コードを迅速かつ確実にインストールできます。 この記事では、JAR と、JAR にパッケージ化されたアプリケーションを実行するジョブを作成する例を示します。 この例では、次のことを行います。

  • サンプル・アプリケーションを定義する JAR プロジェクトを作成します。

  • サンプル ファイルを JAR にバンドルします。

  • JAR を実行するジョブを作成します。

  • ジョブを実行し、結果を表示します。

始める前に

この例を完了するには、次のものが必要です。

  • Java JAR の場合は、Java Development Kit (JDK) を参照してください。

  • Scala JAR の場合は、JDK と sbt.

ステップ 1: サンプルのためのローカルディレクトリを作成する

コード例と生成された成果物を保持するローカル ディレクトリを作成します (例: databricks_jar_test)。

ステップ 2: JARを作成する

Java または Scala を使用して JAR を作成するには、次の手順を実行します。

Java JARを作成する

  1. databricks_jar_test フォルダーから、次の内容の PrintArgs.java という名前のファイルを作成します。

    import java.util.Arrays;
    
    public class PrintArgs {
      public static void main(String[] args) {
        System.out.println(Arrays.toString(args));
      }
    }
    
  2. PrintArgs.java ファイルをコンパイルすると、次のファイル PrintArgs.classが作成されます。

    javac PrintArgs.java
    
  3. (オプション)コンパイルされたプログラムを実行します。

    java PrintArgs Hello World!
    
    # [Hello, World!]
    
  4. PrintArgs.java ファイルおよび PrintArgs.class ファイルと同じフォルダーに、 META-INFという名前のフォルダーを作成します。

  5. META-INF フォルダーに、次の内容の MANIFEST.MF という名前のファイルを作成します。このファイルの最後に改行を追加してください。

    Main-Class: PrintArgs
    
  6. databricks_jar_test フォルダのルートから、 PrintArgs.jarという名前の JAR を作成します。

    jar cvfm PrintArgs.jar META-INF/MANIFEST.MF *.class
    
  7. (オプション) テストするには、 databricks_jar_testフォルダーのルートから JAR を実行します。

    java -jar PrintArgs.jar Hello World!
    
    # [Hello, World!]
    

    no main manifest attribute, in PrintArgs.jarエラーが発生した場合は、必ず MANIFEST.MF ファイルの末尾に改行を追加してから、JARの作成と実行を再試行してください。

  8. PrintArgs.jarボリュームにアップロードします。Unity Catalogボリュームへのファイルのアップロード」を参照してください。

Scala JARを作成する

  1. databricks_jar_test フォルダーから、次の内容の build.sbt という名前の空のファイルを作成します。

    ThisBuild / scalaVersion := "2.12.14"
    ThisBuild / organization := "com.example"
    
    lazy val PrintArgs = (project in file("."))
      .settings(
        name := "PrintArgs"
      )
    
  2. databricks_jar_test フォルダーから、フォルダー構造 src/main/scala/exampleを作成します。

  3. example フォルダーに、次の内容の PrintArgs.scala という名前のファイルを作成します。

    package example
    
    object PrintArgs {
      def main(args: Array[String]): Unit = {
        println(args.mkString(", "))
      }
    }
    
  4. プログラムをコンパイルします。

    sbt compile
    
  5. (オプション)コンパイルされたプログラムを実行します。

    sbt "run Hello World\!"
    
    # Hello, World!
    
  6. databricks_jar_test/project フォルダーに、次の内容の assembly.sbt という名前のファイルを作成します。

    addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.0.0")
    
  7. databricks_jar_testフォルダーのルートからassemblyコマンドを実行すると、 targetフォルダーの下に JAR が生成されます。

    sbt assembly
    
  8. (オプション) テストするには、 databricks_jar_testフォルダーのルートから JAR を実行します。

    java -jar target/scala-2.12/PrintArgs-assembly-0.1.0-SNAPSHOT.jar Hello World!
    
    # Hello, World!
    
  9. PrintArgs-assembly-0.1.0-SNAPSHOT.jarボリュームにアップロードします。Unity Catalogボリュームへのファイルのアップロード」を参照してください。

ステップ3.JAR を実行する Databricks ジョブを作成する

  1. Databricks ランディングページに移動し、次のいずれかの操作を行います。

    • サイドバーで「 ジョブアイコン ワークフロー」 をクリックし[ジョブの作成] ボタン をクリックします。

    • サイドバーで「 新しいアイコン 新規 」をクリックし、メニューから「 ジョブ 」を選択します。

  2. [タスク] タブに表示されるタスク ダイアログ ボックスで、[ジョブの名前を追加する] をジョブ名 (JAR exampleなど) に置き換えます。

  3. [ タスク名] に、タスクの名前を入力します (Java の場合は java_jar_task 、Scala の場合は scala_jar_task など)。

  4. [ タイプ] で [JAR] を選択します。

  5. この例では、 メインクラスに Javaには PrintArgs 、Scalaには example.PrintArgs を入力します。

  6. クラスターの場合は、互換性のあるクラスターを選択します。 「Java および Scala ライブラリのサポート」を参照してください。

  7. 「依存ライブラリー」で、「 + 追加」をクリックします。

  8. [依存ライブラリの追加]ダイアログで、 [ボリューム]を選択した状態で、前のステップでJAR (PrintArgs.jar または PrintArgs-assembly-0.1.0-SNAPSHOT.jar) をアップロードした場所を[ボリューム ファイル パス]に入力するか、フィルターまたは参照を使用してJARを見つけます。 それを選択します。

  9. [追加] をクリックします。

  10. この例では、パラメーター["Hello", "World!"]と入力します。

  11. [追加] をクリックします。

ステップ 4: ジョブを実行し、ジョブ実行の詳細を表示する

クリックして [今すぐ実行] ボタン ワークフローを実行します。 実行 の詳細 を表示するには、 [ トリガーされた実行] ポップアップで [実行の表示] をクリックするか、 ジョブの実行 ビューで 実 の [ 開始時 刻 ] 列のリンクをクリックします。

実行が完了すると、タスクに渡された引数を含む 出力が出力 パネルに表示されます。

JAR ジョブの出力サイズ制限

stdout に出力されるログ出力などのジョブ出力には、20 MB のサイズ制限が適用されます。 出力の合計サイズが大きい場合、実行は取り消され、失敗としてマークされます。

この制限が発生しないようにするには、 spark.databricks.driver.disableScalaOutput Spark 構成を trueに設定することで、stdout がドライバーから Databricks に返されないようにすることができます。 デフォルトでは、フラグ値は falseです。 このフラグは、Scala JAR ジョブと Scala ノートブックのセル出力を制御します。 フラグが有効になっている場合、Spark はジョブの実行結果をクライアントに返しません。 このフラグは、クラスターのログ・ファイルに書き込まれるデータには影響しません。 Databricks では、ノートブックの結果が無効になるため、JAR ジョブのジョブ クラスターに対してのみこのフラグを設定することをお勧めします。

推奨: 共有 SparkContextを使用する

Databricks はマネージド サービスであるため、Apache Spark ジョブが正しく実行されるようにするには、コードの変更が必要になる場合があります。 JAR ジョブ・プログラムは、共有 SparkContext API を使用して SparkContextを取得する必要があります。 Databricks は SparkContextを初期化するため、 new SparkContext() を呼び出すプログラムは失敗します。 SparkContextを取得するには、Databricks によって作成された共有 SparkContext のみを使用します。

val goodSparkContext = SparkContext.getOrCreate()
val goodSparkSession = SparkSession.builder().getOrCreate()

共有 SparkContextを使用するときに避けるべきいくつかの方法もあります。

  • SparkContext.stop()に電話しないでください。

  • Main プログラムの最後に System.exit(0)sc.stop() に電話しないでください。これにより、未定義の動作が発生する可能性があります。

推奨事項: ジョブのクリーンアップにtry-finallyブロックを使用する

次の 2 つの部分で構成される JAR について考えてみます。

  • jobBody() ジョブの主要部分が含まれています。

  • jobCleanup() これは、その関数が成功したか例外を返したかにかかわらず、 jobBody()後に実行する必要があります。

たとえば、 jobBody() によってテーブルが作成され、 jobCleanup() それらのテーブルが削除されます。

クリーンアップメソッドが確実に呼び出される安全な方法は、コードに try-finally ブロックを配置することです。

try {
  jobBody()
} finally {
  jobCleanup()
}

sys.addShutdownHook(jobCleanup) または次のコードを使用してクリーンアップしようと しないでください

val cleanupThread = new Thread { override def run = jobCleanup() }
Runtime.getRuntime.addShutdownHook(cleanupThread)

Spark コンテナーの有効期間が Databricks で管理される方法のため、シャットダウン フックは確実に実行されません。

JAR ジョブ・パラメーターの構成

JAR ジョブへのパラメーターは、JSON 文字列配列で渡します。 Jobs API の Create a new ジョブ オペレーション (POST /jobs/create) に渡されるリクエスト本文の spark_jar_task オブジェクトを参照してください。これらのパラメーターにアクセスするには、main 関数に渡されたString配列を調べます。

ライブラリの依存関係を管理する

Spark ドライバーには、オーバーライドできない特定のライブラリ依存関係があります。 ジョブで競合するライブラリが追加された場合は、Spark ドライバー ライブラリの依存関係が優先されます。

ドライバー ライブラリの依存関係の完全な一覧を取得するには、同じ Spark バージョンで構成されたクラスター (または調べるドライバーを含むクラスター) に接続されているノートブックで次のコマンドを実行します。

%sh
ls /databricks/jars

JAR のライブラリの依存関係を定義する場合、Databricks では、Spark と Hadoop を provided 依存関係として一覧表示することをお勧めします。 Maven では、Spark と Hadoop を提供された依存関係として追加します。

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.3.0</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-core</artifactId>
  <version>1.2.1</version>
  <scope>provided</scope>
</dependency>

sbtでは、提供された依存関係として Spark と Hadoop を追加します。

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0" % "provided"
libraryDependencies += "org.apache.hadoop" %% "hadoop-core" % "1.2.1" % "provided"

ヒント

実行しているバージョンに基づいて、依存関係の正しい Scala バージョンを指定します。

次のステップ

Databricks ジョブの作成と実行の詳細については、「 Databricks ジョブの作成と実行」を参照してください。