Usar o feed de dados de alteração do Delta Lake no Databricks

O feed de dados de alterações permite que o Databricks acompanhe as alterações no nível de linha entre as diferentes versões de uma tabela Delta.Quando ativado em uma tabela Delta, o tempo de execução registra eventos de alteração para todos os dados gravados na tabela. Isso inclui os dados da linha, juntamente com metadados que indicam se a linha especificada foi inserida, excluída ou atualizada.

Importante

O feed de dados de alteração funciona em conjunto com o histórico da tabela para fornecer informações de alteração. Devido à criação de um histórico separado ao clonar uma tabela Delta, o feed de dados de alterações em tabelas clonadas não coincide com o da tabela original.

Processar dados de alteração de forma incremental

Databricks recomenda o uso do change data feed em combinação com a transmissão estruturada para processar de forma incremental as alterações das tabelas do site Delta. O senhor deve usar a transmissão estruturada para que o site Databricks rastreie automaticamente as versões do feed de dados de alterações da sua tabela.

Observação

Delta Live Tables oferece funcionalidade para facilitar a propagação de dados de alteração e armazenar resultados como SCD (dimensões que mudam lentamente (SCD)) tipo 1 ou tabelas tipo 2. Consulte a seção APPLY CHANGES APIs: Simplificar a captura de dados de alterações (CDC) com Delta Live Tables.

Para ler o feed de dados de alteração de uma tabela, o senhor deve ativar o feed de dados de alteração nessa tabela. Consulte Ativar feed de dados de alteração.

Defina a opção readChangeFeed como true ao configurar uma transmissão em relação a uma tabela para ler o feed de dados de alteração, conforme mostrado no exemplo de sintaxe a seguir:

(spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

Em default, a transmissão retorna o último Snapshot da tabela quando a transmissão começou como INSERT e as alterações futuras como dados de alteração.

Os dados de alteração são confirmados como parte da transação Delta Lake e ficam disponíveis ao mesmo tempo em que os novos dados são confirmados na tabela.

Opcionalmente, o senhor pode especificar uma versão inicial. Consulte Devo especificar uma versão inicial?

O feed de dados de alteração também suporta a execução de lotes, o que requer a especificação de uma versão inicial. Veja Ler alterações nas consultas de lotes.

Opções como limites de taxa (maxFilesPerTrigger, maxBytesPerTrigger) e excludeRegex também são compatíveis com a leitura de dados de alteração.

A limitação de taxa pode ser atômica para versões diferentes da versão de captura instantânea inicial. Ou seja, toda a versão do commit será limitada por taxa ou todo o commit será retornado.

Devo especificar uma versão inicial?

Opcionalmente, o senhor pode especificar uma versão inicial se quiser ignorar as alterações que ocorreram antes de uma determinada versão. O senhor pode especificar uma versão usando um carimbo de data/hora ou o número de ID da versão registrado no log de transações Delta.

Observação

Uma versão inicial é necessária para leituras de lotes, e muitos padrões de lotes podem se beneficiar da definição de uma versão final opcional.

Quando o senhor estiver configurando cargas de trabalho de transmissão estruturada que envolvam alimentação de dados alterados, é importante entender como a especificação de uma versão inicial afeta o processamento.

Muitas cargas de trabalho de transmissão, especialmente o novo pipeline de processamento de dados, se beneficiam do comportamento do default. Com o comportamento default, o primeiro lote é processado quando a transmissão registra primeiro todos os registros existentes na tabela como INSERT operações no feed de dados de alteração.

Se a tabela de destino já contiver todos os registros com as alterações apropriadas até um determinado ponto, especifique uma versão inicial para evitar o processamento do estado da tabela de origem como eventos INSERT.

O exemplo a seguir mostra a sintaxe da recuperação de uma falha de transmissão em que o ponto de verificação foi corrompido. Neste exemplo, suponha as seguintes condições:

  1. O feed de dados de alteração foi ativado na tabela de origem na criação da tabela.

  2. A tabela downstream de destino processou todas as alterações até a versão 75, inclusive.

  3. O histórico de versões da tabela de origem está disponível para as versões 70 e superiores.

(spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
)
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")

Nesse exemplo, o senhor também deve especificar um novo local de ponto de verificação.

Importante

Se o senhor especificar uma versão inicial, a transmissão falhará ao começar de um novo ponto de verificação se a versão inicial não estiver mais presente no histórico da tabela. O Delta Lake limpa automaticamente as versões históricas, o que significa que todas as versões iniciais especificadas acabam sendo excluídas.

Consulte Posso usar o feed de dados de alteração para reproduzir todo o histórico de uma tabela?

Ler alterações em consultas em lote

O senhor pode usar a sintaxe de consulta de lotes para ler todas as alterações a partir de uma determinada versão ou para ler as alterações dentro de um intervalo especificado de versões.

Você especifica uma versão como um número inteiro e um carimbo de data/hora como uma string no formato yyyy-MM-dd[ HH:mm:ss[.SSS]].

As versões começar e terminar são inclusivas nas consultas. Para ler as alterações de uma determinada versão do começar para a versão mais recente da tabela, especifique apenas a versão inicial.

Se você fornecer uma versão mais baixo ou um carimbo de data/hora mais antigo do que aquele que registrou eventos de mudança, ou seja, quando o feed de dados de alterações foi habilitado, um erro será gerado indicando que o feed de dados de alterações não estava habilitado

Os exemplos de sintaxe a seguir demonstram o uso das opções de versão inicial e final com leituras de lotes:

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')
# version as ints or longs
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")
// version as ints or longs
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

Observação

Por padrão, se um usuário passar uma versão ou carimbo de data/hora que exceda o último commit em uma tabela, será gerado o erro timestampGreaterThanLatestCommit. No Databricks Runtime 11.3 LTSe acima, o feed de dados de alterações pode lidar com o caso de versão fora de alcance se o usuário definir a seguinte configuração para true:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Se você informar uma versão de início maior do que o último commit em uma tabela ou um carimbo de data/hora de início mais recente do que o último commit em uma tabela, então, quando a configuração anterior está ativada, um resultado de leitura vazio será retornado.

Se você fornecer uma versão final maior do que o último commit em uma tabela ou um carimbo de data/hora final mais recente do que o último commit em uma tabela, então, quando a configuração anterior estiver ativada no modo de leitura em lote, todas as alterações entre a versão de início e o último commit serão recuperadas.

Qual é o esquema para o feed de dados de alteração?

Quando você lê o feed de dados alterados de uma tabela, é utilizado o esquema da versão mais recente da tabela.

Observação

A maioria das operações de alteração e evolução de esquema são totalmente suportadas. A tabela com mapeamento de coluna ativado não oferece suporte para todos os casos de uso e demonstra comportamento diferente. Consulte Alterar limitações de alimentação de dados para tabelas com mapeamento de coluna ativado.

Além das colunas de dados do esquema da tabela Delta, o feed de dados de alterações contém colunas de metadados que indicam o tipo de evento de mudança:

Nome da coluna

Tipo

Valores

_change_type

String

insert, update_preimage, update_postimage, delete (1)

_commit_version

Long

O log Delta ou a versão da tabela que contém a alteração.

_commit_timestamp

Carimbo de data/hora

O carimbo de data/hora associado quando a confirmação foi criada.

(1) preimage é o valor antes da atualização, postimage é o valor após a atualização.

Observação

Você não pode ativar o feed de dados alterados em uma tabela se o esquema contiver colunas com os mesmos nomes dessas colunas adicionadas. Renomeie as colunas na tabela para resolver esse conflito antes de tentar ativar o feed de dados alterados.

Habilitar o feed de dados de alteração

O senhor só pode ler o feed de dados de alteração para tabelas habilitadas. O senhor deve habilitar explicitamente a opção de alterar o feed de dados usando um dos seguintes métodos:

  • Nova tabela: definir a propriedade delta.enableChangeDataFeed = true da tabela no comando CREATE TABLE.

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Tabela existente: defina a propriedade da tabela delta.enableChangeDataFeed = true no comando ALTER TABLE .

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Todas as novas tabela:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Importante

Somente as alterações feitas após o senhor ativar o feed de dados de alteração são registradas. As alterações anteriores em uma tabela não são capturadas.

Alterar armazenamento de dados

A ativação do feed de dados de alteração causa um pequeno aumento nos custos de armazenamento de uma tabela. Os registros de dados de alteração são gerados durante a execução da consulta e, em geral, são muito menores do que o tamanho total dos arquivos regravados.

O Databricks registra os dados de alteração das operações UPDATE, DELETE e MERGE na pasta _change_data do diretório da tabela. Algumas operações, como as operações somente de inserção e as exclusões de partições completas, não geram dados no diretório _change_data porque o site Databricks pode eficientemente compute o feed de dados de alteração diretamente da transação log.

Todas as leituras de arquivos de dados na pasta _change_data devem passar por APIs Delta Lake compatíveis.

Os arquivos na pasta _change_data seguem a política de retenção da tabela. Os dados de alteração do feed de dados são excluídos quando o comando VACUUM é executado.

Posso usar o feed de dados de alteração para reproduzir todo o histórico de uma tabela?

O feed de dados de alterações não se destina a servir como um registro permanente de todas as alterações em uma tabela. O feed de dados de alterações registra apenas as alterações que ocorrem depois que ele é ativado.

O feed de dados de alteração e o site Delta Lake permitem que o senhor sempre reconstrua um Snapshot completo de uma tabela de origem, o que significa que é possível começar uma nova transmissão lida em uma tabela com o feed de dados de alteração ativado e capturar a versão atual dessa tabela e todas as alterações que ocorrerem depois.

O senhor deve tratar os registros no feed de dados de alteração como transitórios e acessíveis apenas para uma janela de retenção especificada. O log de transações Delta remove versões de tabela e suas versões correspondentes de feed de dados de alteração em intervalos regulares. Quando uma versão é removida do log de transações, o senhor não pode mais ler o feed de dados de alteração dessa versão.

Se o seu caso de uso exigir a manutenção de um histórico permanente de todas as alterações em uma tabela, o senhor deve usar a lógica incremental para gravar registros do feed de dados de alteração em uma nova tabela. O exemplo de código a seguir demonstra o uso de trigger.AvailableNow, que aproveita o processamento incremental da transmissão estruturada, mas processa os dados disponíveis como uma carga de trabalho de lotes. O senhor pode programar essa carga de trabalho de forma assíncrona com seu pipeline de processamento principal para criar um backup do feed de dados de alterações para fins de auditoria ou reprodução total.

(spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")

Alterar as limitações do feed de dados para tabelas com mapeamento de coluna ativado

Com o mapeamento de colunas habilitado em uma tabela Delta, você pode excluir ou renomear colunas na tabela sem reescrever arquivos de dados para dados existentes.Com o mapeamento de colunas ativado, o feed de dados de alterações possui limitações após a realização de alterações de esquema não aditivas, como renomear ou remover uma coluna, alterar o tipo de dados ou mudanças na nulidade.

Importante

  • Não é possível ler o feed de dados de alteração para uma transação ou intervalo no qual ocorre uma alteração de esquema não aditiva usando semântica de lote.

  • Em Databricks Runtime 12.2 LTS e abaixo, as tabelas com mapeamento de coluna habilitado que sofreram alterações de esquema não aditivas não suportam leituras de transmissão no feed de dados de alteração. Veja a transmissão com mapeamento de colunas e alterações no esquema.

  • Em Databricks Runtime 11.3 LTS e abaixo, o senhor não pode ler o feed de dados de alteração para tabelas com mapeamento de coluna ativado que sofreram renomeação ou eliminação de coluna.

Em Databricks Runtime 12.2 LTS e acima, o senhor pode realizar leituras de lotes no feed de dados de alteração para tabelas com mapeamento de coluna ativado que sofreram alterações de esquema não aditivas. Em vez de usar o esquema da versão mais recente da tabela, as operações de leitura usam o esquema da versão final da tabela especificada na consulta. As consultas ainda falharão se o intervalo de versões especificado abranger uma alteração de esquema não aditiva.