Databricks SDK for Python
注:
Databricks では、ジョブやその他の Databricks リソースをソース コードとして作成、開発、デプロイ、テストするために、Databricks アセット バンドルを推奨しています。 「Databricks アセットバンドルとは」を参照してください。
この記事では、 Databricks SDK for Python を使用して Databricks の運用を自動化し、開発を加速する方法について説明します。 この記事は、Read The Docs の Databricks SDK for Python ドキュメント と、GitHub の Databricks SDK for Python リポジトリの コード例 を補足するものです。
注:
用のDatabricksSDK Pythonは ベータ版 ですので、本番運用で使用しても問題ありません。
ベータ期間中は、コードが依存するDatabricks SDK for Pythonの特定のマイナーバージョンへの依存関係をピン留めすることをお勧めします。たとえば、venv
の場合はrequirements.txt
、詩の場合はpyproject.toml
とpoetry.lock
などのファイルで依存関係をピン留めできます。依存関係の固定のPoetry細については、「venv
の「仮想環境とパッケージ」またはPoetryの「依存関係のインストール」を参照してください。
始める前に
Databricks SDK for Pythonは、Databricksノートブック内で、あるいはローカルの開発マシンから使用できます。
Databricksノートブック内からDatabricks SDK for Pythonを使用するには、「DatabricksノートブックからDatabricks SDK for Pythonを使用する」に進んでください。
ローカルの開発マシンからDatabricks SDK for Pythonを使用するには、このセクションの手順を実行します。
Databricks SDK for Pythonの使用を開始する前に、開発マシンが以下の要件を満たしていることを確認してください。
Databricks認証が構成されています。
Python 3.8以降がインストールされていること。Databricksコンピュートリソースを自動化する場合、対象のDatabricksコンピュートリソースにインストールされているPythonのメジャーバージョンとマイナーバージョンを一致させることを推奨します。この記事の例では、Python 3.10がインストールされたDatabricks Runtime 13.3 LTSでクラスターを自動化する方法をとっています。正しいバージョンについては、「Databricks Runtimeリリースノートのバージョンと互換性」を参照してください。
Databricks では、Databricks SDK for Python で使用する Python プロジェクトごとに Python 仮想環境 を作成してアクティブ化することをお勧めします。 Python 仮想環境は、コード プロジェクトが互換性のあるバージョンの Python と Python パッケージ (この場合は Databricks SDK for Python パッケージ) を使用していることを確認するのに役立ちます。 Python仮想環境の詳細については、venv または Poetryを参照してください。
Databricks SDK for Pythonの使用を開始する
このセクションでは、ローカルの開発マシンからDatabricks SDK for Pythonを使用する方法について説明します。Databricksノートブック内からDatabricks SDK for Pythonを使用するには、「DatabricksノートブックからDatabricks SDK for Pythonを使用する」に進んでください。
Databricks認証が設定され、Pythonがすでにインストールされており、Python仮想環境が有効になっている開発マシン上で、Python Package Index (PyPI) から databricks-sdk パッケージ (とその依存関係) を以下のようにインストールします。
pip
で、databricks-sdk
パッケージをインストールします。(システムによっては、こちらと全体で、pip3
をpip
に置き換える必要があるかもしれません。)pip3 install databricks-sdk
poetry add databricks-sdk
Databricks SDK for Pythonのベータ版中に
databricks-sdk
パッケージの特定のバージョンをインストールするには、そのパッケージのリリース履歴を参照してください。例えば、0.1.6
のバージョンをインストールする場合は以下のようになります。pip3 install databricks-sdk==0.1.6
poetry add databricks-sdk==0.1.6
既存のDatabricks SDK for Pythonパッケージのインストールを最新バージョンにアップグレードするには、以下のコマンドを実行します。
pip3 install --upgrade databricks-sdk
poetry add databricks-sdk@latest
Databricks SDK for Pythonパッケージの現在の
Version
およびその他の詳細を表示するには、以下のコマンドを実行します。pip3 show databricks-sdk
poetry show databricks-sdk
Pythonの仮想環境で、Databricks SDK for PythonをインポートするPython コードファイルを作成します。次の例では、以下の内容を含む
main.py
という名前のファイル内に、Databricksワークスペース内のすべてのクラスターが一覧表示されています。from databricks.sdk import WorkspaceClient w = WorkspaceClient() for c in w.clusters.list(): print(c.cluster_name)
python
コマンドを実行して、ファイル名がmain.py
であると仮定してPythonコードファイルを実行します。python3.10 main.py
仮想環境のシェルにいる場合:
python3.10 main.py
仮想環境のシェルにいない場合:
poetry run python3.10 main.py
注:
前述の
w = WorkspaceClient()
の呼び出しで引数を設定しないことにより、Databricks SDK for Pythonはデフォルトのプロセスを使用してDatabricks認証を実行しようとします。このデフォルトの動作をオーバーライドするには、次の認証セクションを参照してください。
Databricksアカウントまたはワークスペースを使用してDatabricks SDK for Pythonを認証する
このセクションでは、ローカルの開発マシンかDatabricks アカウントまたはワークスペースに対してDatabricks SDK for Pythonを認証する方法について説明します。Databricksノートブック内からDatabricks SDK for Pythonを認証するには、「DatabricksノートブックからDatabricks SDK for Pythonを使用する」に進んでください。
Databricks SDK for Python は、 Databricks クライアント統合認証 標準を実装しており、認証に対する統合された一貫性のあるアーキテクチャおよびプログラムによるアプローチです。 このアプローチにより、Databricks による認証の設定と自動化をより一元化し、予測可能にすることができます。 これにより、Databricks 認証を一度構成すると、認証構成をさらに変更することなく、その構成を複数の Databricks ツールや SDK で使用できます。 詳細については、Pythonのより完全なコード例を含む、「クライアント統合認証Databricks」を参照してください。
Databricks SDK for Pythonを使用してDatabricks認証を初期化するために使用できるコーディングのパターンには、以下のような例が挙げられます。
以下のいずれかを実行して、Databricksのデフォルト認証を使用します。
ターゲット Databricks 認証の種類に必要なフィールドを持つカスタム Databricks 構成プロファイル を作成または識別します。 次に、
DATABRICKS_CONFIG_PROFILE
環境変数をカスタム構成プロファイルの名前に設定します。ターゲットのDatabricks認証タイプに必要とされる環境変数を設定します。
次に、たとえば次のようにDatabricksのデフォルト認証で
WorkspaceClient
オブジェクトをインスタンス化します。from databricks.sdk import WorkspaceClient w = WorkspaceClient() # ...
必須フィールドのハードコーディングはサポートされていますが、Databricksパーソナルアクセストークンなどのコード内の機密情報が公開される危険があるため推奨はされません。以下の例では、Databricksトークン認証用にDatabricksホストとアクセストークンの値がハードコーディングされています。
from databricks.sdk import WorkspaceClient w = WorkspaceClient( host = 'https://...', token = '...' ) # ...
Databricks SDK for Pythonのドキュメントの「認証」も参照してください。
DatabricksノートブックからDatabricks SDK for Pythonを使用する
Databricks SDK for Python の機能は、Databricks SDK for Python がインストールされた Databricks クラスターがアタッチされた Databricks ノートブックから呼び出すことができます。 これは、Databricks Databricks Runtime13.3 以上を使用するすべての クラスターにデフォルトによってインストールされます。LTSDatabricks Runtime 12.2 LTS 以下を使用する Databricks クラスターの場合は、最初に Databricks SDK for Python をインストールする必要があります。 「ステップ 1: の をインストールまたはアップグレードDatabricksSDK Pythonする」を参照してください。
DatabricksSDKPython特定のDatabricks Runtime バージョンにインストールされている バージョンの を確認するには、そのバージョンのDatabricks Runtime リリースノートの 「InstalledPython ライブラリ 」セクションを参照してください。
Databricksでは、利用可能な最新バージョンのSDK を PiPy からインストールすることをお勧めしますが、 DatabricksSDKPython0.6.0 以降では、すべての バージョンでデフォルトDatabricks ノートブック認証が使用されるため、少なくとも 0.6.0Databricks Runtime 以降をインストールするか、アップグレードすることをお勧めします。
注:
15.1 は、アップグレード不要のデフォルトの 認証をサポートする for the (0.20.0)Databricks Runtime のバージョンをインストールした最初の です。Databricks RuntimeDatabricksSDKPython
次の表は、Databricks SDK for Python バージョンと Databricks Runtime バージョンのノートブック認証サポートの概要を示しています。
SDK/DBRの |
10.4 LTS |
11.3 LTS |
12.3 LTSの |
13.3 LTS |
14.3 LTSの |
15.1 以上 |
---|---|---|---|---|---|---|
0.1.7 およびそれ以下 |
||||||
0.1.10 |
✓ |
✓ |
✓ |
✓ |
✓ |
|
0.6.0 |
✓ |
✓ |
✓ |
✓ |
✓ |
✓ |
0.20.0 以降 |
✓ |
✓ |
✓ |
✓ |
✓ |
✓ |
デフォルトのDatabricksノートブック認証は、Databricksがバックグラウンドで自動的に生成する一時的なDatabricks個人用アクセストークンに依存しています。Databricksはノートブックの実行を停止した後、この一時トークンを削除します。
重要
デフォルトのDatabricksノートブック認証はクラスターのドライバーノードでのみ動作し、クラスターのワーカーノードやエグゼキューターノードでは動作しません。
Databricksノートブック認証はDatabricks構成プロファイルでは機能しません。
DatabricksアカウントレベルのAPIs Databricksを呼び出す場合、またはデフォルトDatabricks ノートブック 認証以外の 認証タイプを使用する場合は、次の認証タイプもサポートされています。
認証タイプ |
Databricks SDK for Pythonバージョン |
---|---|
0.19.0以上 |
|
0.19.0以上 |
|
0.14.0以上 |
|
0.14.0以上 |
|
すべてのバージョン |
ステップ1: Databricks SDK for Pythonをインストールまたはアップグレードする
Databricks Pythonノートブックでは、他のPythonライブラリと同様にDatabricks SDK for Pythonを使用できます。接続されているDatabricksクラスタにDatabricks SDK for Pythonライブラリをインストールまたはアップグレードするには、ノートブックセルから
%pip
マジックコマンドを次のように実行します。%pip install databricks-sdk --upgrade
%pip
マジックコマンドを実行した後、インストールもしくはアップグレードしたライブラリをノートブックで使用できるようにするために、Pythonを再起動する必要があります。これを行うには、%pip
のマジックコマンドのあるセルの直後のノートブックセルから以下のコマンドを実行します。dbutils.library.restartPython()
インストールされているDatabricks SDK for Pythonのバージョンを表示するには、ノートブックセルから次のコマンドを実行します。
%pip show databricks-sdk | grep -oP '(?<=Version: )\S+'
ステップ2: コードを実行する
ノートブックのセルで、Databricks SDK for Pythonをインポートして呼び出すPythonコードを作成します。以下の例では、デフォルトのDatabricksノートブック認証を使用して、Databricksワークスペース内のクラスターを一覧表示します。
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
for c in w.clusters.list():
print(c.cluster_name)
このセルを実行すると、Databricksワークスペースで使用可能なすべてのクラスターの名前が一覧表示されます。
別の Databricks 認証の種類を使用するには、「 Databricks 認証方法 」を参照し、対応するリンクをクリックして追加の技術的な詳細を確認してください。
Databricksユーティリティを使用する
Databricksユーティリティ (dbutils) リファレンスは、ローカルの開発マシンで実行されているDatabricks SDK for Pythonコードから、もしくはDatabricksノートブック内から呼び出すことができます。
ローカルの開発マシンからDatabricksユーティリティがアクセスできるものは、
dbutils.fs
、dbutils.secrets
、dbutils.widgets
、およびdbutils.jobs
のコマンドグループのみです。Databricksクラスターに接続されているDatabricksノートブックから、Databricksユーティリティは、
dbutils.fs
、dbutils.secrets
、およびdbutils.widgets
だけでなく、使用可能なすべてのDatabricksユーティリティのコマンドグループにアクセスできます。さらに、dbutils.notebook
コマンドグループは2つのレベルのコマンドのみに制限されます。(例:dbutils.notebook.run
またはdbutils.notebook.exit
)
ローカルの開発マシンまたはDatabricksノートブックからDatabricksユーティリティを呼び出すには、WorkspaceClient
内で dbutils
を使用します。このコード例では、デフォルトのDatabricksノートブック認証を使用してWorkspaceClient
内のdbutils
を呼び出し、ワークスペースのDBFSルートにあるすべてのオブジェクトのパスを一覧表示します。
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
d = w.dbutils.fs.ls('/')
for f in d:
print(f.path)
または、dbutils
を直接呼び出すこともできます。ただし、使用できるのはデフォルトのDatabricksノートブック認証のみに限定されます。このコード例では、dbutils
を直接呼び出して、ワークスペースのDBFSルートにあるすべてのオブジェクトを一覧表示します。
from databricks.sdk.runtime import *
d = dbutils.fs.ls('/')
for f in d:
print(f.path)
Unity Catalogボリュームにアクセスするには、WorkspaceClient
内でfiles
を使用します。Unity Catalogボリューム内のファイルを管理する」を参照してください。dbutils
を単独で、またはWorkspaceClient
内で使用してボリュームにアクセスすることはできません。
dbutilsの操作も参照してください。
コード例
以下のコード例では、Databricks SDK for Pythonを使用してクラスターの作成と削除、ジョブの実行、アカウントレベルのグループの一覧を行う方法を示します。これらのコード例では、デフォルトのDatabricksノートブック認証を使用します。デフォルトのDatabricksノートブック認証の詳細については、「DatabricksノートブックからDatabricks SDK for Pythonを使用する」を参照してください。ノートブック以外のデフォルト認証の詳細については、「Databricksアカウントまたはワークスペースを使用してDatabricks SDK for Pythonを認証する」を参照してください。
その他のコード例については、GitHubのDatabricks SDK for Pythonリポジトリのexamplesを参照してください。関連項目は次を参照してください。
クラスターを作成する
このコード例では、指定されたDatabricks Runtimeのバージョンとクラスターのノードタイプを用いてクラスターを作成します。このクラスターは1つのワーカーを持ち、クラスターのアイドル状態が15分経過すると自動的に終了します。
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
print("Attempting to create the cluster. Please wait...")
c = w.clusters.create_and_wait(
cluster_name = 'my-cluster',
spark_version = '12.2.x-scala2.12',
node_type_id = 'n2-highmem-4',
autotermination_minutes = 15,
num_workers = 1
)
print(f"View the cluster at " \
f"{w.config.host}#setting/clusters/{c.cluster_id}/configuration\n")
クラスターを完全に削除する
このコード例では、指定されたクラスターIDを持つクラスターをワークスペースから完全に削除します。
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
c_id = input('ID of cluster to delete (for example, 1234-567890-ab123cd4): ')
w.clusters.permanent_delete(cluster_id = c_id)
ジョブを作成する
このコード例では、指定されたクラスター上で特定のノートブックを実行するDatabricksジョブを作成します。コードが実行されると、既存のノートブックのパス、既存のクラスターID、および関連するジョブ設定がターミナルのユーザーから取得されます。
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import Task, NotebookTask, Source
w = WorkspaceClient()
job_name = input("Some short name for the job (for example, my-job): ")
description = input("Some short description for the job (for example, My job): ")
existing_cluster_id = input("ID of the existing cluster in the workspace to run the job on (for example, 1234-567890-ab123cd4): ")
notebook_path = input("Workspace path of the notebook to run (for example, /Users/someone@example.com/my-notebook): ")
task_key = input("Some key to apply to the job's tasks (for example, my-key): ")
print("Attempting to create the job. Please wait...\n")
j = w.jobs.create(
name = job_name,
tasks = [
Task(
description = description,
existing_cluster_id = existing_cluster_id,
notebook_task = NotebookTask(
base_parameters = dict(""),
notebook_path = notebook_path,
source = Source("WORKSPACE")
),
task_key = task_key
)
]
)
print(f"View the job at {w.config.host}/#job/{j.job_id}\n")
Unity Catalogボリューム内のファイルを管理する
このコード例では、WorkspaceClient
内のfiles
機能に対するさまざまな呼び出しを行い、Unity Catalogのボリュームにアクセスします。
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Define volume, folder, and file details.
catalog = 'main'
schema = 'default'
volume = 'my-volume'
volume_path = f"/Volumes/{catalog}/{schema}/{volume}" # /Volumes/main/default/my-volume
volume_folder = 'my-folder'
volume_folder_path = f"{volume_path}/{volume_folder}" # /Volumes/main/default/my-volume/my-folder
volume_file = 'data.csv'
volume_file_path = f"{volume_folder_path}/{volume_file}" # /Volumes/main/default/my-volume/my-folder/data.csv
upload_file_path = './data.csv'
# Create an empty folder in a volume.
w.files.create_directory(volume_folder_path)
# Upload a file to a volume.
with open(upload_file_path, 'rb') as file:
file_bytes = file.read()
binary_data = io.BytesIO(file_bytes)
w.files.upload(volume_file_path, binary_data, overwrite = True)
# List the contents of a volume.
for item in w.files.list_directory_contents(volume_path):
print(item.path)
# List the contents of a folder in a volume.
for item in w.files.list_directory_contents(volume_folder_path):
print(item.path)
# Print the contents of a file in a volume.
resp = w.files.download(volume_file_path)
print(str(resp.contents.read(), encoding='utf-8'))
# Delete a file from a volume.
w.files.delete(volume_file_path)
# Delete a folder from a volume.
w.files.delete_directory(volume_folder_path)
アカウントレベルのグループを一覧表示する
このコード例では、Databricksアカウント内で使用可能なすべてのグループの表示名を一覧表示します。
from databricks.sdk import AccountClient
a = AccountClient()
for g in a.groups.list():
print(g.display_name)
テスト
コードをテストするには、pytestのようなPythonのテストフレームワークを使います。Databricks REST APIエンドポイントを呼び出したり、Databricksアカウントやワークスペースの状態を変更することなく、シミュレートされた条件下でコードをテストするには、unittest.mockなどのPythonモッキングライブラリを使用します。
ヒント
Databricks Labs には、Databricks との統合テストを簡略化するための pytest プラグイン と、コードの品質を確保するための pylint プラグイン が用意されています。
次の helpers.py
という名前のファイルの例には、新しいクラスターに関する情報を返す create_cluster
関数が含まれています。
# helpers.py
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.compute import ClusterDetails
def create_cluster(
w: WorkspaceClient,
cluster_name: str,
spark_version: str,
node_type_id: str,
autotermination_minutes: int,
num_workers: int
) -> ClusterDetails:
response = w.clusters.create(
cluster_name = cluster_name,
spark_version = spark_version,
node_type_id = node_type_id,
autotermination_minutes = autotermination_minutes,
num_workers = num_workers
)
return response
次の main.py
という名前のファイルが create_cluster
関数を呼び出すとします。
# main.py
from databricks.sdk import WorkspaceClient
from helpers import *
w = WorkspaceClient()
# Replace <spark-version> with the target Spark version string.
# Replace <node-type-id> with the target node type string.
response = create_cluster(
w = w,
cluster_name = 'Test Cluster',
spark_version = '<spark-version>',
node_type_id = '<node-type-id>',
autotermination_minutes = 15,
num_workers = 1
)
print(response.cluster_id)
次のtest_helpers.py
という名前のファイルはcreate_cluster
関数が期待された応答を返すかどうかをテストします。このテストでは、対象のワークスペースにクラスターを作成するのではなく、WorkspaceClient
オブジェクトをモックし、モックしたオブジェクトの設定を定義してから、create_cluster
関数にモックしたオブジェクトを渡します。次に、この関数が新しいモックされたクラスタのIDを返すかどうかをチェックします。
# test_helpers.py
from databricks.sdk import WorkspaceClient
from helpers import *
from unittest.mock import create_autospec # Included with the Python standard library.
def test_create_cluster():
# Create a mock WorkspaceClient.
mock_workspace_client = create_autospec(WorkspaceClient)
# Set the mock WorkspaceClient's clusters.create().cluster_id value.
mock_workspace_client.clusters.create.return_value.cluster_id = '123abc'
# Call the actual function but with the mock WorkspaceClient.
# Replace <spark-version> with the target Spark version string.
# Replace <node-type-id> with the target node type string.
response = create_cluster(
w = mock_workspace_client,
cluster_name = 'Test Cluster',
spark_version = '<spark-version>',
node_type_id = '<node-type-id>',
autotermination_minutes = 15,
num_workers = 1
)
# Assert that the function returned the mocked cluster ID.
assert response.cluster_id == '123abc'
このテストを実行するには、コードプロジェクトのルートからpytest
コマンドを実行します。
$ pytest
=================== test session starts ====================
platform darwin -- Python 3.12.2, pytest-8.1.1, pluggy-1.4.0
rootdir: <project-rootdir>
collected 1 item
test_helpers.py . [100%]
======================== 1 passed ==========================