Databricks Connect for Scala のコード例
注
この記事では、Databricks Connect for Databricks Runtime 13.3 LTS 以降について説明します。
この記事では、Databricks Connect for Scala を使用するコード例を示します。 Databricks Connect を使用すると、一般的な IDE、ノートブック サーバー、およびカスタム アプリケーションを Databricks クラスターに接続できます。 「Databricks Connect とは」を参照してください。この記事の Python バージョンについては、「 Databricks Connect for Python のコード例」を参照してください。
注
Databricks Connectの使用を開始する前に、Databricks Connect クライアントをセットアップする必要があります。
Databricks には、Databricks Connect の使用方法を示す追加のサンプル アプリケーションがいくつか用意されています。 具体的には、GitHub の Databricks Connect リポジトリのサンプル アプリケーション を参照してください。
次の簡単なコード例を使用して、エクスペリメントを Databricks Connectで行うこともできます。 これらの例では、 Databricks Connect クライアントのセットアップにデフォルト認証を使用していることを前提としています。
この簡単なコード例では、指定したテーブルをクエリーし、指定したテーブルの最初の 5 行を表示します。 別のテーブルを使用するには、コールを spark.read.table
に調整します。
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
object Main {
def main(args: Array[String]): Unit = {
val spark = DatabricksSession.builder().getOrCreate()
val df = spark.read.table("samples.nyctaxi.trips")
df.limit(5).show()
}
}
この長いコード例では、次の処理を行います。
メモリ内 DataFrameを作成します。
default
スキーマ内にzzz_demo_temps_table
という名前のテーブルを作成します。この名前のテーブルが既に存在する場合は、最初にテーブルが削除されます。 別のスキーマまたはテーブルを使用するには、呼び出しをspark.sql
、temps.write.saveAsTable
、またはその両方に調整します。DataFrameの内容をテーブルに保存します。
テーブルの内容に対して
SELECT
クエリーを実行します。クエリーの結果を表示します。
テーブルを削除します。
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import java.time.LocalDate
object Main {
def main(args: Array[String]): Unit = {
val spark = DatabricksSession.builder().getOrCreate()
// Create a Spark DataFrame consisting of high and low temperatures
// by airport code and date.
val schema = StructType(
Seq(
StructField("AirportCode", StringType, false),
StructField("Date", DateType, false),
StructField("TempHighF", IntegerType, false),
StructField("TempLowF", IntegerType, false)
)
)
val data = Seq(
( "BLI", LocalDate.of(2021, 4, 3), 52, 43 ),
( "BLI", LocalDate.of(2021, 4, 2), 50, 38),
( "BLI", LocalDate.of(2021, 4, 1), 52, 41),
( "PDX", LocalDate.of(2021, 4, 3), 64, 45),
( "PDX", LocalDate.of(2021, 4, 2), 61, 41),
( "PDX", LocalDate.of(2021, 4, 1), 66, 39),
( "SEA", LocalDate.of(2021, 4, 3), 57, 43),
( "SEA", LocalDate.of(2021, 4, 2), 54, 39),
( "SEA", LocalDate.of(2021, 4, 1), 56, 41)
)
val temps = spark.createDataFrame(data).toDF(schema.fieldNames: _*)
// Create a table on the Databricks cluster and then fill
// the table with the DataFrame 's contents.
// If the table already exists from a previous run,
// delete it first.
spark.sql("USE default")
spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table")
temps.write.saveAsTable("zzz_demo_temps_table")
// Query the table on the Databricks cluster, returning rows
// where the airport code is not BLI and the date is later
// than 2021-04-01.Group the results and order by high
// temperature in descending order.
val df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
"GROUP BY AirportCode, Date, TempHighF, TempLowF " +
"ORDER BY TempHighF DESC")
df_temps.show()
// Results:
// +------------+-----------+---------+--------+
// | AirportCode| Date|TempHighF|TempLowF|
// +------------+-----------+---------+--------+
// | PDX | 2021-04-03| 64 | 45 |
// | PDX | 2021-04-02| 61 | 41 |
// | SEA | 2021-04-03| 57 | 43 |
// | SEA | 2021-04-02| 54 | 39 |
// +------------+-----------+---------+--------+
// Clean up by deleting the table from the Databricks cluster.
spark.sql("DROP TABLE zzz_demo_temps_table")
}
}
注
次の例では、Databricks Connect のDatabricksSession
クラスが使用できない場合にSparkSession
クラスを使用する方法を説明します。
この例では、指定されたテーブルをクエリし、最初の 5 行を返します。 この例では、認証にSPARK_REMOTE
環境変数を使用します。
import org.apache.spark.sql.{DataFrame, SparkSession}
object Main {
def main(args: Array[String]): Unit = {
getTaxis(getSpark()).show(5)
}
private def getSpark(): SparkSession = {
SparkSession.builder().getOrCreate()
}
private def getTaxis(spark: SparkSession): DataFrame = {
spark.read.table("samples.nyctaxi.trips")
}
}