Delta テーブルにリキッドクラスタリングを使用する

Delta Lake リキッドクラスタリングは、テーブルのパーティション分割と ZORDER を置き換えて、データレイアウトの決定を簡素化し、クエリーのパフォーマンスを最適化します。 リキッドクラスタリングは、既存のデータを書き換えることなくクラスターキーを再定義する柔軟性を提供し、時間の経過とともに分析のニーズに応じてデータレイアウトを進化させることができます。

重要

Databricks 、リキッドクラスタリングが有効になっているすべてのテーブルにDatabricks Runtime 15.2 以降を使用することをお勧めします。 制限付きのパブリック プレビュー サポートは、Databricks Runtime 13.3 LTS 以降で利用できます。

リキッドクラスタリングが有効になっているテーブルでは、Databricks Runtime 13.3 LTS 以降で行レベルのコンカレンシーがサポートされています。 行レベルのコンカレンシーは、Databricks Runtime 14.2 以降で、削除ベクトルが有効になっているすべてのテーブルで一般公開されています。 「 Databricks での分離レベルと書き込みの競合」を参照してください。

リキッドクラスタリングは何に使用されますか?

Databricks では、すべての新しい Delta テーブルにリキッドクラスタリングを推奨しています。 クラスターの恩恵を受けるシナリオの例を次に示します。

  • 多くの場合、カーディナリティの高い列でフィルター処理されたテーブル。

  • データ分布に大きな偏りがあるテーブル。

  • 急速に大きくなり、保守とチューニングの作業が必要なテーブル。

  • 並列書き込み要件を持つテーブル。

  • 時間の経過と共に変化するアクセス パターンを持つテーブル。

  • 一般的なパーティション キーによって、パーティションが多すぎるか少なすぎるテーブル。

リキッドクラスタリングを有効にする

既存のテーブルまたはテーブルの作成中に、リキッドクラスタリングを有効にすることができます。 クラスタリングはパーティション分割やZORDERと互換性がないため、テーブル内のデータのすべてのレイアウトおよび最適化操作を管理するには Databricks を使用する必要があります。 リキッドクラスタリングを有効にした後、通常OPTIMIZEジョブを実行してクラスターデータを増分します。 クラスタリングをトリガーする方法を参照してください。

リキッドクラスタリングを有効にするには、次の例のように、テーブル作成ステートメントに CLUSTER BY フレーズを追加します。

Databricks Runtime 14.2 以降では、Python または Scala で DataFrame APIsと DeltaTable API を使用して、リキッドクラスタリングを有効にすることができます。

-- Create an empty table
CREATE TABLE table1(col0 int, col1 string) USING DELTA CLUSTER BY (col0);

-- Using a CTAS statement
CREATE EXTERNAL TABLE table2 CLUSTER BY (col0)  -- specify clustering after table name, not in subquery
LOCATION 'table_location'
AS SELECT * FROM table1;

-- Using a LIKE statement to copy configurations
CREATE TABLE table3 LIKE table1;
# Create an empty table
(DeltaTable.create()
  .tableName("table1")
  .addColumn("col0", dataType = "INT")
  .addColumn("col1", dataType = "STRING")
  .clusterBy("col0")
  .execute())

# Using a CTAS statement
df = spark.read.table("table1")
df.write.format("delta").clusterBy("col0").saveAsTable("table2")

# CTAS using DataFrameWriterV2
df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()
// Create an empty table
DeltaTable.create()
  .tableName("table1")
  .addColumn("col0", dataType = "INT")
  .addColumn("col1", dataType = "STRING")
  .clusterBy("col0")
  .execute()

// Using a CTAS statement
val df = spark.read.table("table1")
df.write.format("delta").clusterBy("col0").saveAsTable("table2")

// CTAS using DataFrameWriterV2
val df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()

警告

リキッドクラスタリングを有効にして作成されたテーブルには、作成時に多数のDeltaテーブル機能が有効になっており、 Deltaライター バージョン 7 とリーダー バージョン 3 を使用します。 これらの機能の一部は、有効化をオーバーライドできます。 「デフォルトの機能有効化をオーバーライドする (オプション)」を参照してください。

テーブル プロトコルのバージョンはダウングレードできず、クラスタリングが有効になっているテーブルは、有効になっているすべての Delta リーダー プロトコル テーブル機能をサポートしていない Delta Lake クライアントでは読み取ることができません。 「Databricks は Delta Lake 機能の互換性をどのように管理しますか?」を参照してください。 。

次の構文を使用して、既存のパーティション化されていないDeltaテーブルでリキッドクラスタリングを有効にすることができます。

ALTER TABLE <table_name>
CLUSTER BY (<clustering_columns>)

デフォルトの機能有効化をオーバーライドする (オプション)

リキッドクラスタリングの有効化中にDeltaテーブル機能を有効にするデフォルトの動作をオーバーライドできます。 これにより、これらのテーブル機能に関連付けられているリーダーおよびライターのプロトコルがアップグレードされなくなります。 次のステップを完了するには、既存のテーブルが必要です。

  1. ALTER TABLE を使用して、1 つ以上の機能を無効にするテーブル プロパティを設定します。たとえば、削除クロを無効にするには、次のコマンドを実行します。

    ALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = false);
    
  2. 次のコマンドを実行して、テーブルで液体クラスタリングを有効にします。

    ALTER TABLE <table_name>
    CLUSTER BY (<clustering_columns>)
    

次の表に、オーバーライドできる Delta 機能と、有効化が Databricks Runtime バージョンとの互換性にどのような影響を与えるかについての情報を示します。

Delta機能

Runtime互換性

有効化をオーバーライドするプロパティ

リキッドクラスタリングの無効化の影響

削除ベクトル

読み取りと書き込みには、Databricks Runtime 12.2 lTS 以上が必要です。

'delta.enableDeletionVectors' = false

行レベルの同時実行性が無効になっているため、トランザクションとクラスタリング操作が競合する可能性が高くなります。 「行レベルの同時実行性との書き込みの競合」を参照してください。

DELETEMERGE 、およびUPDATEコマンドの実行が遅くなる可能性があります。

行の追跡

書き込みには Databricks Runtime 13.3 LTS 以上が必要です。 どの Databricks Runtime バージョンからでも読み取ることができます。

'delta.enableRowTracking' = false

行レベルの同時実行性が無効になっているため、トランザクションとクラスタリング操作が競合する可能性が高くなります。 「行レベルの同時実行性との書き込みの競合」を参照してください。

チェックポイント V2

読み取りと書き込みには Databricks Runtime 13.3 LTS 以降が必要です。

'delta.checkpointPolicy' = 'classic'

リキッドクラスタリングの動作には影響しません。

クラスターキーの選択

Databricks では、一般的に使用されるクエリー フィルターに基づいてクラスター キーを選択することをお勧めします。 クラスター キーは任意の順序で定義できます。 2 つの列が相関している場合は、そのうちの 1 つをクラスター キーとして追加するだけで済みます。

クラスタリング キーとして最大 4 つの列を指定できます。 クラスタリング キーに対して収集された統計情報を持つ列のみを指定できます。 デフォルトでは、Delta テーブルの最初の 32 列に統計が収集されます。 Delta統計列の指定」を参照してください。

クラスタリングは、クラスタリング キーに対して次のデータ型をサポートします。

  • Date

  • Timestamp

  • TimestampNTZ (Databricks Runtime 14.3 LTS 以上が必要)

  • String

  • Integer

  • Long

  • Short

  • Float

  • Double

  • Decimal

  • Byte

既存のテーブルを変換する場合は、次の推奨事項を考慮してください。

現在のデータ最適化手法

クラスター キーの推奨事項

Hiveスタイルのパーティション分割

パーティション列をクラスター キーとして使用します。

Z-order のインデックス作成

ZORDER BY 列をクラスター キーとして使用します。

Hiveスタイルのパーティション分割と Z-order

パーティション列と ZORDER BY 列の両方をクラスター キーとして使用します。

カーディナリティを減らすために生成された列 (タイムスタンプの日付など)

元の列をクラスター キーとして使用し、生成された列は作成しないでください。

クラスター化されたテーブルへのデータの書き込み

リキッドクラスタリングで使用されるすべてのDelta書き込みプロトコルテーブル機能をサポートするDeltaライタークライアントを使用する必要があります。Databricksでは、Databricks Runtime 13.3 LTS 以降を使用する必要があります。

書き込み時にクラスター化される操作には、次のものがあります。

  • INSERT INTO オペレーションズ

  • CTASRTAS ステートメント

  • COPY INTO Parquet 形式から

  • spark.write.format("delta").mode("append")

構造化ストリーミング書き込みでは、書き込み時にクラスタリングがトリガーされることはありません。 その他の制限が適用されます。 「制限事項」を参照してください。

書き込み時のクラスタリングは、トランザクション内のデータがサイズのしきい値を満たした場合にのみトリガーされます。 これらのしきい値はクラスタリング列の数によって異なり、Unity Catalog で管理されるテーブルの場合は他の Delta テーブルよりも低くなります。

クラスタリング列の数

Unity Catalog 管理テーブルのしきい値サイズ

他のDeltaテーブルのしきい値サイズ

1

64メガバイト

256 MB

2

256 MB

1 GB

3

512 MB

2ギガバイト

4

1 GB

4ギガバイト

すべての操作がリキッドクラスタリングに適用されるわけではないため、Databricks では、すべてのデータが効率的にクラスター化されるように、 OPTIMIZE を頻繁に実行することをお勧めします。

クラスターをトリガーする方法

クラスターをトリガーするには、 Databricks Runtime 13.3 LTS 以降を使用する必要があります。 次の例のように、テーブルで OPTIMIZE コマンドを使用します。

OPTIMIZE table_name;

リキッドクラスタリングはインクリメンタルであり、クラスタリングする必要があるデータに対応するために、必要な場合にのみデータが書き換えられます。 クラスタ化されるデータと一致しないクラスタキーを持つデータファイルは書き換えられません。

最適なパフォーマンスを得るには、Databricks では、通常の OPTIMIZE ジョブをクラスター データにスケジュールすることをお勧めします。 多くの更新または挿入が発生しているテーブルの場合、Databricks では、1 時間または 2 時間ごとに OPTIMIZE ジョブをスケジュールすることをお勧めします。 リキッドクラスタリングは増分であるため、クラスター化されたテーブルのほとんどの OPTIMIZE ジョブは迅速に実行されます。

クラスター化されたテーブルからデータを読み取る

クラスター化されたテーブルのデータは、削除ベクトルの読み取りをサポートする任意の Delta Lake クライアントを使用して読み取ることができます。 最適なクエリ結果を得るには、次の例のように、クエリー フィルターにクラスター キーを含めます。

SELECT * FROM table_name WHERE cluster_key_column_name = "some_value";

クラスターキーの変更

テーブルのクラスターキーは、次の例のように ALTER TABLE コマンドを実行することでいつでも変更できます。

ALTER TABLE table_name CLUSTER BY (new_column1, new_column2);

クラスターキーを変更すると、後続の OPTIMIZE および書き込み操作では新しいクラスターアプローチが使用されますが、既存のデータは書き換えられません。

次の例のように、キーを NONEに設定してクラスターをオフにすることもできます。

ALTER TABLE table_name CLUSTER BY NONE;

クラスター キーを NONE に設定しても、既にクラスター化されているデータは書き換えられませんが、今後の OPTIMIZE 操作でクラスター化キーが使用されなくなります。

テーブルのクラスター化方法を確認する

次の例に示すように、 DESCRIBE コマンドを使用してテーブルのクラスターキーを表示できます。

DESCRIBE TABLE table_name;

DESCRIBE DETAIL table_name;

リキッドクラスタリングを持つテーブルの互換性

Databricks Runtime 14.1 以降でリキッドクラスタリングで作成されたテーブルは、デフォルトで v2 チェックポイントを使用します。v2 チェックポイントを持つテーブルは、 Databricks Runtime 13.3 LTS 以降で読み書きできます。

Databricks Runtime 12.2 LTS以降では、v2 チェックポイントを無効にしてテーブル プロトコルをダウングレードし、リキッドクラスタリングを使用してテーブルを読み取ることができます。 Drop Delta テーブルの機能を参照してください。

制限

次の制限があります。

  • Databricks Runtime 15.1 以前では、書き込み時のクラスタリングは、フィルター、結合、または集計を含むソース クエリをサポートしていません。

  • 構造化ストリーミング ワークロードは、クラスターオンライトをサポートしていません。

  • 構造化ストリーミング書き込みを使用して、リキッドクラスタリングが有効になっているテーブルを作成することはできません。 構造化ストリーミングを使用すると、リキッドクラスタリングが有効になっている既存のテーブルにデータを書き込むことができます。