ベストプラクティス:Delta Lake

この記事では、Delta Lakeを使用する際のベストプラクティスについて説明します。

Databricksの最適化については、「Databricksにおける最適化の推奨事項」を参照してください。

同じ場所にあるテーブルを削除して再作成する場合は、常に CREATE OR REPLACE TABLE ステートメントを使用する必要があります。 「 Delta テーブルの削除または置換」を参照してください。

最適化されたデータスキップのためにリキッドクラスタリングを使用する

Databricks では、データ スキップのデータ レイアウトを最適化するために、パーティショニング、 Z-Order 、またはその他のデータ編成戦略ではなく、リキッドクラスタリングを使用することをお勧めします。 「 Deltaテーブルにリキッドクラスタリングを使用する」を参照してください。

コンパクトファイル

Databricks では、 OPTIMIZEコマンドを頻繁に実行して小さなファイルを圧縮することをお勧めします。

このオペレーションでは、古いファイルは削除されません。これらを削除するには、VACUUMコマンドを実行する。

テーブルのコンテンツまたはスキーマを置き換える

場合によっては、Deltaテーブルを置き換える必要があるかもしれません。例:

  • テーブル内のデータが間違っていることに気づき、内容を置き換えたいと考えています。

  • テーブル全体を書き換えて、互換性のないスキーマ変更(列タイプの変更など)を実行したいと考えています。

Deltaテーブルのディレクトリ全体を削除して、同じパスに新しいテーブルを作成することもできますが、次の理由からお勧めできません

  • ディレクトリの削除は効率的ではありません。非常に大きなファイルが含まれるディレクトリの削除には、数時間、場合によっては数日かかる場合があります。

  • 削除されたファイルのすべてのコンテンツが失われます。間違ったテーブルを削除すると、回復が難しくなります。

  • ディレクトリの削除はアトミックではありません。テーブルを削除している間、テーブルを読み取る同時クエリーが失敗したり、テーブルの一部が表示されたりする可能性があります。

テーブル スキーマを変更する必要がない場合は、Delta テーブルからデータ を削除して 新しいデータを挿入するか、テーブル を更新 して誤った値を修正できます。

テーブルスキーマを変更する場合は、テーブル全体をアトミックに置き換えることができます。例:

dataframe.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .saveAsTable("<your-table>") # Managed table

dataframe.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .option("path", "<your-table-path>") \
  .saveAsTable("<your-table>") # External table
REPLACE TABLE <your-table> USING DELTA AS SELECT ... -- Managed table
REPLACE TABLE <your-table> USING DELTA LOCATION "<your-table-path>" AS SELECT ... -- External table
dataframe.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable("<your-table>") // Managed table

dataframe.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .option("path", "<your-table-path>")
  .saveAsTable("<your-table>") // External table

このアプローチには複数の利点があります:

  • テーブルの上書きは、ディレクトリを再帰的にリストアップしたり、ファイルを削除したりする必要がないため、はるかに高速です。

  • 古いバージョンのテーブルがまだ存在しています。間違ったテーブルを削除した場合でも、タイムトラベルを使用して古いデータを簡単に取得できます。「Delta Lakeのテーブル履歴を取り扱う」を参照してください。

  • これはアトミックなオペレーションです。同時実行クエリーは、テーブルを削除している間もテーブルを読み取ることができます。

  • Delta Lake ACIDトランザクション保証のため、テーブルの上書きが失敗した場合、テーブルは以前の状態になります。

さらに、テーブルを上書きした後、ストレージコストを節約するために古いファイルを削除する場合は、 VACUUM を使用してそれらを削除できます。 ファイルの削除に最適化されており、通常はディレクトリ全体を削除するよりも高速です。

スパークキャッシュ

Databricks では、次の理由により、Spark キャッシュの使用をお勧めしません。

  • キャッシュされたDataFrameの上に追加された追加のフィルターによって発生する可能性のあるデータスキップにより、データが失われます。

  • キャッシュされるデータは、別の識別子を使用してテーブルにアクセスすると更新されない可能性があります。

Apache SparkのDelta LakeとParquetの違い

Delta Lakeは以下のオペレーションを自動的に処理します。これらのオペレーションを手動で実行しないでください:

  • REFRESH TABLE:Deltaテーブルは常に最新の情報を返すので、変更後に手動でREFRESH TABLEを呼び出す必要はありません。

  • パーティションの追加と削除:Delta Lakeは、テーブルにあるパーティションのセットを自動的に追跡し、データが追加または削除されるとリストを更新します。そのため、ALTER TABLE [ADD|DROP] PARTITIONまたはMSCKを実行する必要はありません。

  • 単一のパーティションをロードする:パーティションを直接読み取る必要はありません。たとえば、spark.read.format("parquet").load("/data/date=2017-01-01")を実行する必要はありません。代わりに、データをスキップするには、spark.read.table("<table-name>").where("date = '2017-01-01'")などのWHERE句を使用します。

  • データファイルを手動で変更しないでください:Delta Lakeはトランザクションログを使用して、テーブルへの変更をアトミックにコミットします。データの損失やテーブルの破損につながる可能性があるため、Deltaテーブル内のParquetデータファイルを直接変更、追加、削除しないでください。

Delta Lakeマージのパフォーマンスを向上させる

マージにかかる時間を短縮するには、次の方法を使用します。

  • 一致の検索スペースを縮小します:デフォルトでは、mergeオペレーションはソーステーブルのマッチを見つけるためにDeltaテーブル全体を検索します。mergeを高速化する一つの方法は、マッチ条件に既知の制約を追加して探索空間を縮小することです。たとえば、countrydateでパーティション化されたテーブルがあり、mergeを使用して最終日と特定の国の情報を更新します。以下の条件を追加すると、関連するパーティションでのみマッチを検索するため、クエリーがより高速になります:

    events.date = current_date() AND events.country = 'USA'
    

    さらに、このクエリーは他の同時オペレーションと競合する可能性も低くなります。詳細については、「Databricksの分離レベルと書き込みコンフリクト」を参照してください。

  • コンパクトなファイル:データが多数の小さなファイルに保存されている場合、マッチを検索するためにデータを読み込むのに時間がかかることがあります。小さなファイルを大きなファイルに圧縮して、読み取りスループットを向上させることができます。詳しくは、「Delta LakeでOPTIMIZEによりデータファイルを圧縮する」をご覧ください。

  • 書き込み用のシャッフルパーティションを制御するmergeオペレーションはデータを複数回シャッフルして、更新されたデータを計算して書き込みます。シャッフルに使用されるタスクの数は、Sparkセッション構成spark.sql.shuffle.partitionsによって制御されます。このパラメーターを設定すると、並列処理が制御されるだけでなく、出力ファイルの数も決まります。値を増やすと並列処理が増加しますが、より多くの小さなデータファイルが生成されます。

  • 最適化された書き込みを有効にする: パーティション分割されたテーブルの場合、 merge はシャッフル パーティションの数よりもはるかに多くの小さなファイルを生成する可能性があります。 これは、すべてのシャッフルタスクが複数のパーティションに複数のファイルを書き込む可能性があり、パフォーマンスのボトルネックになる可能性があるためです。 最適化された書き込みを有効にすることで、ファイルの数を減らすことができます。 「 Databricks での Delta Lake 用に最適化された書き込み」を参照してください。

  • テーブル内のファイル サイズを調整する: Databricks は、Delta テーブルにファイルを書き換える merge 操作が頻繁にあるかどうかを自動的に検出し、将来のファイルの書き換えを見越して、書き換えられたファイルのサイズを小さくすることを選択できます。 詳細については、 ファイル サイズのチューニング に関するセクションを参照してください。

  • 低シャッフル マージ: 低シャッフル マージ は、ほとんどの一般的なワークロードのパフォーマンスを向上させる最適化された MERGE の実装を提供します。 さらに、変更されていないデータに対する Z-Ordering など、既存のデータ レイアウトの最適化が保持されます。

データの最新性を管理する

各クエリーの開始時に、Deltaテーブルはテーブルの最新バージョンに自動更新されます。このプロセスは、コマンドステータスがUpdating the Delta table's stateと報告するときにノートブックで観察できます。ただし、テーブルで履歴分析を実行する場合、特にストリーミングデータが頻繁に取り込まれるテーブルの場合は、必ずしも最新のデータが必要なわけではありません。このような場合、Deltaテーブルの古いスナップショットに対してクエリーを実行できます。このアプローチにより、クエリーから結果を取得する際の待ち時間を短縮できます。

古いデータの許容度を構成するには、Spark セッション構成 spark.databricks.delta.stalenessLimit1h15m (それぞれ 1 時間または 15 分) などの時間文字列値で設定します。 この構成はセッション固有であり、テーブルにアクセスする他のクライアントには影響しません。 テーブルの状態が古さ制限内で更新された場合、テーブルに対するクエリーは、最新のテーブルの更新を待たずに結果を返します。 この設定によってテーブルの更新が妨げられることはなく、古いデータが返されると、更新処理がバックグラウンドで行われます。 最後のテーブル更新が古さの制限よりも古い場合、クエリーはテーブル状態の更新が完了するまで結果を返しません。

低遅延クエリーのための強化されたチェックポイント

Delta Lakeは、最適化された頻度でDeltaテーブルの集約状態としてチェックポイントを書き込みます。これらのチェックポイントは、テーブルの最新の状態を計算する開始点として機能します。チェックポイントがなければ、Delta Lakeはテーブルの状態を計算するために、トランザクションログへのコミットを表すJSONファイルの大規模なコレクション(「Delta」ファイル)を読み取る必要があります。さらに、Delta Lakeがデータスキップを実行するために使用する列レベルの統計は、チェックポイントに保存されます。

重要

Delta Lakeチェックポイントは、Structured Streamingチェックポイントとは異なります。

列レベルの統計は、構造体および JSON として保存されます (下位互換性のため)。 構造体形式を使用すると、次の理由から Delta Lake の読み取りが大幅に高速になります。

  • Delta Lakeは、列レベルの統計を取得するために高価なJSON解析を実行しません。

  • Parquet列のプルーニング機能により、列の統計を読み取るために必要なI/Oが大幅に削減されます。

構造体形式を使用すると、Delta Lake読み取りオペレーションのオーバーヘッドを数秒から数十ミリ秒に削減する一連の最適化が可能になり、短いクエリーの待ち時間が大幅に短縮されます。

チェックポイント での列レベルの統計の管理

テーブルプロパティdelta.checkpoint.writeStatsAsJsonおよびdelta.checkpoint.writeStatsAsStructを使用して、チェックポイントに統計を書き込む方法を管理します。両方のテーブルプロパティがfalseの場合、Delta Lakeはデータスキップを実行できません

  • バッチ書き込みでは、JSON形式と構造体形式の両方で統計情報を書き込みます。delta.checkpoint.writeStatsAsJsontrueです.

  • delta.checkpoint.writeStatsAsStruct デフォルトでは未定義です。

  • リーダーは、利用可能な場合はstruct列を使用し、利用できない場合はJSON列の使用に戻ります。

重要

強化されたチェックポイントによって、オープンソースのDelta Lakeリーダーとの互換性が損なわれることはありません。ただし、delta.checkpoint.writeStatsAsJsonfalseに設定すると、独自のDelta Lakeリーダーに影響が出る可能性があります。パフォーマンスへの影響の詳細については、ベンダーにお問い合わせください。

構造化ストリーミングクエリーの拡張チェックポイントを有効にする

Structured Streamingワークロードに低レイテンシー要件(1分未満のレイテンシー)がない場合は、次のSQLコマンドを実行して拡張チェックポイントを有効にすることができます:

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
('delta.checkpoint.writeStatsAsStruct' = 'true')

次のテーブルプロパティを設定することで、チェックポイントの書き込み遅延を改善することもできます:

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
(
 'delta.checkpoint.writeStatsAsStruct' = 'true',
 'delta.checkpoint.writeStatsAsJson' = 'false'
)

アプリケーションでデータスキップが役に立たない場合は、両方のプロパティをfalseに設定できます。その場合、統計は収集も書き込まれません。Databricksはこの構成を推奨していません。