Usar o feed de dados de alteração do Delta Lake no Databricks

Observação

O feed de dados de alterações permite que o Databricks acompanhe as alterações no nível de linha entre as diferentes versões de uma tabela Delta.Quando ativado em uma tabela Delta, o tempo de execução registra eventos de alteração para todos os dados gravados na tabela. Isso inclui os dados da linha, juntamente com metadados que indicam se a linha especificada foi inserida, excluída ou atualizada.

Você pode ler os eventos de alteração em consultas em lote usando Spark SQL, Apache Spark DataFrames e transmissão estruturada.

Importante

O feed de dados de alteração funciona em conjunto com o histórico da tabela para fornecer informações de alteração. Devido à criação de um histórico separado ao clonar uma tabela Delta, o feed de dados de alterações em tabelas clonadas não coincide com o da tabela original.

Casos de uso

O feed de dados de alteração não está habilitado por padrão. Os seguintes casos de uso devem ser considerados ao decidir quando habilitar o feed de dados de alterações

  • Tabelas Silver e Gold: melhore o desempenho do Delta Lake processando somente as alterações no nível da linha após MERGE UPDATE DELETE as operações iniciais , ou para acelerar e simplificar as operações ETL e ELT.

  • Visualizações materializadas: crie visualizações agregadas atualizadas de informações para uso em BI e análises sem precisar reprocessar as tabelas subjacentes completas, atualizando apenas onde ocorreram mudanças.

  • Transmitir alterações: envie um feed de dados de alterações para sistemas downstream, como Kafka ou RDBMS, que podem usá-lo para processamento incremental em estágios posteriores dos pipelines de dados.

  • Tabela de trilha de auditoria: capture o feed de dados de alterações como uma tabela Delta oferece armazenamento perpétuo e capacidade de consulta eficiente para ver todas as mudanças ao longo do tempo, incluindo quando ocorrem exclusões e quais atualizações foram feitas.

Habilitar o feed de dados de alteração

Você deve habilitar explicitamente a opção de alteração de feed de dados empregando um dos seguintes métodos:

  • Nova tabela: definir a propriedade delta.enableChangeDataFeed = true da tabela no comando CREATE TABLE.

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Tabela existente: defina a propriedade da tabela delta.enableChangeDataFeed = true no comando ALTER TABLE .

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Todas as novas tabela:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Importante

Apenas as alterações feitas após a ativação do feed de dados de alterações são registradas; as alterações anteriores em uma tabela não são registradas.

Alterar armazenamento de dados

O Databricks registra dados alterados para operações UPDATE, DELETEe MERGE na pasta _change_data no diretório da tabela. Algumas operações, como operações somente de inserção e exclusões de partições completas, não geram dados no diretório _change_data porque o Databricks pode calcular com eficiência o feed de dados alterados diretamente do log de transações.

Os arquivos da pasta _change_data seguem a política de retenção da tabela. Portanto, se você executar o comando VACUUM, os dados do feed de dados alterados também serão excluídos.

Ler alterações em consultas em lote

Você pode fornecer a versão ou o carimbo de data/hora do início e do fim. As versões inicial e final e carimbos de data/hora são incluídos nas consultas. Para ler as alterações de uma versão inicial específica para a versão mais recente da tabela, especifique apenas a versão inicial ou o carimbo de data/hora.

Você especifica uma versão como um número inteiro e um carimbo de data/hora como uma string no formato yyyy-MM-dd[ HH:mm:ss[.SSS]].

Se você fornecer uma versão mais baixo ou um carimbo de data/hora mais antigo do que aquele que registrou eventos de mudança, ou seja, quando o feed de dados de alterações foi habilitado, um erro será gerado indicando que o feed de dados de alterações não estava habilitado

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

-- path based tables
SELECT * FROM table_changes_by_path('\path', '2021-04-21 05:45:46')
# version as ints or longs
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# path based tables
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .load("pathToMyDeltaTable")
// version as ints or longs
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// path based tables
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .load("pathToMyDeltaTable")

Leia as alterações nas consultas de transmissão

# providing a starting version
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# providing a starting timestamp
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", "2021-04-21 05:35:43") \
  .load("/pathToMyDeltaTable")

# not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .table("myDeltaTable")
// providing a starting version
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// providing a starting timestamp
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", "2021-04-21 05:35:43")
  .load("/pathToMyDeltaTable")

// not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

Para obter os dados alterados durante a leitura da tabela, defina a opção readChangeFeed como true. O startingVersion ou startingTimestamp são opcionais e, se não forem fornecidos, o fluxo retornará o snapshot mais recente da tabela no momento do fluxo como um INSERT e alterações futuras como dados alterados. Opções como limites de taxa (maxFilesPerTrigger, maxBytesPerTrigger) e excludeRegex também são compatíveis na leitura de dados alterados.

Observação

A limitação de taxa pode ser atômica para versões diferentes da versão de captura instantânea inicial. Ou seja, toda a versão do commit será limitada por taxa ou todo o commit será retornado.

Por padrão, se um usuário passar uma versão ou carimbo de data/hora que exceda o último commit em uma tabela, será gerado o erro timestampGreaterThanLatestCommit. No Databricks Runtime 11.3 LTSe acima, o feed de dados de alterações pode lidar com o caso de versão fora de alcance se o usuário definir a seguinte configuração para true:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Se você informar uma versão de início maior do que o último commit em uma tabela ou um carimbo de data/hora de início mais recente do que o último commit em uma tabela, então, quando a configuração anterior está ativada, um resultado de leitura vazio será retornado.

Se você fornecer uma versão final maior do que o último commit em uma tabela ou um carimbo de data/hora final mais recente do que o último commit em uma tabela, então, quando a configuração anterior estiver ativada no modo de leitura em lote, todas as alterações entre a versão de início e o último commit serão recuperadas.

Qual é o esquema para o feed de dados de alteração?

Quando você lê o feed de dados alterados de uma tabela, é utilizado o esquema da versão mais recente da tabela.

Observação

A maioria das operações de alteração e evolução de esquema são totalmente suportadas. A tabela com mapeamento de coluna ativado não oferece suporte para todos os casos de uso e demonstra comportamento diferente. Consulte Alterar limitações de alimentação de dados para tabelas com mapeamento de coluna ativado.

Além das colunas de dados do esquema da tabela Delta, o feed de dados de alterações contém colunas de metadados que indicam o tipo de evento de mudança:

Nome da coluna

Tipo

Valores

_change_type

String

insert, update_preimage, update_postimage, delete (1)

_commit_version

Long

O log Delta ou a versão da tabela que contém a alteração.

_commit_timestamp

Carimbo de data/hora

O carimbo de data/hora associado quando a confirmação foi criada.

(1) preimage é o valor antes da atualização, postimage é o valor após a atualização.

Observação

Você não pode ativar o feed de dados alterados em uma tabela se o esquema contiver colunas com os mesmos nomes dessas colunas adicionadas. Renomeie as colunas na tabela para resolver esse conflito antes de tentar ativar o feed de dados alterados.

Alterar as limitações do feed de dados para tabelas com mapeamento de coluna ativado

Com o mapeamento de colunas habilitado em uma tabela Delta, você pode excluir ou renomear colunas na tabela sem reescrever arquivos de dados para dados existentes.Com o mapeamento de colunas ativado, o feed de dados de alterações possui limitações após a realização de alterações de esquema não aditivas, como renomear ou remover uma coluna, alterar o tipo de dados ou mudanças na nulidade.

Importante

  • Não é possível ler o feed de dados de alteração para uma transação ou intervalo no qual ocorre uma alteração de esquema não aditiva usando semântica de lote.

  • Em Databricks Runtime 12.2 LTS e abaixo, as tabelas com mapeamento de coluna habilitado que sofreram alterações de esquema não aditivas não suportam leituras de transmissão no feed de dados de alteração. Veja a transmissão com mapeamento de colunas e alterações no esquema.

  • Em Databricks Runtime 11.3 LTS e abaixo, o senhor não pode ler o feed de dados de alteração para tabelas com mapeamento de coluna ativado que sofreram renomeação ou eliminação de coluna.

Em Databricks Runtime 12.2 LTS e acima, o senhor pode realizar leituras de lotes no feed de dados de alteração para tabelas com mapeamento de coluna ativado que sofreram alterações de esquema não aditivas. Em vez de usar o esquema da versão mais recente da tabela, as operações de leitura usam o esquema da versão final da tabela especificada na consulta. As consultas ainda falharão se o intervalo de versões especificado abranger uma alteração de esquema não aditiva.

Perguntas frequentes (FAQ)

Qual é o custo indireto de habilitar o feed de dados de alteração?

Não há impacto considerável. Os registros de dados de alterações são gerados na hora durante o processo de execução da consulta e, em geral, são muito menores do que o tamanho total dos arquivos reescritos.

Qual é a política de retenção de registros de alterações?

Os registros de alterações seguem a mesma política de retenção que as versões desatualizadas da tabela e serão removidos através do comando VACUUM se estiverem fora do período de retenção especificado.

Quando novos registros são disponibilizados no feed de dados de alteração?

Os dados alterados são confirmados junto com a transação Delta Lake e ficarão disponíveis ao mesmo tempo em que os novos dados estiverem disponíveis na tabela.

Exemplo de notebook: propagar alterações com feed de dados de alteração Delta

Este notebook mostra como propagar as alterações feitas em uma tabela de prata de número absoluto de vacinações para uma tabela ouro de taxas de vacinação.

Alterar notebook do feed de dados

Abra o bloco de anotações em outra guia