Databricks ジョブでの JAR の使用

Java アーカイブ ( JAR) ファイル形式は、一般的な ZIP ファイル形式に基づいており、多くの Java または Scala ファイルを 1 つに集約するために使用されます。 JAR タスクを使用すると、Databricks ジョブに Java または Scala コードを迅速かつ確実にインストールできます。 この記事では、JAR と、JAR にパッケージ化されたアプリケーションを実行するジョブを作成する例を示します。 この例では、次のことを行います。

  • サンプル アプリケーションを定義する JAR プロジェクトを作成します。

  • サンプル ファイルを JAR にバンドルします。

  • JAR を実行するジョブを作成します。

  • ジョブを実行し、結果を表示する。

始める前に

この例を完了するには、次のものが必要です。

  • Java JAR の場合は、Java 開発キット (JDK)。

  • Scala JAR の場合、JDK と sbt。

ステップ 1: サンプルのためのローカルディレクトリを作成する

サンプルコードと生成されたアーティファクトを保持するローカルディレクトリを作成します (例: . databricks_jar_test.

ステップ 2: JARを作成する

Java または Scala を使用して JAR を作成するには、次の手順に従います。

Java JARを作成する

  1. databricks_jar_test フォルダから、次の内容の PrintArgs.java という名前のファイルを作成します。

    import java.util.Arrays;
    
    public class PrintArgs {
      public static void main(String[] args) {
        System.out.println(Arrays.toString(args));
      }
    }
    
  2. PrintArgs.javaファイルをコンパイルすると、次のファイルが作成されますPrintArgs.class

    javac PrintArgs.java
    
  3. (オプション) コンパイルされたプログラムを実行します。

    java PrintArgs Hello World!
    
    # [Hello, World!]
    
  4. PrintArgs.java ファイルや PrintArgs.class ファイルと同じフォルダに、META-INFという名前のフォルダを作成します。

  5. META-INF フォルダーに、次の内容で MANIFEST.MF という名前のファイルを作成します。このファイルの最後に改行を追加してください。

    Main-Class: PrintArgs
    
  6. databricks_jar_testフォルダーのルートから、 PrintArgs.jarという名前の JAR を作成します。

    jar cvfm PrintArgs.jar META-INF/MANIFEST.MF *.class
    
  7. (オプション) テストするには、 databricks_jar_testフォルダーのルートから JAR を実行します。

    java -jar PrintArgs.jar Hello World!
    
    # [Hello, World!]
    

    注:

    エラーno main manifest attribute, in PrintArgs.jarが発生した場合は、必ずMANIFEST.MFファイルの末尾に改行を追加してから、JAR を再度作成して実行してください。

  8. PrintArgs.jarボリュームにアップロードします。「Unity Catalog ボリュームへのファイルのアップロード」を参照してください。

Scala JARを作成する

  1. databricks_jar_test フォルダから、次の内容を含む build.sbt という名前の空のファイルを作成します。

    ThisBuild / scalaVersion := "2.12.14"
    ThisBuild / organization := "com.example"
    
    lazy val PrintArgs = (project in file("."))
      .settings(
        name := "PrintArgs"
      )
    
  2. databricks_jar_test フォルダから、フォルダ構造 src/main/scala/exampleを作成します。

  3. example フォルダーに、次の内容で PrintArgs.scala という名前のファイルを作成します。

    package example
    
    object PrintArgs {
      def main(args: Array[String]): Unit = {
        println(args.mkString(", "))
      }
    }
    
  4. プログラムをコンパイルします。

    sbt compile
    
  5. (オプション) コンパイルされたプログラムを実行します。

    sbt "run Hello World\!"
    
    # Hello, World!
    
  6. databricks_jar_test/project フォルダーに、次の内容で assembly.sbt という名前のファイルを作成します。

    addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.0.0")
    
  7. databricks_jar_testフォルダのルートからassemblyコマンドを実行すると、 targetフォルダの下に JAR が生成されます。

    sbt assembly
    
  8. (オプション) テストするには、 databricks_jar_testフォルダーのルートから JAR を実行します。

    java -jar target/scala-2.12/PrintArgs-assembly-0.1.0-SNAPSHOT.jar Hello World!
    
    # Hello, World!
    
  9. PrintArgs-assembly-0.1.0-SNAPSHOT.jarボリュームにアップロードします。「Unity Catalog ボリュームへのファイルのアップロード」を参照してください。

ステップ3.JAR を実行する Databricks ジョブを作成する

  1. Databricksページに移動し、次のいずれかを実行します。

    • サイドバーで、ワークフローアイコンワークフローとクリック「ジョブを作成」ボタン

    • サイドバーで、新しいアイコン新規をクリックし、メニューからジョブを選択します。

  2. [タスク] タブに表示される [タスク] ダイアログ ボックスで、 [ジョブの名前を追加]をジョブ名 (例: JAR exampleに置き換えます。

  3. [タスク名]にタスクの名前を入力します。たとえば、 Javaの場合は java_jar_task 、 Scalaの場合は scala_jar_task です。

  4. タイプにはJARを選択します。

  5. この例では、メイン クラスに、Java の場合はPrintArgs 、Scala の場合はexample.PrintArgsを入力します。

  6. クラスターについては、互換性のあるクラスターを選択します。 Java および Scala ライブラリのサポートを参照してください。

  7. 依存ライブラリの場合は、 「+ 追加」をクリックします。

  8. [依存ライブラリの追加]ダイアログで、 [ボリューム]を選択した状態で、前のステップでJAR (PrintArgs.jar または PrintArgs-assembly-0.1.0-SNAPSHOT.jar) をアップロードした場所をVolumes File Pathに入力するか、フィルターまたは参照してJAR見つけます。 それを選択します。

  9. [追加] をクリックします。

  10. この例では、 「否」["Hello", "World!"]と入力します。

  11. [追加] をクリックします。

ステップ 4: ジョブを実行し、ジョブ実行の詳細を表示する

クリックして 「今すぐ実行」ボタン ワークフローを実行します。 実行の詳細 を表示するには、「 トリガーされた実行 」ポップアップで「 実行の表示 」をクリックするか、 ジョブ実行 ビューで実行の「 開始時刻 」列のリンクをクリックします。

実行が完了すると、タスクに渡された引数を含む出力が出力パネルに表示されます。

JAR ジョブの出力サイズ制限

stdout に出力されるログ出力などのジョブ出力には、20 MB のサイズ制限が適用されます。 出力の合計サイズが大きい場合、実行はキャンセルされ、失敗としてマークされます。

この制限を回避するには、 spark.databricks.driver.disableScalaOutput Spark 構成をtrueに設定して、stdout がドライバーから Databricks に返されないようにすることができます。 デフォルトでは、フラグの値はfalseです。 このフラグは、Scala JAR ジョブと Scala ノートブックのセル出力を制御します。 フラグが有効になっている場合、Spark はジョブ実行結果をクライアントに返しません。 このフラグは、クラスターのログ ファイルに書き込まれるデータには影響しません。 Databricks 、ノートブックの結果が無効になるため、このフラグをJARジョブのジョブクラスターに対してのみ設定することをお勧めします。

推奨: 共有 SparkContextを使用する

Databricksはマネージド サービスであるため、 Apache Sparkジョブを正しく実行するには、コードの変更が必要になる場合があります。 JAR ジョブ プログラムは、共有SparkContext API を使用してSparkContextを取得する必要があります。 Databricks はSparkContextを初期化するため、 new SparkContext()を呼び出すプログラムは失敗します。 SparkContextを取得するには、Databricks によって作成された共有SparkContextのみを使用します。

val goodSparkContext = SparkContext.getOrCreate()
val goodSparkSession = SparkSession.builder().getOrCreate()

共有 SparkContextを使用する際に避けるべき方法もいくつかあります。

  • SparkContext.stop()に電話しないでください。

  • Mainプログラムの最後にSystem.exit(0)またはsc.stop()に電話しないでください。これにより、未定義の動作が発生する可能性があります。

推奨事項: ジョブのクリーンアップにtry-finallyブロックを使用する

次の 2 つの部分で構成される JAR を考えてみましょう。

  • jobBody() ジョブの主要部分が含まれます。

  • jobCleanup() これは、その関数が成功したか例外を返したかに関係なく、 jobBody()後に実行する必要があります。

たとえば、 jobBody() はテーブルを作成し、 jobCleanup() はそれらのテーブルを削除します。

clean-up メソッドが呼び出されるようにする安全な方法は、コードに try-finally ブロックを配置することです。

try {
  jobBody()
} finally {
  jobCleanup()
}

sys.addShutdownHook(jobCleanup) または次のコードを使用してクリーンアップを試みないでください

val cleanupThread = new Thread { override def run = jobCleanup() }
Runtime.getRuntime.addShutdownHook(cleanupThread)

Databricks で Spark コンテナーの有効期間が管理される方法のため、シャットダウン フックは確実に実行されません。

JAR ジョブ・パラメーターの構成

文字列配列を使用して、問題を JARジョブに渡します。JSONJobs API の「新しいジョブの作成」操作 ( POST /jobs/create ) に渡されるリクエスト本文のspark_jar_taskオブジェクトを参照してください。 これらの問題にアクセスするには、 main関数に渡されたString配列を検査します。

ライブラリの依存関係を管理する

Spark ドライバーには、オーバーライドできない特定のライブラリ依存関係があります。 ジョブが競合するライブラリを追加した場合、 Sparkドライバーのライブラリの依存関係が優先されます。

ドライバー ライブラリの依存関係の完全なリストを取得するには、同じ Spark バージョン (または調査するドライバーを含むクラスター) で構成されたクラスターに接続されたノートブックで次のコマンドを実行します。

%sh
ls /databricks/jars

JAR のライブラリ依存関係を定義する場合、Databricks では Spark と Hadoop をprovided依存関係としてリストすることをお勧めします。 Maven で、提供された依存関係として Spark と Hadoop を追加します。

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.3.0</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-core</artifactId>
  <version>1.2.1</version>
  <scope>provided</scope>
</dependency>

sbtで、提供された依存関係として Spark と Hadoop を追加します。

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0" % "provided"
libraryDependencies += "org.apache.hadoop" %% "hadoop-core" % "1.2.1" % "provided"

ヒント

実行しているバージョンに基づいて、依存関係の正しい Scala バージョンを指定します。

次のステップ

Databricks ジョブの作成と実行の詳細については、「 ワークフローのスケジュールと調整」を参照してください。