Databricks SQL Connector for Python
The Databricks SQL Connector for Python is a Python library that allows you to use Python code to run SQL commands on Databricks clusters and Databricks SQL warehouses. The Databricks SQL Connector for Python is easier to set up and use than similar Python libraries such as pyodbc. This library follows PEP 249 – Python Database API Specification v2.0.
Note
The Databricks SQL Connector for Python also includes a SQLAlchemy dialect for Databricks. See Use SQLAlchemy with Databricks.
Requirements
A development machine running Python >=3.8 and <=3.11.
Databricks recommends that you use Python virtual environments, such as those provided by venv that are included with Python. Virtual environments help to ensure that you are using the correct versions of Python and the Databricks SQL Connector for Python together. Setting up and using virtual environments is outside of the scope of this article. For more information, see Creating virtual environments.
An existing cluster or SQL warehouse.
Get started
Install the Databricks SQL Connector for Python library on your development machine by running
pip install databricks-sql-connector
orpython -m pip install databricks-sql-connector
.Gather the following information for the cluster or SQL warehouse that you want to use:
The server hostname of the cluster. You can get this from the Server Hostname value in the Advanced Options > JDBC/ODBC tab for your cluster.
The HTTP path of the cluster. You can get this from the HTTP Path value in the Advanced Options > JDBC/ODBC tab for your cluster.
The server hostname of the SQL warehouse. You can get this from the Server Hostname value in the Connection Details tab for your SQL warehouse.
The HTTP path of the SQL warehouse. You can get this from the HTTP Path value in the Connection Details tab for your SQL warehouse.
Authentication
The Databricks SQL Connector for Python supports the following Databricks authentication types:
The Databricks SQL Connector for Python does not yet support the following Databricks authentication types:
Databricks personal access token authentication
To use the Databricks SQL Connector for Python with Databricks personal access token authentication, you must first create a Databricks personal access token. To do so, follow the steps in Databricks personal access tokens for workspace users.
To authenticate the Databricks SQL Connector for Python, use the following code snippet. This snippet assumes that you have set the following environment variables:
DATABRICKS_SERVER_HOSTNAME
set to the Server Hostname value for your cluster or SQL warehouse.DATABRICKS_HTTP_PATH
, set to HTTP Path value for your cluster or SQL warehouse.DATABRICKS_TOKEN
, set to the Databricks personal access token.
To set environment variables, see your operating system’s documentation.
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:
# ...
OAuth machine-to-machine (M2M) authentication
Databricks SQL Connector for Python versions 2.7.0 and above support OAuth machine-to-machine (M2M) authentication. You must also install the Databricks SDK for Python 0.18.0 or above (for example by running pip install databricks-sdk
or python -m pip install databricks-sdk
).
To use the Databricks SQL Connector for Python with OAuth M2M authentication, you must do the following:
Create a Databricks service principal in your Databricks workspace, and create an OAuth secret for that service principal.
To create the service principal and its OAuth secret, see Authenticate access to Databricks with a service principal using OAuth (OAuth M2M). Make a note of the service principal’s UUID or Application ID value, and the Secret value for the service principal’s OAuth secret.
Give that service principal access to your cluster or warehouse.
To give the service principal access to your cluster or warehouse, see Compute permissions or Manage a SQL warehouse.
To authenticate the Databricks SQL Connector for Python, use the following code snippet. This snippet assumes that you have set the following environment variables:
DATABRICKS_SERVER_HOSTNAME
set to the Server Hostname value for your cluster or SQL warehouse.DATABRICKS_HTTP_PATH
, set to HTTP Path value for your cluster or SQL warehouse.DATABRICKS_CLIENT_ID
, set to the service principal’s UUID or Application ID value.DATABRICKS_CLIENT_SECRET
, set to the Secret value for the service principal’s OAuth secret.
To set environment variables, see your operating system’s documentation.
from databricks.sdk.core import Config, oauth_service_principal
from databricks import sql
import os
server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME")
def credential_provider():
config = Config(
host = f"https://{server_hostname}",
client_id = os.getenv("DATABRICKS_CLIENT_ID"),
client_secret = os.getenv("DATABRICKS_CLIENT_SECRET"))
return oauth_service_principal(config)
with sql.connect(server_hostname = server_hostname,
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
credentials_provider = credential_provider) as connection:
# ...
OAuth user-to-machine (U2M) authentication
Databricks SQL Connector for Python versions 3.0.3 and above support OAuth user-to-machine (U2M) authentication. You must also install the Databricks SDK for Python 0.19.0 or above (for example by running pip install databricks-sdk
or python -m pip install databricks-sdk
).
To authenticate the Databricks SQL Connector for Python with OAuth U2M authentication, use the following code snippet. OAuth U2M authentication uses real-time human sign-in and consent to authenticate the target Databricks user account. This snippet assumes that you have set the following environment variables:
Set
DATABRICKS_SERVER_HOSTNAME
to the Server Hostname value for your cluster or SQL warehouse.Set
DATABRICKS_HTTP_PATH
to HTTP Path value for your cluster or SQL warehouse.
To set environment variables, see your operating system’s documentation.
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
auth_type = "databricks-oauth") as connection:
# ...
Examples
The following code examples demonstrate how to use the Databricks SQL Connector for Python to query and insert data, query metadata, manage cursors and connections, and configure logging.
Note
The following code examples demonstrate how to use a Databricks personal access token for authentication. To use other available Databricks authentication types instead, see Authentication.
These code example retrieve their server_hostname
, http_path
, and access_token
connection variable values from these environment variables:
DATABRICKS_SERVER_HOSTNAME
, which represents the Server Hostname value from the requirements.DATABRICKS_HTTP_PATH
, which represents the HTTP Path value from the requirements.DATABRICKS_TOKEN
, which represents your access token from the requirements.
You can use other approaches to retrieving these connection variable values. Using environment variables is just one approach among many.
Query data
The following code example demonstrates how to call the Databricks SQL Connector for Python to run a basic SQL command on a cluster or SQL warehouse. This command returns the first two rows from the trips
table in the samples
catalog’s nyctaxi
schema.
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:
with connection.cursor() as cursor:
cursor.execute("SELECT * FROM samples.nyctaxi.trips LIMIT 2")
result = cursor.fetchall()
for row in result:
print(row)
Insert data
The following example demonstrate how to insert small amounts of data (thousands of rows):
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:
with connection.cursor() as cursor:
cursor.execute("CREATE TABLE IF NOT EXISTS squares (x int, x_squared int)")
squares = [(i, i * i) for i in range(100)]
values = ",".join([f"({x}, {y})" for (x, y) in squares])
cursor.execute(f"INSERT INTO squares VALUES {values}")
cursor.execute("SELECT * FROM squares LIMIT 10")
result = cursor.fetchall()
for row in result:
print(row)
For large amounts of data, you should first upload the data to cloud storage and then execute the COPY INTO command.
Query metadata
There are dedicated methods for retrieving metadata. The following example retrieves metadata about columns in a sample table:
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:
with connection.cursor() as cursor:
cursor.columns(schema_name="default", table_name="squares")
print(cursor.fetchall())
Manage cursors and connections
It is a best practice to close any connections and cursors that are no longer in use. This frees resources on Databricks clusters and Databricks SQL warehouses.
You can use a context manager (the with
syntax used in previous examples) to manage the resources, or explicitly call close
:
from databricks import sql
import os
connection = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN"))
cursor = connection.cursor()
cursor.execute("SELECT * from range(10)")
print(cursor.fetchall())
cursor.close()
connection.close()
Manage files in Unity Catalog volumes
The Databricks SQL Connector enables you to write local files to Unity Catalog volumes, download files from volumes, and delete files from volumes, as shown in the following example:
from databricks import sql
import os
# For writing local files to volumes and downloading files from volumes,
# you must set the staging_allows_local_path argument to the path to the
# local folder that contains the files to be written or downloaded.
# For deleting files in volumes, you must also specify the
# staging_allows_local_path argument, but its value is ignored,
# so in that case its value can be set for example to an empty string.
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN"),
staging_allowed_local_path = "/tmp/") as connection:
with connection.cursor() as cursor:
# Write a local file to the specified path in a volume.
# Specify OVERWRITE to overwrite any existing file in that path.
cursor.execute(
"PUT '/temp/my-data.csv' INTO '/Volumes/main/default/my-volume/my-data.csv' OVERWRITE"
)
# Download a file from the specified path in a volume.
cursor.execute(
"GET '/Volumes/main/default/my-volume/my-data.csv' TO '/tmp/my-downloaded-data.csv'"
)
# Delete a file from the specified path in a volume.
cursor.execute(
"REMOVE '/Volumes/main/default/my-volume/my-data.csv'"
)
Configure logging
The Databricks SQL Connector uses Python’s standard logging module. You can configure the logging level similar to the following:
from databricks import sql
import os, logging
logging.getLogger("databricks.sql").setLevel(logging.DEBUG)
logging.basicConfig(filename = "results.log",
level = logging.DEBUG)
connection = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN"))
cursor = connection.cursor()
cursor.execute("SELECT * from range(10)")
result = cursor.fetchall()
for row in result:
logging.debug(row)
cursor.close()
connection.close()
Testing
To test your code, use Python test frameworks such as pytest. To test your code under simulated conditions without calling Databricks REST API endpoints or changing the state of your Databricks accounts or workspaces, you can use Python mocking libraries such as unittest.mock.
For example, given the following file named helpers.py
containing a get_connection_personal_access_token
function that uses a Databricks personal access token to return a connection to a Databricks workspace, and a select_nyctaxi_trips
function that uses the connection to get the specified number of data rows from the trips
table in the samples
catalog’s nyctaxi
schema:
# helpers.py
from databricks import sql
from databricks.sql.client import Connection, List, Row, Cursor
def get_connection_personal_access_token(
server_hostname: str,
http_path: str,
access_token: str
) -> Connection:
return sql.connect(
server_hostname = server_hostname,
http_path = http_path,
access_token = access_token
)
def select_nyctaxi_trips(
connection: Connection,
num_rows: int
) -> List[Row]:
cursor: Cursor = connection.cursor()
cursor.execute(f"SELECT * FROM samples.nyctaxi.trips LIMIT {num_rows}")
result: List[Row] = cursor.fetchall()
return result
And given the following file named main.py
that calls the get_connection_personal_access_token
and select_nyctaxi_trips
functions:
# main.py
from databricks.sql.client import Connection, List, Row
import os
from helpers import get_connection_personal_access_token, select_nyctaxi_trips
connection: Connection = get_connection_personal_access_token(
server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")
)
rows: List[Row] = select_nyctaxi_trips(
connection = connection,
num_rows = 2
)
for row in rows:
print(row)
The following file named test_helpers.py
tests whether the select_nyctaxi_trips
function returns the expected response. Rather than creating a real connection to the target workspace, this test mocks a Connection
object. The test also mocks some data that conforms to the schema and values that are in the real data. The test returns the mocked data through the mocked connection and then checks whether one of the mocked data rows’ values matches the expected value.
# test_helpers.py
import pytest
from databricks.sql.client import Connection, List, Row
from datetime import datetime
from helpers import select_nyctaxi_trips
from unittest.mock import create_autospec
@pytest.fixture
def mock_data() -> List[Row]:
return [
Row(
tpep_pickup_datetime = datetime(2016, 2, 14, 16, 52, 13),
tpep_dropoff_datetime = datetime(2016, 2, 14, 17, 16, 4),
trip_distance = 4.94,
fare_amount = 19.0,
pickup_zip = 10282,
dropoff_zip = 10171
),
Row(
tpep_pickup_datetime = datetime(2016, 2, 4, 18, 44, 19),
tpep_dropoff_datetime = datetime(2016, 2, 4, 18, 46),
trip_distance = 0.28,
fare_amount = 3.5,
pickup_zip = 10110,
dropoff_zip = 10110
)
]
def test_select_nyctaxi_trips(mock_data: List[Row]):
# Create a mock Connection.
mock_connection = create_autospec(Connection)
# Set the mock Connection's cursor().fetchall() to the mock data.
mock_connection.cursor().fetchall.return_value = mock_data
# Call the real function with the mock Connection.
response: List[Row] = select_nyctaxi_trips(
connection = mock_connection,
num_rows = 2)
# Check the value of one of the mocked data row's columns.
assert response[1].fare_amount == 3.5
Because the select_nyctaxi_trips
function contains a SELECT
statement and therefore does not change the state of the trips
table, mocking is not absolutely required in this example. However, mocking enables you to quickly run your tests without waiting for an actual connection to be made with the workspace. Also, mocking enables you to run simulated tests multiple times for functions that might change a table’s state, such as INSERT INTO
, UPDATE
, and DELETE FROM
.
API reference
Package
databricks-sql-connector
Usage: pip install databricks-sql-connector
See also databricks-sql-connector in the Python Package Index (PyPI).
Classes
Selected classes include the following:
Classes |
---|
A session on a Databricks compute resource. |
A mechanism for traversing over data records. |
A row of data in a SQL query result. |
Connection
class
To create a Connection
object, call the databricks.sql.connect
method with the following parameters:
Parameters |
---|
Type: The server hostname for the cluster. To get the server hostname, see the instructions earlier in this article. This parameter is required. Example: |
Type: The HTTP path of the cluster. To get the HTTP path, see the instructions earlier in this article. This parameter is required.
|
Type: Your Databricks personal access token for the workspace for the cluster. To create a token, see the instructions earlier in this article. This parameter is required. Example: |
Type: A dictionary of Spark session configuration parameters.
Setting a configuration is equivalent to using the Defaults to This parameter is optional. Example: |
Type: Additional (key, value) pairs to set in HTTP headers on every RPC request the client
makes. Typical usage will not set any extra HTTP headers.
Defaults to This parameter is optional. Since version 2.0 |
Type: Initial catalog to use for the connection.
Defaults to This parameter is optional. Since version 2.0 |
Type: Initial schema to use for the connection.
Defaults to This parameter is optional. Since version 2.0 |
Type:
If Since version 2.8 |
Selected Connection
methods include the following:
Methods |
---|
Closes the connection to the database and releases all associated resources on the server.
Any additional calls to this connection will throw an No parameters. No return value. |
Returns a new No parameters. |
Cursor
class
To create a Cursor
object, call the Connection
class’s cursor
method.
Selected Cursor
attributes include the following:
Attributes |
---|
Used with the Read-write access. |
Contains a Python
The remaining 5 items of each 7-item Read-only access. |
Selected Cursor
methods include the following:
Methods |
---|
Interrupts the running of any database query or command that the cursor
has started. To release the associated resources on the server, call the
No parameters. No return value. |
Closes the cursor and releases the associated resources on the server. Closing an already closed cursor might throw an error. No parameters. No return value. |
Prepares and then runs a database query or command. No return value. Parameters:
Type: The query or command to prepare and then run. This parameter is required. Example without the cursor.execute(
'SELECT * FROM samples.nyctaxi.trips WHERE pickup_zip="10019" LIMIT 2'
)
Example with the cursor.execute(
'SELECT * FROM samples.nyctaxi.trips WHERE zip=%(pickup_zip)s LIMIT 2',
{ 'pickup_zip': '10019' }
)
Type: dictionary A sequence of parameters to use with the This parameter is optional. The default is |
Prepares and then runs a database query or command using all parameter
sequences in the No return value. Parameters:
Type: The query or command to prepare and then run. This parameter is required.
Type: A sequence of many sets of parameter values to use with the
This parameter is required. |
Execute a metadata query about the catalogs. Actual results should then
be fetched using Important fields in the result set include:
No parameters. No return value. Since version 1.0 |
Execute a metadata query about the schemas. Actual results should then
be fetched using Important fields in the result set include:
No return value. Since version 1.0 Parameters:
Type: A catalog name to retrieve information about.
The This parameter is optional.
Type: A schema name to retrieve information about.
The This parameter is optional. |
Execute a metadata query about tables and views. Actual results should
then be fetched using Important fields in the result set include:
No return value. Since version 1.0 Parameters
Type: A catalog name to retrieve information about.
The This parameter is optional.
Type: A schema name to retrieve information about.
The This parameter is optional.
Type: A table name to retrieve information about.
The This parameter is optional.
Type: A list of table types to match, for example This parameter is optional. |
Execute a metadata query about the columns. Actual results should then be
fetched using Important fields in the result set include:
No return value. Since version 1.0 Parameters:
Type: A catalog name to retrieve information about.
The This parameter is optional.
Type: A schema name to retrieve information about.
The This parameter is optional.
Type: A table name to retrieve information about.
The This parameter is optional.
Type: A column name to retrieve information about.
The This parameter is optional. |
Gets all (or all remaining) rows of a query. No parameters. Returns all (or all remaining) rows of the query as a Python Throws an |
Gets the next rows of a query. Returns up to If there are fewer than Throws an Parameters:
Type: The number of next rows to get. This parameter is optional. If not specified, the value of
the Example: |
Gets the next row of the dataset. No parameters. Returns the next row of the dataset as a single sequence as a Python
Throws an |
Gets all (or all remaining) rows of a query, as a PyArrow No parameters. Returns all (or all remaining) rows of the query as a PyArrow table. Throws an Since version 2.0 |
Gets the next rows of a query as a PyArrow Returns up to the Throws an Since version 2.0 Parameters:
Type: The number of next rows to get. This parameter is optional. If not specified, the value of
the Example: |
Row
class
The row class is a tuple-like data structure that represents an individual result row.
If the row contains a column with the name "my_column"
, you can access the "my_column"
field of row
via
row.my_column
. You can also use numeric indicies to access fields, for example row[0]
.
If the column name is not allowed as an attribute method name (for example, it begins with a digit),
then you can access the field as row["1_my_column"]
.
Since version 1.0
Selected Row
methods include:
Returns a dictionary representation of the row, which is indexed by field names. If there are duplicate field names, one of the duplicate fields (but only one) will be returned in the dictionary. Which duplicate field is returned is not defined. No parameters. Returns a |
Type conversions
The following table maps Apache Spark SQL data types to their Python data type equivalents.
Apache Spark SQL data type |
Python data type |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Troubleshooting
tokenAuthWrapperInvalidAccessToken: Invalid access token
message
Issue: When you run your code, you see a message similar to Error during request to server: tokenAuthWrapperInvalidAccessToken: Invalid access token
.
Possible cause: The value passed to access_token
is not a valid Databricks personal access token.
Recommended fix: Check that the value passed to access_token
is correct and try again.
gaierror(8, 'nodename nor servname provided, or not known')
message
Issue: When you run your code, you see a message similar to Error during request to server: gaierror(8, 'nodename nor servname provided, or not known')
.
Possible cause: The value passed to server_hostname
is not the correct host name.
Recommended fix: Check that the value passed to server_hostname
is correct and try again.
For more information on finding the server hostname, see Get connection details for a Databricks compute resource.
IpAclError
message
Issue: When you run your code, you see the message Error during request to server: IpAclValidation
when you try to use the
connector on a Databricks notebook.
Possible cause: You may have IP allow listing enabled for the Databricks workspace. With IP allow listing, connections from Spark clusters back to the control plane are not allowed by default.
Recommended fix: Ask your administrator to add the compute plane subnet to the IP allow list.
Additional resources
For more information, see:
The Databricks SQL Connector for Python repository on GitHub
Built-in Types (for
bool
,bytearray
,float
,int
, andstr
) on the Python websitedatetime (for
datetime.date
anddatatime.datetime
) on the Python websitedecimal (for
decimal.Decimal
) on the Python websiteBuilt-in Constants (for
NoneType
) on the Python website