Databricks ODBC ドライバーのテスト

この記事では、Databricks ODBC ドライバーを使用するコードをテストする方法について説明します。

Databricks ODBC ドライバーを DSN または DSN なしの接続文字列とともに使用するコードをテストするには、ODBC をサポートするプログラミング言語用の一般的なテスト フレームワークを使用できます。 たとえば、次の Python コード例では、 pyodbcpytestunittest.mockを使用して、DSN を使用して Databricks ODBC ドライバーを自動化およびテストします。 このサンプル コードは、Connect Python and pyodbc to Databricksのサンプル コードに基づいています。

次のhelpers.pyという名前のサンプル コード ファイルには、DSN を使用して Databricks ドライバーを自動化するいくつかの関数が含まれています。

  • connect_to_dsn 関数は DSN を使用して、 Databricksコンピュートリソースを介して接続を開きます。

  • get_cursor_from_connection関数は接続を使用してカーソルを取得し、コンピュートリソースを通じてデータのフェッチ操作を可能にします。

  • select_from_nyctaxi_trips 関数は、カーソルを使用して、samples カタログのnyctaxiスキーマ内のtripsテーブルから指定された数のデータ行を選択します。

  • print_rows 関数は、データ行の内容を画面に出力します。

# helpers.py

from pyodbc import connect, Connection, Cursor

def connect_to_dsn(
  connstring: str,
  autocommit: bool
) -> Connection:

  connection = connect(
    connstring,
    autocommit = autocommit
  )

  return connection

def get_cursor_from_connection(
  connection: Connection
) -> Cursor:

  cursor = connection.cursor()
  return cursor

def select_from_nyctaxi_trips(
  cursor: Cursor,
  num_rows: int
) -> Cursor:

  select_cursor = cursor.execute(f"SELECT * FROM samples.nyctaxi.trips LIMIT {num_rows}")
  return select_cursor

def print_rows(cursor: Cursor):
  for row in cursor.fetchall():
    print(row)

次の main.py file という名前のコード ファイルの例では、 helpers.py ファイル内の関数を呼び出します。

# main.py

from helpers import *

connection = connect_to_dsn(
  connstring = "DSN=<your-dsn-name>",
  autocommit = True
)

cursor = get_cursor_from_connection(
  connection = connection)

select_cursor = select_from_nyctaxi_trips(
  cursor = cursor,
  num_rows = 2
)

print_rows(
  cursor = select_cursor
)

次の test_helpers.py という名前のコード ファイル例では、 pytest を使用して helpers.py ファイル内の関数をテストします。 実際のコンピュートリソースの時間とコストを使用してhelpers.pyファイル内の関数を呼び出す代わりに、次のサンプル コードではunittest.mockを使用してこれらの呼び出しをシミュレートします。 これらのシミュレートされた呼び出しは通常、わずか数秒で完了するため、既存の Databricks アカウントまたはワークスペースの状態を変更せずに、コードの品質に対する信頼を高めることができます。

# test_helpers.py

from pyodbc import SQL_DBMS_NAME
from helpers import *
from unittest.mock import patch
import datetime

@patch("helpers.connect_to_dsn")
def test_connect_to_dsn(mock_connection):
  mock_connection.return_value.getinfo.return_value = "Spark SQL"

  mock_connection = connect_to_dsn(
    connstring = "DSN=<your-dsn-name>",
    autocommit = True
  )

  assert mock_connection.getinfo(SQL_DBMS_NAME) == "Spark SQL"

@patch('helpers.get_cursor_from_connection')
def test_get_cursor_from_connection(mock_connection):
  mock_cursor = mock_connection.return_value.cursor
  mock_cursor.return_value.rowcount = -1

  mock_connection = connect_to_dsn(
    connstring = "DSN=<your-dsn-name>",
    autocommit = True
  )

  mock_cursor = get_cursor_from_connection(
    connection = mock_connection
  )

  assert mock_cursor.rowcount == -1

@patch('helpers.select_from_nyctaxi_trips')
def test_select_from_nyctaxi_trips(mock_connection):
  mock_cursor = mock_connection.return_value.cursor
  mock_get_cursor = mock_cursor.return_value.execute
  mock_select_cursor = mock_get_cursor.return_value.arraysize = 1

  mock_connection = connect_to_dsn(
    connstring = "DSN=<your-dsn-name>",
    autocommit = True
  )

  mock_get_cursor = get_cursor_from_connection(
    connection = mock_connection
  )

  mock_select_cursor = select_from_nyctaxi_trips(
    cursor = mock_get_cursor,
    num_rows = 2
  )

  assert mock_select_cursor.arraysize == 1

@patch('helpers.print_rows')
def test_print_rows(mock_connection, capsys):
  mock_cursor = mock_connection.return_value.cursor
  mock_get_cursor = mock_cursor.return_value.execute
  mock_select_cursor = mock_get_cursor.return_value.fetchall.return_value = [
    (datetime.datetime(2016, 2, 14, 16, 52, 13), datetime.datetime(2016, 2, 14, 17, 16, 4), 4.94, 19.0, 10282, 10171),
    (datetime.datetime(2016, 2, 4, 18, 44, 19), datetime.datetime(2016, 2, 4, 18, 46), 0.28, 3.5, 10110, 10110)
  ]

  mock_connection = connect_to_dsn(
    connstring = "DSN=<your-dsn-name>",
    autocommit = True
  )

  mock_get_cursor = get_cursor_from_connection(
    connection = mock_connection
  )

  mock_select_cursor = select_from_nyctaxi_trips(
    cursor = mock_get_cursor,
    num_rows = 2
  )

  print_rows(
    cursor = mock_select_cursor
  )

  captured = capsys.readouterr()
  assert captured.out == "(datetime.datetime(2016, 2, 14, 16, 52, 13), datetime.datetime(2016, 2, 14, 17, 16, 4), 4.94, 19.0, 10282, 10171)\n" \
                         "(datetime.datetime(2016, 2, 4, 18, 44, 19), datetime.datetime(2016, 2, 4, 18, 46), 0.28, 3.5, 10110, 10110)\n"

select_from_nyctaxi_trips 関数には SELECT ステートメントが含まれているため、trips テーブルの状態は変更されないため、この例ではモック作成は絶対に必要というわけではありません。ただし、モックを使用すると、コンピュートリソースとの実際の接続が確立されるのを待たずに、テストをすばやく実行できます。 また、モックを使用すると、 INSERT INTOUPDATEDELETE FROMなど、テーブルの状態を変更する可能性のある関数に対して、シミュレートされたテストを複数回実行できます。