Auto Loaderでスキーマ推論と進化を設定する

読み込まれたデータのスキーマを自動的に検出するようにAuto Loaderを構成すると、データスキーマを明示的に宣言せずにテーブルを初期化し、新しい列の導入に応じてテーブルスキーマを進化させることができます。これにより、時間の経過とともにスキーマの変更を手動で追跡して適用する必要がなくなります。

Auto Loaderは、JSON blobカラムの予期しないデータ(例えば、データ型が異なるなど)を「レスキュー」することもでき、後で半構造化データアクセスAPIを使用してアクセスすることもできます。

スキーマの推論と進化については、以下の形式がサポートされています:

ファイル形式

サポートされているバージョン

JSON

すべてのバージョン

CSV

すべてのバージョン

XML

Databricks Runtime 14.3 LTS 以降

Avro

Databricks Runtime 10.4 LTS 以降

Parquet

Databricks Runtime 11.3 LTS 以降

ORC

サポートされていません

Text

該当なし(固定スキーマ)

Binaryfile

該当なし(固定スキーマ)

スキーマの推論と進化の構文

オプション cloudFiles.schemaLocation のターゲット・ディレクトリーを指定すると、スキーマの推論と進化が可能になります。 checkpointLocationに指定したディレクトリと同じディレクトリを使用することもできます。Delta Live Tables を使用する場合、Databricks はスキーマの場所とその他のチェックポイント情報を自動的に管理します。

注:

ターゲットテーブルに読み込まれているソースデータの場所が複数ある場合、各Auto Loader取り込みワークロードに個別のストリーミングチェックポイントが必要です。

以下の例ではcloudFiles.formatの代わりにparquetを使用しています。他のファイルソースには、csvavro、またはjsonを使用します。読み取りおよび書き込みに関するその他の設定はすべて各形式のデフォルトの動作と同じままです。

(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")
)
spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Auto Loaderスキーマ推論の仕組み

最初にデータを読み取るときにスキーマを推測するために、Auto Loaderは、検出した最初の50 GBまたは1000ファイル(最初に制限を超えた方)をサンプリングします。Auto Loaderは、入力データに対するスキーマの変更を経時的に追跡するために、設定されたcloudFiles.schemaLocationのディレクトリ_schemasにスキーマ情報を保存します。

注:

使用されるサンプルのサイズを変更するには、SQL構成を設定します。

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(バイト数を表す文字列。例:10gb

そして

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

(整数)

デフォルトでは、 Auto Loader スキーマ推論は、型の不一致によるスキーマ進化の問題を回避しようとします。 データ型をエンコードしない形式 (JSON、 CSV、XML) の場合、 Auto Loader はすべての列を文字列として推論します ( JSON ファイル内の入れ子になったフィールドを含む)。 型付きスキーマ (Parquet および Avro) の形式の場合、 Auto Loader はファイルのサブセットをサンプリングし、個々のファイルのスキーマをマージします。 この動作を次の表にまとめます。

ファイル形式

デフォルトの推論データ型

JSON

String

CSV

String

XML

String

Avro

Avroスキーマでエンコードされた型

Parquet

Parquetスキーマでエンコードされた型

Apache Spark DataFrameReader は、スキーマ推論に異なる動作を使用し、サンプル データに基づいて JSON、CSV、および XML ソースの列のデータ型を選択します。 この動作を Auto Loaderで有効にするには、オプション cloudFiles.inferColumnTypestrue に設定します。

注:

CSVデータのスキーマを推論する場合、Auto Loaderファイルにヘッダーが含まれていると想定します。CSV ファイルにヘッダーが含まれていない場合は、 .option("header", "false")のオプションを指定します。 さらに、 Auto Loader はサンプル内のすべてのファイルのスキーマをマージして、グローバル スキーマを作成します。 その後、Auto Loaderヘッダーに従って各ファイルを読み取り、CSVを正しく解析できます。

注:

2 つの Parquet ファイルで列のデータ型が異なる場合、 Auto Loader は最も幅の広い型を選択します。 schemaHints を使用して、この選択をオーバーライドできます。スキーマ ヒントを指定すると、 Auto Loader は列を指定された型にキャストしませんが、列を指定された型として読み取るように Parquet リーダーに指示します。 不一致の場合、列は レスキューされたデータ列でレスキューされます。

Auto Loaderスキーマ進化の仕組み

Auto Loaderは、データを処理するときに新しい列の追加を検出します。Auto Loaderが新しい列を検出すると、ストリームはUnknownFieldExceptionで停止します。ストリームがこのエラーをスローする前に、Auto Loaderはデータの最新のマイクロバッチに対してスキーマ推論を実行し、新しい列をスキーマの末尾にマージすることでスキーマの場所を最新のスキーマで更新します。既存の列のデータ型は変更されません。

DatabricksAuto LoaderではDatabricks ジョブ を使用してストリームを構成して、このようなスキーマの変更後に自動的に再起動することをお勧めします。

Auto Loaderは、以下のスキーマ進化モードをサポートします。これは、オプションcloudFiles.schemaEvolutionModeで設定します。

Mode

新しい列の読み取り時の動作

addNewColumns (デフォルト)

ストリームが失敗します。新しい列がスキーマに追加されます。既存の列ではデータ型が進化しません。

rescue

スキーマは進化せず、スキーマの変更によってストリームが失敗することはありません。 すべての新しい列は、 レスキューされたデータ列に記録されます

failOnNewColumns

ストリームが失敗します。提供されたスキーマが更新されるか、問題のあるデータファイルが削除されない限り、ストリームは再起動されません。

none

スキーマは進化せず、新しい列は無視され、rescuedDataColumnオプションが設定されていない限りデータはレスキューされません。スキーマの変更によってストリームが失敗することはありません。

注:

addNewColumns スキーマが指定されていない場合は mode がデフォルトですが、スキーマが指定されている場合は mode がデフォルト noneaddNewColumns ストリームのスキーマが指定されている場合は許可されませんが、スキーマ をスキーマヒントとして提供する場合は機能します。

Auto Loaderでのパーティションの動作

データがHiveスタイルのパーティショニングでレイアウトされている場合、Auto Loaderはデータの基礎となるディレクトリ構造からパーティション列を推測しようとします。たとえば、ファイルパスbase_path/event=click/date=2021-04-01/f0.jsonでは、パーティション列としてdateeventが推論されます。基礎となるディレクトリ構造に競合するHiveパーティションが含まれているか、Hiveスタイルのパーティショニングが含まれていない場合、パーティション列は無視されます。

バイナリファイル(binaryFile)およびtextファイル形式には固定のデータスキーマがありますが、パーティション列推論がサポートされています。Databricksでは、これらのファイル形式にcloudFiles.schemaLocationを設定することをお勧めしています。これにより、潜在的なエラーや情報損失が回避され、Auto Loaderが起動するたびにパーティション列が推論されることがなくなります。

スキーマの進化ではパーティション列は考慮されません。base_path/event=click/date=2021-04-01/f0.jsonのような初期ディレクトリ構造があり、その後base_path/event=click/date=2021-04-01/hour=01/f1.jsonとして新しいファイルの受信を開始した場合、Auto Loaderは時間列を無視します。新しいパーティション列の情報を取得するには、cloudFiles.partitionColumnsevent,date,hourに設定します。

注:

オプションcloudFiles.partitionColumnsでは、列名をコンマ区切りリストで指定します。ディレクトリ構造内にkey=valueペアとして存在する列のみが解析されます。

レスキューされたデータ列とは

Auto Loaderがスキーマを推論すると、レスキューされたデータ列が_rescued_dataとしてスキーマに自動的に追加されます。オプションrescuedDataColumnを設定することで、列の名前を変更したり、スキーマを指定する場合に列を含めたりすることができます。

レスキューされたデータ列は、スキーマに一致しない列を削除せずレスキューします。レスキューされたデータ列には、以下の理由で解析されなかったデータが含まれます。

  • 列がスキーマにない

  • 型が一致しない

  • 大文字小文字が一致しない

レスキューされたデータ列には、レスキューされた列とレコードのソースファイルパスを含むJSONが含まれます。

注:

CSVパーサーは、レコードの解析時にPERMISSIVEDROPMALFORMED、およびFAILFASTの3つのモードで対応します。rescuedDataColumnと組み合わせて使用すると、データ型の不一致によってDROPMALFORMEDモードでレコードが削除されたり、FAILFASTモードでエラーがスローされたりすることはありません。破損したレコードのみが削除されるか、不完全または不正な形式のJSONまたはCSVなどのエラーがスローされます。JSONまたはCSVを解析するときにbadRecordsPathを使用すると、rescuedDataColumnの使用時にデータ型の不一致が不良レコードと見なされなくなります。不完全で不正な形式のJSONまたはCSVレコードのみがbadRecordsPathに保存されます。

大文字と小文字を区別する動作を変更する

大文字と小文字の区別が有効になっていない限り、列 、 abcAbc、および ABC は、スキーマ推論の目的上、同じ列と見なされます。 選択されるケースは任意であり、サンプリングされたデータによって異なります。 スキーマ ヒントを使用して、使用する大文字と小文字を強制できます。選択が行われ、スキーマが推論されると、選択されなかった大文字と小文字のバリエーションはスキーマと一致し Auto Loader 考慮されません。

レスキューされたデータ列が有効になっている場合、スキーマのケース以外の名前のフィールドが_rescued_data列に読み込まれます。この動作を変更するには、オプション readerCaseSensitive を false に設定し、その場合、 Auto Loader は大文字と小文字を区別しない方法でデータを読み取ります。

スキーマヒントを使用してスキーマ推論をオーバーライドする

スキーマヒントを使用すると、推論されたスキーマに対して、自分が知っていて期待しているスキーマ情報を適用できます。列が特定のデータ型であることが分かっている場合や、より一般的なデータ型(例えば、integerの代わりにdoubleなど)を選択したい場合は、SQLスキーマ指定構文を使用して、列のデータ型に関する任意の数のヒントを以下のような文字列として提供することができます:

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

サポートされているデータ型のリストについては、 データ型 に関するドキュメントを参照してください。

ストリームの先頭に列が存在しない場合は、スキーマヒントを使用して、その列を推論されたスキーマに追加することもできます。

以下は、スキーマヒントの動作を確認するための推論されたスキーマの例です。

推論されたスキーマ:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

以下のスキーマヒントを指定することで

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

以下が得られます

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

注:

配列およびマップスキーマヒントのサポートは、Databricks Runtime 9.1 LTS以降で利用できます。

以下は、スキーマヒントを使用して動作を確認するための、データ型が複雑な推論されたスキーマの例です。

推論されたスキーマ:

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

以下のスキーマヒントを指定することで

.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

以下が得られます

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

注:

スキーマヒントは、Auto Loaderにスキーマを指定しない場合にのみ使用されます。cloudFiles.inferColumnTypesが有効か無効かに関係なく、スキーマヒントを使用できます。