Databricks Connect for Python のテスト

注:

この記事では、Databricks Runtime 13.0 以降の Databricks Connect について説明します。

この記事では、Databricks Runtime 13.0 以降の Databricks Connect でpytestを使用してテストを実行する方法について説明します。 Databricks Connectの詳細については、 Databricks Connect for Pythonを参照してください。

この情報は、Databricks Connect for Python が既にインストールされていることを前提としています。 「Databricks Connect for Python のインストール」を参照してください。

リモート Databricks ワークスペース内のクラスターへの接続を必要としないローカル コードでpytestを実行できます。 たとえば、 pytestを使用して、ローカル メモリ内の PySpark DataFrameオブジェクトを受け入れて返す関数をテストできます。 pytestを使い始めてローカルで実行するには、 pytestドキュメントの「はじめに」を参照してください。

たとえば、SparkSession インスタンスを返す get_spark 関数と、samples カタログのnyctaxi スキーマ内のtripsテーブルを表すDataFrameを返す get_nyctaxi_trips 関数を含む nyctaxi_functions.py という名前の次のファイルがあるとします。

nyctaxi_functions.py

from databricks.connect import DatabricksSession
from pyspark.sql import DataFrame, SparkSession

def get_spark() -> SparkSession:
  spark = DatabricksSession.builder.getOrCreate()
  return spark

def get_nyctaxi_trips() -> DataFrame:
  spark = get_spark()
  df = spark.read.table("samples.nyctaxi.trips")
  return df

そして、これらのget_spark関数とget_nyctaxi_trips関数を呼び出すmain.pyという名前の次のファイルがあるとします。

main.py

from nyctaxi_functions import *

df = get_nyctaxi_trips()
df.show(5)

次の test_nyctaxi_functions.py という名前のファイルは、 get_spark 関数が SparkSession インスタンスを返すかどうか、および get_nyctaxi_trips 関数が少なくとも 1 行のデータを含む DataFrame を返すかどうかをテストします。

test_nyctaxi_functions.py

import pyspark.sql.connect.session
from nyctaxi_functions import *

def test_get_spark():
  spark = get_spark()
  assert isinstance(spark, pyspark.sql.connect.session.SparkSession)

def test_get_nyctaxi_trips():
  df = get_nyctaxi_trips()
  assert df.count() > 0

これらのテストを実行するには、コード プロジェクトのルートからpytestコマンドを実行します。これにより、次のようなテスト結果が生成されます。

$ pytest
=================== test session starts ====================
platform darwin -- Python 3.11.7, pytest-8.1.1, pluggy-1.4.0
rootdir: <project-rootdir>
collected 2 items

test_nyctaxi_functions.py .. [100%]
======================== 2 passed ==========================