一般的なデータ読み込みパターン

Auto Loader 、多くの一般的なデータ取り込みタスクを簡略化します。 このクイック リファレンスでは、いくつかの一般的なパターンの例を示します。

glob パターン を使用したディレクトリまたはファイルのフィルタリング

glob パターンは、パスに指定されているディレクトリとファイルのフィルタリングに使用できます。

パターン

説明

?

任意の 1 文字に一致

*

0 個以上の文字に一致します。

[abc]

文字セット {a,b,c} の 1 文字に一致します。

[a-z]

文字範囲の 1 文字に一致します {a...z}.

[^a]

文字セットまたは範囲 {a} 以外の 1 つの文字と一致します。 ^ 文字は、左角かっこ (かっこ) のすぐ右側に記述する必要があることに注意してください。

{ab,cd}

文字列セット {ab, cd} の文字列と一致します。

{ab,c{de, fh}}

文字列セット {ab, cde, cfh} の文字列と一致します。

プレフィックス パターンを指定する path を使用します。

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base-path>/*/files")
val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base-path>/*/files")

重要

サフィックス パターンを明示的に指定するには、オプション pathGlobFilter を使用する必要があります。 path はプレフィックス フィルターのみを提供します。

たとえば、異なるサフィックスを持つファイルを含むディレクトリ内の png つのファイルのみを解析する場合は、次のようにします。

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base-path>)
val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base-path>)

Auto Loader の既定のグロビング動作は、他の Spark ファイル ソースの既定の動作とは異なります。読み取りに .option("cloudFiles.useStrictGlobber", "true") を追加して、ファイル ソースに対する既定の Spark 動作に一致するグロビングを使用します。 グロビングの詳細については、次の表を参照してください。

パターン

ファイルパス

デフォルトのglobber

厳格なglobber

/a/b

/a/b/c/file.txt

はい

はい

/a/b

/a/b_dir/c/file.txt

いいえ

いいえ

/a/b

/a/b.txt

いいえ

いいえ

/a/b/

/a/b.txt

いいえ

いいえ

/a/*/c/

/a/b/c/file.txt

はい

はい

/a/*/c/

/a/b/c/d/file.txt

はい

はい

/a/*/c/

/a/b/x/y/c/file.txt

はい

いいえ

/a/*/c

/a/b/c_file.txt

はい

いいえ

/a/*/c/

/a/b/c_file.txt

はい

いいえ

/a/*/c/

/a/*/cookie/file.txt

はい

いいえ

/a/b*

/a/b.txt

はい

はい

/a/b*

/a/b/file.txt

はい

はい

/a/{0.txt,1.txt}

/a/0.txt

はい

はい

/a/*/{0.txt,1.txt}

/a/0.txt

いいえ

いいえ

/a/b/[cde-h]/i/

/a/b/c/i/file.txt

はい

はい

簡単な ETL を有効にする

データを失うことなくデータを Delta Lake に取り込む簡単な方法は、次のパターンを使用し、 Auto Loaderでスキーマ推論を有効にすることです。 Databricks では、ソース データのスキーマが変更されたときにストリームを自動的に再起動するために、Databricks ジョブで次のコードを実行することをお勧めします。 デフォルトでは、スキーマは文字列型として推論され、解析エラー(すべてが文字列として残っている場合は何もないはずです)は _rescued_dataに移動し、新しい列はストリームに失敗し、スキーマを進化させます。

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")
spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

適切に構造化されたデータ のデータ損失を防止

スキーマはわかっているが、予期しないデータを受信するたびに知りたい場合は、Databricks で rescuedDataColumnを使用することをお勧めします。

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")
spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

スキーマと一致しない新しいフィールドが導入された場合にストリームの処理を停止する場合は、以下を追加できます。

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

柔軟な半構造化データパイプライン を実現

提供する情報に新しい列を導入するベンダーからデータを受け取る場合、ベンダーがいつそれを行うかを正確に認識していないか、データパイプラインを更新するための帯域幅がない可能性があります。 スキーマ進化を利用してストリームを再起動し、 Auto Loader 推論されたスキーマを自動的に更新できるようになりました。 また、ベンダーが提供している可能性のある「スキーマレス」フィールドの一部に schemaHints を活用することもできます。

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")
spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

ネストされた JSON データの 変換

Auto Loader は最上位の JSON 列を文字列として推論するため、さらに変換を必要とする入れ子になった JSON オブジェクトを残すことができます。半構造化データ アクセス APIs を使用して、複雑な JSON コンテンツをさらに変換できます。

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<source-data-with-nested-json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )
spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<source-data-with-nested-json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

ネストされた JSON データを 推測する

入れ子になったデータがある場合は、 cloudFiles.inferColumnTypes オプションを使用して、データやその他の列の入れ子になった構造を推測できます。

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source-data-with-nested-json>")
spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source-data-with-nested-json>")

ヘッダー なしでCSVファイルを読み込む

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)
val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

ヘッダー 付きの CSV ファイルにスキーマを適用する

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)
val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

機械学習用のDelta Lakeに画像またはバイナリ データを取り込む

データが Delta Lake に格納されたら、データに対して分散推論を実行できます。 「 pandas UDF を使用した分散推論の実行」を参照してください。

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")
spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

DLT用のAuto Loader構文

Delta Live Tables では、Auto Loader向けに若干変更されたPython 構文が提供され、Auto Loaderの SQL サポートが追加されています。

以下の例では、Auto Loaderを使用してCSVファイルとJSONファイルからデータセットを作成しています。

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

サポートされている 形式オプション は、 Auto Loaderで使用できます。 map() 関数を使用すると、cloud_files() メソッドにオプションを渡すことができます。オプションはキーと値のペアで、キーと値は文字列です。 次に、SQL で Auto Loader を操作するための構文について説明します。

CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

以下の例では、ヘッダー付きのタブ区切りCSVファイルからデータを読み込んでいます。

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))

schemaを使用して、フォーマットを手動で指定することができます。スキーマ推論をサポートしていないフォーマットの場合は、schemaを指定する必要があります。

@dlt.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )
CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
  FROM cloud_files(
    "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
    "parquet",
    map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
  )

Delta Live Tablesは、Auto Loaderを使用してファイルを読み取るときに、スキーマディレクトリとチェックポイントディレクトリを自動的に構成および管理します。ただし、これらのディレクトリのいずれかを手動で構成する場合、完全リフレッシュを実行しても、構成されたディレクトリの内容は影響を受けません。Databricksでは、処理中の予期しない副作用を避けるために、自動構成ディレクトリを使用することをお勧めしています。