Execução de consulta adaptável

Observação

O recurso Spark UI não está disponível no Databricks on Google Cloud a partir desta versão.

A execução query adaptável (AQE) é a reotimização query que ocorre durante a execução query .

A motivação para a reotimização Runtime é que o Databricks tem as estatísticas precisas mais atualizadas no final de uma troca aleatória e de transmissão (referida como um estágio query no AQE). Como resultado, o Databricks pode optar por uma estratégia física melhor, escolher um tamanho e número de partição pós-embaralhamento ideal ou fazer otimizações que costumavam exigir dicas, por exemplo, manipulação join inclinação.

Isso pode ser muito útil quando a coleta de estatísticas não está ativada ou quando as estatísticas estão desatualizadas. Também é útil em locais onde as estatísticas derivadas estaticamente são imprecisas, como no meio de uma query complicada ou após a ocorrência de distorção de dados.

Capacidades

AQE está habilitado por default. Possui 4 recursos principais:

  • Altera dinamicamente a join merge de classificação para join de hash de transmissão.

  • Aglutina dinamicamente as partições (combina pequenas partições em partições de tamanho razoável) após a troca aleatória. Tarefas muito pequenas têm uma taxa de transferência de E/S pior e tendem a sofrer mais com a sobrecarga do programa e com a configuração da tarefa. A combinação de pequenas tarefas economiza recursos e melhora clusters Taxa de transferência.

  • Manipula dinamicamente a join de merge de classificação e join aleatória, dividindo (e replicando, se necessário) tarefas distorcidas em tarefas de tamanho aproximadamente igual.

  • Detecta e propaga dinamicamente relações vazias.

Aplicativo

AQE se aplica a todas query que são:

  • Não transmissão

  • Conter pelo menos uma troca (geralmente quando há uma join, agregação ou janela), umaquery ou ambas.

Nem todas query aplicadas por AQE são necessariamente otimizadas novamente. A reotimização pode ou não apresentar um plano query diferente daquele compilado estaticamente. Para determinar se o plano de uma consulta foi alterado pelo AQE, consulte a seção a seguir, Planos de consulta.

Planos de consulta

Esta seção discute como você pode examinar os planos query de diferentes maneiras.

Spark UI

AdaptiveSparkPlan

query aplicada por AQE contém um ou mais nós AdaptiveSparkPlan, geralmente como o nó raiz de cada query principal ouquery. Antes da execução query ou durante a execução, o sinalizador isFinalPlan do nó AdaptiveSparkPlan correspondente é exibido como false; após a conclusão da execução query , o sinalizador isFinalPlan muda para true.

Plano em evolução

O diagrama do plano query evolui à medida que a execução progride e reflete o plano mais atual que está sendo executado. Os nós que já foram executados (nos quais as métricas estão disponíveis) não serão alterados, mas aqueles que não foram podem mudar ao longo do tempo como resultado de reotimizações.

Veja a seguir um exemplo de diagrama de plano query :

diagrama de plano query

DataFrame.explain()

AdaptiveSparkPlan

query aplicada por AQE contém um ou mais nós AdaptiveSparkPlan, geralmente como o nó raiz de cada query principal ouquery. Antes da execução query ou durante a execução, o sinalizador isFinalPlan do nó AdaptiveSparkPlan correspondente é exibido como false; após a conclusão da execução query , o sinalizador isFinalPlan muda para true.

Plano atual e inicial

Em cada nó AdaptiveSparkPlan haverá o plano inicial (o plano antes de aplicar qualquer otimização AQE) e o plano atual ou final, dependendo se a execução foi concluída. O plano atual evoluirá à medida que a execução avança.

Estatísticas Runtime

Cada estágio de shuffle e broadcast contém estatísticas de dados.

Antes da execução do estágio ou quando o estágio está em execução, as estatísticas são estimativas de tempo de compilação e o sinalizador isRuntime é false, por exemplo: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);

Após a conclusão da execução do estágio, as estatísticas são aquelas coletadas em Runtime e o sinalizador isRuntime se tornará true, por exemplo: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)

A seguir está um exemplo de DataFrame.explain :

  • Antes da execução

    Antes da execução
  • Durante a execução

    Durante a execução
  • Após a execução

    Após a execução

SQL EXPLAIN

AdaptiveSparkPlan

query aplicada por AQE contém um ou mais nós AdaptiveSparkPlan, geralmente como o nó raiz de cada query principal ouquery.

Nenhum plano atual

Como SQL EXPLAIN não executa a query, o plano atual é sempre igual ao plano inicial e não reflete o que eventualmente seria executado pelo AQE.

Veja a seguir um exemplo de explicação de SQL:

SQL explicar

Eficácia

O plano query mudará se uma ou mais otimizações AQE entrarem em vigor. O efeito dessas otimizações AQE é demonstrado pela diferença entre os planos atual e final e o plano inicial e os nós do plano específico nos planos atual e final.

  • Altere dinamicamente join merge de classificação para join de hash de transmissão: diferentes nós join física entre o plano atual/final e o plano inicial

    join stringsde estratégia
  • Unir partições dinamicamente: nó CustomShuffleReader com propriedade Coalesced

    Leitor aleatório personalizado
    stringsde leitura aleatórias personalizadas
  • Trate dinamicamente join de inclinação: nó SortMergeJoin com campo isSkew como verdadeiro.

    Inclinar plano join
    Inclinar strings join
  • Detectar e propagar relações vazias dinamicamente: parte (ou todo) do plano é substituído pelo nó LocalTableScan com o campo de relação como vazio.

    Varredura da tabela local
    stringsde verificação de tabela local

Configuração

Ativar e desativar a execução de consulta adaptável

Propriedade

spark.databricks.optimizer.adaptive.enabled

Tipo: Boolean

Se deve habilitar ou desabilitar a execução query adaptável.

valor default : true

Ativar reprodução aleatória otimizada automaticamente

Propriedade

spark.sql.shuffle.partitions

Tipo: Integer

O número default de partições a serem usadas ao embaralhar dados para join ou agregações. A configuração do valor auto ativa o embaralhamento otimizado automaticamente, que determina automaticamente esse número com base no plano query e no tamanho dos dados de entrada query .

Nota: Para a transmissão estruturada, esta configuração não pode ser alterada entre as reinicializações query a partir do mesmo ponto de verificação.

valor default : 200

Altere dinamicamente a junção de mesclagem de classificação para junção de hash de transmissão

Propriedade

spark.databricks.adaptive.autoBroadcastJoinThreshold

Tipo: Byte String

O limite para acionar a alternância para join de transmissão em Runtime.

valor default : 30MB

Unir partições dinamicamente

Propriedade

spark.sql.adaptive.coalescePartitions.enabled

Tipo: Boolean

Se deve ativar ou desativar a união de partições.

valor default : true

spark.sql.adaptive.advisoryPartitionSizeInBytes

Tipo: Byte String

O tamanho de destino após a coalescência. Os tamanhos de partição aglutinados serão próximos, mas não maiores que esse tamanho de destino.

valor default : 64MB

spark.sql.adaptive.coalescePartitions.minPartitionSize

Tipo: Byte String

O tamanho mínimo das partições após a união. Os tamanhos das partições combinadas não serão menores que esse tamanho.

valor default : 1MB

spark.sql.adaptive.coalescePartitions.minPartitionNum

Tipo: Integer

O número mínimo de partições após a união. Não recomendado, porque a configuração substitui explicitamente spark.sql.adaptive.coalescePartitions.minPartitionSize.

valor default : 2x não. de núcleos clusters

Lidar dinamicamente com junção de inclinação

Propriedade

spark.sql.adaptive.skewJoin.enabled

Tipo: Boolean

Se deve habilitar ou desabilitar o tratamento join de inclinação.

valor default : true

spark.sql.adaptive.skewJoin.skewedPartitionFactor

Tipo: Integer

Um fator que, quando multiplicado pelo tamanho médio da partição, contribui para determinar se uma partição está distorcida.

valor default : 5

spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

Tipo: Byte String

Um limite que contribui para determinar se uma partição está distorcida.

valor default : 256MB

Uma partição é considerada distorcida quando (partition size > skewedPartitionFactor * median partition size) e (partition size > skewedPartitionThresholdInBytes) são true.

Detecte e propague dinamicamente relações vazias

Propriedade

spark.databricks.adaptive.emptyRelationPropagation.enabled

Tipo: Boolean

Se deve habilitar ou desabilitar a propagação dinâmica de relação vazia.

valor default : true

Perguntas frequentes (FAQ)

Por que o AQE não transmitiu uma pequena tabela de junção?

Se o tamanho da relação que se espera ser transmitido cair abaixo desse limite, mas ainda não for transmitido:

  • Verifique o tipo join . A transmissão não é compatível com determinados tipos join , por exemplo, a relação à esquerda de um LEFT OUTER JOIN não pode ser transmitida.

  • Também pode ser que a relação contenha muitas partições vazias, caso em que a maioria das tarefas pode ser concluída rapidamente com join merge de classificação ou pode ser potencialmente otimizada com manipulação join de inclinação. O AQE evita alterar essa join merge de classificação para join hash de transmissão se a porcentagem de partições não vazias for menor que spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin.

Ainda devo usar uma dica de estratégia de junção de transmissão com AQE ativado?

Sim. Uma join de transmissão planejada estaticamente geralmente tem mais desempenho do que uma planejada dinamicamente por AQE, pois AQE pode não alternar para join de transmissão até depois de realizar o shuffle para ambos os lados da join (quando os tamanhos reais da relação são obtidos). Portanto, usar uma dica de transmissão ainda pode ser uma boa escolha se você conhecer bem sua query . O AQE respeitará as dicas de query da mesma forma que a otimização estática, mas ainda poderá aplicar otimizações dinâmicas que não são afetadas pelas dicas.

Qual é a diferença entre dica de junção de inclinação e otimização de junção de inclinação AQE? Qual devo usar?

Recomenda-se confiar na manipulação join de inclinação AQE em vez de usar a dica join inclinação, porque join de inclinação AQE é totalmente automática e, em geral, tem um desempenho melhor do que a contraparte da dica.

Por que o AQE não ajustou minha ordem de junção automaticamente?

A reordenação join dinâmica não faz parte do AQE.

Por que o AQE não detectou minha distorção de dados?

Há duas condições de tamanho que devem ser satisfeitas para que o AQE detecte uma partição como uma partição distorcida:

  • O tamanho da partição é maior que spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (default 256 MB)

  • O tamanho da partição é maior que o tamanho médio de todas as partições vezes o fator de partição distorcida spark.sql.adaptive.skewJoin.skewedPartitionFactor (default 5)

Além disso, o suporte à manipulação de distorção é limitado para determinados tipos join , por exemplo, em LEFT OUTER JOIN, somente a distorção no lado esquerdo pode ser otimizada.

Legado

O termo “Adaptive Execution” existe desde o Spark 1.6, mas o novo AQE no Spark 3.0 é fundamentalmente diferente. Em termos de funcionalidade, o Spark 1.6 faz apenas a parte “unir partições dinamicamente”. Em termos de arquitetura técnica, o novo AQE é uma estrutura de planejamento dinâmico e replanejamento de query com base em estatísticas Runtime , que suporta uma variedade de otimizações como as que descrevemos neste artigo e pode ser estendida para permitir otimizações mais potenciais.