Databricksでの分離レベルと書き込みの競合

テーブルの分離レベルは、並列操作によって行われた変更からトランザクションを分離する必要がある度合いを定義します。 Databricks での書き込みの競合は、分離レベルによって異なります。

Delta Lake は、読み取りと書き込みの間の ACIDトランザクション保証を提供します。 これは、次のことを意味します。

  • 複数のクラスターにまたがる複数のライターが、テーブル パーティションを同時に変更できます。 ライターにはテーブルの一貫性のあるスナップショット ビューが表示され、書き込みはシリアル順で行われます。

    • 読者には、ジョブ中にテーブルが変更された場合でも、Databricks ジョブが開始されたテーブルの一貫性のあるスナップショット ビューが引き続き表示されます。

Databricksの ACID 保証とは何ですか?」を参照してください。

Databricks 、デフォルトですべてのテーブルにデルタレイクを使用します。 この記事では、Databricks での Delta Lake の動作について説明します。

重要

メタデータの変更により、すべての書き込み操作が失敗します。 これらの操作には、テーブルプロトコル、テーブルプロパティ、またはデータスキーマの変更が含まれます。

テーブルのメタデータを変更するコミットが発生すると、ストリーミング読み取りは失敗します。 ストリームを継続したい場合は、ストリームを再起動する必要があります。 推奨される方法については、 「構造化ストリーミングの本番運用に関する考慮事項」を参照してください。

メタデータを変更するクエリの例を次に示します。

-- Set a table property.
ALTER TABLE table-name SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')

-- Enable a feature using a table property and update the table protocol.
ALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);

-- Drop a table feature.
ALTER TABLE table_name DROP FEATURE deletionVectors;

-- Upgrade to UniForm.
REORG TABLE table_name APPLY (UPGRADE UNIFORM(ICEBERG_COMPAT_VERSION=2));

-- Update the table schema.
ALTER TABLE table_name ADD COLUMNS (col_name STRING);

行レベルの同時実行との書き込みの競合

行レベルのコンカレンシーは、行レベルでの変更を検出し、並列書き込みが同じデータ ファイル内の異なる行を更新または削除するときに発生する競合を自動的に解決することで、並列書き込み操作間の競合を減らします。

行レベルの同時実行は、Databricks Runtime 14.2 以降で一般的に利用可能です。 行レベルの同時実行性は、次の条件でデフォルトでサポートされます。

  • 削除が有効になっており、パーティション化されていないテーブル。

  • 削除を無効にしていない限り、リキッドクラスタリングのあるテーブル。

パーティションのあるテーブルは行レベルの同時実行をサポートしていませんが、削除が有効になっている場合でも、 OPTIMIZEと他のすべての書き込み操作の間の競合を回避できます。 行レベルの同時実行性の制限を参照してください。

他の Databricks Runtime バージョンについては、 「行レベルの同時実行プレビュー動作 (レガシー)」を参照してください。

MERGE INTO 行レベルの同時実行性をサポートするには、Databricks Runtime 14.2 の Photon が必要です。 Databricks Runtime 14.3 LTS 以降では、Photon は必要ありません。

次の表は、ロー・レベルの同時実行性が有効な場合、各 独立性レベルで 競合する可能性がある書き込み操作のペアを示しています。

ID 列を持つテーブルは、並列トランザクションをサポートしていません。 Delta Lakeでの ID 列の使用を参照してください。

インサート (1)

更新、削除、マージ先

最適化

挿入する

競合できません

更新、削除、マージ先

WriteSerializable で競合することはできません。 同じ行を変更するときに Serializable で競合する可能性があります。 行レベルの同時実行性の制限を参照してください。

同じ行を変更するときに競合する可能性があります。 行レベルの同時実行性の制限を参照してください。

最適化

競合できません

ZORDER BY を使用すると競合する可能性があります。それ以外では競合できません。

ZORDER BY を使用すると競合する可能性があります。それ以外では競合できません。

重要

(1) 上記の表のすべての INSERT 操作は、コミットする前に同じテーブルからデータを読み取らない追加操作について説明しています。 同じテーブルを読み取るサブクエリを含むINSERT 操作は、 MERGEと同じ同時実行性をサポートします。

REORG 操作の分離セマンティクスは、削除ベクトルに記録された変更を反映するようにデータ ファイルを書き換える場合の OPTIMIZE と同じです。 REORG を使用してアップグレードを適用すると、テーブルプロトコルが変更され、進行中のすべてのオペレーションと競合します。

行レベルの同時実行性のない書き込みの競合

次の表では、各 独立性レベルで競合する書き込み操作のペアについて説明します。

テーブルは、パーティションが定義されている場合、または削除ベクトルが有効になっていない場合、行レベルの同時実行をサポートしません。 Databricks Runtime 14.2 以降は、行レベルのコンカレンシーに必要です。

ID 列を持つテーブルは、並列トランザクションをサポートしていません。 Delta Lakeでの ID 列の使用を参照してください。

インサート (1)

更新、削除、マージ先

最適化

挿入する

競合できません

更新、削除、マージ先

WriteSerializable で競合することはできません。 Serializable で競合する可能性があります。 「パーティションとの競合の回避」を参照してください。

Serializable と WriteSerializable で競合する可能性があります。 「パーティションとの競合の回避」を参照してください。

最適化

競合できません

ZORDER BYが使用されない限り、削除ベクトルが有効になっているテーブル内で競合することはできません。 そうしないと競合する可能性があります。

ZORDER BYが使用されない限り、削除ベクトルが有効になっているテーブル内で競合することはできません。 そうしないと競合する可能性があります。

重要

(1) 上記の表のすべての INSERT 操作は、コミットする前に同じテーブルからデータを読み取らない追加操作について説明しています。 同じテーブルを読み取るサブクエリを含むINSERT 操作は、 MERGEと同じ同時実行性をサポートします。

REORG 操作の分離セマンティクスは、削除ベクトルに記録された変更を反映するようにデータ ファイルを書き換える場合の OPTIMIZE と同じです。 REORG を使用してアップグレードを適用すると、テーブルプロトコルが変更され、進行中のすべてのオペレーションと競合します。

行レベルの同時実行に関する制限事項

行レベルの同時実行には、いくつかの制限が適用されます。 次の操作では、競合の解決は、Databricks での書き込み競合の通常のコンカレンシーに従います。 「行レベルの同時実行性を使用しない書き込みの競合」を参照してください。

  • 次のような複雑な条件句を持つコマンド

    • 構造体、配列、マップなどの複雑なデータ型の条件。

    • 非決定論的な式とサブクエリを使用する条件。

    • 相関サブクエリを含む条件。

  • Databricks Runtime 14.2 では、 MERGEコマンドは、ソース テーブルに一致する行をフィルター処理するために、ターゲット テーブルで明示的な述語を使用する必要があります。 マージ解決の場合、フィルターは並列操作のフィルター条件に基づいて競合する可能性のある行のみをスキャンします。

行レベルの競合検出により、合計実行時間が長くなる可能性があります。 多くの並列トランザクションの場合、ライターは競合の解決よりも待機時間を優先し、競合が発生する可能性があります。

削除ベクトルのすべての制限も適用されます。 「制限事項」を参照してください。

Delta Lake はテーブルを読まずにいつコミットしますか?

Delta Lake INSERT または追加操作は、次の条件が満たされている場合、コミットする前にテーブルの状態を読み取りません。

  1. ロジックは、SQL ロジックまたは追加モードを使用して INSERT 表されます。

  2. ロジックには、書き込み操作の対象となるテーブルを参照するサブクエリや条件は含まれていません。

他のコミットと同様に、Delta Lake はトランザクションログのメタデータを使用してコミット時にテーブルのバージョンを検証して解決しますが、テーブルのバージョンは実際には読み取られません。

多くの一般的なパターンでは、 MERGE 操作を使用して、テーブル条件に基づいてデータを挿入します。 INSERT ステートメントを使用してこのロジックを書き直すこともできますが、条件式がターゲット表の列を参照する場合、これらのステートメントには MERGEと同じ並行性の制限があります。

書き込み直列可能分離レベルと直列化可能分離レベル

テーブルの独立性レベルは、並列トランザクションによって行われた変更からトランザクションを分離する必要がある程度を定義します。 Databricks のDelta Lake では、シリアル化可能と書き込みシリアル化可能という 2 つの分離レベルがサポートされています。

  • シリアル化可能: 最も強力な分離レベル。 これにより、コミットされた書き込み操作とすべての読み取りが シリアル化可能になります。 操作は、表に示されているものと同じ結果を生成する一度に 1 つずつ実行する一連のシーケンスが存在する限り許可されます。 書き込み操作の場合、シリアルシーケンスはテーブルの履歴に表示されるシーケンスとまったく同じです。

  • WriteSerializable (デフォルト): Serializable よりも弱い分離レベルです。 これにより、書き込み操作 (つまり、読み取りではない) がシリアル化可能であることだけが保証されます。 ただし、これは スナップショット アイソレーションよりも強力です。 WriteSerializable は、ほとんどの一般的な操作でデータの一貫性と可用性のバランスが取れているため、既定の分離レベルです。

    このモードでは、Delta テーブルの内容は、テーブル履歴に表示される一連の操作から予想される内容と異なる場合があります。 これは、このモードでは、並列書き込みの特定のペア (たとえば、操作 X と Y) を続行して、Y が X の後にコミットされたことが履歴に示されている場合でも、結果が X の前に実行された (つまり、それらの間でシリアル化可能) になるためです。 この並べ替えを禁止するには、 テーブル分離レベルを Serializable に設定して、これらのトランザクションを失敗させます。

読み取り操作では、常にスナップショット分離が使用されます。 書き込み分離レベルは、履歴に従って「存在しなかった」テーブルのスナップショットをリーダーが表示できるかどうかを決定します。

シリアル化可能レベルでは、リーダーには常に履歴に準拠するテーブルのみが表示されます。 WriteSerializable レベルでは、リーダーは Delta ログに存在しないテーブルを参照できます。

たとえば、実行時間の長い削除である txn1 と、txn1 によって削除されたデータを挿入する txn2 について考えてみます。 TXN2 と TXN1 が完了し、履歴にこの順序で記録されます。 履歴によると、txn2に挿入されたデータはテーブルに存在しないはずです。 シリアル化可能なレベルの場合、リーダーはtxn2によって挿入されたデータを見ることはありません。 ただし、WriteSerializable レベルの場合、リーダーはある時点で txn2 によって挿入されたデータを見ることができます。

各独立性レベルで競合する可能性のある操作の種類と、考えられるエラーの詳細については、 パーティショニングおよび不整合コマンド条件を使用した競合の回避を参照してください。

独立性レベルを設定する

独立性レベルを設定するには、 ALTER TABLE コマンドを使用します。

ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = <level-name>)

ここで、 <level-name>Serializable または WriteSerializableです。

たとえば、独立性レベルをデフォルトの WriteSerializable から Serializableに変更するには、次のコマンドを実行します。

ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')

パーティショニングと分離されたコマンド条件を使用した競合の回避

"競合する可能性があります" とマークされているすべての場合において、2 つの操作が競合するかどうかは、同じファイル セットを操作するかどうかによって異なります。 2 つのファイル セットの不整合化は、操作の条件で使用される列と同じ列でテーブルをパーティション分割することで解消できます。 たとえば、テーブルが日付でパーティション分割されていない場合、2 つのコマンド UPDATE table WHERE date > '2010-01-01' ...DELETE table WHERE date < '2010-01-01' は、どちらも同じファイル セットを変更しようとする可能性があるため、競合します。 テーブルを date でパーティション分割すると、競合を回避できます。 したがって、コマンドで一般的に使用される条件に従ってテーブルをパーティション分割すると、競合を大幅に減らすことができます。 ただし、カーディナリティの高い列でテーブルをパーティション分割すると、サブディレクトリの数が多いため、他のパフォーマンスの問題が発生する可能性があります。

競合の例外

トランザクションの競合が発生すると、次のいずれかの例外が発生します。

同時追加例外

この例外は、並列操作によって、操作が読み取るのと同じパーティション (またはパーティション分割されていないテーブル内の任意の場所) にファイルが追加される場合に発生します。 ファイルの追加は、 INSERTDELETEUPDATE、または MERGE 操作によって発生する可能性があります。

デフォルトの 独立性レベル WriteSerializableでは、 ブラインド INSERT 操作 (つまり、データを読み取らずにデータを盲目的に追加する操作) によって追加されたファイルは、同じパーティション (またはパーティション分割されていないテーブル内の任意の場所) にアクセスしても、どの操作とも競合しません。 独立性レベルが Serializableに設定されている場合、ブラインド アペンドが競合する可能性があります。

この例外は、並列 DELETEUPDATE、または MERGE 操作中にスローされることがよくあります。 並列操作は異なるパーティション ディレクトリを物理的に更新している可能性がありますが、一方が他方が並列で更新するのと同じパーティションを読み取るため、競合が発生する可能性があります。 これを回避するには、操作条件で分離を明示的にします。 次の例を考えてみましょう。

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

上記のコードを異なる日付または国に対して同時に実行するとします。 各ジョブはターゲット Delta テーブル上の独立したパーティションで動作しているため、競合は発生しません。 ただし、条件は十分に明示的ではなく、テーブル全体をスキャンでき、他のパーティションを更新する並列操作と競合する可能性があります。 代わりに、次の例に示すように、ステートメントを書き直して、マージ条件に特定の日付と国を追加できます。

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + <date> + "' AND t.country = '" + <country> + "'")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

この操作は、異なる日付と国で同時に実行しても安全です。

同時削除読み取り例外

この例外は、並列操作によって、操作によって読み取られたファイルが削除された場合に発生します。 一般的な原因は、ファイルを書き換える DELETEUPDATE、または MERGE 操作です。

ConcurrentDeleteDeleteException

この例外は、並列操作によって削除されたファイルも削除した場合に発生します。 これは、2 つの並列圧縮操作で同じファイルを書き換えることが原因である可能性があります。

MetadataChangedException

この例外は、並列トランザクションが Delta テーブルのメタデータを更新するときに発生します。 一般的な原因は、テーブルのスキーマを更新する Delta テーブルへの ALTER TABLE 操作または書き込みです。

同時トランザクション例外

同じチェックポイントの場所を使用するストリーミング クエリーが同時に複数回開始され、同時に Delta テーブルへの書き込みを試行した場合。 2 つのストリーミング クエリーで同じチェックポイントの場所を使用し、同時に実行しないでください。

プロトコル変更例外

この例外は、次の場合に発生する可能性があります。

  • Delta テーブルが新しいプロトコル バージョンにアップグレードされたとき。 将来の操作を成功させるには、 Databricks Runtimeをアップグレードする必要がある場合があります。

  • 複数のライターが同時にテーブルを作成または置換する場合。

  • 複数のライターが同時に空のパスに書き込んでいる場合。

詳細については 、「Databricks による Delta Lake 機能の互換性の管理方法」 を参照してください。

行レベルのコンカレンシー プレビュー動作 (レガシー)

このセクションでは、Databricks Runtime 14.1 以前の行レベルのコンカレンシーのプレビュー動作について説明します。 行レベルの同時実行には、常に削除ベクトルが必要です。

Databricks Runtime 13.3 LTS 以降では、リキッドクラスタリングが有効になっているテーブルでは、行レベルのコンカレンシーが自動的に有効になります。

Databricks Runtime 14.0 および 14.1 では、クラスターまたは SparkSession に対して次の構成を設定することで、削除ベクトルを持つテーブルに対して行レベルのコンカレンシーを有効にすることができます。

spark.databricks.delta.rowLevelConcurrencyPreview = true

Databricks Runtime 14.1 以前では、Photon 以外のコンピュートでは、 DELETE 操作の行レベルのコンカレンシーのみがサポートされます。