Manuseio de consultas grandes em fluxo de trabalho interativo

Um desafio com o fluxo de trabalho de dados interativos é lidar com grandes consultas. Isso inclui consultas que geram muitas linhas de saída, buscam muitas partições externas ou compute em conjuntos de dados extremamente grandes. Essas consultas podem ser extremamente lentas, saturar o compute recurso e dificultar que outras pessoas compartilhem o mesmo compute.

O Query Watchdog é um processo que evita que as consultas monopolizem o recurso compute, examinando as causas mais comuns de consultas grandes e encerrando as consultas que ultrapassam um limite. Este artigo descreve como ativar e configurar o Query Watchdog.

Importante

O Query Watchdog está ativado para toda a computação multifuncional criada usando a UI.

Exemplo de uma consulta disruptiva

Um analista está realizando algumas consultas ad hoc em um data warehouse just-in-time. O analista usa uma autoescala compartilhada compute que facilita a utilização de um único compute por vários usuários ao mesmo tempo. Suponha que haja duas tabelas com um milhão de linhas cada.

import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.shuffle.partitions", 10)

spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_x")
spark.range(1000000)
  .withColumn("join_key", lit(" "))
  .createOrReplaceTempView("table_y")

Esses tamanhos de tabela são gerenciáveis no Apache Spark. No entanto, cada uma delas inclui uma coluna join_key com uma cadeia vazia em cada linha. Isso pode ocorrer se os dados não estiverem perfeitamente limpos ou se houver uma distorção significativa dos dados, em que algumas chaves são mais predominantes do que outras. Essas chaves join vazias são muito mais predominantes do que qualquer outro valor.

No código a seguir, o analista está unindo essas duas tabelas em sua chave, o que produz uma saída de um trilhão de resultados, e todos eles são produzidos em um único executor (o executor que obtém o " " key):

SELECT
  id, count(id)
FROM
  (SELECT
    x.id
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key)
GROUP BY id

Essa consulta parece estar em execução. Mas, sem saber sobre os dados, o analista vê que há "apenas" uma única tarefa restante durante a execução do Job. A consulta nunca é concluída, deixando o analista frustrado e confuso sobre o motivo de não ter funcionado.

Nesse caso, há apenas um problema join key. Outras vezes, pode haver muito mais.

Ativar e configurar o Query Watchdog

Para ativar e configurar o Query Watchdog, são necessários os seguintes passos.

  • Habilite o Watchdog com spark.databricks.queryWatchdog.enabled.

  • Configure o tempo de execução da tarefa com spark.databricks.queryWatchdog.minTimeSecs.

  • Exibir a saída com spark.databricks.queryWatchdog.minOutputRows.

  • Configure a taxa de saída com spark.databricks.queryWatchdog.outputRatioThreshold.

Para evitar que uma consulta crie um número excessivo de linhas de saída para o número de linhas de entrada, o senhor pode ativar o Query Watchdog e configurar o número máximo de linhas de saída como um múltiplo do número de linhas de entrada. Neste exemplo, usamos uma proporção de 1000 (o default).

spark.conf.set("spark.databricks.queryWatchdog.enabled", true)
spark.conf.set("spark.databricks.queryWatchdog.outputRatioThreshold", 1000L)

A última configuração declara que uma determinada tarefa nunca deve produzir mais de 1.000 vezes o número de linhas de entrada.

Dica

A taxa de saída é totalmente personalizável. Recomendamos começar com valores mais baixos e ver qual limite funciona melhor para o senhor e sua equipe. Uma faixa de 1.000 a 10.000 é um bom ponto de partida.

O Query Watchdog não só evita que os usuários monopolizem o recurso compute para trabalhos que nunca serão concluídos, como também economiza tempo ao fazer falhar rapidamente uma consulta que nunca teria sido concluída. Por exemplo, a consulta a seguir falhará após vários minutos porque excede a proporção.

SELECT
  z.id
  join_key,
  sum(z.id),
  count(z.id)
FROM
  (SELECT
    x.id,
    y.join_key
  FROM
    table_x x
  JOIN
    table_y y
  on x.join_key = y.join_key) z
GROUP BY join_key, z.id

Aqui está o que o senhor veria:

Consulta ao watchdog

Normalmente, é suficiente ativar o Query Watchdog e definir a relação do limite de saída/entrada, mas o senhor também tem a opção de definir duas propriedades adicionais: spark.databricks.queryWatchdog.minTimeSecs e spark.databricks.queryWatchdog.minOutputRows. Essas propriedades especificam o tempo mínimo que uma determinada tarefa em uma consulta deve executar antes de ser cancelada e o número mínimo de linhas de saída para uma tarefa nessa consulta.

Por exemplo, o senhor pode definir minTimeSecs com um valor mais alto se quiser dar a ele a chance de produzir um grande número de linhas por tarefa. Da mesma forma, o senhor pode definir spark.databricks.queryWatchdog.minOutputRows como dez milhões se quiser interromper uma consulta somente depois que uma tarefa dessa consulta tiver produzido dez milhões de linhas. Se o valor for menor, a consulta será bem-sucedida, mesmo que a proporção de saída/entrada tenha sido excedida.

spark.conf.set("spark.databricks.queryWatchdog.minTimeSecs", 10L)
spark.conf.set("spark.databricks.queryWatchdog.minOutputRows", 100000L)

Dica

Se o senhor configurar o Query Watchdog em um Notebook, a configuração não persistirá entre as reinicializações do compute. Se quiser configurar o Query Watchdog para todos os usuários de um compute, recomendamos que o senhor use uma configuraçãocompute .

Detectar consultas em conjuntos de dados extremamente grandes

Outra consulta grande típica pode examinar uma grande quantidade de dados de grandes tabelas/conjuntos de dados. As operações de varredura podem durar muito tempo e saturar o compute recurso (até mesmo a leitura de metadados de uma tabela Hive grande pode levar um tempo significativo). O senhor pode definir maxHivePartitions para evitar a obtenção de muitas partições de uma tabela Hive grande. Da mesma forma, o senhor também pode definir maxQueryTasks para limitar as consultas em um site extremamente grande dataset.

spark.conf.set("spark.databricks.queryWatchdog.maxHivePartitions", 20000)
spark.conf.set("spark.databricks.queryWatchdog.maxQueryTasks", 20000)

Quando o senhor deve ativar o Query Watchdog?

O Query Watchdog deve ser ativado para análises ad hoc compute em que o analista SQL e o cientista de dados estão compartilhando um determinado compute e um administrador precisa garantir que as consultas "funcionem bem" umas com as outras.

Quando o senhor deve desativar o Query Watchdog?

Em geral, não aconselhamos o cancelamento antecipado de consultas usadas em um cenário de ETL porque, normalmente, não há um ser humano no loop para corrigir o erro. Recomendamos que o senhor desative o Query Watchdog para todas as análises, exceto as ad hoc compute.