Trabalhar com o histórico da tabela Delta Lake
Cada operação que modifica uma tabela Delta Lake cria uma nova versão da tabela. Você pode usar as informações do histórico para auditar operações, reverter uma tabela ou consultar uma tabela em um ponto específico no tempo usando a viagem do tempo.
Observação
O Databricks não recomenda o uso do histórico de tabelas do Delta Lake como uma solução de backup de longo prazo para arquivamento de dados. O Databricks recomenda usar apenas os últimos sete dias para operações de viagem do tempo, a menos que você tenha definido as configurações de retenção de dados e de log para um valor maior.
Recuperar histórico de uma tabela Delta
Você pode recuperar informações, incluindo as operações, o usuário e o carimbo de data/hora de cada gravação em uma tabela Delta executando o comando history
. As operações são retornadas em ordem cronológica inversa.
A retenção do histórico da tabela é determinada pela configuração da tabela delta.logRetentionDuration
, que é de 30 dias por padrão.
Observação
viagem do tempo e table história são controladas por diferentes limiares de retenção. Veja O que é Delta Lake viagem do tempo?.
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
Para obter detalhes de sintaxe do Spark SQL, consulte DESCRIBE HISTORY.
Consulte a documentação da API Delta Lake para obter detalhes da sintaxe Scala/Java/Python.
O Catalog Explorer fornece uma view visual desta informação detalhada da tabela e da história das tabelas Delta. Além do esquema da tabela e dos dados de amostra, você pode clicar na história tab para ver a tabela história exibida com DESCRIBE HISTORY
.
Esquema de histórico
A saída da operação history
tem as seguintes colunas.
Coluna |
Tipo |
Descrição |
---|---|---|
version |
long |
Versão da tabela gerada pela operação. |
timestamp |
timestamp |
Quando essa versão foi confirmada. |
userId |
string |
ID do usuário que executou a operação. |
userName |
string |
Nome do usuário que executou a operação. |
operation |
string |
Nome da operação. |
operationParameters |
map |
Parâmetros da operação (por exemplo, predicados.) |
job |
struct |
Detalhes do job que executou a operação. |
notebook |
struct |
Detalhes do notebook a partir do qual a operação foi executada. |
clusterId |
string |
ID do cluster no qual a operação foi executada. |
readVersion |
long |
Versão da tabela que foi lida para realizar a operação de gravação. |
isolationLevel |
string |
Nível de isolamento usado para essa operação. |
isBlindAppend |
boolean |
Se essa operação anexou dados. |
operationMetrics |
map |
Métricas da operação (por exemplo, número de linhas e arquivos modificados). |
userMetadata |
string |
Metadados de confirmação definidos pelo usuário, se tiverem sido especificados |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
Observação
Algumas das outras colunas não estarão disponíveis se você gravar em uma tabela Delta usando os seguintes métodos:
As colunas adicionadas no futuro serão sempre adicionadas após a última coluna.
Chaves de métricas de operações
A operação history
retorna uma coleção de métricas de operações no mapa de colunas operationMetrics
.
As tabelas a seguir listam as principais definições do mapa por operação.
Operação |
Nome da métrica |
Descrição |
---|---|---|
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO |
||
numFiles |
Número de arquivos gravados. |
|
numOutputBytes |
Tamanho em bytes do conteúdo gravado. |
|
numOutputRows |
Número de linhas gravadas. |
|
STREAMING UPDATE |
||
numAddedFiles |
Número de arquivos adicionados. |
|
numRemovedFiles |
Número de arquivos removidos. |
|
numOutputRows |
Número de linhas gravadas. |
|
numOutputBytes |
Tamanho da gravação em bytes. |
|
DELETE |
||
numAddedFiles |
Número de arquivos adicionados. Não fornecido quando as partições da tabela são excluídas. |
|
numRemovedFiles |
Número de arquivos removidos. |
|
numDeletedRows |
Número de linhas removidas. Não fornecido quando as partições da tabela são excluídas. |
|
numCopiedRows |
Número de linhas copiadas no processo de exclusão de arquivos. |
|
executionTimeMs |
Tempo gasto para executar toda a operação. |
|
scanTimeMs |
Tempo gasto para verificar os arquivos em busca de correspondências. |
|
rewriteTimeMs |
Tempo gasto para regravar os arquivos correspondentes. |
|
TRUNCATE |
||
numRemovedFiles |
Número de arquivos removidos. |
|
executionTimeMs |
Tempo gasto para executar toda a operação. |
|
MERGE |
||
numSourceRows |
Número de linhas no DataFrame de origem. |
|
numTargetRowsInserted |
Número de linhas inseridas na tabela de destino. |
|
numTargetRowsUpdated |
Número de linhas atualizadas na tabela de destino. |
|
numTargetRowsDeleted |
Número de linhas excluídas na tabela de destino. |
|
numTargetRowsCopied |
Número de linhas de destino copiadas. |
|
numOutputRows |
Número total de linhas gravadas. |
|
numTargetFilesAdded |
Número de arquivos adicionados ao coletor (destino). |
|
numTargetFilesRemoved |
Número de arquivos removidos do coletor (destino). |
|
executionTimeMs |
Tempo gasto para executar toda a operação. |
|
scanTimeMs |
Tempo gasto para verificar os arquivos em busca de correspondências. |
|
rewriteTimeMs |
Tempo gasto para regravar os arquivos correspondentes. |
|
UPDATE |
||
numAddedFiles |
Número de arquivos adicionados. |
|
numRemovedFiles |
Número de arquivos removidos. |
|
numUpdatedRows |
Número de linhas atualizadas. |
|
numCopiedRows |
O número de linhas que acabaram de ser copiadas no processo de atualização de arquivos. |
|
executionTimeMs |
Tempo gasto para executar toda a operação. |
|
scanTimeMs |
Tempo gasto para verificar os arquivos em busca de correspondências. |
|
rewriteTimeMs |
Tempo gasto para regravar os arquivos correspondentes. |
|
FSCK |
numRemovedFiles |
Número de arquivos removidos. |
CONVERT |
numConvertedFiles |
Número de arquivos Parquet que foram convertidos. |
OPTIMIZE |
||
numAddedFiles |
Número de arquivos adicionados. |
|
numRemovedFiles |
Número de arquivos otimizados. |
|
numAddedBytes |
Número de bytes adicionados depois que a tabela foi otimizada. |
|
numRemovedBytes |
Número de bytes removidos. |
|
minFileSize |
Tamanho do menor arquivo após a tabela ser otimizada. |
|
p25FileSize |
Tamanho do arquivo do 25º percentil após a tabela ser otimizada. |
|
p50FileSize |
Tamanho mediano do arquivo após a tabela ser otimizada. |
|
p75FileSize |
Tamanho do arquivo do 75º percentil após a tabela ser otimizada. |
|
maxFileSize |
Tamanho do maior arquivo após a tabela ser otimizada. |
|
CLONE |
||
sourceTableSize |
Tamanho em bytes da tabela de origem na versão clonada. |
|
sourceNumOfFiles |
Número de arquivos na tabela de origem na versão clonada. |
|
numRemovedFiles |
Número de arquivos removidos da tabela de destino se uma tabela Delta anterior tiver sido substituída. |
|
removedFilesSize |
Tamanho total em bytes dos arquivos removidos da tabela de destino se uma tabela Delta anterior tiver sido substituída. |
|
numCopiedFiles |
Número de arquivos que foram copiados para o novo local. 0 para clones rasos. |
|
copiedFilesSize |
Tamanho total em bytes dos arquivos que foram copiados para o novo local. 0 para clones rasos. |
|
RESTORE |
||
tableSizeAfterRestore |
Tamanho da tabela em bytes após a restauração. |
|
numOfFilesAfterRestore |
Número de arquivos na tabela após a restauração. |
|
numRemovedFiles |
Número de arquivos removidos pela operação de restauração. |
|
numRestoredFiles |
Número de arquivos adicionados como resultado da restauração. |
|
removedFilesSize |
Tamanho em bytes dos arquivos removidos pela restauração. |
|
restoredFilesSize |
Tamanho em bytes dos arquivos adicionados pela restauração. |
|
VACUUM |
||
numDeletedFiles |
Número de arquivos excluídos. |
|
numVacuumedDirectories |
Número de diretórios aspirados. |
|
numFilesToDelete |
Número de arquivos a serem excluídos. |
O que é viagem do tempo do Delta Lake?
A viagem do tempo do Delta Lake é compatível com a consulta de versões anteriores da tabela com base no carimbo de data/hora ou na versão da tabela (conforme registrado no log de transações). Você pode usar a viagem do tempo para aplicações como as seguintes:
Recriar análises, relatórios ou resultados (por exemplo, o resultado de um modelo do machine learning).Isso pode ser útil para depuração ou auditoria, especialmente em indústrias regulamentadas.
Escrever consultas temporais complexas.
Corrigir erros em seus dados.
Fornecer isolamento de instantâneos para um conjunto de consultas para tabelas que mudam rapidamente.
Importante
As versões de tabelas acessíveis com viagem do tempo são determinadas por uma combinação do limite de retenção para arquivos de log de transações e da frequência e retenção especificadas para operações VACUUM
.Se você executar VACUUM
diariamente com os valores padrão, sete dias de dados estarão disponíveis para viagem do tempo.
Sintaxe da viagem do tempo do Delta
Você consulta uma tabela Delta com viagem do tempo adicionando uma cláusula após a especificação do nome da tabela.
timestamp_expression
pode ser qualquer um dos seguintes:'2018-10-18T22:15:12.013Z'
, isto é, uma string que pode ser convertida em um carimbo de data/horacast('2018-10-18 13:36:32 CEST' as timestamp)
'2018-10-18'
, ou seja, uma string de datacurrent_timestamp() - interval 12 hours
date_sub(current_date(), 1)
Qualquer outra expressão que seja ou possa ser convertida em um carimbo de data/hora
version
é um valor longo que pode ser obtido da saída deDESCRIBE HISTORY table_spec
.
Nem timestamp_expression
nem version
podem ser subconsultas.
Somente strings de data ou carimbo de data/hora são aceitas. Por exemplo, "2019-01-01"
e "2019-01-01T00:00:00.000Z"
. Consulte o seguinte código para ver um exemplo de sintaxe:
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
Você também pode utilizar a sintaxe @
para especificar o carimbo de data/hora ou versão como parte do nome da tabela. O carimbo de data/hora deve estar no formato yyyyMMddHHmmssSSS
. Você pode especificar uma versão após @
precedendo um v
à versão. Consulte o seguinte código para ver um exemplo de sintaxe:
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
O que são pontos de verificação de logs de transações?
O Delta Lake registra as versões da tabela como arquivos JSON dentro do diretório _delta_log
, que é armazenado junto com os dados da tabela. Para otimizar a consulta dos pontos de verificação, o Delta Lake agrega as versões da tabela aos arquivos dos pontos de verificação do Parquet, evitando a necessidade de ler todas as versões do JSON do histórico da tabela. O Databricks otimiza a frequência de pontos de verificação para o tamanho dos dados e a carga de trabalho. Os usuários não devem precisar interagir diretamente com os pontos de verificação. A frequência dos pontos de verificação está sujeita a alterações sem aviso prévio.
Configurar a retenção de dados para consultas de viagem do tempo
Para consultar uma versão anterior da tabela, você deve reter tanto os arquivos de log quanto os dados dessa versão.
Os arquivos de dados são excluídos quando VACUUM
é executado em uma tabela. O Delta Lake gerencia a remoção automática dos arquivos de log após verificar as versões da tabela.
Como a maioria das tabelas Delta tem VACUUM
executado regularmente, as consultas pontuais devem respeitar o limite de retenção para VACUUM
, que é de sete dias por padrão.
Para aumentar o limite de retenção de dados para tabelas Delta, você deve configurar as seguintes propriedades da tabela:
delta.logRetentionDuration = "interval <interval>"
: controla por quanto tempo o histórico de uma tabela é mantido. O padrão éinterval 30 days
.delta.deletedFileRetentionDuration = "interval <interval>"
: determina que o limite que oVACUUM
utiliza para remover arquivos de dados não é mais referenciado na versão da tabela atual. O padrão éinterval 7 days
.
Você pode especificar as propriedades da Delta durante a criação da tabela ou defini-las com uma instrução ALTER TABLE
. Consulte a referência das propriedades da tabela Delta.
Observação
Você deve definir essas duas propriedades para garantir que o histórico da tabela seja mantido por mais tempo para tabelas com operações VACUUM
frequentes. Por exemplo, para acessar 30 dias de dados históricos, configure delta.deletedFileRetentionDuration = "interval 30 days"
(que corresponde à configuração padrão para delta.logRetentionDuration
).
Aumentar o limite de retenção de dados pode fazer com que seus custos de armazenamento subam, pois mais arquivos de dados são mantidos.
Restaurar uma tabela Delta para um estado anterior
Você pode restaurar uma tabela Delta para seu estado anterior usando o comando RESTORE
. Uma tabela Delta mantém internamente versões históricas da tabela que permitem que ela seja restaurada para um estado anterior. Uma versão correspondente ao estado anterior ou um carimbo de data/hora de quando o estado anterior foi criado são compatíveis como opções pelo comando RESTORE
.
Importante
Você pode restaurar uma tabela já restaurada.
Você pode restaurar uma tabela clonada.
Você deve ter permissão para
MODIFY
na tabela que está sendo restaurada.Você não pode restaurar uma tabela para uma versão mais antiga onde os arquivos de dados foram excluídos manualmente ou pelo
vacuum
. A restauração para essa versão ainda é possível parcialmente sespark.sql.files.ignoreMissingFiles
estiver definido comotrue
.O formato do carimbo de data/hora para restaurar para um estado anterior é
yyyy-MM-dd HH:mm:ss
. Também há compatibilidade com o fornecimento de apenas uma string de data (yyyy-MM-dd
).
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
Para obter detalhes da sintaxe, consulte RESTORE.
Importante
A restauração é considerada uma operação de alteração de dados. As entradas de log do Delta Lake adicionadas pelo comando RESTORE
contêm dataChange definido como true. Se houver um aplicativo downstream, como um job de transmissão estruturada que processa as atualizações de uma tabela Delta Lake, as entradas do log de alterações de dados adicionadas pela operação de restauração serão consideradas como novas atualizações de dados, e o processamento delas poderá resultar em dados duplicados.
Por exemplo:
Versão da tabela |
Operação |
Atualizações de log da Delta |
Registros em atualizações de log de alterações de dados |
---|---|---|---|
0 |
INSERT |
AddFile(/path/to/file-1, dataChange = true) |
(name = Viktor, age = 29, (name = George, age = 55) |
1 |
INSERT |
AddFile(/path/to/file-2, dataChange = true) |
(name = George, age = 39) |
2 |
OPTIMIZE |
AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) |
(Sem registros, pois a compactação do Optimize não altera os dados na tabela) |
3 |
RESTORE(version=1) |
RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) |
(name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39) |
No exemplo anterior, o comando RESTORE
resulta em atualizações que já foram vistas ao ler a tabela Delta versão 0 e 1. Se uma query de transmissão estiver lendo essa tabela, esses arquivos serão considerados como dados recém-adicionados e serão processados novamente.
Restaurar métricas
RESTORE
informa as seguintes métricas como um DataFrame de uma única linha quando a operação é concluída:
table_size_after_restore
: O tamanho da tabela após a restauração.num_of_files_after_restore
: O número de arquivos na tabela após a restauração.num_removed_files
: Número de arquivos removidos (excluídos logicamente) da tabela.num_restored_files
: número de arquivos restaurados devido à reversão.removed_files_size
: Tamanho total em bytes dos arquivos removidos da tabela.restored_files_size
: Tamanho total em bytes dos arquivos restaurados.
Exemplos de uso da viagem do tempo no Delta Lake
Corrigir exclusões acidentais em uma tabela para o usuário:
111
INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
Corrigir atualizações incorretas acidentais em uma tabela:
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
Consultar o número de novos clientes adicionados na última semana.
SELECT count(distinct userId) FROM my_table - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
Como localizar a última versão do commit na sessão do Spark?
Para obter o número da versão do último commit gravado pelo SparkSession
atual em todos os threads e todas as tabelas, consulte a configuração SQL spark.databricks.delta.lastCommitVersionInSession
.
SET spark.databricks.delta.lastCommitVersionInSession
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Se nenhum commit tiver sido feito pelo SparkSession
, consultar a chave retornará um valor vazio.
Observação
Se você compartilhar o mesmo SparkSession
em vários threads, será semelhante a compartilhar uma variável em vários threads; você poderá ter condições de corrida, pois o valor da configuração é atualizado simultaneamente.