Substitua dados seletivamente com o Delta Lake

O Databricks aproveita a funcionalidade do Delta Lake para oferecer suporte a duas opções distintas para substituições seletivas:

  • A opção replaceWhere substitui atomicamente todos os registros que correspondem a um determinado predicado.

  • Você pode substituir diretórios de dados com base em como as tabelas são particionadas usando substituições de partição dinâmica.

Para a maioria das operações, a Databricks recomenda usar replaceWhere para especificar quais dados devem ser substituídos.

Importante

Se os dados foram substituídos acidentalmente, você pode usar a restauração para desfazer a alteração.

Substituição seletiva arbitrária por replaceWhere

Você pode substituir seletivamente apenas os dados que correspondem a uma expressão arbitrária. Este recurso está disponível com DataFrames no Databricks Runtime 9.1 LTS e acima e tem suporte em SQL no Databricks Runtime 12.0 (sem suporte) e acima.

O comando a seguir substitui atomicamente os eventos em janeiro na tabela de destino, que é particionada por start_date, pelos dados em replace_data:

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")
)
replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")
INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

Este código de amostra grava os dados em replace_data, valida se todas as linhas correspondem ao predicado e executa uma substituição atômica usando a semântica overwrite . Se algum valor nas operações estiver fora da restrição, essas operações falharão com um erro por default.

Você pode alterar esse comportamento para overwrite valores dentro do intervalo de predicado e insert registros que estão fora do intervalo especificado. Para fazer isso, desative a verificação de restrição definindo spark.databricks.delta.replaceWhere.constraintCheck.enabled como falso usando uma das seguintes configurações:

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)
SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

No Databricks Runtime 9.0 e versões anteriores, replaceWhere sobrescreve os dados que correspondem a um predicado somente nas colunas de partição. O comando a seguir substitui atomicamente o mês de janeiro na tabela de destino, que é particionada por date, pelos dados em df:

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/tmp/delta/people10m")
)
df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/tmp/delta/people10m")

No Databricks Runtime 9.1e acima, se você desejar voltar ao comportamento antigo, você poderá desabilitar a bandeira spark.databricks.delta.replaceWhere.dataColumns.enabled:

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

Substituições de partições dinâmicas

Visualização

Esse recurso está em visualização pública.

O Databricks Runtime 11.1e acima oferece suporte ao modo de substituição de partição dinâmica para tabelas particionadas. Para tabelas com várias partições, o Databricks Runtime 12.0 e versões anteriores oferecem suporte apenas a substituições de partições dinâmicas se todas as colunas de partição forem do mesmo tipo de dado.

No modo de substituição de partições dinâmicas, as operações sobrescrevem todos os dados existentes em cada partição lógica para a qual a gravação adiciona novos dados.Todas as partições lógicas existentes para as quais a gravação não contém dados permanecem inalteradas. Esse modo só é aplicável quando os dados estão sendo gravados no modo de substituição: INSERT OVERWRITE no SQL ou uma gravação DataFrame com df.write.mode("overwrite").

Configure o modo de substituição dinâmica de partição definindo a configuração da sessão do Spark como. spark.sql.sources.partitionOverwriteMode dynamic Você também pode habilitar isso definindo partitionOverwriteMode a DataFrameWriter opção como dynamic. Se presente, a opção específica da consulta substitui o modo definido na configuração da sessão. O padrão para partitionOverwriteMode é static.

Importante

Valide se os dados gravados com a substituição dinâmica de partições tocam somente as partições esperadas. Uma única linha na partição incorreta pode levar à substituição não intencional de uma partição inteira.

O exemplo a seguir demonstra o uso de substituições dinâmicas de partições:

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;
(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)
df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

Observação

  • A substituição de partição dinâmica entra em conflito com a opção replaceWhere para tabelas particionadas.

    • Se a substituição dinâmica da partição estiver ativada na configuração da sessão do Spark e replaceWhere for fornecido como uma opção DataFrameWriter, o Delta Lake substituirá os dados de acordo com a expressão replaceWhere (as opções específicas da consulta substituem as configurações da sessão).

    • Você receberá um erro se as DataFrameWriter opções tiverem a substituição de partição dinâmica e replaceWhere ativada.

  • Você não pode especificar overwriteSchema como true ao usar a substituição dinâmica de partição.