O que é um DataSource do PySpark?

Visualização

Esse recurso está em Public Preview em Databricks Runtime 15.2 e acima.

Um DataSource PySpark é criado pelo DataSourcePython (PySpark) API, que permite a leitura de fontes de dados personalizadas e a gravação em coletores de dados personalizados em Apache Spark usando Python. O senhor pode usar o PySpark DataSources para definir conexões personalizadas com sistemas de dados e implementar funcionalidades adicionais, para criar fontes de dados reutilizáveis.

Classe DataSource

O PySpark DataSource é uma classe base que fornece métodos para criar leitores e gravadores de dados. Além de definir name e schema, DataSource.reader ou DataSource.writer devem ser implementados por qualquer subclasse para tornar a fonte de dados legível ou gravável, ou ambos. Depois de implementar essa interface, registre-a e, em seguida, carregue ou salve sua fonte de dados usando a seguinte sintaxe:

# Register the data source
spark.dataSource.register(<DataSourceClass>)

# Read from a custom data source
spark.read.format(<datasource-name>).load()

# Write to a custom data source
df.write.format(<datasource-name>).save()

Criar um PySpark DataSource para a consulta de lotes

Para demonstrar os recursos de leitura do PySpark DataSource, crie uma fonte de dados que gere dados de exemplo, gerados usando o pacote faker Python. Para obter mais informações sobre faker, consulte a documentação do Faker.

o passo 1: Instalar dependências

Dependendo do cenário específico da fonte de dados personalizada, talvez seja necessário instalar uma ou mais dependências. Neste exemplo, instale o pacote faker usando o seguinte comando:

%pip install faker

o passo 2: Definir o DataSource

Em seguida, defina seu novo PySpark DataSource como uma subclasse de DataSource, com um nome, um esquema e um leitor. O método reader() deve ser definido para ler de uma fonte de dados em uma consulta de lotes.


from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType

class FakeDataSource(DataSource):
    """
    An example data source for batch query using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fake"

    def schema(self):
        return "name string, date string, zipcode string, state string"

    def reader(self, schema: StructType):
        return FakeDataSourceReader(schema, self.options)

o passo 3: Implementar o leitor para uma consulta de lotes

Em seguida, implemente a lógica do leitor para gerar dados de exemplo. Use a biblioteca faker instalada para preencher cada campo do esquema.

class FakeDataSourceReader(DataSourceReader):

    def __init__(self, schema, options):
        self.schema: StructType = schema
        self.options = options

    def read(self, partition):
        # Library imports must be within the read method
        from faker import Faker
        fake = Faker()

        # Every value in this `self.options` dictionary is a string.
        num_rows = int(self.options.get("numRows", 3))
        for _ in range(num_rows):
            row = []
            for field in self.schema.fields:
                value = getattr(fake, field.name)()
                row.append(value)
            yield tuple(row)

o passo 4: registro e uso do exemplo fonte de dados

Para usar a fonte de dados, registre-a. Em default, o FakeDataSource tem três linhas, e o esquema default inclui esses campos string: name, date, zipcode, state. O exemplo a seguir registra, carrega e gera um exemplo de fonte de dados com o padrão:

spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
|             name|      date|zipcode|     state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24|  79766|  Colorado|
|       Shelby Cox|2011-08-05|  24596|   Florida|
|  Amanda Robinson|2019-01-06|  57395|Washington|
+-----------------+----------+-------+----------+

Somente os campos string são suportados, mas o senhor pode especificar um esquema com quaisquer campos que correspondam aos campos dos provedores de pacotes faker para gerar dados aleatórios para teste e desenvolvimento. O exemplo a seguir carrega uma fonte de dados com os campos name e company:

spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name                 |company       |
+---------------------+--------------+
|Tanner Brennan       |Adams Group   |
|Leslie Maxwell       |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc   |
+---------------------+--------------+

Para carregar uma fonte de dados com um número personalizado de linhas, especifique a opção numRows. O exemplo a seguir especifica 5 linhas:

spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
|          name|      date|zipcode|       state|
+--------------+----------+-------+------------+
|  Pam Mitchell|1988-10-20|  23788|   Tennessee|
|Melissa Turner|1996-06-14|  30851|      Nevada|
|  Brian Ramsey|2021-08-21|  55277|  Washington|
|  Caitlin Reed|1983-06-22|  89813|Pennsylvania|
| Douglas James|2007-01-18|  46226|     Alabama|
+--------------+----------+-------+------------+

Solução de problemas

Se o resultado for o seguinte erro, o site compute não é compatível com PySpark DataSources. O senhor deve usar o site Databricks Runtime 15.2 ou o acima.

Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000