What is a PySpark DataSource?

Preview

This feature is in Public Preview in Databricks Runtime 15.2 and above.

A PySpark DataSource is created by the Python (PySpark) DataSource API, which enables reading from custom data sources and writing to custom data sinks in Apache Spark using Python. You can use PySpark DataSources to define custom connections to data systems and implement additional functionality, to build out reusable data sources.

DataSource class

The PySpark DataSource is a base class that provides methods to create data readers and writers. In addition to defining name and schema, either DataSource.reader or DataSource.writer must be implemented by any subclass to make the data source either readable or writable, or both. After implementing this interface, register it, then load or save your data source using the following syntax:

# Register the data source
spark.dataSource.register(<DataSourceClass>)

# Read from a custom data source
spark.read.format(<datasource-name>).load()

# Write to a custom data source
df.write.format(<datasource-name>).save()

Create a PySpark DataSource for batch query

To demonstrate PySpark DataSource reader capabilities, create a data source that generates example data, generated using the faker Python package. For more information about faker, see the Faker documentation.

Step 1: Install dependencies

Depending on your particular custom data source scenario, you may need to install one or more dependencies. In this example, install the faker package using the following command:

%pip install faker

Step 2: Define the DataSource

Next, define your new PySpark DataSource as a subclass of DataSource, with a name, schema, and reader. The reader() method must be defined to read from a data source in a batch query.


from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType

class FakeDataSource(DataSource):
    """
    An example data source for batch query using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fake"

    def schema(self):
        return "name string, date string, zipcode string, state string"

    def reader(self, schema: StructType):
        return FakeDataSourceReader(schema, self.options)

Step 3: Implement the reader for a batch query

Next, implement the reader logic to generate example data. Use the installed faker library to populate each field in the schema.

class FakeDataSourceReader(DataSourceReader):

    def __init__(self, schema, options):
        self.schema: StructType = schema
        self.options = options

    def read(self, partition):
        # Library imports must be within the read method
        from faker import Faker
        fake = Faker()

        # Every value in this `self.options` dictionary is a string.
        num_rows = int(self.options.get("numRows", 3))
        for _ in range(num_rows):
            row = []
            for field in self.schema.fields:
                value = getattr(fake, field.name)()
                row.append(value)
            yield tuple(row)

Step 4: Register and use the example data source

To use the data source, register it. By default the FakeDataSource has three rows, and the default schema includes these string fields: name, date, zipcode, state. The following example registers, loads, and outputs an example data source with the defaults:

spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
|             name|      date|zipcode|     state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24|  79766|  Colorado|
|       Shelby Cox|2011-08-05|  24596|   Florida|
|  Amanda Robinson|2019-01-06|  57395|Washington|
+-----------------+----------+-------+----------+

Only string fields are supported, but you can specify a schema with any fields that correspond to faker package providers’ fields to generate random data for testing and development. The following example loads a data source with name and company fields:

spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name                 |company       |
+---------------------+--------------+
|Tanner Brennan       |Adams Group   |
|Leslie Maxwell       |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc   |
+---------------------+--------------+

To load a data source with a custom number of rows, specify the numRows option. The following example specifies 5 rows:

spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
|          name|      date|zipcode|       state|
+--------------+----------+-------+------------+
|  Pam Mitchell|1988-10-20|  23788|   Tennessee|
|Melissa Turner|1996-06-14|  30851|      Nevada|
|  Brian Ramsey|2021-08-21|  55277|  Washington|
|  Caitlin Reed|1983-06-22|  89813|Pennsylvania|
| Douglas James|2007-01-18|  46226|     Alabama|
+--------------+----------+-------+------------+

Troubleshooting

If the output is the following error, your compute does not support PySpark DataSources. You must use Databricks Runtime 15.2 or above.

Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000