Leituras e gravações de transmissão da tabela Delta

O Delta Lake está profundamente integrado ao Spark Structured Streaming por meio de readStream e writeStream. O Delta Lake supera muitas das limitações normalmente associadas aos sistemas e arquivos de transmissão, incluindo:

  • Aglutinação de pequenos arquivos produzidos por ingestão de baixa latência.

  • Manter processamento “exatamente uma vez” com mais de uma transmissão (ou trabalho simultâneo em lote).

  • Descobrir com eficiência quais arquivos são novos ao usar arquivos como fonte para uma transmissão.

Tabela Delta como fonte

a transmissão estruturada lê tabelas Delta de forma incremental. Enquanto uma query de transmissão está ativa em uma tabela Delta, novos registros são processados de forma idempotente à medida que novas versões da tabela commit na tabela de origem.

Os exemplos de código a seguir mostram a configuração de uma leitura de transmissão usando o nome da tabela ou o caminho do arquivo.

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")
spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Importante

Se o esquema de uma tabela Delta mudar após o início de uma leitura de transmissão na tabela, a consulta falhará. Para a maioria das alterações de esquema, você pode reiniciar a transmissão para resolver a incompatibilidade de esquema e continuar o processamento.

Em Databricks Runtime 12.2 LTS e abaixo, o senhor não pode fazer a transmissão de uma tabela Delta com mapeamento de coluna ativado que tenha passado por uma evolução não aditiva do esquema, como renomear ou eliminar colunas. Para obter detalhes, consulte transmissão com mapeamento de coluna e alterações de esquema.

Limitar taxa de entrada

As seguintes opções estão disponíveis para controlar micro-batches:

  • maxFilesPerTrigger: quantos arquivos novos devem ser considerados em cada micro-batch. O padrão é 1000.

  • maxBytesPerTrigger: Quantos dados são processados em cada micro-batch. Essa opção define um "soft max", o que significa que um lote processa aproximadamente essa quantidade de dados e pode processar mais do que o limite para fazer a consulta de transmissão avançar nos casos em que a menor unidade de entrada é maior que esse limite. Isso não é definido por padrão.

Se você utilizar o maxBytesPerTrigger em conjunto com o maxFilesPerTrigger, o micro-batch processará dados até que o limite de maxFilesPerTrigger ou maxBytesPerTrigger seja atingido.

Observação

Nos casos em que as transações da tabela de origem são limpas devido à configuração logRetentionDuration e a query de transmissão tenta processar essas versões, por default a query falha em evitar a perda de dados. Você pode definir a opção failOnDataLoss como false para ignorar dados perdidos e continuar o processamento.

Transmitir um feed de captura de dados de alteração de Delta Lake (CDC)

O Delta Lake altera o feed de dados de registros de alterações em uma tabela Delta, incluindo atualizações e exclusões. Quando habilitado, você pode transmitir de um feed de dados de alteração e lógica de gravação para processar inserções, atualizações e exclusões em tabelas downstream. Embora a saída de dados do feed de dados de alteração seja ligeiramente diferente da tabela Delta descrita, isso apresenta uma solução para propagar alterações incrementais em tabelas downstream em uma arquitetura medalhão.

Importante

Em Databricks Runtime 12.2 LTS e abaixo, o senhor não pode transmitir a partir do feed de dados de alteração para uma tabela Delta com mapeamento de coluna ativado que passou por uma evolução não aditiva do esquema, como renomear ou eliminar colunas. Veja a transmissão com mapeamento de colunas e alterações no esquema.

Ignorar atualizações e exclusões

O transmissão estruturada não trata a entradas que não forem acréscimos e lança uma exceção se ocorrerem modificações na tabela que estiver sendo usada como fonte. Há duas estratégias principais para lidar com alterações que não podem ser propagadas automaticamente downstream:

  • Você pode excluir a saída e o ponto de verificação e reiniciar a transmissão desde o início.

  • Você pode definir uma destas duas opções:

    • ignoreDeletes: ignora transações que excluem dados nos limites da partição.

    • skipChangeCommits: ignora transações que excluam ou modifiquem registros existentes. skipChangeCommits subsume ignoreDeletes.

Observação

Em Databricks Runtime 12.2 LTS e acima, skipChangeCommits substitui a configuração anterior ignoreChanges. No Databricks Runtime 11.3 LTS e versões inferiores, ignoreChanges é a única opção suportada.

A semântica de ignoreChanges difere muito de skipChangeCommits. Com ignoreChanges ativado, os arquivos de dados reescritos na tabela de origem são reemitidos após uma operação de alteração de dados, como UPDATE, MERGE INTO, DELETE (dentro de partições) ou OVERWRITE. As linhas inalteradas geralmente são emitidas junto com as novas linhas, portanto os consumidores downstream devem ser capazes de lidar com as duplicidades. As exclusões não são propagadas downstream. ignoreChanges subsume ignoreDeletes.

skipChangeCommits ignora totalmente as operações de alteração de arquivos. Os arquivos de dados reescritos na tabela de origem devido à operação de alteração de dados como UPDATE, MERGE INTO, DELETE e OVERWRITE são ignorados completamente. Para refletir as alterações nas tabelas de origem upstream, você deve implementar lógica separada para propagar essas alterações.

As cargas de trabalho configuradas com ignoreChanges continuam a operar usando semântica conhecida, mas a Databricks recomenda usar skipChangeCommits para todas as novas cargas de trabalho. A migração de cargas de trabalho usando ignoreChanges para skipChangeCommits requer lógica de refatoração.

Exemplo

Por exemplo, suponha que você tenha uma tabela user_events com colunas date, user_email e action particionada por date. Você sai da tabela user_events e precisa excluir dados dela devido ao GDPR.

Quando você exclui nos limites da partição (ou seja, WHERE está em uma coluna de partição), os arquivos já estão segmentados por valor, portanto, a exclusão apenas remove esses arquivos dos metadados. Ao excluir uma partição inteira de dados, você pode usar o seguinte:

spark.readStream.format("delta")
  .option("ignoreDeletes", "true")
  .load("/tmp/delta/user_events")

Se você excluir dados em diversas partições (neste exemplo, filtrando em user_email), use a seguinte sintaxe:

spark.readStream.format("delta")
  .option("skipChangeCommits", "true")
  .load("/tmp/delta/user_events")

Se você atualizar um user_email com a instrução UPDATE , o arquivo que contém o user_email em questão será reescrito. Use skipChangeCommits para ignorar os arquivos de dados alterados.

Especificar posição inicial

Você pode usar as opções a seguir para especificar o ponto de partida da fonte de transmissão do Delta Lake sem processar a tabela inteira.

  • startingVersion: A versão Delta Lake para começar. A Databricks recomenda omitir esta opção para a maioria das cargas de trabalho. Quando não definida, a transmissão começará a partir da última versão disponível incluindo um Snapshot completo da tabela naquele momento.

    Se especificado, a transmissão lê todas as alterações na tabela Delta começando pela versão especificada (inclusive). Se a versão especificada não estiver mais disponível, a transmissão não começará. Você pode obter as versões commit na coluna version da saída do comando DESCRIBE história .

    Para retornar apenas as alterações mais recentes, especifique latest.

  • startingTimestamp: O timestamp de onde começar. Todas as alterações de tabela confirmadas no timestamp ou depois dele (inclusive) são lidas pelo leitor de streaming. Se o timestamp fornecido for anterior a todos os commits da tabela, a leitura de streaming começará com o timestamp mais antigo disponível. Um de:

    • Uma sequência de carimbo de data/hora. Por exemplo, "2019-01-01T00:00:00.000Z".

    • Uma string de datas. Por exemplo, "2019-01-01".

Você não pode definir as duas opções ao mesmo tempo. Eles entram em vigor somente ao iniciar uma nova query de transmissão. Se uma query de transmissão tiver começado e o progresso tiver sido registrado em seu checkpoint, essas opções serão ignoradas.

Importante

Embora você possa iniciar a fonte de transmissão a partir de uma versão específica ou carimbo de data/hora, o esquema da fonte de transmissão é sempre o esquema mais recente da tabela Delta. Você deve garantir que não haja nenhuma alteração de esquema incompatível na tabela Delta após a versão especificada ou o carimbo de data/hora. Caso contrário, a fonte de transmissão poderá retornar resultados incorretos ao ler os dados com um esquema incorreto.

Exemplo

Por exemplo, suponha que você tenha uma tabela user_events. Se quiser ler as alterações desde a versão 5, use:

spark.readStream.format("delta")
  .option("startingVersion", "5")
  .load("/tmp/delta/user_events")

Se quiser ler as alterações desde 18-10-2018, use:

spark.readStream.format("delta")
  .option("startingTimestamp", "2018-10-18")
  .load("/tmp/delta/user_events")

Processar o snapshot inicial sem que os dados sejam descartados

Observação

Esse recurso está disponível em Databricks Runtime 11.3 LTS e acima. Esse recurso está em Public Preview.

Ao usar uma tabela Delta como fonte de transmissão, a consulta primeiro processa todos os dados presentes na tabela. A tabela Delta desta versão é chamada de snapshot inicial. Por padrão, os arquivos de dados da tabela Delta são processados com base em qual arquivo foi modificado pela última vez. No entanto, a hora da última modificação não representa necessariamente a ordem de tempo do evento de registro.

Em uma consulta de transmissão estável com uma marca d'água definida, o processamento de arquivos por tempo de modificação pode resultar em registros processados na ordem incorreta. Isso pode fazer com que os registros sejam descartados como eventos tardios pela marca d'água.

Você pode evitar o problema de perda de dados ativando a seguinte opção:

  • withEventTimeOrder: se o snapshot inicial deve ser processado com a ordem do horário do evento.

Com a ordem de tempo do evento ativada, o intervalo de tempo do evento dos dados do snapshot inicial é dividido em intervalos de tempo. Cada micro lote processa um bloco filtrando dados dentro do intervalo de tempo. As opções de configuração maxFilesPerTrigger e maxBytesPerTrigger ainda são aplicáveis para controlar o tamanho do microbatch, mas apenas de forma aproximada devido à natureza do processamento.

O gráfico abaixo mostra esse processo:

Snapshot inicial

Informações importantes sobre esse recurso:

  • O problema de perda de dados só acontece quando o snapshot Delta inicial de uma consulta de transmissão com monitoração de estado é processado na ordem padrão.

  • Você não pode alterar withEventTimeOrder depois que a consulta de stream é iniciada enquanto o snapshot inicial ainda estiver sendo processado. Para reiniciar com withEventTimeOrder alterado, você precisa excluir o ponto de verificação.

  • Se você estiver executando uma consulta de fluxo com EventTimeOrder habilitado, não poderá fazer o downgrade para uma versão DBR que não ofereça suporte a esse recurso até que o processamento inicial do snapshot seja concluído. Se você precisar fazer downgrade, poderá aguardar a conclusão do snapshot inicial ou excluir o ponto de verificação e reiniciar a consulta.

  • Esse recurso não é suportado nos seguintes cenários incomuns:

    • A coluna de tempo do evento é uma coluna gerada e há transformações sem projeção entre a fonte Delta e a marca d'água.

    • Há uma marca d'água que tem mais de uma fonte Delta na consulta de fluxo.

  • Com a ordem de tempo do evento habilitada, o desempenho do processamento de snapshot inicial do Delta pode ser mais lento.

  • Cada microlote verifica o Snapshot inicial para filtrar dados dentro do intervalo de tempo do evento correspondente. Para uma ação de filtro mais rápida, é aconselhável usar uma coluna de origem Delta como a hora do evento para que o salto de dados possa ser aplicado (verifique o salto de dados para Delta Lake para saber quando é aplicável). Além disso, o particionamento da tabela ao longo da coluna de horário do evento pode acelerar ainda mais o processamento. Você pode verificar Spark UI para ver quantos arquivos delta são verificados para microlotes específicos.

Exemplo

Suponha que você tenha uma tabela user_events com uma coluna event_time. Sua consulta de transmissão é uma consulta de agregação. Se quiser garantir que nenhum dado seja perdido durante o processamento inicial do snapshot, você pode usar:

spark.readStream.format("delta")
  .option("withEventTimeOrder", "true")
  .load("/tmp/delta/user_events")
  .withWatermark("event_time", "10 seconds")

Observação

Você também pode habilitar isso com a configuração Spark no cluster que se aplicará a todas as consultas de transmissão: spark.databricks.delta.withEventTimeOrder.enabled true

Mesa Delta como pia

Você também pode gravar dados em uma tabela Delta usando o a transmissão estruturada. O registro de transações permite que o Delta Lake garanta o processamento de uma única vez, mesmo se houver outros fluxos ou consultas em lote sendo executados simultaneamente na tabela.

Observação

A função Delta Lake VACUUM remove todos os arquivos não gerenciados pelo Delta Lake, mas ignora todos os diretórios que se começam com _. Você pode armazenar pontos de verificação com segurança ao lado de outros dados e metadados para uma tabela Delta usando uma estrutura de diretórios como <table-name>/_checkpoints.

Métricas

Você pode descobrir o número de bytes e o número de arquivos ainda a serem processados em um processo de consulta de transmissão como a métrica numFilesOutstanding e numBytesOutstanding. Outras métricas são:

  • numNewListedFiles: Número de arquivos Delta Lake que foram listados para calcular a lista de pendências desse lote.

    • backlogEndOffset: A versão da tabela usada para calcular o backlog.

Se estiver executando a transmissão em um notebook, você poderá ver essas métricas na guia Raw Data (Dados brutos) no painel de progresso da consulta de fluxo:

{
  "sources" : [
    {
      "description" : "DeltaSource[file:/path/to/source]",
      "metrics" : {
        "numBytesOutstanding" : "3456",
        "numFilesOutstanding" : "8"
      },
    }
  ]
}

Modo anexar

Por padrão, os fluxos são executados no modo Anexar, o que adiciona novos registros à tabela.

Você pode usar o método de caminho:

(events.writeStream
   .format("delta")
   .outputMode("append")
   .option("checkpointLocation", "/tmp/delta/_checkpoints/")
   .start("/delta/events")
)
events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .start("/tmp/delta/events")

ou o método toTable , como segue:

(events.writeStream
   .format("delta")
   .outputMode("append")
   .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
   .toTable("events")
)
events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .toTable("events")

Modo completo

Você também pode usar o Structured Streaming para substituir toda a tabela por cada lote. Um exemplo de caso de uso é calcular um resumo usando agregação:

(spark.readStream
  .format("delta")
  .load("/tmp/delta/events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .start("/tmp/delta/eventsByCustomer")
)
spark.readStream
  .format("delta")
  .load("/tmp/delta/events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .start("/tmp/delta/eventsByCustomer")

O exemplo anterior atualiza continuamente uma tabela que contém o número agregado de eventos por cliente.

Para aplicativos com requisitos de latência mais leniente, você pode economizar recursos de computação com gatilhos únicos. Utilize-os para atualizar tabelas de agregação de resumo em um determinado cronograma, processando apenas os novos dados que chegaram desde a última atualização.

Executando junções estáticas de fluxo

Você pode confiar nas garantias transacionais e no protocolo de controle de versão do Delta Lake para realizar uniões de fluxo estático. Uma junção de fluxo estático une a versão válida mais recente de uma tabela Delta (os dados estáticos) a um fluxo de dados com uma junção sem estado.

Quando o Databricks processa um micro-batch de dados em uma união estática de fluxo, a última versão válida dos dados da tabela Delta estática se une aos registros presentes no micro-batch atual. Como a união é sem estado, você não precisa configurar a marca d'água e pode processar os resultados com baixa latência. Os dados na tabela Delta estática usada na união devem estar mudando lentamente.

streamingDF = spark.readStream.table("orders")
staticDF = spark.read.table("customers")

query = (streamingDF
  .join(staticDF, streamingDF.customer_id==staticDF.id, "inner")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .table("orders_with_customer_info")
)

Upsert a partir de consultas de transmissão com foreachBatch

Você pode utilizar uma combinação de merge e foreachBatch para escrever upserts complexos de uma consulta de transmissão em uma tabela Delta. Consulte Usar foreachBatch para gravar em bancos de dados arbitrárias.

Esse padrão tem muitos aplicativos, incluindo o seguinte:

  • Escrever agregados de transmissão no modo de atualização: isso é muito mais eficiente que o modo completo.

  • Gravar uma transmissão de alterações do banco de dados em uma tabela Delta: A query de mesclagem para gravar dados alterados pode ser usada em foreachBatch para aplicar continuamente uma transmissão de alterações a uma tabela Delta.

  • Grave uma transmissão de dados na tabela Delta com desduplicação: a query mesclagem somente de inserção para desduplicação pode ser usada em foreachBatch para gravar dados continuamente (com duplicatas) em uma tabela Delta com desduplicação automática.

Observação

  • Faça com que sua instrução merge dentro de foreachBatch esteja idempotente, pois as reinicializações da consulta de transmissão podem aplicar a operação no mesmo lote de dados várias vezes.

  • Quando merge é usado em foreachBatch, a taxa de dados de entrada da consulta de transmissão (informada por StreamingQueryProgress e visível no gráfico de taxa de notebook) pode ser informada como um múltiplo da taxa real em que os dados são gerados na fonte. Isso ocorre porque merge lê os dados de entrada várias vezes, fazendo com que as métricas de entrada sejam multiplicadas. Se isso for um gargalo, você pode armazenar em cache o DataFrame em lote antes de merge e depois desarmazená-lo. merge

O seguinte exemplo demonstra como você pode utilizar SQL dentro do foreachBatch para realizar esta tarefa:

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  // Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  // Use the view name to apply MERGE
  // NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
  microBatchOutputDF.sparkSession.sql(s"""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  # Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  # Use the view name to apply MERGE
  # NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe

  # In Databricks Runtime 10.5 and below, you must use the following:
  # microBatchOutputDF._jdf.sparkSession().sql("""
  microBatchOutputDF.sparkSession.sql("""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

Você também pode optar por usar as APIs do Delta Lake para executar transmissão de upserts, como no exemplo a seguir:

import io.delta.tables.*

val deltaTable = DeltaTable.forPath(spark, "/data/aggregates")

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  deltaTable.as("t")
    .merge(
      microBatchOutputDF.as("s"),
      "s.key = t.key")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/data/aggregates")

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  (deltaTable.alias("t").merge(
      microBatchOutputDF.alias("s"),
      "s.key = t.key")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
  )

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

A tabela idempotente grava foreachBatch

Observação

A Databricks recomenda configurar uma gravação de transmissão separada para cada pia que você deseja atualizar. Usar foreachBatch para gravar em várias tabelas serializa as gravações, o que reduz o paralelismo e aumenta a latência geral.

As tabelas Delta suportam as seguintes opções DataFrameWriter para tornar as gravações em diversas tabelas dentro de foreachBatch idempotentes:

  • txnAppId: uma strings exclusiva que você pode transmitir em cada gravação do DataFrame. Por exemplo, você pode usar o ID StreamingQuery como txnAppId.

  • txnVersion: um número crescente monotonicamente que atua como versão da transação.

Delta Lake usa a combinação de txnAppId e txnVersion para identificar gravações duplicadas e ignorá-las.

Se uma gravação de lotes for interrompida com falha, a nova execução dos lotes usará o mesmo aplicativo e ID de lotes para ajudar o tempo de execução a identificar corretamente gravações duplicadas e ignorá-las. O ID do aplicativo (txnAppId) pode ser qualquer strings exclusiva gerada pelo usuário e não precisa estar relacionado ao ID de transmissão. Consulte Usar foreachBatch para gravar em coletores de dados arbitrários.

Aviso

Se você excluir o checkpoint de transmissão e reiniciar a query com um novo checkpoint, deverá fornecer um txnAppId diferente. Novos pontos de verificação começam com um ID de lote de 0. Delta Lake usa o ID do lote e txnAppId como uma key exclusiva e ignora lotes com valores já vistos.

O exemplo de código a seguir demonstra esse padrão:

app_id = ... # A unique string that is used as an application ID.

def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2

streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()
val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 1
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 2
}