Delta Lakeテーブルのスキーマを更新する

Delta Lakeを使用すると、テーブルのスキーマを更新できます。次の種類の変更がサポートされています:

  • 新しい列の追加(任意の位置)

  • 既存の列の並べ替え

  • 既存の列の名前を変更する

これらの変更は、DDLを使用して明示的に行うことも、DMLを使用して暗黙的に行うこともできます。

重要

Deltaテーブル スキーマの更新は、すべてのDelta書き込み操作と競合する操作です。

デルタテーブルスキーマを更新すると、そのテーブルから読み取るストリームが終了します。ストリームを継続したい場合は、ストリームを再起動する必要があります。 推奨される方法については、「構造化ストリーミングの制作上の考慮点」を参照してください。

スキーマを明示的に更新して列を追加する

ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

デフォルトでは、NULL値の許容はtrueです。

ネストされたフィールドに列を追加するには、次のコマンドを使用します:

ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

たとえば、ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)を実行する前のスキーマが次のようになっているとします:

- root
| - colA
| - colB
| +-field1
| +-field2

その後のスキーマは次のとおりです:

- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2

ネストされた列の追加は、構造体に対してのみサポートされています。配列とマップはサポートされていません。

スキーマを明示的に更新して列のコメントまたは順序を変更する

ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)

ネストされたフィールドの列を変更するには、次のコマンドを使用します。

ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)

たとえば、ALTER TABLE boxes ALTER COLUMN colB.field2 FIRSTを実行する前のスキーマが次のようになっているとします:

- root
| - colA
| - colB
| +-field1
| +-field2

その後のスキーマは次のとおりです:

- root
| - colA
| - colB
| +-field2
| +-field1

スキーマを明示的に更新して列を置き換える

ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)

例えば、以下のようなDDLを実行する場合:

ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)

前のスキーマが次の場合:

- root
| - colA
| - colB
| +-field1
| +-field2

その後のスキーマは次のとおりです:

- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA

スキーマを明示的に更新して列の名前を変更する

プレビュー

この機能はパブリックプレビュー段階にあります。

この機能は、Databricks Runtime 10.2以降で利用できます。

列の既存のデータを書き換えずに列の名前を変更するには、テーブルの列マッピングを有効にする必要があります。「Delta Lake列マッピングを使用した列の名前変更と削除」を参照してください。

列の名前を変更するには:

ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name

ネストされたフィールドの名前を変更するには:

ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field

たとえば、次のコマンドを実行するとします:

ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001

以前のスキーマが次の場合:

- root
| - colA
| - colB
| +-field1
| +-field2

その後のスキーマは次のようになります:

- root
| - colA
| - colB
| +-field001
| +-field2

「Delta Lakeの列マッピングによる列名の変更とドロップ」を参照してください。

スキーマを明示的に更新して列を削除する

プレビュー

この機能はパブリックプレビュー段階にあります。

この機能は、Databricks Runtime 11.0以降で利用できます。

データファイルを書き換えずにメタデータのみの操作として列を削除するには、テーブルの列マッピングを有効にする必要があります。「Delta Lake列マッピングを使用した列の名前変更と削除」を参照してください。

重要

メタデータから列を削除しても、ファイル内の列の基礎となるデータは削除されません。削除された列データを消去するには、REORG TABLEを使ってファイルを書き換えます。その後、VACUUMを使用して、ドロップされた列データを含むファイルを物理的に削除することができます。

列を削除するには:

ALTER TABLE table_name DROP COLUMN col_name

複数の列を削除するには:

ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)

明示的にスキーマを更新して列の型や名前を変更する

テーブルを書き換えることで、列のタイプや名前を変更したり、列を削除したりできます。これを行うには、overwriteSchemaオプションを使用します。

次の例は、列の型を変更する方法を示しています:

(spark.read.table(...)
  .withColumn("birthDate", col("birthDate").cast("date"))
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

次の例は、列名の変更を示しています:

(spark.read.table(...)
  .withColumnRenamed("dateOfBirth", "birthDate")
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

自動スキーマ更新による列の追加

DataFrameには存在するがテーブルには存在しない列は、次の場合に書き込みトランザクションの一部として自動的に追加されます:

  • write またはwriteStream持っています .option("mergeSchema", "true")

  • spark.databricks.delta.schema.autoMerge.enabledtrue

両方のオプションが指定された場合、DataFrameWriterのオプションが優先されます。追加された列は、その列が存在する構造体の末尾に追加されます。大文字と小文字は、新しい列を追加するときに保持されます。

  • mergeSchema INSERT INTOまたは.write.insertInto()と一緒に使用することはできません。

Delta Lakeマージのための自動スキーマ進化

スキーマ進化により、ユーザーはマージ時にターゲットテーブルとソース・テーブル間のスキーマの不一致を解決することができます。以下の2つのケースに対応する:

  1. ソーステーブルの列がターゲットテーブルに存在しません。新しい列がターゲットスキーマに追加され、その値がソース値を使用して挿入または更新されます。

  2. ターゲットテーブルの列がソーステーブルに存在しません。ターゲットのスキーマは変更されません。追加のターゲット列の値は変更されないままになるか(UPDATEの場合)、NULLに設定されます(INSERTの場合)。

重要

スキーマ進化を使用するには、 mergeコマンドを実行する前に、Spark セッション構成spark.databricks.delta.schema.autoMerge.enabledtrueに設定する必要があります。

  • Databricks Runtime 12.2以降では、ソーステーブルに存在する列は、挿入にまたは更新アクションの名前で指定できます。Databricks Runtime 12.1以前では、マージによるスキーマ進化に使用できるのはINSERT *アクションまたはUPDATE SET *アクションのみです。

ここでは、スキーマ進化を伴う場合と伴わない場合のmerge操作の影響の例をいくつか示します。

クエリー(SQL の場合)

スキーマ進化なしの動作(既定)

スキーマの進化に伴う動作

ターゲット列: key, value

ソース列: key, value, new_value

MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
  THEN UPDATE SET *
WHEN NOT MATCHED
  THEN INSERT *

テーブルのスキーマは変更されません。列keyvalueのみが更新/挿入されます。

テーブルスキーマが(key, value, new_value)に変更されます。一致する既存のレコードは、ソース内のvaluenew_valueで更新されます。新しい行がスキーマ(key, value, new_value)とともに挿入されます。

ターゲット列: key, old_value

ソース列: key, new_value

MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
  THEN UPDATE SET *
WHEN NOT MATCHED
  THEN INSERT *

UPDATE ターゲット列old_valueがソースにないため、INSERTアクションはエラーをスローします。

テーブルスキーマが(key, old_value, new_value)に変更されます。一致する既存のレコードは、ソース内のnew_valueで更新され、old_valueは変更されません。old_valueに指定されたkeynew_value、およびNULLを使用して新しいレコードが挿入されます。

ターゲット列: key, old_value

ソース列: key, new_value

MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
  THEN UPDATE SET new_value = s.new_value

UPDATEnew_valueがターゲットテーブルに存在しないため、エラーがスローされます。

テーブル スキーマが (key, old_value, new_value)に変更されます。 一致する既存のレコードはソースの new_value で更新され、 old_value は変更されず、一致しないレコードは new_valueに入力 NULL 。注 (1) を参照してください。

ターゲット列: key, old_value

ソース列: key, new_value

MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN NOT MATCHED
  THEN INSERT (key, new_value) VALUES (s.key, s.new_value)

INSERTnew_valueがターゲットテーブルに存在しないため、エラーがスローされます。

テーブル スキーマが (key, old_value, new_value)に変更されます。 新しいレコードは、 old_valueに指定された keynew_value、および NULL で挿入されます。既存のレコードは new_value NULL 入力され、 old_value は変更されません。注 (1) を参照してください。

(1)この動作はDatabricks Runtime 12.2以降で利用できます。この状況では、Databricks Runtime 12.1以前でエラーが発生します。

Delta Lake マージでの列の除外

Databricks Runtime 12.0 以降では、マージ条件で EXCEPT 句を使用することで、列を明示的に除外できます。EXCEPT キーワードの動作は、スキーマ進化が有効かどうかによって異なります。

スキーマ進化が無効な場合、EXCEPT キーワードがターゲットテーブルの列のリストに適用され、UPDATE または INSERT アクションから列を除外できるようになります。除外された列は null に設定されます。

スキーマ進化が有効な場合、EXCEPT キーワードがソーステーブルの列のリストに適用され、スキーマ進化から列を除外することができます。ターゲットに存在しないソースの新しい列が EXCEPT 句にリストされている場合、その列はターゲットスキーマに追加されません。除外される列がターゲットに既に存在する場合、null に設定されます。

次の例は、構文を示しています。

クエリー(SQL の場合)

スキーマ進化なしの動作(既定)

スキーマの進化に伴う動作

ターゲット列: id, title, last_updated

ソース列: id, title, review, last_updated

MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
  THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
  THEN INSERT * EXCEPT (last_updated)

一致した行は、last_updated フィールドが現在の日付に設定されて更新されます。新しい行は、idtitle の値を使って挿入されます。除外されたフィールド last_updatednull に設定されます。フィールド review はターゲットにないため無視されます。

一致した行は、last_updated フィールドが現在の日付に設定されて更新されます。スキーマは、フィールド review を追加するように進化します。新しい行は、null に設定される last_updated を除き、すべてのソースフィールドを使用して挿入されます。

ターゲット列: id, title, last_updated

ソース列: id, title, review, internal_count

MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
  THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
  THEN INSERT * EXCEPT (last_updated, internal_count)

INSERTinternal_countがターゲットテーブルに存在しないため、エラーがスローされます。

一致した行は、last_updated フィールドが現在の日付に設定されて更新されます。review フィールドはターゲットテーブルに追加されますが、internal_count フィールドは無視されます。新たに挿入された行では、last_updatednull に設定されます。

構造体の配列の自動スキーマ進化

Delta MERGE INTOは、名前による構造体フィールドの解決と、構造体の配列のスキーマの進化をサポートします。スキーマの進化を有効にすると、ターゲットテーブルのスキーマが構造体の配列に対して進化し、配列内のネストされた構造体でも機能します。

Databricks Runtime 12.2以降では、ソーステーブルに存在する構造体フィールドは挿入または更新コマンドの名前で指定できます。Databricks Runtime 12.1以前では、マージによるスキーマ進化に使用できるのはINSERT *またはUPDATE SET *コマンドのみです。

ここでは、構造体の配列に対するスキーマ進化を伴う場合と伴わない場合のマージ操作の影響の例をいくつか示します。

ソーススキーマ

ターゲットスキーマ

スキーマ進化なしの動作(既定)

スキーマの進化に伴う動作

array<struct<b: string, a: string>>

array<struct<a: int, b: int>>

テーブルスキーマは変更されません。列は名前で解決され、更新または挿入されます。

テーブルスキーマは変更されません。列は名前で解決され、更新または挿入されます。

array<struct<a: int, c: string, d: string>>

array<struct<a: string, b: string>>

update cdがターゲットテーブルに存在しないため、insertエラーをスローします。

テーブルスキーマはarray<struct<a: string, b: string, c: string, d: string>>に変更されます。cdターゲットテーブル内の既存のエントリのNULLとして挿入されます。updateinsert文字列にキャストされたaNULLとしてのbを使用してソーステーブルのエントリを埋めます。

array<struct<a: string, b: struct<c: string, d: string>>>

array<struct<a: string, b: struct<c: string>>>

update dはターゲットテーブルに存在しないため、insertはエラーをスローします。

対象テーブルスキーマは、配列<struct<a: string, b: struct<c: string, d: string>>>に変更されます。dは、ターゲットテーブルの既存のエントリーに対してNULLとして挿入されます。

スキーマ更新でNullType列を扱う

ParquetはNullTypeをサポートしていないため、Deltaテーブルに書き込むときにNullType列はDataFrameから削除されますが、スキーマには引き続き保存されます。その列に対して別のデータ型を受け取ると、Delta Lakeはスキーマを新しいデータ型にマージします。Delta Lakeが既存の列に対してNullTypeを受け取った場合、古いスキーマは保持され、新しい列は書き込み中に削除されます。

NullType ストリーミングではサポートされていません。ストリーミングを使用する場合はスキーマを設定する必要があるため、これは非常にまれです。NullTypeは、ArrayTypeMapTypeなどの複合型にも受け入れられません。

テーブルスキーマを置き換える

デフォルトでは、テーブル内のデータを上書きしてもスキーマは上書きされません。replaceWhereを使用せずにmode("overwrite")を使用してテーブルを上書きする場合でも、書き込まれるデータのスキーマを上書きする必要がある場合があります。overwriteSchemaオプションをtrueに設定して、テーブルのスキーマとパーティションを置き換えます:

df.write.option("overwriteSchema", "true")

重要

動的パーティションの上書きを使用する場合、overwriteSchematrueとして指定することはできません。