Databricks SQL Connector for Python

Databricks SQL Connector for Python は、Python コードを使用して Databricks クラスターおよび Databricks SQLウェアハウスで SQL コマンドを実行できるようにする Python ライブラリです。Databricks SQL Connector for Python は、 pyodbc などの同様の Python ライブラリよりもセットアップと使用が簡単です。 このライブラリは PEP 249 – Python データベース API 仕様 v2.0 に準拠しています。

Databricks SQL Connector for Python には、Databricks 用の SQLAlchemy ダイアレクトも含まれています。 「 Databricks で SQLAlchemy を使用する」を参照してください。

要件

  • Python >=3.8 および <=3.11 を実行している開発マシン。

  • Databricks では、Python に含まれる venv によって提供されるものなど、Python 仮想環境を使用することをお勧めします。 仮想環境は、正しいバージョンの Python と Databricks SQL Connector for Python を一緒に使用するのに役立ちます。 仮想環境の設定と使用は、この記事の範囲外です。 詳細については、「 仮想環境の作成」を参照してください。

  • 既存のクラスターまたはSQL ウェアハウス

はじめに

  • pip install databricks-sql-connector または python -m pip install databricks-sql-connectorを実行して、開発用コンピューターに Databricks SQL Connector for Python ライブラリをインストールします。

  • 使用するクラスターまたは SQLウェアハウスに関する次の情報を収集します。

    • SQLウェアハウスのサーバーホスト名。 これは、SQLウェアハウスの「 接続の詳細 」タブの 「サーバー・ホスト名 」の値から取得できます。

    • SQLウェアハウスの HTTP パス。 これは、SQLウェアハウスの「 接続の詳細 」タブの 「HTTPパス 」値から取得できます。

認証

Databricks SQL Connector for Python では、次の Databricks 認証の種類がサポートされています。

Databricks SQL Connector for Python は、次の Databricks 認証タイプをまだサポートしていません。

Databricks個人用アクセストークン認証

Databricks 個人用アクセストークン認証で Databricks SQL Connector for Python を使用するには、まず次のように Databricks 個人 用アクセストークンを作成する必要があります。

  1. Databricks ワークスペースで、上部のバーにある Databricks ユーザー名をクリックし、ドロップダウンから[設定]を選択します。

  2. [ 開発者] をクリックします。

  3. [アクセストークン] の横にある [管理] をクリックします。

  4. [ 新しいトークンの生成] をクリックします。

  5. (任意)今後このトークンを識別するのに役立つコメントを入力し、トークンのデフォルトの有効期間である90日を変更します。有効期間のないトークンを作成するには(非推奨)、[有効期間 (日) ] ボックスを空白のままにしてください。

  6. [生成] をクリックします。

  7. 表示されたトークンを安全な場所にコピーし、[完了] をクリックします。

コピーしたトークンは、必ず安全な場所に保存してください。 コピーしたトークンを他のユーザーと共有しないでください。 コピーしたトークンを紛失した場合、まったく同じトークンを再生成することはできません。 代わりに、この手順を繰り返して新しいトークンを作成する必要があります。 コピーしたトークンを紛失した場合、またはトークンが侵害されたと思われる場合は、アクセストークン ページでトークンの横にあるごみ箱 (取り消し) アイコンをクリックして、ワークスペースからそのトークンをすぐに削除することを強くお勧めします。

ワークスペースでトークンを作成または使用できない場合は、ワークスペース管理者がトークンを無効にしているか、トークンを作成または使用する権限を付与していない可能性があります。 ワークスペース管理者または次のトピックを参照してください。

Databricks SQL Connector for Python を認証するには、次のコード スニペットを使用します。 このスニペットは、次の環境変数が設定されていることを前提としています。

  • DATABRICKS_SERVER_HOSTNAMEをクラスターまたは SQLウェアハウスの [Server Hostname ] の値に設定します。

  • DATABRICKS_HTTP_PATHで、クラスターまたは SQLウェアハウスの HTTP パス 値に設定します。

  • DATABRICKS_TOKENを Databricks personal アクセストークンに設定します。

環境変数を設定するには、ご利用になっているオペレーティングシステムのドキュメントを参照してください。

from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                 access_token    = os.getenv("DATABRICKS_TOKEN")) as connection:
# ...

OAuthマシン間(M2M)認証

Databricks SQL Connector for Python バージョン 2.7.0 以降では、OAuth マシン間 (M2M) 認証がサポートされています。 Databricks SDK for Python 0.18.0 以降もインストールする必要があります (たとえばpip install databricks-sdkまたはpython -m pip install databricks-sdkを実行します)。

OAuth M2M 認証で Databricks SQL Connector for Python を使用するには、次の操作を行う必要があります。

  1. Databricks ワークスペースに Databricks サービスプリンシパルを作成し、そのサービスプリンシパルの OAuth シークレットを作成します。

    サービスプリンシパルとそのOAuth シークレットを作成するには、「 Databricksを使用してサービスプリンシパルで へのアクセスを認証するOAuth (OAuth M2M)」 を参照してください。サービスプリンシパルの UUID または Application ID の値と、サービスプリンシパルの シークレットの SecretOAuth 値をメモします。

  2. そのサービスプリンシパルにクラスターまたはウェアハウスへのアクセス権を付与します。

    サービスプリンシパルにクラスターまたはウェアハウスへのアクセスを許可するには、 「コンピュート権限」または「SQL ウェアハウスの管理」を参照してください。

Databricks SQL Connector for Python を認証するには、次のコード スニペットを使用します。 このスニペットは、次の環境変数が設定されていることを前提としています。

  • DATABRICKS_SERVER_HOSTNAME をクラスターまたは SQLウェアハウスの [Server Hostname ] の値に設定します。

  • DATABRICKS_HTTP_PATHで、クラスターまたは SQLウェアハウスの HTTP パス 値に設定します。

  • DATABRICKS_CLIENT_IDは、サービスプリンシパルの UUID または アプリケーション ID の値に設定されます。

  • DATABRICKS_CLIENT_SECRETで、サービスプリンシパルの OAuth シークレットの Secret 値に設定します。

環境変数を設定するには、ご利用になっているオペレーティングシステムのドキュメントを参照してください。

from databricks.sdk.core import Config, oauth_service_principal
from databricks import sql
import os

server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME")

def credential_provider():
  config = Config(
    host          = f"https://{server_hostname}",
    client_id     = os.getenv("DATABRICKS_CLIENT_ID"),
    client_secret = os.getenv("DATABRICKS_CLIENT_SECRET"))
  return oauth_service_principal(config)

with sql.connect(server_hostname      = server_hostname,
                 http_path            = os.getenv("DATABRICKS_HTTP_PATH"),
                 credentials_provider = credential_provider) as connection:
# ...

OAuthユーザー対マシン(U2M)認証

Databricks SQL コネクタ for Python バージョン 3.0.3 以降では、 OAuth ユーザー対マシン (U2M) 認証がサポートされています。 Databricks SDK for Python 0.19.0 以降もインストールする必要があります (たとえばpip install databricks-sdkまたはpython -m pip install databricks-sdkを実行します)。

OAuth U2M 認証を使用して Databricks SQL Connector for Python を認証するには、次のコード スニペットを使用します。 OAuth U2M 認証では、リアルタイムの人間のサインインと同意を使用して、ターゲット Databricks ユーザー アカウントを認証します。 このスニペットは、次の環境変数が設定されていることを前提としています。

  • DATABRICKS_SERVER_HOSTNAMEをクラスターまたは SQL ウェアハウスのサーバー ホスト名の値に設定します。

  • DATABRICKS_HTTP_PATHをクラスターまたは SQL ウェアハウスのHTTP パス値に設定します。

環境変数を設定するには、ご利用になっているオペレーティングシステムのドキュメントを参照してください。

from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                 auth_type       = "databricks-oauth") as connection:
# ...

次のコード例は、Databricks SQL コネクタ for Python を使用して、データのクエリと挿入、メタデータのクエリー、カーソルと接続の管理、およびログの構成を行う方法を示しています。

次のコード例は、認証に Databricks 個人用アクセストークンを使用する方法を示しています。 代わりに、他の使用可能な Databricks 認証の種類を使用するには、「 認証」を参照してください。

これらのコード例では、これらの環境変数から server_hostnamehttp_path、および access_token 接続変数の値を取得します。

  • DATABRICKS_SERVER_HOSTNAMEこれは、要件の サーバ ホスト名 の値を表します。

  • DATABRICKS_HTTP_PATHこれは、要件からの HTTP パス 値を表します。

  • DATABRICKS_TOKENこれは、要件からのアクセストークンを表します。

これらの接続変数の値を取得するには、他の方法を使用できます。 環境変数の使用は、多くのアプローチの 1 つにすぎません。

データのクエリー

次のコード例は、Databricks SQL Connector for Python を呼び出して、クラスターまたは SQLウェアハウスで基本的な SQL コマンドを実行する方法を示しています。 このコマンドは、samples カタログのnyctaxiスキーマのtripsテーブルから最初の 2 行を返します。

from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                 access_token    = os.getenv("DATABRICKS_TOKEN")) as connection:

  with connection.cursor() as cursor:
    cursor.execute("SELECT * FROM samples.nyctaxi.trips LIMIT 2")
    result = cursor.fetchall()

    for row in result:
      print(row)

データの挿入

次の例は、少量のデータ (数千行) を挿入する方法を示しています。

from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                 access_token    = os.getenv("DATABRICKS_TOKEN")) as connection:

  with connection.cursor() as cursor:
    cursor.execute("CREATE TABLE IF NOT EXISTS squares (x int, x_squared int)")

    squares = [(i, i * i) for i in range(100)]
    values = ",".join([f"({x}, {y})" for (x, y) in squares])

    cursor.execute(f"INSERT INTO squares VALUES {values}")

    cursor.execute("SELECT * FROM squares LIMIT 10")

    result = cursor.fetchall()

    for row in result:
      print(row)

大量のデータの場合は、最初にデータをクラウドストレージにアップロードしてから、 COPY INTO コマンドを実行する必要があります。

クエリ メタデータ

メタデータを取得するための専用のメソッドがあります。 次の例では、サンプル テーブルの列に関するメタデータを取得します。

from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                 access_token    = os.getenv("DATABRICKS_TOKEN")) as connection:

  with connection.cursor() as cursor:
    cursor.columns(schema_name="default", table_name="squares")
    print(cursor.fetchall())

カーソルと接続を管理する

使用しなくなった接続とカーソルを閉じるのがベスト・プラクティスです。 これにより、Databricks クラスターと Databricks SQL ウェアハウスのリソースが解放されます。

コンテキストマネージャ(前の例で使用した with 構文)を使用してリソースを管理するか、明示的に closeを呼び出すことができます。

from databricks import sql
import os

connection = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                         http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                         access_token    = os.getenv("DATABRICKS_TOKEN"))

cursor = connection.cursor()

cursor.execute("SELECT * from range(10)")
print(cursor.fetchall())

cursor.close()
connection.close()

Unity Catalogボリューム内のファイルを管理する

Databricks SQL コネクタを使用すると、次の例に示すように、Unity Catalogボリュームにローカル ファイルを書き込んだり、ボリュームからファイルをダウンロードしたり、ボリュームからファイルを削除したりできます。

from databricks import sql
import os

# For writing local files to volumes and downloading files from volumes,
# you must set the staging_allows_local_path argument to the path to the
# local folder that contains the files to be written or downloaded.
# For deleting files in volumes, you must also specify the
# staging_allows_local_path argument, but its value is ignored,
# so in that case its value can be set for example to an empty string.
with sql.connect(server_hostname            = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path                  = os.getenv("DATABRICKS_HTTP_PATH"),
                 access_token               = os.getenv("DATABRICKS_TOKEN"),
                 staging_allowed_local_path = "/tmp/") as connection:

  with connection.cursor() as cursor:

    # Write a local file to the specified path in a volume.
    # Specify OVERWRITE to overwrite any existing file in that path.
    cursor.execute(
      "PUT '/temp/my-data.csv' INTO '/Volumes/main/default/my-volume/my-data.csv' OVERWRITE"
    )

    # Download a file from the specified path in a volume.
    cursor.execute(
      "GET '/Volumes/main/default/my-volume/my-data.csv' TO '/tmp/my-downloaded-data.csv'"
    )

    # Delete a file from the specified path in a volume.
    cursor.execute(
      "REMOVE '/Volumes/main/default/my-volume/my-data.csv'"
    )

ログの構成

Databricks SQL コネクタは、Python の 標準ログ モジュールを使用します。 ログ レベルは次のように構成できます。

from databricks import sql
import os, logging

logging.getLogger("databricks.sql").setLevel(logging.DEBUG)
logging.basicConfig(filename = "results.log",
                    level    = logging.DEBUG)

connection = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                         http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                         access_token    = os.getenv("DATABRICKS_TOKEN"))

cursor = connection.cursor()

cursor.execute("SELECT * from range(10)")

result = cursor.fetchall()

for row in result:
   logging.debug(row)

cursor.close()
connection.close()

テスティング

コードをテストするには、 pytestなどの Python テスト フレームワークを使用します。 Databricks REST API エンドポイントを呼び出さずに、または Databricks アカウントやワークスペースの状態を変更せずに、シミュレートされた条件下でコードをテストするには、 unittest.mockなどの Python モック ライブラリを使用できます。

たとえば、 Databricks個人用アクセストークンを使用してDatabricksワークスペースへの接続を返す get_connection_personal_access_token 関数と、接続を使用して samples カタログの nyctaxi スキーマの trips テーブルから指定された数のデータ行を取得する select_nyctaxi_trips 関数を含む helpers.py という名前の次のファイルがあるとします。

# helpers.py

from databricks import sql
from databricks.sql.client import Connection, List, Row, Cursor

def get_connection_personal_access_token(
  server_hostname: str,
  http_path: str,
  access_token: str
) -> Connection:
  return sql.connect(
    server_hostname = server_hostname,
    http_path = http_path,
    access_token = access_token
  )

def select_nyctaxi_trips(
  connection: Connection,
  num_rows: int
) -> List[Row]:
  cursor: Cursor = connection.cursor()
  cursor.execute(f"SELECT * FROM samples.nyctaxi.trips LIMIT {num_rows}")
  result: List[Row] = cursor.fetchall()
  return result

また、get_connection_personal_access_token 関数と select_nyctaxi_trips 関数を呼び出す main.py という名前の次のファイルがあるとします。

# main.py

from databricks.sql.client import Connection, List, Row
import os
from helpers import get_connection_personal_access_token, select_nyctaxi_trips

connection: Connection = get_connection_personal_access_token(
  server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
  http_path = os.getenv("DATABRICKS_HTTP_PATH"),
  access_token = os.getenv("DATABRICKS_TOKEN")
)

rows: List[Row] = select_nyctaxi_trips(
  connection = connection,
  num_rows = 2
)

for row in rows:
  print(row)

次の test_helpers.py という名前のファイルは、 select_nyctaxi_trips 関数が予期される応答を返すかどうかをテストします。 このテストは、ターゲットワークスペースへの実際の接続を作成するのではなく、 Connectionオブジェクトをモックします。 また、このテストでは、実際のデータに含まれるスキーマと値に準拠する一部のデータをモックします。 このテストでは、モックされた接続を介してモックされたデータが返され、モックされたデータ行の 1 つの値が期待値と一致するかどうかがチェックされます。

# test_helpers.py

import pytest
from databricks.sql.client import Connection, List, Row
from datetime import datetime
from helpers import select_nyctaxi_trips
from unittest.mock import create_autospec

@pytest.fixture
def mock_data() -> List[Row]:
  return [
    Row(
      tpep_pickup_datetime = datetime(2016, 2, 14, 16, 52, 13),
      tpep_dropoff_datetime = datetime(2016, 2, 14, 17, 16, 4),
      trip_distance = 4.94,
      fare_amount = 19.0,
      pickup_zip = 10282,
      dropoff_zip = 10171
    ),
    Row(
      tpep_pickup_datetime = datetime(2016, 2, 4, 18, 44, 19),
      tpep_dropoff_datetime = datetime(2016, 2, 4, 18, 46),
      trip_distance = 0.28,
      fare_amount = 3.5,
      pickup_zip = 10110,
      dropoff_zip = 10110
    )
  ]

def test_select_nyctaxi_trips(mock_data: List[Row]):
  # Create a mock Connection.
  mock_connection = create_autospec(Connection)

  # Set the mock Connection's cursor().fetchall() to the mock data.
  mock_connection.cursor().fetchall.return_value = mock_data

  # Call the real function with the mock Connection.
  response: List[Row] = select_nyctaxi_trips(
    connection = mock_connection,
    num_rows = 2)

  # Check the value of one of the mocked data row's columns.
  assert response[1].fare_amount == 3.5

select_nyctaxi_trips 関数には SELECT ステートメントが含まれているため、trips テーブルの状態は変更されないため、この例ではモック作成は絶対に必要というわけではありません。ただし、モックを使用すると、ワークスペースとの実際の接続が確立されるのを待たずに、テストをすばやく実行できます。 また、モックを使用すると、 INSERT INTOUPDATEDELETE FROMなど、テーブルの状態を変更する可能性のある関数に対して、シミュレートされたテストを複数回実行できます。

API リファレンス

パッケージ

databricks-sql-connector

使い: pip install databricks-sql-connector

Python パッケージインデックス (PyPI) の databricks-sql-connector も参照してください。

モジュール

databricks.sql

使い: from databricks import sql

クラス

選択されたクラスには、次のものが含まれます。

クラス

Connection

Databricksコンピュートリソースに関するセッション。

Cursor

データ・レコードをトラバースするためのメカニズム。

Row

SQL クエリ結果内のデータの行。

Connection クラス

Connectionオブジェクトを作成するには、次の引数を使用してdatabricks.sql.connectメソッドを呼び出します。

パラメーター

server_hostname

種類: str

クラスターのサーバーのホスト名。 サーバーのホスト名を取得するには、この記事で前述した手順を参照してください。

このパラメーターは必須です。

例: 1234567890123456.7.gcp.databricks.com

http_path

種類: str

クラスターの HTTP パス。 HTTP パスを取得するには、この記事で前述した手順を参照してください。

このパラメーターは必須です。

sql/protocolv1/o/1234567890123456/1234-567890-test123 クラスターの場合。 /sql/1.0/warehouses/a1b234c567d8e9fa : SQL ウェアハウス。

access_token

種類: str

クラスターのワークスペースの Databricks 個人用アクセストークン。 トークンを作成するには、この記事で前述した手順を参照してください。

このパラメーターは必須です。

例: dapi...<the-remaining-portion-of-your-token>

session_configuration

種類: dict[str, Any]

Spark セッション構成パラメーターのディクショナリ。 構成の設定は、 SET key=val SQL コマンドを使用することと同じです。 SQL コマンド SET -v を実行して、使用可能な構成の完全な一覧を取得します。

デフォルトは Noneです。

このパラメーターはオプションです。

例: {"spark.sql.variable.substitute": True}

http_headers

種類: List[Tuple[str, str]]]

クライアントが行うすべての RPC 要求の HTTP ヘッダーに設定する追加の (キーと値のペア。 一般的な使用法では、追加の HTTP ヘッダーは設定されません。 デフォルトは Noneです。

このパラメーターはオプションです。

バージョン 2.0 以降

catalog

種類: str

接続に使用する初期カタログ。 デフォルトから None (この場合、デフォルトカタログ、通常は hive_metastore が使用されます)。

このパラメーターはオプションです。

バージョン 2.0 以降

schema

種類: str

接続に使用する初期スキーマ。 デフォルトから None (この場合、デフォルトスキーマ default が使用されます)。

このパラメーターはオプションです。

バージョン 2.0 以降

use_cloud_fetch

種類: bool

True を使用して、フェッチ要求をクラウドオブジェクトストアに直接送信して、データのチャンクをダウンロードします。 False (デフォルト) を使用して、フェッチ要求を Databricks に直接送信します。

use_cloud_fetchTrue に設定されているが、ネットワーク アクセスがブロックされている場合、フェッチ要求は失敗します。

バージョン 2.8 以降

選択された Connection 方法には、次のものがあります。

メソッド

close

データベースへの接続を閉じ、サーバー上のすべての関連リソースを解放します。 この接続への追加の呼び出しは、 Errorをスローします。

パラメーターはありません。

戻り値はありません。

cursor

データベース内のレコードのトラバーサルを可能にする新しい Cursor オブジェクトを返します。

パラメーターはありません。

Cursor クラス

Cursor オブジェクトを作成するには、Connection クラスの cursor メソッドを呼び出します。

選択された Cursor 属性には、次のものがあります。

属性

arraysize

fetchmanyメソッドとともに使用し、内部バッファサイズを指定します。これは、一度にサーバーから実際にフェッチされる行数でもあります。デフォルト値は10000です。 幅の狭い結果 (各行に多くのデータが含まれていない結果) の場合は、パフォーマンスを向上させるためにこの値を増やす必要があります。

読み取り/書き込みアクセス。

description

tuple オブジェクトの Python list が含まれています。これらの tuple オブジェクトにはそれぞれ 7 つの値が含まれ、各 tuple オブジェクトの最初の 2 つの項目には、次のように 1 つの結果列を説明する情報が含まれています。

  • name: 列の名前。

  • type_code: 列の型を表す文字列。 たとえば、整数列の型コードは intになります。

各 7 項目 tuple オブジェクトの残りの 5 項目は実装されておらず、その値は定義されていません。 通常、これらは 4 つの None 値とそれに続く 1 つの True 値として返されます。

読み取り専用アクセス。

選択された Cursor 方法には、次のものがあります。

メソッド

cancel

カーソルが開始したデータベース クエリまたはコマンドの実行を中断します。 サーバー上の関連リソースを解放するには、 cancelメソッドを呼び出した後にcloseメソッドを呼び出します。

パラメーターはありません。

戻り値はありません。

close

カーソルを閉じ、サーバー上の関連リソースを解放します。 既に閉じているカーソルを閉じると、エラーがスローされる可能性があります。

パラメーターはありません。

戻り値はありません。

execute

データベースを準備し、クエリーまたはコマンドを実行します。

戻り値はありません。

パラメーター:

operation

種類: str

準備して実行するクエリーまたはコマンド。

このパラメーターは必須です。

parameters パラメーターを使用しない例:

cursor.execute(
 'SELECT * FROM samples.nyctaxi.trips WHERE pickup_zip="10019" LIMIT 2'
)

parameters パラメーターを使用した例:

cursor.execute(
 'SELECT * FROM samples.nyctaxi.trips WHERE zip=%(pickup_zip)s LIMIT 2',
 { 'pickup_zip': '10019' }
)

parameters

原稿種別: 辞書

operation パラメーターと共に使用するパラメーターのシーケンス。

このパラメーターはオプションです。 デフォルトは Noneです。

executemany

データベースを準備し、 seq_of_parameters 引数のすべてのパラメーター シーケンスを使用してクエリまたはコマンドを実行します。 最終的な結果セットのみが保持されます。

戻り値はありません。

パラメーター:

operation

種類: str

準備して実行するクエリーまたはコマンド。

このパラメーターは必須です。

seq_of_parameters

タイプ: list dict

operation パラメーターで使用するパラメーター値の多くのセットのシーケンス。

このパラメーターは必須です。

catalogs

カタログに関するメタデータ クエリを実行します。 実際の結果は、 fetchmany または fetchallを使用して取得する必要があります。

結果セットの重要なフィールドには、次のものがあります。

  • フィールド名: TABLE_CAT。 タイプ: str. カタログの名前。

パラメーターはありません。

戻り値はありません。

バージョン 1.0 以降

schemas

スキーマに関するメタデータ クエリを実行します。 実際の結果は、 fetchmany または fetchallを使用して取得する必要があります。

結果セットの重要なフィールドには、次のものがあります。

  • フィールド名: TABLE_SCHEM。 タイプ: str. スキーマの名前。

  • フィールド名: TABLE_CATALOG。 タイプ: str. スキーマが属するカタログ。

戻り値はありません。

バージョン 1.0 以降

パラメーター:

catalog_name

種類: str

情報を取得するカタログ名。 % 文字はワイルドカードとして解釈されます。

このパラメーターはオプションです。

schema_name

種類: str

情報を取得するスキーマ名。 % 文字はワイルドカードとして解釈されます。

このパラメーターはオプションです。

tables

テーブルとビューに関するメタデータ クエリを実行します。 実際の結果は、 fetchmany または fetchallを使用して取得する必要があります。

結果セットの重要なフィールドには、次のものがあります。

  • フィールド名: TABLE_CAT。 タイプ: str. 表が属するカタログ。

  • フィールド名: TABLE_SCHEM。 タイプ: str. テーブルが属するスキーマ。

  • フィールド名: TABLE_NAME。 タイプ: str. テーブルの名前。

  • フィールド名: TABLE_TYPE. 「 str. 関係の種類。たとえば、 VIEWまたはTABLE (Databricks Runtime 10.4 LTS 以降に適用されます。それより前のバージョンの Databricks Runtime では空の文字列が返されます)。

戻り値はありません。

バージョン 1.0 以降

パラメーター

catalog_name

種類: str

情報を取得するカタログ名。 % 文字はワイルドカードとして解釈されます。

このパラメーターはオプションです。

schema_name

種類: str

情報を取得するスキーマ名。 % 文字はワイルドカードとして解釈されます。

このパラメーターはオプションです。

table_name

種類: str

情報を取得するテーブル名。 % 文字はワイルドカードとして解釈されます。

このパラメーターはオプションです。

table_types

種類: List[str]

照合するテーブルタイプのリスト ( TABLEVIEWなど)。

このパラメーターはオプションです。

columns

列に関するメタデータ クエリを実行します。 実際の結果は、 fetchmany または fetchallを使用して取得する必要があります。

結果セットの重要なフィールドには、次のものがあります。

  • フィールド名: TABLE_CAT。 タイプ: str. 列が属するカタログ。

  • フィールド名: TABLE_SCHEM。 タイプ: str. 列が属するスキーマ。

  • フィールド名: TABLE_NAME。 タイプ: str. 列が属するテーブルの名前。

  • フィールド名: COLUMN_NAME。 タイプ: str. 列の名前。

戻り値はありません。

バージョン 1.0 以降

パラメーター:

catalog_name

種類: str

情報を取得するカタログ名。 % 文字はワイルドカードとして解釈されます。

このパラメーターはオプションです。

schema_name

種類: str

情報を取得するスキーマ名。 % 文字はワイルドカードとして解釈されます。

このパラメーターはオプションです。

table_name

種類: str

情報を取得するテーブル名。 % 文字はワイルドカードとして解釈されます。

このパラメーターはオプションです。

column_name

種類: str

情報を取得する列名。 % 文字はワイルドカードとして解釈されます。

このパラメーターはオプションです。

fetchall

クエリーのすべて (または残りのすべて) 行を取得します。

パラメーターはありません。

クエリーのすべての (または残りのすべての) 行を、 Row オブジェクトの Python list として返します。

execute メソッドの前回の呼び出しでデータが返されなかった場合、またはexecute呼び出しがまだ行われていない場合は、Errorをスローします。

fetchmany

クエリーの次の行を取得します。

クエリの次の行の最大size (またはsizeが指定されていない場合はarraysize属性) をRowオブジェクトの Python listとして返します。

フェッチする行数が size 行未満の場合は、残りのすべての行が返されます。

execute メソッドの前回の呼び出しでデータが返されなかった場合、またはexecute呼び出しがまだ行われていない場合は、Errorをスローします。

パラメーター:

size

種類: int

取得する次の行の数。

このパラメーターはオプションです。 指定しない場合は、 arraysize 属性の値が使用されます。

例: cursor.fetchmany(10)

fetchone

データセットの次の行を取得します。

パラメーターはありません。

データセットの次の行を Python tuple オブジェクトとして 1 つのシーケンスとして返し、使用可能なデータがなくなった場合は None を返します。

execute メソッドの前回の呼び出しでデータが返されなかった場合、またはexecute呼び出しがまだ行われていない場合は、Errorをスローします。

fetchall_arrow

クエリーのすべて (または残りのすべて) 行を PyArrow Table オブジェクトとして取得します。 非常に大量のデータを返すクエリーは、メモリ消費を減らすために代わりに fetchmany_arrow を使用する必要があります。

パラメーターはありません。

クエリーのすべて (または残りのすべて) 行を PyArrow テーブルとして返します。

execute メソッドの前回の呼び出しでデータが返されなかった場合、またはexecute呼び出しがまだ行われていない場合は、Errorをスローします。

バージョン 2.0 以降

fetchmany_arrow

クエリーの次の行を PyArrow Table オブジェクトとして取得します。

クエリの次の行のsize引数 (またはsizeが指定されていない場合はarraysize属性) までを Python PyArrow Tableオブジェクトとして返します。

execute メソッドの前回の呼び出しでデータが返されなかった場合、またはexecute呼び出しがまだ行われていない場合は、Errorをスローします。

バージョン 2.0 以降

パラメーター:

size

種類: int

取得する次の行の数。

このパラメーターはオプションです。 指定しない場合は、 arraysize 属性の値が使用されます。

例: cursor.fetchmany_arrow(10)

Row クラス

行クラスは、個々の結果行を表すタプルのようなデータ構造です。 行に "my_column"という名前の列が含まれている場合は、 row.my_columnを介して row"my_column" フィールドにアクセスできます。数値インデックスを使用して、 row[0]などのフィールドにアクセスすることもできます。 列名が属性メソッド名として許可されていない場合 (たとえば、数字で始まる場合)、フィールドには row["1_my_column"]としてアクセスできます。

バージョン 1.0 以降

選択された Row 方法は次のとおりです。

asDict

フィールド名でインデックス付けされた行の辞書表現を返します。 重複するフィールド名がある場合、重複するフィールドの 1 つ (ただし、1 つだけ) が辞書に返されます。 どの重複フィールドが返されるかは定義されていません。

パラメーターはありません。

フィールドの dict を返します。

型変換

次の表は、Apache Spark SQL データ型を同等の Python データ型にマップします。

Apache Spark SQL データ型

Python データ型

array

numpy.ndarray

bigint

int

binary

bytearray

boolean

bool

date

datetime.date

decimal

decimal.Decimal

double

float

int

int

map

str

null

NoneType

smallint

int

string

str

struct

str

timestamp

datetime.datetime

tinyint

int

トラブルシューティング

tokenAuthWrapperInvalidAccessToken: Invalid access token メッセージ

問題: コードを実行すると、 Error during request to server: tokenAuthWrapperInvalidAccessToken: Invalid access tokenのようなメッセージが表示されます。

考えられる原因: access_token に渡された値が有効な Databricks 個人用アクセストークンではありません。

推奨される修正: access_token に渡された値が正しいことを確認し、再試行してください。

gaierror(8, 'nodename nor servname provided, or not known') メッセージ

問題: コードを実行すると、 Error during request to server: gaierror(8, 'nodename nor servname provided, or not known')のようなメッセージが表示されます。

考えられる原因: server_hostname に渡された値が正しくホスト名ではありません。

推奨される修正: server_hostname に渡された値が正しいことを確認し、再試行してください。

サーバーのホスト名の検索の詳細については、「 Databricks コンピュート リソースの接続の詳細を取得する」を参照してください。

IpAclError メッセージ

問題: コードを実行すると、Databricks ノートブックでコネクタを使用しようとすると、 Error during request to server: IpAclValidation メッセージが表示されます。

考えられる原因: Databricks ワークスペースの IP 許可リストが有効になっている可能性があります。 IP 許可リストでは、Spark クラスターからコントロール プレーンへの接続は Default では許可されません。

推奨される修正方法: コンピュート プレーン サブネットを IP 許可リストに追加するよう管理者に依頼します。

関連リソース

詳細については、以下を参照してください。