アダプティブクエリー実行

注:

Spark UI機能は、このリリースの時点で Databricks on Google Cloud では使用できません。

適応クエリー実行 (AQE) は、クエリー実行中に発生するクエリー再最適化です。

ランタイム再最適化の動機は、シャッフルおよびブロードキャスト交換 (AQE ではクエリー ステージと呼ばれます) の最後に、Databricks に最新の正確な統計があることです。 その結果、Databricks は、より優れた物理戦略を選択したり、最適なシャッフル後のパーティション サイズと数を選択したり、スキュー結合処理などのヒントを必要としていた最適化を実行したりできます。

これは、統計収集がオンになっていない場合や、統計が古くなっている場合に非常に役立ちます。 また、複雑なクエリーの最中やデータスキューの発生後など、静的に導出された統計が不正確な場所にも役立ちます。

資格

AQE はデフォルトで有効になっています。 4つの主要な機能があります。

  • ソートマージ結合をブロードキャストハッシュ結合に動的に変更します。

  • シャッフル交換後にパーティションを動的に結合します (小さなパーティションを妥当なサイズのパーティションに結合します)。 非常に小さなタスクは I/O スループットが悪く、スケジュールのオーバーヘッドとタスク設定のオーバーヘッドに悩まされる傾向があります。 小さなタスクを組み合わせることで、リソースが節約され、クラスターのスループットが向上します。

  • ソートマージ結合とシャッフルハッシュ結合のスキューを動的に処理し、スキューされたタスクをほぼ均等なサイズのタスクに分割し(必要に応じてレプリケートします)。

  • 空のリレーションを動的に検出して伝播します。

アプリケーション

AQE は、次のようなすべてのクエリーに適用されます。

  • 非ストリーミング

  • 少なくとも 1 つの交換 (通常、結合、集計、またはウィンドウがある場合)、1 つのサブクエリ、またはその両方を含む。

すべての AQE 適用クエリーが必ずしも再最適化されるわけではありません。 再最適化では、静的にコンパイルされたものとは異なるクエリープランが出てくる場合と思い付かない場合があります。 クエリーのプランが AQE によって変更されたかどうかを判別するには、次のセクション「 クエリー・プラン」を参照してください。

クエリープラン

このセクションでは、さまざまな方法でクエリー プランを調べる方法について説明します。

このセクションの内容:

Spark UI

AdaptiveSparkPlan ノード

AQE 適用クエリーには、通常、各メインクエリーまたはサブクエリーのルートノードとして、1 つ以上の AdaptiveSparkPlan ノードが含まれます。 クエリーの実行前または実行中は、対応する AdaptiveSparkPlan ノードの isFinalPlan フラグは次のように表示されます false。クエリーの実行が完了すると、 isFinalPlan フラグは次のように変わります。 true.

進化する計画

クエリー プラン図は、実行の進行に伴って進化し、実行中の最新のプランを反映します。 すでに実行されている(メトリクスが使用可能な)ノードは変更されませんが、実行されていないノードは、再最適化の結果として時間の経過とともに変更される可能性があります。

以下は、クエリー計画図の例です。

クエリー 平面図

DataFrame.explain()

AdaptiveSparkPlan ノード

AQE 適用クエリーには、通常、各メインクエリーまたはサブクエリーのルートノードとして、1 つ以上の AdaptiveSparkPlan ノードが含まれます。 クエリーの実行前または実行中は、対応する AdaptiveSparkPlan ノードの isFinalPlan フラグは次のように表示されます false。クエリーの実行が完了すると、 isFinalPlan フラグが trueに変わります。

現在および当初の計画

AdaptiveSparkPlan ノードの下には、実行が完了したかどうかに応じて、初期計画 (AQE 最適化を適用する前の計画) と現在計画または最終計画の両方があります。 現在の計画は、実行が進むにつれて進化します。

ランタイム統計

各シャッフルおよびブロードキャストステージには、データ統計が含まれています。

ステージの実行前またはステージの実行中に、統計はコンパイル時の推定値であり、フラグ isRuntimefalseです。 Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);

ステージの実行が完了すると、統計は実行時に収集された統計になり、フラグ isRuntimetrueになります。 Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)

次に、 DataFrame.explain 例を示します。

  • 実行前

    実行前
  • 実行中

    実行中
  • 実行後

    実行後

SQL EXPLAIN

AdaptiveSparkPlan ノード

AQE 適用クエリには、通常は各メイン クエリーまたはサブクエリーのルート ノードとして、1 つ以上の AdaptiveSparkPlan ノードが含まれます。

現在の計画はありません

SQL EXPLAIN クエリーを実行しないため、現在の計画は常に初期計画と同じであり、最終的に AQE によって実行される内容を反映しません。

以下は、SQL の説明の例です。

SQL の説明

効果

クエリー プランは、1 つ以上の AQE 最適化が有効になると変更されます。 これらの AQE 最適化の効果は、現行計画と最終計画、および現行計画と最終計画の初期計画ノードと特定の計画ノードの違いによって示されます。

  • ソートマージ結合をブロードキャストハッシュ結合に動的に変更:現在/最終計画と初期計画の間の異なる物理結合ノード

    結合戦略文字列
  • パーティションの動的結合: プロパティを持つノード CustomShuffleReader Coalesced

    カスタムシャッフルリーダー
    カスタムシャッフルリーダー文字列
  • スキュー結合を動的に処理する: フィールド isSkew が true のノード SortMergeJoin

    スキュー結合プラン
    スキュー結合文字列
  • 空のリレーションを動的に検出して伝播する: プランの一部 (または全体) は、リレーション フィールドが空であるノード LocalTableScan に置き換えられます。

    ローカルテーブルスキャン
    ローカル テーブル スキャン文字列

構成

アダプティブクエリー実行の有効化と無効化

財産

spark.databricks.optimizer.adaptive.enabled

種類: Boolean

アダプティブクエリー実行を有効にするか無効にするか。

デフォルト値: true

自動最適化シャッフルを有効にする

財産

spark.sql.shuffle.partitions

種類: Integer

結合または集計のデータをシャッフルするときに使用するパーティションの既定の数。 値を auto に設定すると、自動最適化シャッフルが有効になり、クエリー プランとクエリー入力データ サイズに基づいてこの数値が自動的に決定されます。

注: 構造化ストリーミングの場合、同じチェックポイント・ロケーションからのクエリー再始動の間にこの構成を変更することはできません。

デフォルト値: 200

ソートマージ結合をブロードキャストハッシュ結合に動的に変更

財産

spark.databricks.adaptive.autoBroadcastJoinThreshold

種類: Byte String

ランタイム時にブロードキャスト参加への切り替えをトリガーするしきい値。

デフォルト値: 30MB

パーティションを動的に結合する

財産

spark.sql.adaptive.coalescePartitions.enabled

種類: Boolean

パーティションの結合を有効にするか無効にするか。

デフォルト値: true

spark.sql.adaptive.advisoryPartitionSizeInBytes

種類: Byte String

合体後の目標サイズ。 結合されたパーティションサイズは、このターゲットサイズに近くなりますが、それより大きくはありません。

デフォルト値: 64MB

spark.sql.adaptive.coalescePartitions.minPartitionSize

種類: Byte String

結合後のパーティションの最小サイズ。 結合されたパーティションサイズは、このサイズより小さくなりません。

デフォルト値: 1MB

spark.sql.adaptive.coalescePartitions.minPartitionNum

種類: Integer

結合後のパーティションの最小数。 この設定は明示的に spark.sql.adaptive.coalescePartitions.minPartitionSizeをオーバーライドするため、推奨されません。

デフォルト値: 2x いいえ。 クラスター コア数

スキュー結合を動的に処理する

財産

spark.sql.adaptive.skewJoin.enabled

種類: Boolean

スキュー結合処理を有効にするか無効にするか。

デフォルト値: true

spark.sql.adaptive.skewJoin.skewedPartitionFactor

種類: Integer

パーティション サイズの中央値を掛けたときに、パーティションが歪んでいるかどうかの判断に寄与する要因。

デフォルト値: 5

spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

種類: Byte String

パーティションが歪んでいるかどうかの判断に寄与するしきい値。

デフォルト値: 256MB

パーティションは、 (partition size > skewedPartitionFactor * median partition size)(partition size > skewedPartitionThresholdInBytes) の両方が trueされている場合、歪んでいると見なされます。

空のリレーションを動的に検出して伝播する

財産

spark.databricks.adaptive.emptyRelationPropagation.enabled

種類: Boolean

動的な空リレーション伝播を有効にするか無効にするか。

デフォルト値: true

よくある質問(FAQ)

AQE が小さな結合テーブルをブロードキャストしなかったのはなぜですか?

ブロードキャストされる予定のリレーションのサイズがこのしきい値を下回っているが、まだブロードキャストされない場合:

  • 結合タイプを確認します。 ブロードキャストは、特定の結合タイプではサポートされません (たとえば、 LEFT OUTER JOIN の左リレーションはブロードキャストできません)。

  • また、リレーションに空のパーティションが多数含まれている場合もあり、その場合、タスクの大部分はソートマージ結合ですばやく終了するか、スキュー結合処理で最適化できる可能性があります。 AQE は、空でないパーティションの割合が spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoinより低い場合、このようなソート・マージ結合をブロードキャスト・ハッシュ結合に変更することを回避します。

AQE を有効にしたブロードキャスト参加戦略のヒントを使用する必要がありますか?

はい。 静的に計画されたブロードキャスト結合は、結合の両側でシャッフルを実行するまでブロードキャスト結合に切り替えない可能性があるため、通常、AQE によって動的に計画されたブロードキャスト結合よりもパフォーマンスが高くなります (その時点で実際の関係サイズが取得されます)。 したがって、ブロードキャストのヒントを使用することは、クエリをよく知っている場合は、依然として良い選択です。 AQE は、静的最適化と同じようにクエリー ヒントを尊重しますが、ヒントの影響を受けない動的最適化を適用できます。

スキュー結合ヒントと AQE スキュー結合の最適化の違いは何ですか?どちらを使うべきですか?

AQE スキュー結合は完全に自動であり、一般に対応するヒントよりもパフォーマンスが向上するため、スキュー結合ヒントを使用するのではなく、AQE スキュー結合処理に依存することをお勧めします。

AQE で結合順序が自動的に調整されないのはなぜですか?

動的結合の並べ替えは AQE の一部ではありません。

AQE でデータ スキューが検出されなかったのはなぜですか?

AQE がパーティションを歪んだパーティションとして検出するには、次の 2 つのサイズ条件を満たす必要があります。

  • パーティションサイズが spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes よりも大きい(デフォルトは256MB)

  • パーティション サイズが、すべてのパーティションの中央値サイズに歪んだパーティション係数 spark.sql.adaptive.skewJoin.skewedPartitionFactor を掛けた値よりも大きくなっています (デフォルト 5)

さらに、スキュー処理のサポートは、特定の結合タイプでは制限されています (たとえば、 LEFT OUTER JOINでは、左側のスキューのみを最適化できます)。

レガシー

「アダプティブ実行」という用語は Spark 1.6 から存在していましたが、Spark 3.0 の新しい AQE は根本的に異なります。 機能面では、Spark 1.6 は "パーティションの動的結合" 部分のみを行います。 技術的なアーキテクチャの観点からは、新しいAQEは、ランタイム統計に基づくクエリーの動的な計画と再計画のフレームワークであり、この記事で説明したようなさまざまな最適化をサポートし、より多くの潜在的な最適化を可能にするために拡張できます。