Delta Lakeでデータを選択的に上書きする

DatabricksはDelta Lake機能を利用して、選択的上書きのための2つの異なるオプションをサポートします:

  • replaceWhereオプションは、指定された述語に一致するすべてのレコードをアトミックに置き換えます。

  • 動的パーティションの上書きを使用して、テーブルのパーティション分割方法に基づいてデータのディレクトリを置き換えることができます。

ほとんどの操作では、DatabricksはreplaceWhereを使用して上書きするデータを指定することを推奨します。

重要

データが誤って上書きされた場合は、 復元 を使用して変更を元に戻すことができます。

replaceWhereによる任意の選択的上書き

任意の式に一致するデータのみを選択的に上書きできます。

SQL には Databricks Runtime 12.2 LTS 以上が必要です。

次のコマンドは、start_dateでパーティショニングされたターゲットテーブルの1月のイベントをreplace_dataのデータにアトミックに置き換える:

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")
)
replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")
INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

このサンプル コードは、 replace_dataにデータを書き込み、すべての行が述語と一致することを検証し、 overwrite セマンティクスを使用してアトミック置換を実行します。 操作のいずれかの値が制約の範囲外にある場合、この操作は default によるエラーで失敗します。

この動作を変更して、述語範囲内の値 overwrite 、指定した範囲外のレコード insert ことができます。 これを行うには、次のいずれかの設定を使用して spark.databricks.delta.replaceWhere.constraintCheck.enabled を false に設定し、制約チェックを無効にします。

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)
SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

従来の動作

従来のデフォルトの動作では、 replaceWhereパーティション列の述語に一致するデータのみを上書きしていました。 このレガシー モデルでは、次のコマンドは、 dateでパーティション化されたターゲット テーブル内の 1 月をdfのデータにアトミックに置き換えます。

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/tmp/delta/people10m")
)
df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/tmp/delta/people10m")

以前の動作にフォールバックする場合は、 spark.databricks.delta.replaceWhere.dataColumns.enabled フラグを無効にすることができます。

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

動的パーティションの上書き

プレビュー

この機能はパブリックプレビュー段階にあります。

Databricks Runtime 11.3 LTS 以降では、パーティション テーブルの動的パーティション上書きモードがサポートされています。 複数のパーティションを持つテーブルの場合、Databricks Runtime 11.3 LTS 以下では、すべてのパーティション列が同じデータ型である場合にのみ、動的パーティション上書きがサポートされます。

動的パーティション上書きモードの場合、操作により、書き込みが新しいデータをコミットする各論理パーティション内のすべての既存データが上書きされます。書き込みにデータが含まれていない既存の論理パーティションは変更されません。このモードは、データが上書きモード(SQLのINSERT OVERWRITEまたはdf.write.mode("overwrite")を使用したDataFrame書き込み)で書き込まれている場合にのみ適用されます。

Sparkセッション構成spark.sql.sources.partitionOverwriteModedynamicに設定して、動的パーティション上書きモードを構成します。DataFrameWriterオプションpartitionOverwriteModedynamicに設定してこれを有効にすることもできます。存在する場合、クエリー固有のオプションはセッション構成で定義されたモードを上書きします。partitionOverwriteModeのデフォルトはstaticです。

重要

動的パーティションの上書きで書き込まれたデータが、予想されるパーティションのみに影響することを検証します。間違ったパーティションに1行が含まれていると、パーティション全体が意図せず上書きされてしまう可能性があります。

次の例は、動的パーティションのオーバーライトを使用する例です:

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;
(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)
df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

  • 動的パーティション上書きはパーティションテーブルのオプションreplaceWhereと競合します。

    • Sparkセッション構成で動的パーティションの上書きが有効になっており、replaceWhereDataFrameWriterオプションとして提供されている場合、Delta LakeはreplaceWhere式に従ってデータを上書きします(クエリー固有のオプションはセッション構成をオーバーライドします)。

    • DataFrameWriterオプションで動的パーティションの上書きとreplaceWhereの両方が有効になっている場合、エラーが発生します。

  • 動的パーティションの上書きを使用する場合、overwriteSchematrueとして指定することはできません。