Captura simplificada de dados de alterações (CDC) com a API APPLY CHANGES em Delta Live Tables

Delta Live Tables simplifica a captura de dados de alterações (CDC) (CDC) com a API APPLY CHANGES . Anteriormente, a instrução MERGE INTO era comumente usada para processar registros CDC no Databricks. No entanto, MERGE INTO pode produzir resultados incorretos devido a registros fora de sequência ou exigir lógica complexa para reordenar registros.

Ao processar automaticamente registros fora de sequência, a API APPLY CHANGES no Delta Live Tables garante o processamento correto de registros CDC e elimina a necessidade de desenvolver lógica complexa para lidar com registros fora de sequência.

A API APPLY CHANGES é compatível com as interfaces SQL e Python do Delta Live Tables, incluindo suporte para atualização de tabelas com SCD tipo 1 e tipo 2:

  • Use SCD tipo 1 para atualizar registros diretamente. O histórico não é retido para registros atualizados.

  • Use o SCD tipo 2 para manter um histórico de registros, em todas as atualizações ou em atualizações de um conjunto específico de colunas.

Para obter sintaxe e outras referências, consulte:

Observação

Este artigo descreve como atualizar tabelas em seu pipeline Delta Live Tables com base nas alterações nos dados de origem. Para saber como registrar e consultar informações de alteração em nível de linha para tabelas Delta, consulte Usar o feed de dados de alteração do Delta Lake no Databricks.

Como o CDC é implementado com o Delta Live Tables?

Você deve especificar uma coluna nos dados de origem na qual sequenciar registros, o que o Delta Live Tables interpreta como uma representação crescente e monótona da ordenação adequada dos dados de origem.As Delta Live Tables lidam automaticamente com os dados que chegam fora do pedido. Para alterações do tipo SCD 2, as Delta Live Tables propagam os valores de sequenciamento apropriados para as colunas __START_AT e __END_AT da tabela de destino. Deve haver uma atualização distinta por chave em cada valor de sequenciamento, e os valores de sequenciamento NULL não são suportados.

Para executar o processamento de CDC com Delta Live Tables, primeiro crie uma tabela de transmissão e, em seguida, use uma instrução APPLY CHANGES INTO para especificar a origem, key e a sequência do feed de alteração. Para criar a tabela de transmissão de destino, use a instrução CREATE OR REFRESH STREAMING TABLE em SQL ou a função create_streaming_table() em Python. Para criar a instrução que define o processamento CDC, use a instrução APPLY CHANGES em SQL ou a função apply_changes() em Python. Para obter detalhes de sintaxe, consulte captura de dados de alterações (CDC) with SQL in Delta Live Tables ou captura de dados de alterações (CDC) with Python in Delta Live Tables.

Quais objetos de dados são usados para o processamento do Delta Live Tables CDC?

Quando você declara a tabela de destino no Hive metastore, são criadas duas estruturas de dados:

  • Uma visualização usando o nome atribuído à tabela de destino.

  • Uma tabela de apoio interna utilizada pelas Delta Live Tables para gerenciar o processamento de CDC. Essa tabela é nomeada aplicando-se o prefixo __apply_changes_storage_ no nome da tabela de destino.

Por exemplo, se você declarar uma tabela de destino chamada dlt_cdc_target, verá uma visualização chamada dlt_cdc_target e uma tabela chamada __apply_changes_storage_dlt_cdc_target no metastore. A criação de uma view possibilita que as Delta Live Tables filtrem as informações extras (por exemplo, tombstones e versões) necessárias para lidar com dados fora de ordem. Para ver os dados processados, consulte a view de destino. Como o esquema da tabela do __apply_changes_storage_ pode mudar para suportar futuros recursos ou melhorias, você não deve consultar a tabela para uso de produção. Se você adicionar dados manualmente à tabela, os registros serão assumidos antes de outras alterações, pois as colunas da versão estão faltando.

Se um pipeline for publicado no Unity Catalog, as tabelas de suporte internas não poderão ser acessadas pelos usuários.

Obtenha dados sobre registros processados por uma consulta Delta Live Tables CDC

As seguintes métricas são capturadas pela query apply changes:

  • num_upserted_rows: o número de linhas de saída inseridas no dataset durante uma atualização.

  • num_deleted_rows: o número de linhas de saída existentes excluídas do dataset durante uma atualização.

As num_output_rows métricas, que são geradas para fluxos não CDC, não são capturadas para apply changes query.

Limitações

O destino da query APPLY CHANGES INTO ou da função apply_changes não pode ser utilizado como origem para uma tabela de transmissão. Uma tabela que lê o destino de uma query APPLY CHANGES INTO ou função apply_changes deve ser uma view materializada.

SCD tipo 1 e SCD tipo 2 no Databricks

As seções a seguir fornecem exemplos que demonstram as consultas Delta Live Tables SCD tipo 1 e tipo 2 que atualizam tabelas de destino com base em eventos de origem que:

  1. Criam novos registros de usuários.

  2. Excluem um registro de usuário.

  3. Atualizar registros de usuários. No exemplo do SCD tipo 1, as últimas UPDATE operações chegam atrasadas e são descartadas da tabela de destino, demonstrando o tratamento de eventos fora de ordem.

Os exemplos a seguir pressupõem familiaridade com a configuração e atualização de pipelines do Delta Live Tables. Consulte Tutorial: Execute seu primeiro pipeline das Delta Live Tables.

Para executar esses exemplos, você deve começar criando um dataset de amostra. Consulte Gerar dados de teste.

A seguir encontram-se registros de entrada para esses exemplos:

userId

name

city

operation

sequenceNum

124

Raul

Oaxaca

INSERT

1

123

Isabel

Monterrey

INSERT

1

125

Mercedes

Tijuana

INSERT

2

126

Lily

Cancun

INSERT

2

123

null

null

DELETE

6

125

Mercedes

Guadalajara

UPDATE

6

125

Mercedes

Mexicali

UPDATE

5

123

Isabel

Chihuahua

UPDATE

5

Se você cancelar o comentário da linha final nos dados de exemplo, ele inserirá o seguinte registro que especifica onde os registros devem ser truncados:

userId

name

city

operation

sequenceNum

null

null

null

TRUNCATE

3

Observação

Todos os exemplos seguintes incluem opções para especificar as operações do DELETE e TRUNCATE, mas cada uma delas é opcional.

Processar atualizações do SCD tipo 1

O seguinte exemplo de código demonstra atualizações de processamento do SCD tipo 1:

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

Após executar o exemplo do SCD tipo 1, a tabela de destino contém os seguintes registros:

userId

name

city

124

Raul

Oaxaca

125

Mercedes

Guadalajara

126

Lily

Cancun

Depois de executar o exemplo do SCD tipo 1 com o registro TRUNCATE adicional, os registros 124 e 126 são truncados devido à operação TRUNCATE em sequenceNum=3, e a tabela de destino contém o seguinte registro:

userId

name

city

125

Mercedes

Guadalajara

Processar atualizações do SCD tipo 2

O seguinte exemplo de código demonstra atualizações de processamento do SCD tipo 2:

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

Após executar o exemplo do SCD tipo 2, a tabela de destino contém os seguintes registros:

userId

name

city

__START_AT

__END_AT

123

Isabel

Monterrey

1

5

123

Isabel

Chihuahua

5

6

124

Raul

Oaxaca

1

null

125

Mercedes

Tijuana

2

5

125

Mercedes

Mexicali

5

6

125

Mercedes

Guadalajara

6

null

126

Lily

Cancun

2

null

Uma query SCD tipo 2 também pode especificar um subconjunto de colunas de saída a serem rastreadas para história na tabela de destino. As alterações em outras colunas são atualizadas no local, em vez de gerar novos registros de história. O exemplo a seguir demonstra a exclusão da coluna city do acompanhamento:

O exemplo a seguir demonstra o uso do histórico de rastreamento com o SCD tipo 2:

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

Depois de executar este exemplo sem o registro TRUNCATE adicional, a tabela de destino conterá os seguintes registros:

userId

name

city

__START_AT

__END_AT

123

Isabel

Chihuahua

1

6

124

Raul

Oaxaca

1

null

125

Mercedes

Guadalajara

2

null

126

Lily

Cancun

2

null

Gerar dados de teste

O código abaixo é fornecido para gerar um dataset de exemplo para utilizar nas consultas de exemplo presentes neste tutorial. Supondo que você tenha as credenciais apropriadas para criar um novo esquema e criar uma nova tabela, você pode executar essas instruções com um notebook ou Databricks SQL. O seguinte código não se destina a ser executado como parte de um pipeline do Delta Live Tables:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

Adicionar, alterar ou excluir dados em uma tabela de streaming de destino

Se o pipeline publicar tabelas no Unity Catalog, você poderá usar instruções de linguagem de manipulação de dados (DML), incluindo instruções de inserção, atualização, exclusão e merge , para modificar as tabelas de transmissão de destino criadas por instruções APPLY CHANGES INTO.

Observação

  • As declarações DML que modificam o esquema de tabela de uma tabela de streaming não são suportadas. Certifique-se de que suas instruções DML não tentem evoluir o esquema da tabela.

  • As instruções DML que atualizam uma tabela de streaming podem ser executadas somente em um cluster compartilhado do Unity Catalog ou em um SQL warehouse usando o Databricks Runtime 13.1e acima.

  • Como a transmissão requer fonte de dados somente anexada, se o seu processamento exigir transmissão de uma tabela de transmissão de origem com alterações (por exemplo, por instruções DML), defina o sinalizador skipChangeCommits ao ler a tabela de transmissão de origem. Quando skipChangeCommits é definido, as transações que excluem ou modificam registros na tabela de origem são ignoradas. Caso o seu processamento não necessite de uma tabela de transmissão, você pode utilizar uma view materializada (que não possui a restrição somente de acréscimo) como tabela de destino.

Como o Delta Live Tables usa uma coluna SEQUENCE BY especificada e propaga valores de sequenciamento apropriados para as colunas __START_AT e __END_AT da tabela de destino (para SCD tipo 2), você deve garantir que as instruções DML usem valores válidos para essas colunas para manter a ordem adequada dos registros. Consulte Como o CDC é implementado com Delta Live Tables?.

Para obter mais informações sobre o uso de instruções DML com tabelas de transmissão, consulte Adicionar, alterar ou excluir dados em uma tabela de transmissão.

O exemplo seguinte insere um registro ativo com uma sequência inicial de 5:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);