Testing the Databricks ODBC Driver
This article describes how to test code that uses the Databricks ODBC Driver.
To test code that uses the Databricks ODBC Driver along with DSNs or DSN-less connection strings, you can use popular test frameworks for programming languages that support ODBC. For instance, the following example Python code uses pyodbc, pytest, and unittest.mock to automate and test the Databricks ODBC Driver using a DSN. This example code is based on the example code in Connect Python and pyodbc to Databricks.
The following example code file named helpers.py
contains several functions that automate the Databricks Driver using a DSN:
The
connect_to_dsn
function uses a DSN to open a connection through a Databricks compute resource.The
get_cursor_from_connection
function uses the connection to obtain a cursor, which enables fetch operations on the data through the compute resource.The
select_from_nyctaxi_trips
function uses the cursor to select the specified number of data rows from thetrips
table in thesamples
catalog’snyctaxi
schema.The
print_rows
function prints the data rows’ content to the screen.
# helpers.py
from pyodbc import connect, Connection, Cursor
def connect_to_dsn(
connstring: str,
autocommit: bool
) -> Connection:
connection = connect(
connstring,
autocommit = autocommit
)
return connection
def get_cursor_from_connection(
connection: Connection
) -> Cursor:
cursor = connection.cursor()
return cursor
def select_from_nyctaxi_trips(
cursor: Cursor,
num_rows: int
) -> Cursor:
select_cursor = cursor.execute(f"SELECT * FROM samples.nyctaxi.trips LIMIT {num_rows}")
return select_cursor
def print_rows(cursor: Cursor):
for row in cursor.fetchall():
print(row)
The following example code file named main.py
file calls the functions in the helpers.py
file:
# main.py
from helpers import *
connection = connect_to_dsn(
connstring = "DSN=<your-dsn-name>",
autocommit = True
)
cursor = get_cursor_from_connection(
connection = connection)
select_cursor = select_from_nyctaxi_trips(
cursor = cursor,
num_rows = 2
)
print_rows(
cursor = select_cursor
)
The following example code file named test_helpers.py
uses pytest
to test the functions in the helpers.py
file. Instead of using the time and cost of actual compute resources to call the functions in the helpers.py
file, the following example code uses unittest.mock
to simulate these calls. These simulated calls are typically completed in just a few seconds, increasing your confidence in the quality of your code while not changing the state of your existing Databricks accounts or workspaces.
# test_helpers.py
from pyodbc import SQL_DBMS_NAME
from helpers import *
from unittest.mock import patch
import datetime
@patch("helpers.connect_to_dsn")
def test_connect_to_dsn(mock_connection):
mock_connection.return_value.getinfo.return_value = "Spark SQL"
mock_connection = connect_to_dsn(
connstring = "DSN=<your-dsn-name>",
autocommit = True
)
assert mock_connection.getinfo(SQL_DBMS_NAME) == "Spark SQL"
@patch('helpers.get_cursor_from_connection')
def test_get_cursor_from_connection(mock_connection):
mock_cursor = mock_connection.return_value.cursor
mock_cursor.return_value.rowcount = -1
mock_connection = connect_to_dsn(
connstring = "DSN=<your-dsn-name>",
autocommit = True
)
mock_cursor = get_cursor_from_connection(
connection = mock_connection
)
assert mock_cursor.rowcount == -1
@patch('helpers.select_from_nyctaxi_trips')
def test_select_from_nyctaxi_trips(mock_connection):
mock_cursor = mock_connection.return_value.cursor
mock_get_cursor = mock_cursor.return_value.execute
mock_select_cursor = mock_get_cursor.return_value.arraysize = 1
mock_connection = connect_to_dsn(
connstring = "DSN=<your-dsn-name>",
autocommit = True
)
mock_get_cursor = get_cursor_from_connection(
connection = mock_connection
)
mock_select_cursor = select_from_nyctaxi_trips(
cursor = mock_get_cursor,
num_rows = 2
)
assert mock_select_cursor.arraysize == 1
@patch('helpers.print_rows')
def test_print_rows(mock_connection, capsys):
mock_cursor = mock_connection.return_value.cursor
mock_get_cursor = mock_cursor.return_value.execute
mock_select_cursor = mock_get_cursor.return_value.fetchall.return_value = [
(datetime.datetime(2016, 2, 14, 16, 52, 13), datetime.datetime(2016, 2, 14, 17, 16, 4), 4.94, 19.0, 10282, 10171),
(datetime.datetime(2016, 2, 4, 18, 44, 19), datetime.datetime(2016, 2, 4, 18, 46), 0.28, 3.5, 10110, 10110)
]
mock_connection = connect_to_dsn(
connstring = "DSN=<your-dsn-name>",
autocommit = True
)
mock_get_cursor = get_cursor_from_connection(
connection = mock_connection
)
mock_select_cursor = select_from_nyctaxi_trips(
cursor = mock_get_cursor,
num_rows = 2
)
print_rows(
cursor = mock_select_cursor
)
captured = capsys.readouterr()
assert captured.out == "(datetime.datetime(2016, 2, 14, 16, 52, 13), datetime.datetime(2016, 2, 14, 17, 16, 4), 4.94, 19.0, 10282, 10171)\n" \
"(datetime.datetime(2016, 2, 4, 18, 44, 19), datetime.datetime(2016, 2, 4, 18, 46), 0.28, 3.5, 10110, 10110)\n"
Because the select_from_nyctaxi_trips
function contains a SELECT
statement and therefore does not change the state of the trips
table, mocking is not absolutely required in this example. However, mocking enables you to quickly run your tests without waiting for an actual connection to be made with the compute resource. Also, mocking enables you to run simulated tests multiple times for functions that might change a table’s state, such as INSERT INTO
, UPDATE
, and DELETE FROM
.