PySpark データソースとは何ですか?

プレビュー

この機能は、Databricks Runtime 15.2 以降でパブリック プレビューされています。

PySparkDataSource は 、Python (PySpark ) DataSourceAPI によって作成され、Apache Spark Pythonを使用して のカスタム データソースからの読み取りとカスタム データ シンクへの書き込みが可能になります。PySpark DataSources を使用すると、データ システムへのカスタム接続を定義し、追加機能を実装して、再利用可能なデータソースを構築できます。

DataSource クラス

PySpark DataSourceは、データ リーダーとライターを作成するためのメソッドを提供する基本クラスです。 nameschemaを定義することに加えて、データソースを読み取り可能、書き込み可能、またはその両方にするには、サブクラスでDataSource.readerまたはDataSource.writerいずれかを実装する必要があります。 このインターフェースを実装したら、それを登録し、次の構文を使用してデータソースを読み込むか保存します。

# Register the data source
spark.dataSource.register(<DataSourceClass>)

# Read from a custom data source
spark.read.format(<datasource-name>).load()

# Write to a custom data source
df.write.format(<datasource-name>).save()

バッチクエリ用の PySpark データソースを作成する

PySpark DataSource リーダー機能を実証するには、faker Pythonパッケージを使用して生成されたサンプル データを生成するデータ ソースを作成します。 fakerの詳細については、 Faker のドキュメントを参照してください。

ステップ1: 依存関係をインストールする

特定のカスタム データソース シナリオに応じて、1 つ以上の依存関係をインストールする必要がある場合があります。 この例では、次のコマンドを使用してfakerパッケージをインストールします。

%pip install faker

ステップ2: データソースを定義する

次に、名前、スキーマ、リーダーを指定して、新しい PySpark DataSource をDataSourceのサブクラスとして定義します。 バッチ クエリのデータ ソースから読み取るには、 reader()メソッドを定義する必要があります。


from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType

class FakeDataSource(DataSource):
    """
    An example data source for batch query using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fake"

    def schema(self):
        return "name string, date string, zipcode string, state string"

    def reader(self, schema: StructType):
        return FakeDataSourceReader(schema, self.options)

ステップ3: バッチクエリのリーダーを実装する

次に、リーダーロジックを実装してサンプルデータを生成します。 インストールされたfakerライブラリを使用して、スキーマの各フィールドにデータを入力します。

class FakeDataSourceReader(DataSourceReader):

    def __init__(self, schema, options):
        self.schema: StructType = schema
        self.options = options

    def read(self, partition):
        # Library imports must be within the read method
        from faker import Faker
        fake = Faker()

        # Every value in this `self.options` dictionary is a string.
        num_rows = int(self.options.get("numRows", 3))
        for _ in range(num_rows):
            row = []
            for field in self.schema.fields:
                value = getattr(fake, field.name)()
                row.append(value)
            yield tuple(row)

ステップ4: 登録してサンプルデータソースを使用する

データソースを使用するには、登録してください。 デフォルトでは、 FakeDataSourceには 3 つの行があり、デフォルトのスキーマには次のstringフィールドが含まれます: namedatezipcodestate 。 次の例では、デフォルトを使用してサンプル データソースを登録し、ロードして出力します。

spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
|             name|      date|zipcode|     state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24|  79766|  Colorado|
|       Shelby Cox|2011-08-05|  24596|   Florida|
|  Amanda Robinson|2019-01-06|  57395|Washington|
+-----------------+----------+-------+----------+

サポートされているフィールドは string つだけですが、 faker パッケージプロバイダーのフィールドに対応する任意のフィールドを含むスキーマを指定して、テストおよび開発用のランダムデータを生成できます。 次の例では、 nameフィールドとcompanyフィールドを持つデータソースを読み込みます。

spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name                 |company       |
+---------------------+--------------+
|Tanner Brennan       |Adams Group   |
|Leslie Maxwell       |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc   |
+---------------------+--------------+

カスタムの行数でデータソースをロードするには、 numRowsオプションを指定します。 次の例では、5 行を指定しています。

spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
|          name|      date|zipcode|       state|
+--------------+----------+-------+------------+
|  Pam Mitchell|1988-10-20|  23788|   Tennessee|
|Melissa Turner|1996-06-14|  30851|      Nevada|
|  Brian Ramsey|2021-08-21|  55277|  Washington|
|  Caitlin Reed|1983-06-22|  89813|Pennsylvania|
| Douglas James|2007-01-18|  46226|     Alabama|
+--------------+----------+-------+------------+

トラブルシューティング

出力が次のエラーの場合、コンピュートはPySpark DataSources をサポートしていません。 Databricks Runtime 15.2 以上を使用する必要があります。

Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000