Databricks SDK for Python

注:

Databricks では、ジョブやその他の Databricks リソースをソース コードとして作成、開発、デプロイ、テストするために、Databricks アセット バンドルを推奨しています。 「Databricks アセットバンドルとは」を参照してください。

この記事では、 Databricks SDK for Python を使用して Databricks の運用を自動化し、開発を加速する方法について説明します。 この記事は、Read The Docs の Databricks SDK for Python ドキュメント と、GitHub の Databricks SDK for Python リポジトリの コード例 を補足するものです。

注:

用のDatabricksSDK Pythonは ベータ版 ですので、本番運用で使用しても問題ありません。

ベータ期間中は、コードが依存するDatabricks SDK for Pythonの特定のマイナーバージョンへの依存関係をピン留めすることをお勧めします。たとえば、venvの場合はrequirements.txt、詩の場合はpyproject.tomlpoetry.lockなどのファイルで依存関係をピン留めできます。依存関係の固定のPoetry細については、「venvの「仮想環境とパッケージ」またはPoetryの「依存関係のインストール」を参照してください。

始める前に

Databricks SDK for Pythonは、Databricksノートブック内で、あるいはローカルの開発マシンから使用できます。

Databricks SDK for Pythonの使用を開始する前に、開発マシンが以下の要件を満たしていることを確認してください。

  • Databricks認証が構成されています。

  • Python 3.8以降がインストールされていること。Databricksコンピュートリソースを自動化する場合、対象のDatabricksコンピュートリソースにインストールされているPythonのメジャーバージョンとマイナーバージョンを一致させることを推奨します。この記事の例では、Python 3.10がインストールされたDatabricks Runtime 13.3 LTSでクラスターを自動化する方法をとっています。正しいバージョンについては、「Databricks Runtimeリリースノートのバージョンと互換性」を参照してください。

  • Databricks では、Databricks SDK for Python で使用する Python プロジェクトごとに Python 仮想環境 を作成してアクティブ化することをお勧めします。 Python 仮想環境は、コード プロジェクトが互換性のあるバージョンの Python と Python パッケージ (この場合は Databricks SDK for Python パッケージ) を使用していることを確認するのに役立ちます。 Python仮想環境の詳細については、venv または Poetryを参照してください。

Databricks SDK for Pythonの使用を開始する

このセクションでは、ローカルの開発マシンからDatabricks SDK for Pythonを使用する方法について説明します。Databricksノートブック内からDatabricks SDK for Pythonを使用するには、「DatabricksノートブックからDatabricks SDK for Pythonを使用する」に進んでください。

  1. Databricks認証が設定され、Pythonがすでにインストールされており、Python仮想環境が有効になっている開発マシン上で、Python Package Index (PyPI) から databricks-sdk パッケージ (とその依存関係) を以下のようにインストールします。

    pipで、databricks-sdkパッケージをインストールします。(システムによっては、こちらと全体で、pip3pipに置き換える必要があるかもしれません。)

    pip3 install databricks-sdk
    
    poetry add databricks-sdk
    

    Databricks SDK for Pythonのベータ版中にdatabricks-sdkパッケージの特定のバージョンをインストールするには、そのパッケージのリリース履歴を参照してください。例えば、0.1.6のバージョンをインストールする場合は以下のようになります。

    pip3 install databricks-sdk==0.1.6
    
    poetry add databricks-sdk==0.1.6
    

    既存のDatabricks SDK for Pythonパッケージのインストールを最新バージョンにアップグレードするには、以下のコマンドを実行します。

    pip3 install --upgrade databricks-sdk
    
    poetry add databricks-sdk@latest
    

    Databricks SDK for Pythonパッケージの現在のVersionおよびその他の詳細を表示するには、以下のコマンドを実行します。

    pip3 show databricks-sdk
    
    poetry show databricks-sdk
    
  2. Pythonの仮想環境で、Databricks SDK for PythonをインポートするPython コードファイルを作成します。次の例では、以下の内容を含む main.py という名前のファイル内に、Databricksワークスペース内のすべてのクラスターが一覧表示されています。

    from databricks.sdk import WorkspaceClient
    
    w = WorkspaceClient()
    
    for c in w.clusters.list():
      print(c.cluster_name)
    
  3. python コマンドを実行して、ファイル名が main.py であると仮定してPythonコードファイルを実行します。

    python3.10 main.py
    

    仮想環境のシェルにいる場合:

    python3.10 main.py
    

    仮想環境のシェルにいない場合:

    poetry run python3.10 main.py
    

    注:

    前述の w = WorkspaceClient()の呼び出しで引数を設定しないことにより、Databricks SDK for Pythonはデフォルトのプロセスを使用してDatabricks認証を実行しようとします。このデフォルトの動作をオーバーライドするには、次の認証セクションを参照してください。

Databricksアカウントまたはワークスペースを使用してDatabricks SDK for Pythonを認証する

このセクションでは、ローカルの開発マシンかDatabricks アカウントまたはワークスペースに対してDatabricks SDK for Pythonを認証する方法について説明します。Databricksノートブック内からDatabricks SDK for Pythonを認証するには、「DatabricksノートブックからDatabricks SDK for Pythonを使用する」に進んでください。

Databricks SDK for Python は、 Databricks クライアント統合認証 標準を実装しており、認証に対する統合された一貫性のあるアーキテクチャおよびプログラムによるアプローチです。 このアプローチにより、Databricks による認証の設定と自動化をより一元化し、予測可能にすることができます。 これにより、Databricks 認証を一度構成すると、認証構成をさらに変更することなく、その構成を複数の Databricks ツールや SDK で使用できます。 詳細については、Pythonのより完全なコード例を含む、「クライアント統合認証Databricks」を参照してください。

Databricks SDK for Pythonを使用してDatabricks認証を初期化するために使用できるコーディングのパターンには、以下のような例が挙げられます。

  • 以下のいずれかを実行して、Databricksのデフォルト認証を使用します。

    • ターゲット Databricks 認証の種類に必要なフィールドを持つカスタム Databricks 構成プロファイル を作成または識別します。 次に、 DATABRICKS_CONFIG_PROFILE 環境変数をカスタム構成プロファイルの名前に設定します。

    • ターゲットのDatabricks認証タイプに必要とされる環境変数を設定します。

    次に、たとえば次のようにDatabricksのデフォルト認証でWorkspaceClientオブジェクトをインスタンス化します。

    from databricks.sdk import WorkspaceClient
    
    w = WorkspaceClient()
    # ...
    
  • 必須フィールドのハードコーディングはサポートされていますが、Databricksパーソナルアクセストークンなどのコード内の機密情報が公開される危険があるため推奨はされません。以下の例では、Databricksトークン認証用にDatabricksホストとアクセストークンの値がハードコーディングされています。

    from databricks.sdk import WorkspaceClient
    
    w = WorkspaceClient(
      host  = 'https://...',
      token = '...'
    )
    # ...
    

Databricks SDK for Pythonのドキュメントの「認証」も参照してください。

DatabricksノートブックからDatabricks SDK for Pythonを使用する

Databricks SDK for Python の機能は、Databricks SDK for Python がインストールされた Databricks クラスターがアタッチされた Databricks ノートブックから呼び出すことができます。 これは、Databricks Databricks Runtime13.3 以上を使用するすべての クラスターにデフォルトによってインストールされます。LTSDatabricks Runtime 12.2 LTS 以下を使用する Databricks クラスターの場合は、最初に Databricks SDK for Python をインストールする必要があります。 「ステップ 1: の をインストールまたはアップグレードDatabricksSDK Pythonする」を参照してください。

DatabricksSDKPython特定のDatabricks Runtime バージョンにインストールされている バージョンの を確認するには、そのバージョンのDatabricks Runtime リリースノートの 「InstalledPython ライブラリ 」セクションを参照してください。

Databricksでは、利用可能な最新バージョンのSDK を PiPy からインストールすることをお勧めしますが、 DatabricksSDKPython0.6.0 以降では、すべての バージョンでデフォルトDatabricks ノートブック認証が使用されるため、少なくとも 0.6.0Databricks Runtime 以降をインストールするか、アップグレードすることをお勧めします。

注:

15.1 は、アップグレード不要のデフォルトの 認証をサポートする for the (0.20.0)Databricks Runtime のバージョンをインストールした最初の です。Databricks RuntimeDatabricksSDKPython

次の表は、Databricks SDK for Python バージョンと Databricks Runtime バージョンのノートブック認証サポートの概要を示しています。

SDK/DBRの

10.4 LTS

11.3 LTS

12.3 LTSの

13.3 LTS

14.3 LTSの

15.1 以上

0.1.7 およびそれ以下

0.1.10

0.6.0

0.20.0 以降

デフォルトのDatabricksノートブック認証は、Databricksがバックグラウンドで自動的に生成する一時的なDatabricks個人用アクセストークンに依存しています。Databricksはノートブックの実行を停止した後、この一時トークンを削除します。

重要

  • デフォルトのDatabricksノートブック認証はクラスターのドライバーノードでのみ動作し、クラスターのワーカーノードやエグゼキューターノードでは動作しません。

  • Databricksノートブック認証はDatabricks構成プロファイルでは機能しません。

DatabricksアカウントレベルのAPIs Databricksを呼び出す場合、またはデフォルトDatabricks ノートブック 認証以外の 認証タイプを使用する場合は、次の認証タイプもサポートされています。

認証タイプ

Databricks SDK for Pythonバージョン

OAuthマシン間 (M2M) 認証

0.19.0以上

OAuthユーザー対マシン (U2M) 認証

0.19.0以上

Google Cloudの認証情報認証

0.14.0以上

Google Cloud ID認証

0.14.0以上

Databricks個人用アクセストークン認証

すべてのバージョン

ステップ1: Databricks SDK for Pythonをインストールまたはアップグレードする

  1. Databricks Pythonノートブックでは、他のPythonライブラリと同様にDatabricks SDK for Pythonを使用できます。接続されているDatabricksクラスタにDatabricks SDK for Pythonライブラリをインストールまたはアップグレードするには、ノートブックセルから%pipマジックコマンドを次のように実行します。

    %pip install databricks-sdk --upgrade
    
  2. %pipマジックコマンドを実行した後、インストールもしくはアップグレードしたライブラリをノートブックで使用できるようにするために、Pythonを再起動する必要があります。これを行うには、%pipのマジックコマンドのあるセルの直後のノートブックセルから以下のコマンドを実行します。

    dbutils.library.restartPython()
    
  3. インストールされているDatabricks SDK for Pythonのバージョンを表示するには、ノートブックセルから次のコマンドを実行します。

    %pip show databricks-sdk | grep -oP '(?<=Version: )\S+'
    

ステップ2: コードを実行する

ノートブックのセルで、Databricks SDK for Pythonをインポートして呼び出すPythonコードを作成します。以下の例では、デフォルトのDatabricksノートブック認証を使用して、Databricksワークスペース内のクラスターを一覧表示します。

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

for c in w.clusters.list():
  print(c.cluster_name)

このセルを実行すると、Databricksワークスペースで使用可能なすべてのクラスターの名前が一覧表示されます。

別の Databricks 認証の種類を使用するには、「 Databricks 認証方法 」を参照し、対応するリンクをクリックして追加の技術的な詳細を確認してください。

Databricksユーティリティを使用する

Databricksユーティリティ (dbutils) リファレンスは、ローカルの開発マシンで実行されているDatabricks SDK for Pythonコードから、もしくはDatabricksノートブック内から呼び出すことができます。

  • ローカルの開発マシンからDatabricksユーティリティがアクセスできるものは、dbutils.fsdbutils.secretsdbutils.widgets、およびdbutils.jobsのコマンドグループのみです。

  • Databricksクラスターに接続されているDatabricksノートブックから、Databricksユーティリティは、dbutils.fsdbutils.secrets 、および dbutils.widgets だけでなく、使用可能なすべてのDatabricksユーティリティのコマンドグループにアクセスできます。さらに、 dbutils.notebook コマンドグループは2つのレベルのコマンドのみに制限されます。(例:dbutils.notebook.run または dbutils.notebook.exit

ローカルの開発マシンまたはDatabricksノートブックからDatabricksユーティリティを呼び出すには、WorkspaceClient内で dbutilsを使用します。このコード例では、デフォルトのDatabricksノートブック認証を使用してWorkspaceClient内のdbutilsを呼び出し、ワークスペースのDBFSルートにあるすべてのオブジェクトのパスを一覧表示します。

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
d = w.dbutils.fs.ls('/')

for f in d:
  print(f.path)

または、dbutilsを直接呼び出すこともできます。ただし、使用できるのはデフォルトのDatabricksノートブック認証のみに限定されます。このコード例では、dbutilsを直接呼び出して、ワークスペースのDBFSルートにあるすべてのオブジェクトを一覧表示します。

from databricks.sdk.runtime import *

d = dbutils.fs.ls('/')

for f in d:
  print(f.path)

Unity Catalogボリュームにアクセスするには、WorkspaceClient内でfilesを使用します。Unity Catalogボリューム内のファイルを管理する」を参照してください。dbutilsを単独で、またはWorkspaceClient内で使用してボリュームにアクセスすることはできません。

dbutilsの操作も参照してください。

コード例

以下のコード例では、Databricks SDK for Pythonを使用してクラスターの作成と削除、ジョブの実行、アカウントレベルのグループの一覧を行う方法を示します。これらのコード例では、デフォルトのDatabricksノートブック認証を使用します。デフォルトのDatabricksノートブック認証の詳細については、「DatabricksノートブックからDatabricks SDK for Pythonを使用する」を参照してください。ノートブック以外のデフォルト認証の詳細については、「Databricksアカウントまたはワークスペースを使用してDatabricks SDK for Pythonを認証する」を参照してください。

その他のコード例については、GitHubのDatabricks SDK for Pythonリポジトリのexamplesを参照してください。関連項目は次を参照してください。

クラスターを作成する

このコード例では、指定されたDatabricks Runtimeのバージョンとクラスターのノードタイプを用いてクラスターを作成します。このクラスターは1つのワーカーを持ち、クラスターのアイドル状態が15分経過すると自動的に終了します。

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

print("Attempting to create the cluster. Please wait...")

c = w.clusters.create_and_wait(
  cluster_name             = 'my-cluster',
  spark_version            = '12.2.x-scala2.12',
  node_type_id             = 'n2-highmem-4',
  autotermination_minutes = 15,
  num_workers              = 1
)

print(f"View the cluster at " \
      f"{w.config.host}#setting/clusters/{c.cluster_id}/configuration\n")

クラスターを完全に削除する

このコード例では、指定されたクラスターIDを持つクラスターをワークスペースから完全に削除します。

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

c_id = input('ID of cluster to delete (for example, 1234-567890-ab123cd4): ')

w.clusters.permanent_delete(cluster_id = c_id)

ジョブを作成する

このコード例では、指定されたクラスター上で特定のノートブックを実行するDatabricksジョブを作成します。コードが実行されると、既存のノートブックのパス、既存のクラスターID、および関連するジョブ設定がターミナルのユーザーから取得されます。

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import Task, NotebookTask, Source

w = WorkspaceClient()

job_name            = input("Some short name for the job (for example, my-job): ")
description         = input("Some short description for the job (for example, My job): ")
existing_cluster_id = input("ID of the existing cluster in the workspace to run the job on (for example, 1234-567890-ab123cd4): ")
notebook_path       = input("Workspace path of the notebook to run (for example, /Users/someone@example.com/my-notebook): ")
task_key            = input("Some key to apply to the job's tasks (for example, my-key): ")

print("Attempting to create the job. Please wait...\n")

j = w.jobs.create(
  name = job_name,
  tasks = [
    Task(
      description = description,
      existing_cluster_id = existing_cluster_id,
      notebook_task = NotebookTask(
        base_parameters = dict(""),
        notebook_path = notebook_path,
        source = Source("WORKSPACE")
      ),
      task_key = task_key
    )
  ]
)

print(f"View the job at {w.config.host}/#job/{j.job_id}\n")

Unity Catalogボリューム内のファイルを管理する

このコード例では、WorkspaceClient内のfiles機能に対するさまざまな呼び出しを行い、Unity Catalogのボリュームにアクセスします。

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

# Define volume, folder, and file details.
catalog            = 'main'
schema             = 'default'
volume             = 'my-volume'
volume_path        = f"/Volumes/{catalog}/{schema}/{volume}" # /Volumes/main/default/my-volume
volume_folder      = 'my-folder'
volume_folder_path = f"{volume_path}/{volume_folder}" # /Volumes/main/default/my-volume/my-folder
volume_file        = 'data.csv'
volume_file_path   = f"{volume_folder_path}/{volume_file}" # /Volumes/main/default/my-volume/my-folder/data.csv
upload_file_path   = './data.csv'

# Create an empty folder in a volume.
w.files.create_directory(volume_folder_path)

# Upload a file to a volume.
with open(upload_file_path, 'rb') as file:
  file_bytes = file.read()
  binary_data = io.BytesIO(file_bytes)
  w.files.upload(volume_file_path, binary_data, overwrite = True)

# List the contents of a volume.
for item in w.files.list_directory_contents(volume_path):
  print(item.path)

# List the contents of a folder in a volume.
for item in w.files.list_directory_contents(volume_folder_path):
  print(item.path)

# Print the contents of a file in a volume.
resp = w.files.download(volume_file_path)
print(str(resp.contents.read(), encoding='utf-8'))

# Delete a file from a volume.
w.files.delete(volume_file_path)

# Delete a folder from a volume.
w.files.delete_directory(volume_folder_path)

アカウントレベルのグループを一覧表示する

このコード例では、Databricksアカウント内で使用可能なすべてのグループの表示名を一覧表示します。

from databricks.sdk import AccountClient

a = AccountClient()

for g in a.groups.list():
  print(g.display_name)

テスト

コードをテストするには、pytestのようなPythonのテストフレームワークを使います。Databricks REST APIエンドポイントを呼び出したり、Databricksアカウントやワークスペースの状態を変更することなく、シミュレートされた条件下でコードをテストするには、unittest.mockなどのPythonモッキングライブラリを使用します。

ヒント

Databricks Labs には、Databricks との統合テストを簡略化するための pytest プラグイン と、コードの品質を確保するための pylint プラグイン が用意されています。

次の helpers.py という名前のファイルの例には、新しいクラスターに関する情報を返す create_cluster 関数が含まれています。

# helpers.py

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.compute import ClusterDetails

def create_cluster(
  w: WorkspaceClient,
  cluster_name:            str,
  spark_version:           str,
  node_type_id:            str,
  autotermination_minutes: int,
  num_workers:             int
) -> ClusterDetails:
  response = w.clusters.create(
    cluster_name            = cluster_name,
    spark_version           = spark_version,
    node_type_id            = node_type_id,
    autotermination_minutes = autotermination_minutes,
    num_workers             = num_workers
  )
  return response

次の main.py という名前のファイルが create_cluster 関数を呼び出すとします。

# main.py

from databricks.sdk import WorkspaceClient
from helpers import *

w = WorkspaceClient()

# Replace <spark-version> with the target Spark version string.
# Replace <node-type-id> with the target node type string.
response = create_cluster(
  w = w,
  cluster_name            = 'Test Cluster',
  spark_version           = '<spark-version>',
  node_type_id            = '<node-type-id>',
  autotermination_minutes = 15,
  num_workers             = 1
)

print(response.cluster_id)

次のtest_helpers.pyという名前のファイルはcreate_cluster関数が期待された応答を返すかどうかをテストします。このテストでは、対象のワークスペースにクラスターを作成するのではなく、WorkspaceClientオブジェクトをモックし、モックしたオブジェクトの設定を定義してから、create_cluster関数にモックしたオブジェクトを渡します。次に、この関数が新しいモックされたクラスタのIDを返すかどうかをチェックします。

# test_helpers.py

from databricks.sdk import WorkspaceClient
from helpers import *
from unittest.mock import create_autospec # Included with the Python standard library.

def test_create_cluster():
  # Create a mock WorkspaceClient.
  mock_workspace_client = create_autospec(WorkspaceClient)

  # Set the mock WorkspaceClient's clusters.create().cluster_id value.
  mock_workspace_client.clusters.create.return_value.cluster_id = '123abc'

  # Call the actual function but with the mock WorkspaceClient.
  # Replace <spark-version> with the target Spark version string.
  # Replace <node-type-id> with the target node type string.
  response = create_cluster(
    w = mock_workspace_client,
    cluster_name            = 'Test Cluster',
    spark_version           = '<spark-version>',
    node_type_id            = '<node-type-id>',
    autotermination_minutes = 15,
    num_workers             = 1
  )

  # Assert that the function returned the mocked cluster ID.
  assert response.cluster_id == '123abc'

このテストを実行するには、コードプロジェクトのルートからpytestコマンドを実行します。

$ pytest
=================== test session starts ====================
platform darwin -- Python 3.12.2, pytest-8.1.1, pluggy-1.4.0
rootdir: <project-rootdir>
collected 1 item

test_helpers.py . [100%]
======================== 1 passed ==========================