Delta Lakeテーブルのスキーマを更新する

Delta Lakeを使用すると、テーブルのスキーマを更新できます。次の種類の変更がサポートされています:

  • 新しい列の追加(任意の位置)

  • 既存の列の並べ替え

  • 既存の列の名前を変更する

これらの変更は、DDLを使用して明示的に行うことも、DMLを使用して暗黙的に行うこともできます。

重要

Deltaテーブル スキーマの更新は、すべてのDelta書き込み操作と競合する操作です。

デルタテーブルスキーマを更新すると、そのテーブルから読み取るストリームが終了します。ストリームを継続したい場合は、ストリームを再起動する必要があります。 推奨される方法については、「構造化ストリーミングの制作上の考慮点」を参照してください。

スキーマを明示的に更新して列を追加する

ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

デフォルトでは、NULL値の許容はtrueです。

ネストされたフィールドに列を追加するには、次のコマンドを使用します:

ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

たとえば、ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)を実行する前のスキーマが次のようになっているとします:

- root
| - colA
| - colB
| +-field1
| +-field2

その後のスキーマは次のとおりです:

- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2

ネストされた列の追加は、構造体に対してのみサポートされています。配列とマップはサポートされていません。

スキーマを明示的に更新して列のコメントまたは順序を変更する

ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)

ネストされたフィールドの列を変更するには、次のコマンドを使用します。

ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)

たとえば、ALTER TABLE boxes ALTER COLUMN colB.field2 FIRSTを実行する前のスキーマが次のようになっているとします:

- root
| - colA
| - colB
| +-field1
| +-field2

その後のスキーマは次のとおりです:

- root
| - colA
| - colB
| +-field2
| +-field1

スキーマを明示的に更新して列を置き換える

ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)

例えば、以下のようなDDLを実行する場合:

ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)

前のスキーマが次の場合:

- root
| - colA
| - colB
| +-field1
| +-field2

その後のスキーマは次のとおりです:

- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA

スキーマを明示的に更新して列の名前を変更する

この機能は、Databricks Runtime 10.4 LTS 以降で利用できます。

列の既存のデータを書き換えずに列の名前を変更するには、テーブルの列マッピングを有効にする必要があります。 「Delta Lake 列マッピングを使用した列の名前変更と削除」を参照してください。

列の名前を変更するには:

ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name

ネストされたフィールドの名前を変更するには:

ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field

たとえば、次のコマンドを実行するとします:

ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001

以前のスキーマが次の場合:

- root
| - colA
| - colB
| +-field1
| +-field2

その後のスキーマは次のようになります:

- root
| - colA
| - colB
| +-field001
| +-field2

「Delta Lake 列マッピングを使用した列の名前変更と削除」を参照してください。

スキーマを明示的に更新して列を削除する

この機能は、Databricks Runtime 11.3 LTS 以降で利用できます。

データ ファイルを書き換えずにメタデータのみの操作として列を削除するには、テーブルの列マッピングを有効にする必要があります。 「Delta Lake 列マッピングを使用した列の名前変更と削除」を参照してください。

重要

メタデータから列を削除しても、ファイル内の列の基礎となるデータは削除されません。削除された列データを消去するには、REORG TABLEを使ってファイルを書き換えます。その後、VACUUMを使用して、ドロップされた列データを含むファイルを物理的に削除することができます。

列を削除するには:

ALTER TABLE table_name DROP COLUMN col_name

複数の列を削除するには:

ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)

明示的にスキーマを更新して列の型や名前を変更する

テーブルを書き換えることで、列のタイプや名前を変更したり、列を削除したりできます。これを行うには、overwriteSchemaオプションを使用します。

次の例は、列の型を変更する方法を示しています:

(spark.read.table(...)
  .withColumn("birthDate", col("birthDate").cast("date"))
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

次の例は、列名の変更を示しています:

(spark.read.table(...)
  .withColumnRenamed("dateOfBirth", "birthDate")
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

休暇の進化を有効にする

次のいずれかの方法で、訪問者進化を有効にできます。

Databricks Spark conf を設定するのではなく、書き込み操作ごとに ゲスト進化 を有効にすることをお勧めします。

書き込み操作でユーザー進化を有効にするためにオプションまたは構文を使用すると、これがSpark conf よりも優先されます。

INSERT INTOステートメントにはスキーマ進化句がありません。

新しい列を追加するための書き込みのスキーマ進化を有効にする

ソース クエリには存在するがターゲット テーブルには存在しない列は、訪問者進化が有効になっている場合、書き込みトランザクションの一部として自動的に追加されます。 「スキーマ進化を有効にする」を参照してください。

大文字と小文字は、新しい列を追加するときに保持されます。 新しい列がテーブル スキーマの末尾に追加されます。 追加の列が構造体内にある場合は、ターゲット表の構造体の末尾に追加されます。

次の例は、 Auto Loaderで mergeSchema オプションを使用する方法を示しています。 「Auto Loaderとは」を参照してください。

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .trigger(availableNow=True)
  .toTable("table_name")
)

次の例は、バッチ書き込み操作でmergeSchemaオプションを使用する方法を示しています。

(spark.read
  .table(source_table)
  .write
  .option("mergeSchema", "true")
  .mode("append")
  .saveAsTable("table_name")
)

Delta Lakeマージのための自動スキーマ進化

スキーマ進化により、ユーザーはマージ時にターゲットテーブルとソース・テーブル間のスキーマの不一致を解決することができます。以下の2つのケースに対応する:

  1. ソーステーブルの列がターゲットテーブルに存在しません。新しい列がターゲットスキーマに追加され、その値がソース値を使用して挿入または更新されます。

  2. ターゲットテーブルの列がソーステーブルに存在しません。ターゲットのスキーマは変更されません。追加のターゲット列の値は変更されないままになるか(UPDATEの場合)、NULLに設定されます(INSERTの場合)。

自動スキーマ進化を手動で有効にする必要があります。 「スキーマ進化を有効にする」を参照してください。

Databricks Runtime 12.2 LTS 以降では、挿入アクションまたは更新アクションで、ソース テーブルに存在する列と構造体フィールドを名前で指定できます。 Databricks Runtime 11.3 LTS以下では、マージによる訪問者進化に使用できるのは INSERT * または UPDATE SET * アクションのみです。

Databricks Runtime 13.3 LTS以降では、map<int, struct<a: int, b: int>> など、マップ内にネストされた構造体で訪問者進化を使用できます。

マージのためのスキーマ進化構文

Databricks Runtime 15.2 以降では、 SQLまたはDeltaテーブルAPIsを使用して、マージ ステートメントで訪問者進化を指定できます。

MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE
from delta.tables import *

(targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)
import io.delta.tables._

targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

スキーマ進化を伴うマージ操作の例

ここでは、スキーマ進化を伴う場合と伴わない場合のmerge操作の影響の例をいくつか示します。

クエリー(SQL の場合)

スキーマ進化なしの動作(既定)

スキーマの進化に伴う動作

ターゲット列: key, value

ソース列: key, value, new_value

MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
  THEN UPDATE SET *
WHEN NOT MATCHED
  THEN INSERT *

テーブルのスキーマは変更されません。列keyvalueのみが更新/挿入されます。

テーブルスキーマが(key, value, new_value)に変更されます。一致する既存のレコードは、ソース内のvaluenew_valueで更新されます。新しい行がスキーマ(key, value, new_value)とともに挿入されます。

ターゲット列: key, old_value

ソース列: key, new_value

MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
  THEN UPDATE SET *
WHEN NOT MATCHED
  THEN INSERT *

UPDATE ターゲット列old_valueがソースにないため、INSERTアクションはエラーをスローします。

テーブルスキーマが(key, old_value, new_value)に変更されます。一致する既存のレコードは、ソース内のnew_valueで更新され、old_valueは変更されません。old_valueに指定されたkeynew_value、およびNULLを使用して新しいレコードが挿入されます。

ターゲット列: key, old_value

ソース列: key, new_value

MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
  THEN UPDATE SET new_value = s.new_value

UPDATEnew_valueがターゲットテーブルに存在しないため、エラーがスローされます。

テーブル スキーマが (key, old_value, new_value)に変更されます。 一致する既存のレコードはソースの new_value で更新され、 old_value は変更されず、一致しないレコードは new_valueに入力 NULL 。注 (1) を参照してください。

ターゲット列: key, old_value

ソース列: key, new_value

MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN NOT MATCHED
  THEN INSERT (key, new_value) VALUES (s.key, s.new_value)

INSERTnew_valueがターゲットテーブルに存在しないため、エラーがスローされます。

テーブル スキーマが (key, old_value, new_value)に変更されます。 新しいレコードは、 old_valueに指定された keynew_value、および NULL で挿入されます。既存のレコードは new_value NULL 入力され、 old_value は変更されません。注 (1) を参照してください。

(1)この動作は Databricks Runtime 12.2 LTS 以上で利用できます。Databricks Runtime 11.3 LTS 以下ではこの条件ではエラーが発生します。

Delta Lake マージでの列の除外

Databricks Runtime 12.2 LTS 以降では、マージ条件でEXCEPT句を使用して列を明示的に除外できます。 EXCEPTキーワードの動作は、訪問者進化が有効になっているかどうかによって異なります。

スキーマ進化が無効な場合、EXCEPT キーワードがターゲットテーブルの列のリストに適用され、UPDATE または INSERT アクションから列を除外できるようになります。除外された列は null に設定されます。

スキーマ進化が有効な場合、EXCEPT キーワードがソーステーブルの列のリストに適用され、スキーマ進化から列を除外することができます。ターゲットに存在しないソースの新しい列が EXCEPT 句にリストされている場合、その列はターゲットスキーマに追加されません。除外される列がターゲットに既に存在する場合、null に設定されます。

次の例は、構文を示しています。

クエリー(SQL の場合)

スキーマ進化なしの動作(既定)

スキーマの進化に伴う動作

ターゲット列: id, title, last_updated

ソース列: id, title, review, last_updated

MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
  THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
  THEN INSERT * EXCEPT (last_updated)

一致した行は、last_updated フィールドが現在の日付に設定されて更新されます。新しい行は、idtitle の値を使って挿入されます。除外されたフィールド last_updatednull に設定されます。フィールド review はターゲットにないため無視されます。

一致した行は、last_updated フィールドが現在の日付に設定されて更新されます。スキーマは、フィールド review を追加するように進化します。新しい行は、null に設定される last_updated を除き、すべてのソースフィールドを使用して挿入されます。

ターゲット列: id, title, last_updated

ソース列: id, title, review, internal_count

MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
  THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
  THEN INSERT * EXCEPT (last_updated, internal_count)

INSERTinternal_countがターゲットテーブルに存在しないため、エラーがスローされます。

一致した行は、last_updated フィールドが現在の日付に設定されて更新されます。review フィールドはターゲットテーブルに追加されますが、internal_count フィールドは無視されます。新たに挿入された行では、last_updatednull に設定されます。

スキーマ更新でNullType列を扱う

ParquetはNullTypeをサポートしていないため、Deltaテーブルに書き込むときにNullType列はDataFrameから削除されますが、スキーマには引き続き保存されます。その列に対して別のデータ型を受け取ると、Delta Lakeはスキーマを新しいデータ型にマージします。Delta Lakeが既存の列に対してNullTypeを受け取った場合、古いスキーマは保持され、新しい列は書き込み中に削除されます。

NullType ストリーミングではサポートされていません。ストリーミングを使用する場合はスキーマを設定する必要があるため、これは非常にまれです。NullTypeは、ArrayTypeMapTypeなどの複合型にも受け入れられません。

テーブルスキーマを置き換える

デフォルトでは、テーブル内のデータを上書きしてもスキーマは上書きされません。replaceWhereを使用せずにmode("overwrite")を使用してテーブルを上書きする場合でも、書き込まれるデータのスキーマを上書きする必要がある場合があります。overwriteSchemaオプションをtrueに設定して、テーブルのスキーマとパーティションを置き換えます:

df.write.option("overwriteSchema", "true")

重要

動的パーティションの上書きを使用する場合、overwriteSchematrueとして指定することはできません。