不正なレコードとファイルを処理する

Databricks には、不良レコードを含むファイルを処理するためのオプションが多数用意されています。 不良データの例を次に示します。

  • 不完全または破損したレコード:主にJSONやCSVなどのテキストベースのファイル形式で観察されます。 たとえば、右中かっこのない JSON レコードや、CSV ファイルのヘッダーまたは最初のレコードほど多くの列がない CSV レコードなどです。

  • データ型の不一致: 列の値が指定または推論されたデータ型を持たない場合。

  • 不正なフィールド名: ファイルまたはレコードで指定された列名の大文字と小文字が、指定されたスキーマまたは推論されたスキーマと異なる場合、すべてのファイル形式で発生する可能性があります。

  • 破損したファイル: ファイルを読み取ることができない場合、Avro、Parquet、ORC などのバイナリ ファイルの種類のメタデータまたはデータの破損が原因である可能性があります。 まれに、基盤となるストレージ・システムの長期にわたる一時的な障害が原因である可能性があります。

  • 見つからないファイル: クエリー分析時に検出され、処理時に存在しなくなったファイル。

badRecordsPathを使用する

badRecordsPathを設定すると、指定したパスは、データの読み込み中に発生した不良レコードまたはファイルの例外を記録します。

破損したレコードとファイルに加えて、削除されたファイル、ネットワーク接続例外、IO例外などを示すエラーは無視され、 badRecordsPathに記録されます。

ファイルベースのデータソースで badRecordsPath オプションを使用するには、いくつかの重要な制限があります。

  • これは非トランザクションであり、一貫性のない結果につながる可能性があります。

  • 一時的なエラーはエラーとして扱われます。

入力ファイルが見つかりません

val df = spark.read
  .option("badRecordsPath", "/tmp/badRecordsPath")
  .format("parquet").load("/input/parquetFile")

// Delete the input parquet file '/input/parquetFile'
dbutils.fs.rm("/input/parquetFile")

df.show()

上記の例では、 df.show() が入力ファイルを見つけることができないため、Spark はエラーを記録するために JSON 形式の例外ファイルを作成します。 たとえば、 /tmp/badRecordsPath/20170724T101153/bad_files/xyz は例外ファイルのパスです。 このファイルは、指定された badRecordsPath ディレクトリ /tmp/badRecordsPathの下にあります。 20170724T101153 この DataFrameReaderの作成時間です. bad_files は例外の種類です。 xyz は、不正なファイルのパスと例外/理由メッセージを含むJSONレコードを含むファイルです。

入力ファイルに不正なレコードが含まれています

// Creates a json file containing both parsable and corrupted records
Seq("""{"a": 1, "b": 2}""", """{bad-record""").toDF().write.format("text").save("/tmp/input/jsonFile")

val df = spark.read
  .option("badRecordsPath", "/tmp/badRecordsPath")
  .schema("a int, b int")
  .format("json")
  .load("/tmp/input/jsonFile")

df.show()

この例では、 DataFrame には最初の解析可能なレコード ({"a": 1, "b": 2}) のみが含まれています。 2 番目の不良レコード ({bad-record) は、 /tmp/badRecordsPath/20170724T114715/bad_records/xyzにある JSON ファイルである例外ファイルに記録されます。 例外ファイルには、不良レコード、レコードを含むファイルのパス、および例外/理由メッセージが含まれています。 例外ファイルを見つけたら、JSON リーダーを使用してそれらを処理できます。