Usar o feed de dados de alteração do Delta Lake no Databricks
Observação
Este artigo descreve como registrar e consultar informações de alteração em nível de linha para tabelas Delta usando o recurso de feed de dados alterados. Para saber como atualizar tabelas em um pipeline Delta Live Tables com base em alterações nos dados de origem, consulte Captura simplificada de dados de alterações (CDC) com a API APPLY CHANGES em Delta Live Tables.
O feed de dados de alterações permite que o Databricks acompanhe as alterações no nível de linha entre as diferentes versões de uma tabela Delta.Quando ativado em uma tabela Delta, o tempo de execução registra eventos de alteração para todos os dados gravados na tabela. Isso inclui os dados da linha, juntamente com metadados que indicam se a linha especificada foi inserida, excluída ou atualizada.
Você pode ler os eventos de alteração em consultas em lote usando Spark SQL, Apache Spark DataFrames e transmissão estruturada.
Importante
O feed de dados de alteração funciona em conjunto com o histórico da tabela para fornecer informações de alteração. Devido à criação de um histórico separado ao clonar uma tabela Delta, o feed de dados de alterações em tabelas clonadas não coincide com o da tabela original.
Casos de uso
O feed de dados de alteração não está habilitado por padrão. Os seguintes casos de uso devem ser considerados ao decidir quando habilitar o feed de dados de alterações
Tabelas Silver e Gold: melhore o desempenho do Delta Lake processando somente as alterações no nível da linha após
MERGE
UPDATE
DELETE
as operações iniciais , ou para acelerar e simplificar as operações ETL e ELT.Visualizações materializadas: crie visualizações agregadas atualizadas de informações para uso em BI e análises sem precisar reprocessar as tabelas subjacentes completas, atualizando apenas onde ocorreram mudanças.
Transmitir alterações: envie um feed de dados de alterações para sistemas downstream, como Kafka ou RDBMS, que podem usá-lo para processamento incremental em estágios posteriores dos pipelines de dados.
Tabela de trilha de auditoria: capture o feed de dados de alterações como uma tabela Delta oferece armazenamento perpétuo e capacidade de consulta eficiente para ver todas as mudanças ao longo do tempo, incluindo quando ocorrem exclusões e quais atualizações foram feitas.
Habilitar o feed de dados de alteração
Você deve habilitar explicitamente a opção de alteração de feed de dados empregando um dos seguintes métodos:
Nova tabela: definir a propriedade
delta.enableChangeDataFeed = true
da tabela no comandoCREATE TABLE
.CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
Tabela existente: defina a propriedade da tabela
delta.enableChangeDataFeed = true
no comandoALTER TABLE
.ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
Todas as novas tabela:
set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
Importante
Apenas as alterações feitas após a ativação do feed de dados de alterações são registradas; as alterações anteriores em uma tabela não são registradas.
Alterar armazenamento de dados
O Databricks registra dados alterados para operações UPDATE
, DELETE
e MERGE
na pasta _change_data
no diretório da tabela. Algumas operações, como operações somente de inserção e exclusões de partições completas, não geram dados no diretório _change_data
porque o Databricks pode calcular com eficiência o feed de dados alterados diretamente do log de transações.
Os arquivos da pasta _change_data
seguem a política de retenção da tabela. Portanto, se você executar o comando VACUUM, os dados do feed de dados alterados também serão excluídos.
Ler alterações em consultas em lote
Você pode fornecer a versão ou o carimbo de data/hora do início e do fim. As versões inicial e final e carimbos de data/hora são incluídos nas consultas. Para ler as alterações de uma versão inicial específica para a versão mais recente da tabela, especifique apenas a versão inicial ou o carimbo de data/hora.
Você especifica uma versão como um número inteiro e um carimbo de data/hora como uma string no formato yyyy-MM-dd[ HH:mm:ss[.SSS]]
.
Se você fornecer uma versão mais baixo ou um carimbo de data/hora mais antigo do que aquele que registrou eventos de mudança, ou seja, quando o feed de dados de alterações foi habilitado, um erro será gerado indicando que o feed de dados de alterações não estava habilitado
-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)
-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)
-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')
-- path based tables
SELECT * FROM table_changes_by_path('\path', '2021-04-21 05:45:46')
# version as ints or longs
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
# timestamps as formatted timestamp
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
# providing only the startingVersion/timestamp
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
# path based tables
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.load("pathToMyDeltaTable")
// version as ints or longs
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
// timestamps as formatted timestamp
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
// providing only the startingVersion/timestamp
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
// path based tables
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.load("pathToMyDeltaTable")
Leia as alterações nas consultas de transmissão
# providing a starting version
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
# providing a starting timestamp
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", "2021-04-21 05:35:43") \
.load("/pathToMyDeltaTable")
# not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.table("myDeltaTable")
// providing a starting version
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
// providing a starting timestamp
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", "2021-04-21 05:35:43")
.load("/pathToMyDeltaTable")
// not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.table("myDeltaTable")
Para obter os dados alterados durante a leitura da tabela, defina a opção readChangeFeed
como true
. O startingVersion
ou startingTimestamp
são opcionais e, se não forem fornecidos, o fluxo retornará o snapshot mais recente da tabela no momento do fluxo como um INSERT
e alterações futuras como dados alterados. Opções como limites de taxa (maxFilesPerTrigger
, maxBytesPerTrigger
) e excludeRegex
também são compatíveis na leitura de dados alterados.
Observação
A limitação de taxa pode ser atômica para versões diferentes da versão de captura instantânea inicial. Ou seja, toda a versão do commit será limitada por taxa ou todo o commit será retornado.
Por padrão, se um usuário passar uma versão ou carimbo de data/hora que exceda o último commit em uma tabela, será gerado o erro timestampGreaterThanLatestCommit
. No Databricks Runtime 11.3 LTSe acima, o feed de dados de alterações pode lidar com o caso de versão fora de alcance se o usuário definir a seguinte configuração para true
:
set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
Se você informar uma versão de início maior do que o último commit em uma tabela ou um carimbo de data/hora de início mais recente do que o último commit em uma tabela, então, quando a configuração anterior está ativada, um resultado de leitura vazio será retornado.
Se você fornecer uma versão final maior do que o último commit em uma tabela ou um carimbo de data/hora final mais recente do que o último commit em uma tabela, então, quando a configuração anterior estiver ativada no modo de leitura em lote, todas as alterações entre a versão de início e o último commit serão recuperadas.
Qual é o esquema para o feed de dados de alteração?
Quando você lê o feed de dados alterados de uma tabela, é utilizado o esquema da versão mais recente da tabela.
Observação
A maioria das operações de alteração e evolução de esquema são totalmente suportadas. A tabela com mapeamento de coluna ativado não oferece suporte para todos os casos de uso e demonstra comportamento diferente. Consulte Alterar limitações de alimentação de dados para tabelas com mapeamento de coluna ativado.
Além das colunas de dados do esquema da tabela Delta, o feed de dados de alterações contém colunas de metadados que indicam o tipo de evento de mudança:
Nome da coluna |
Tipo |
Valores |
---|---|---|
|
String |
|
|
Long |
O log Delta ou a versão da tabela que contém a alteração. |
|
Carimbo de data/hora |
O carimbo de data/hora associado quando a confirmação foi criada. |
(1) preimage
é o valor antes da atualização, postimage
é o valor após a atualização.
Observação
Você não pode ativar o feed de dados alterados em uma tabela se o esquema contiver colunas com os mesmos nomes dessas colunas adicionadas. Renomeie as colunas na tabela para resolver esse conflito antes de tentar ativar o feed de dados alterados.
Alterar as limitações do feed de dados para tabelas com mapeamento de coluna ativado
Com o mapeamento de colunas habilitado em uma tabela Delta, você pode excluir ou renomear colunas na tabela sem reescrever arquivos de dados para dados existentes.Com o mapeamento de colunas ativado, o feed de dados de alterações possui limitações após a realização de alterações de esquema não aditivas, como renomear ou remover uma coluna, alterar o tipo de dados ou mudanças na nulidade.
Importante
Não é possível ler o feed de dados de alteração para uma transação ou intervalo no qual ocorre uma alteração de esquema não aditiva usando semântica de lote.
Em Databricks Runtime 12.2 LTS e abaixo, as tabelas com mapeamento de coluna habilitado que sofreram alterações de esquema não aditivas não suportam leituras de transmissão no feed de dados de alteração. Veja a transmissão com mapeamento de colunas e alterações no esquema.
Em Databricks Runtime 11.3 LTS e abaixo, o senhor não pode ler o feed de dados de alteração para tabelas com mapeamento de coluna ativado que sofreram renomeação ou eliminação de coluna.
Em Databricks Runtime 12.2 LTS e acima, o senhor pode realizar leituras de lotes no feed de dados de alteração para tabelas com mapeamento de coluna ativado que sofreram alterações de esquema não aditivas. Em vez de usar o esquema da versão mais recente da tabela, as operações de leitura usam o esquema da versão final da tabela especificada na consulta. As consultas ainda falharão se o intervalo de versões especificado abranger uma alteração de esquema não aditiva.
Perguntas frequentes (FAQ)
Qual é o custo indireto de habilitar o feed de dados de alteração?
Não há impacto considerável. Os registros de dados de alterações são gerados na hora durante o processo de execução da consulta e, em geral, são muito menores do que o tamanho total dos arquivos reescritos.