Práticas recomendadas: Delta Lake

Este artigo descreve as práticas recomendadas ao usar o Delta Lake.

Para otimizações do Databricks, consulte Recomendações de otimização no Databricks.

Ao excluir e recriar uma tabela no mesmo local, você deve sempre usar uma instrução CREATE OR REPLACE TABLE . Consulte Eliminar ou substituir uma tabela Delta.

Usar clusters líquidos para otimizar a omissão de dados

A Databricks recomenda o uso de clusters líquidos em vez de particionamento, Z-order, ou outras estratégias de organização de dados para otimizar a disposição dos dados para o salto de dados. Consulte Usar clusters líquidos para tabelas Delta.

Arquivos compactos

A Databricks recomenda a execução frequente do comando OPTIMIZE para compactar arquivos pequenos.

Observação

Esta operação não remove os arquivos antigos. Para removê-los, execute o comando VACUUM.

Substituir o conteúdo ou esquema de uma tabela

Ocasionalmente, você pode querer substituir uma tabela Delta. Por exemplo:

  • Você descobre que os dados na tabela estão incorretos e deseja substituir o conteúdo.

  • Você deseja regravar toda a tabela para fazer alterações incompatíveis no esquema (como alterar os tipos de coluna).

Embora você possa excluir todo o diretório de uma tabela Delta e criar uma nova tabela no mesmo caminho, ela não é recomendada porque:

  • Excluir um diretório não é eficiente. Um diretório que contém arquivos muito grandes pode levar horas ou até dias para ser excluído.

  • Você perde todo o conteúdo dos arquivos excluídos; é difícil recuperar se você excluir a tabela errada.

  • A exclusão do diretório não é atômica. Enquanto você estiver excluindo a tabela, uma consulta simultânea lendo a tabela pode falhar ou ver uma tabela parcial.

Se você não precisar alterar o esquema da tabela, poderá excluir dados de uma tabela Delta e inserir seus novos dados ou atualizar a tabela para corrigir os valores incorretos.

Se quiser alterar o esquema da tabela, você pode substituir toda a tabela atomicamente. Por exemplo:

dataframe.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .saveAsTable("<your-table>") # Managed table

dataframe.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .option("path", "<your-table-path>") \
  .saveAsTable("<your-table>") # External table
REPLACE TABLE <your-table> USING DELTA AS SELECT ... -- Managed table
REPLACE TABLE <your-table> USING DELTA LOCATION "<your-table-path>" AS SELECT ... -- External table
dataframe.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable("<your-table>") // Managed table

dataframe.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .option("path", "<your-table-path>")
  .saveAsTable("<your-table>") // External table

Existem vários benefícios com essa abordagem:

  • Substituir uma tabela é muito mais rápido porque ela não precisa listar o diretório recursivamente ou excluir nenhum arquivo.

  • A versão antiga da tabela ainda existe. Se você excluir a tabela errada, você poderá recuperar facilmente os dados antigos utilizando viagens de tempo. Consulte Trabalhar com a história da tabela do Delta Lake.

  • É uma operação atômica. Consultas simultâneas ainda podem ler a tabela enquanto você exclui a tabela.

  • Devido às garantias de transação do Delta Lake ACID, se a substituição da tabela falhar, ela estará em seu estado anterior.

Além disso, se desejar excluir arquivos antigos para economizar custos de armazenamento após substituir a tabela, você pode usar VACUUM para excluí-los. É otimizado para exclusão de arquivos e geralmente é mais rápido do que excluir o diretório inteiro.

Cache Spark

A Databricks não recomenda que o senhor use o cache do Spark pelos seguintes motivos:

  • Você perde qualquer perda de dados que possa vir de filtros adicionais adicionados ao cache DataFrame.

  • Os dados que são armazenados em cache podem não ser atualizados se a tabela for acessada usando um identificador diferente.

Diferenças entre o Delta Lake e o Parquet no Apache Spark

O Delta Lake lida com as seguintes operações automaticamente. Você nunca deve executar estas operações manualmente:

  • REFRESH TABLE: As tabelas delta sempre retornam as informações mais atualizadas, portanto, não há necessidade de ligar REFRESH TABLE manualmente após as alterações.

  • Adicionar e remover partições: o Delta Lake rastreia automaticamente o conjunto de partições presentes em uma tabela e atualiza a lista à medida que os dados são adicionados ou removidos. Como resultado, não há necessidade de executar ALTER TABLE [ADD|DROP] PARTITION ou MSCK.

  • Carregar uma única partição: não é necessário ler partições diretamente. Por exemplo, você não precisa executar o spark.read.format("parquet").load("/data/date=2017-01-01"). Em vez disso, utilize uma cláusula WHERE para pular dados, como spark.read.table("<table-name>").where("date = '2017-01-01'").

  • Não modifique manualmente os arquivos de dados: o Delta Lake usa o log de transações para confirmar as alterações na tabela atomicamente. Não modifique, adicione ou exclua arquivos de dados Parquet diretamente em uma tabela Delta, porque isso pode levar à perda de dados ou à corrupção da tabela.

Melhorar o desempenho do merge do Delta Lake

Você pode reduzir o tempo necessário para merge usando as seguintes abordagens:

  • Reduza o espaço de busca por correspondências: por padrão, a merge operação pesquisa toda a tabela Delta para encontrar correspondências na tabela de origem. Uma forma de acelerar merge é reduzir o espaço de pesquisa adicionando restrições conhecidas na condição de correspondência. Por exemplo, suponha que você tenha uma tabela particionada por country date e e queira usar merge para atualizar as informações do último dia e de um país específico. Adicionar a condição a seguir torna a consulta mais rápida, pois ela procura correspondências somente nas partições relevantes:

    events.date = current_date() AND events.country = 'USA'
    

    Além disso, essa consulta também reduz as chances de conflitos com outras operações simultâneas. Consulte Níveis de isolamento e conflitos de gravação no Databricks para obter mais detalhes.

  • Arquivos compactos: se os dados forem armazenados em muitos arquivos pequenos, a leitura dos dados para pesquisar correspondências pode se tornar lenta. Você pode compactar arquivos pequenos em arquivos maiores para melhorar o rendimento de leitura. Consulte Compactar arquivos de dados com otimização no Delta Lake para detalhes.

  • Controle as partições aleatórias para gravações: a operação merge embaralha dados várias vezes para calcular e gravar os dados atualizados. O número de tarefas usadas para embaralhar é controlado pela configuração da sessão do Spark spark.sql.shuffle.partitions. A definição desse parâmetro não apenas controla o paralelismo, mas também determina o número de arquivos de saída. Aumentar o valor aumenta o paralelismo, mas também gera um número maior de arquivos de dados menores.

  • Ativar gravações otimizadas: para tabelas particionadas, merge pode produzir um número muito maior de arquivos pequenos do que o número de partições aleatórias. Isso ocorre porque cada tarefa aleatória pode gravar vários arquivos em várias partições e pode se tornar um gargalo de desempenho. Você pode reduzir o número de arquivos ativando gravações otimizadas. Consulte Gravações otimizadas para Delta Lake em Databricks.

  • Ajustar tamanhos de arquivo na tabela: o Databricks pode detectar automaticamente se uma tabela Delta tem operações merge frequentes que reescrevem arquivos e podem optar por reduzir o tamanho dos arquivos reescritos em antecipação a novas reescritas de arquivos no futuro. Consulte a seção sobre como ajustar o tamanho dos arquivos para obter detalhes.

  • Low Shuffle Merge: Low Shuffle Merge fornece uma implementação otimizada de MERGE que fornece melhor desempenho para as cargas de trabalho mais comuns. Além disso, preserva as otimizações de disposição de dados existentes, como Z-ordering em dados não modificados.

Gerenciar a recenticidade de dados

No início de cada consulta, as tabelas Delta são atualizadas automaticamente para a versão mais recente da tabela. Este processo pode ser observado em cadernos quando o status do comando relata: Updating the Delta table's state. No entanto, ao executar a análise histórica em uma tabela, talvez você não precise necessariamente de dados atualizados até o último minuto, especialmente para tabelas em que os dados de transmissão estão sendo ingeridos com frequência. Nesses casos, as consultas podem ser executadas em instantâneos obsoletos da tabela Delta. Essa abordagem pode reduzir a latência para obter resultados de consultas.

Você pode configurar a tolerância para dados obsoletos definindo a configuração da sessão do Spark spark.databricks.delta.stalenessLimit com um valor strings de tempo como 1h ou 15m (por 1 hora ou 15 minutos, respectivamente). Esta configuração é específica da sessão e não afeta outros clientes que acessam a tabela. Se o estado da tabela tiver sido atualizado dentro do limite de inatividade, uma query na tabela retornará resultados sem aguardar a atualização mais recente da tabela. Essa configuração nunca impede a atualização da tabela e, quando dados obsoletos são retornados, a atualização é processada em segundo plano. Se a última atualização da tabela for anterior ao limite de inatividade, a query não retornará resultados até que a atualização do estado da tabela seja concluída.

Pontos de verificação aprimorados para consultas de baixa latência

O Delta Lake grava pontos de verificação como um estado agregado de uma tabela Delta em uma frequência otimizada. Esses pontos de verificação servem como ponto de partida para calcular o estado mais recente da tabela. Sem pontos de verificação, o Delta Lake teria que ler uma grande coleção de arquivos JSON ("arquivos delta") que representam confirmações no log de transações para calcular o estado de uma tabela. Além disso, as estatísticas em nível de coluna que o Delta Lake usa para executar o salto de dados são armazenadas no ponto de verificação.

Importante

Os pontos de verificação do Delta Lake são diferentes dos pontos de verificação do streaming estruturado.

As estatísticas no nível da coluna são armazenadas como um struct e um JSON (para compatibilidade com versões anteriores). O formato struct torna as leituras do Delta Lake muito mais rápidas, porque:

  • Delta Lake não executa análise JSON cara para obter estatísticas em nível de coluna.

  • Os recursos de remoção da coluna Parquet reduzem significativamente a E/S necessária para ler as estatísticas de uma coluna.

O formato struct possibilita uma série de otimizações que reduzem a sobrecarga das operações de leitura do Delta Lake de segundos para dezenas de milissegundos, o que diminui significativamente a latência para consultas rápidas.

Gerenciar estatísticas em nível de coluna em pontos de controle

Você gerencia como as estatísticas são gravadas nos pontos de verificação usando as propriedades da tabela delta.checkpoint.writeStatsAsJson e delta.checkpoint.writeStatsAsStruct. Se ambas as propriedades da tabela forem false, Delta Lake não poderá ignorar dados.

  • As gravações em lote registram estatísticas em ambos os formatos JSON e struct.delta.checkpoint.writeStatsAsJson é true.

  • delta.checkpoint.writeStatsAsStruct é indefinido por padrão.

  • Os leitores usam a coluna struct quando disponível e, caso contrário, voltam a usar a coluna JSON.

Importante

Pontos de verificação aprimorados não quebram a compatibilidade com leitores Delta Lake de código aberto. No entanto, falsedefinir delta.checkpoint.writeStatsAsJson como false pode ter implicações em leitores exclusivos do Delta Lake.Entre em contato com seus fornecedores para saber mais sobre as implicações de desempenho.

Habilitar pontos de verificação aprimorados para consultas de Transmissão Estruturada

Se suas cargas de trabalho de transmissão estruturada não tiverem requisitos de baixa latência (latências de subminuto), você poderá habilitar pontos de verificação aprimorados executando o seguinte comando SQL:

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
('delta.checkpoint.writeStatsAsStruct' = 'true')

Você também pode melhorar a latência de gravação do ponto de verificação configurando as seguintes propriedades da tabela:

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
(
 'delta.checkpoint.writeStatsAsStruct' = 'true',
 'delta.checkpoint.writeStatsAsJson' = 'false'
)

Se a omissão de dados não for útil em seu aplicativo, você pode definir ambas as propriedades como falso. Então, nenhuma estatística é coletada ou escrita. A Databricks não recomenda essa configuração.