Gerenciar a qualidade dos dados com o Delta Live Tables
Você utiliza expectativas para estabelecer restrições de qualidade de dados para o conteúdo de um conjunto de dados. As expectativas permitem que você garanta que os dados que chegam às tabelas atendam aos requisitos de qualidade de dados e forneçam insights sobre a qualidade de dados para cada atualização do pipeline.Você aplica expectativas para consultas usando decoradores Python ou cláusulas de restrição SQL.
Quais são as expectativas do Delta Live Tables?
As expectativas são cláusulas opcionais que você adiciona às declarações de conjuntos de dados no Delta Live Tables, que aplicam verificações de qualidade de dados em cada registro que passa por uma consulta.
Uma expectativa consiste em três coisas:
Uma descrição, que atua como um identificador exclusivo e permite que você rastreie métricas para a restrição.
Uma instrução Boolean que sempre retorna verdadeiro ou falso com base em alguma condição declarada.
Uma ação a tomar quando um registro falha na expectativa, o que significa que o Boolean retorna falso.
A matriz a seguir mostra as três ações que você pode aplicar a registros inválidos:
Ação |
Resultado |
---|---|
avisar (default) |
Registros inválidos são gravados no alvo; a falha é relatada como uma métrica para o dataset. |
Registros inválidos são eliminados antes que os dados sejam gravados no destino; a falha é relatada como uma métrica para o dataset. |
|
Registros inválidos impedem que a atualização seja bem-sucedida. É necessária uma intervenção manual antes do reprocessamento. |
você pode visualizar métricas de qualidade de dados, como o número de registros que violam uma expectativa, consultando o log de eventos do Delta Live Tables. Consulte Monitorar pipelines do Delta Live Tables.
Para obter uma referência completa da sintaxe de declaração do dataset do Delta Live Tables, consulte Referência da linguagem Python do Delta Live Tables ou Referência da linguagem SQL do Delta Live Tables.
Observação
Embora você possa incluir várias cláusulas em qualquer expectativa, apenas o Python oferece suporte à definição de ações com base em várias expectativas. Consulte Múltiplas expectativas.
Reter registros inválidos
Use o operador expect
quando quiser manter registros que violem a expectativa. Registros que violam a expectativa são adicionados ao dataset de destino junto com registros válidos:
@dlt.expect("valid timestamp", "col(“timestamp”) > '2012-01-01'")
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
Solte registros inválidos
Use o operador expect or drop
para evitar o processamento adicional de registros inválidos. Os registros que violam as expectativas são descartados do dataset de destino:
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
Falha em registros inválidos
Quando registros inválidos forem inaceitáveis, use o operador expect or fail
para interromper a execução imediatamente quando um registro falhar na validação. Se a operação for uma atualização de tabela, o sistema reverte atomicamente a transação:
@dlt.expect_or_fail("valid_count", "count > 0")
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
Quando um pipeline falha devido a uma violação de expectativa, é necessário corrigir o código do pipeline para lidar corretamente com os dados inválidos antes de executar o pipeline novamente.
As expectativas de falha modificam o plano de consulta Spark de suas transformações para rastrear as informações necessárias para detectar e relatar violações.Para muitas consultas, é possível usar essas informações para identificar qual registro de entrada resultou na violação. A seguir, um exemplo de exceção:
Expectation Violated:
{
"flowName": "a-b",
"verboseInfo": {
"expectationsViolated": [
"x1 is negative"
],
"inputData": {
"a": {"x1": 1,"y1": "a },
"b": {
"x2": 1,
"y2": "aa"
}
},
"outputRecord": {
"x1": 1,
"y1": "a",
"x2": 1,
"y2": "aa"
},
"missingInputData": false
}
}
Múltiplas expectativas
Você pode definir expectativas com um ou mais critérios de qualidade de dados em pipelines Python.Esses decoradores aceitam um dicionário Python como argumento, onde a chave é o nome da expectativa e o valor é a restrição da expectativa.
Use expect_all
para especificar várias restrições de qualidade de dados quando os registros que falham na validação devem ser incluídos no dataset de destino:
@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Utilize o expect_all_or_drop
para especificar múltiplas restrições de qualidade de dados quando registros que falham na validação devem ser descartados do dataset de destino:
@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Use expect_all_or_fail
para especificar várias restrições de qualidade de dados quando os registros que falham na validação devem interromper a execução do pipeline:
@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Você também pode definir uma coleção de expectativas como uma variável e passá-la para uma ou mais consultas em seu pipeline:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
# Create raw dataset
@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
# Create cleaned and prepared dataset
Colocar dados inválidos em quarentena
O exemplo a seguir utiliza expectativas em combinação com tabelas e visualizações temporárias. Esse padrão oferece métricas para registros que passam pelas verificações de expectativa durante as atualizações do pipeline e proporciona uma maneira de processar registros válidos e inválidos por caminhos downstream distintos.
Observação
Este exemplo lê dados de amostra incluídos no datasetDatabricks. Como o dataset Databricks não é compatível com um pipeline que publica no Unity Catalog, este exemplo funciona apenas com um pipeline configurado para publicar no Hive metastore. No entanto, esse padrão também funciona com pipeline habilitado para Unity Catalog, mas você deve ler dados de locais externos. Para saber mais sobre como usar o Unity Catalog com Delta Live Tables, consulte Usar o Unity Catalog com seu pipelineDelta Live Tables.
import dlt
from pyspark.sql.functions import expr
rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))
@dlt.table(
name="raw_farmers_market"
)
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="farmers_market_quarantine",
temporary=True,
partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
return (
dlt.read("raw_farmers_market")
.select("MarketName", "Website", "Location", "State",
"Facebook", "Twitter", "Youtube", "Organic", "updateTime")
.withColumn("is_quarantined", expr(quarantine_rules))
)
@dlt.view(
name="valid_farmers_market"
)
def get_valid_farmers_market():
return (
dlt.read("farmers_market_quarantine")
.filter("is_quarantined=false")
)
@dlt.view(
name="invalid_farmers_market"
)
def get_invalid_farmers_market():
return (
dlt.read("farmers_market_quarantine")
.filter("is_quarantined=true")
)
Validar a contagem de linhas nas tabelas
O senhor pode adicionar uma tabela adicional ao site pipeline que defina uma expectativa para comparar as contagens de linhas entre duas visualizações materializadas ou tabelas de transmissão. Os resultados dessa expectativa aparecem no evento log e na UI Delta Live Tables. O exemplo a seguir valida contagens de linhas iguais entre as tabelas tbla
e tblb
:
CREATE OR REFRESH MATERIALIZED VIEW count_verification(
CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
(SELECT COUNT(*) AS a_count FROM LIVE.tbla),
(SELECT COUNT(*) AS b_count FROM LIVE.tblb)
Realize validação avançada com as expectativas do Delta Live Tables
O senhor pode definir a visualização materializada usando consultas agregadas e join e usar os resultados dessas consultas como parte da verificação de expectativas. Isso é útil se o senhor deseja realizar verificações complexas de qualidade de dados, por exemplo, garantindo que uma tabela derivada contenha todos os registros da tabela de origem ou garantindo a igualdade de uma coluna numérica entre tabelas. O senhor pode usar a palavra-chave TEMPORARY
para impedir que essas tabelas sejam publicadas no esquema de destino.
O seguinte exemplo valida que todos os registros esperados estão presentes na tabela report
:
CREATE TEMPORARY MATERIALIZED VIEW report_compare_tests(
CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key
O exemplo a seguir usa um agregado para garantir a exclusividade de uma chave primária:
CREATE MATERIALIZED VIEW report_pk_tests(
CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk
Torne as expectativas portáteis e reutilizáveis
Você pode manter as regras de qualidade de dados separadamente das implementações de pipeline.
A Databricks recomenda armazenar as regras em uma tabela Delta com cada regra categorizada por uma tag. Use essa marca nas definições de dataset para determinar quais regras devem ser aplicadas.
O exemplo seguinte cria uma tabela denominada rules
para manter as regras:
CREATE OR REPLACE TABLE
rules
AS SELECT
col1 AS name,
col2 AS constraint,
col3 AS tag
FROM (
VALUES
("website_not_null","Website IS NOT NULL","validity"),
("location_not_null","Location IS NOT NULL","validity"),
("state_not_null","State IS NOT NULL","validity"),
("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)
O exemplo de Python a seguir define as expectativas de qualidade dos dados com base nas regras armazenadas na rules
tabela. A get_rules()
função lê as regras da rules
tabela e retorna um dicionário Python contendo regras que correspondem ao tag
argumento passado para a função. O dicionário é aplicado nos @dlt.expect_all_*()
decoradores para impor restrições de qualidade de dados. Por exemplo, todos os registros que falharem nas regras marcadas com validity
serão retirados da raw_farmers_market
tabela:
Observação
Este exemplo lê dados de amostra incluídos no datasetDatabricks. Como o dataset Databricks não é compatível com um pipeline que publica no Unity Catalog, este exemplo funciona apenas com um pipeline configurado para publicar no Hive metastore. No entanto, esse padrão também funciona com pipeline habilitado para Unity Catalog, mas você deve ler dados de locais externos. Para saber mais sobre como usar o Unity Catalog com Delta Live Tables, consulte Usar o Unity Catalog com seu pipelineDelta Live Tables.
import dlt
from pyspark.sql.functions import expr, col
def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
df = spark.read.table("rules")
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
dlt.read("raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)
Em vez de criar uma tabela chamada rules
para manter as regras, o senhor poderia criar um módulo Python para as regras principais, por exemplo, em um arquivo chamado rules_module.py
na mesma pasta do site Notebook:
def get_rules_as_list_of_dict():
return [
{
"name": "website_not_null",
"constraint": "Website IS NOT NULL",
"tag": "validity"
},
{
"name": "location_not_null",
"constraint": "Location IS NOT NULL",
"tag": "validity"
},
{
"name": "state_not_null",
"constraint": "State IS NOT NULL",
"tag": "validity"
},
{
"name": "fresh_data",
"constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
"tag": "maintained"
},
{
"name": "social_media_access",
"constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
"tag": "maintained"
}
]
Em seguida, modifique o site Notebook anterior importando o módulo e alterando a função get_rules()
para ler do módulo em vez de ler da tabela rules
:
import dlt
from rules_module import *
from pyspark.sql.functions import expr, col
df = spark.createDataFrame(get_rules_as_list_of_dict())
def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
dlt.read("raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)