Delta Live Tablesでデータ品質を管理する

期待値を使用して、データセットの内容に対するデータ品質制約を定義します。期待により、テーブルに到着するデータがデータ品質要件を満たしていることを保証し、パイプライン更新ごとにデータ品質についての洞察を得ることができます。PythonデコレータまたはSQL制約句を使用して、クエリーに期待値を適用します。

Delta Live Tablesのエクスペクテーションとは?

エクスペクテーションとは、Delta Live Tablesデータセット宣言に追加するオプションの句で、クエリーを通過する各レコードにデータ品質チェックを適用します。

期待値は、次の3つの要素で構成されます:

  • 説明。一意の識別子として機能し、制約のメトリックを追跡できるようにします。

  • 指定された条件に基づいて常に真または偽を返すブーリアンテートメント。

  • レコードが期待を裏切った場合に実行するアクション。ブーリアン値がfalseを返すことを意味します。

次のマトリックスは、無効なレコードに適用できる3つのアクションを示しています:

操作

結果

warn (デフォルト)

無効なレコードはターゲットに書き込まれ、失敗はデータセットのメトリックとして報告される。

drop

無効なレコードは、データがターゲットに書き込まれる前に削除されます。失敗はデータセットのメトリクスとして報告されます。

fail

無効なレコードがあると、更新が成功しません。再処理の前に手動による介入が必要です。

Delta Live Tablesイベントログをクエリーすることで、期待に反するレコードの数などのデータ品質メトリクスを表示できます。「Delta Live Tablesパイプラインの監視」を参照してください。

Delta Live Tablesデータセット宣言構文の完全なリファレンスは、「Delta Live Tables Python言語リファレンス」または「Delta Live Tables SQL言語リファレンス」を参照してください。

任意エクスペクテーションに複数の句を含めることができますが、複数のエクスペクテーションに基づくアクションの定義をサポートしているのは Python だけです。 「複数のエクスペクテーション」を参照してください。

無効な記録を保持する

期待に反するレコードを保持したい場合は、expect演算子を使用します。期待に違反するレコードは、有効なレコードとともにターゲットデータセットに追加されます:

@dlt.expect("valid timestamp", "col(“timestamp”) > '2012-01-01'")
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

無効なレコードを削除する

無効なレコードがそれ以上処理されないようにするには、expect or drop演算子を使用します。期待に反するレコードは、ターゲットデータセットから削除されます:

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

無効なレコードで失敗

無効なレコードを許容できない場合は、expect or fail演算子を使用して、レコードがバリデーションに失敗したときに直ちに実行を停止します。操作がテーブル更新の場合、システムはトランザクションをアトミックにロールバックする:

@dlt.expect_or_fail("valid_count", "count > 0")
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

期待値違反によりパイプラインが失敗した場合は、パイプラインを再実行する前に、無効なデータを正しく処理できるようにパイプラインコードを修正する必要があります。

期待に失敗すると、変換のSparkクエリープランが変更され、違反の検出とレポートに必要な情報が追跡されます。多くのクエリーでは、この情報を使用して、違反が発生した入力レコードを特定できます。例外の例を次に示します:

Expectation Violated:
{
  "flowName": "a-b",
  "verboseInfo": {
    "expectationsViolated": [
      "x1 is negative"
    ],
    "inputData": {
      "a": {"x1": 1,"y1": "a },
      "b": {
        "x2": 1,
        "y2": "aa"
      }
    },
    "outputRecord": {
      "x1": 1,
      "y1": "a",
      "x2": 1,
      "y2": "aa"
    },
    "missingInputData": false
  }
}

複数のエクスペクテーション

Pythonパイプラインで1つ以上のデータ品質制約を使用して期待値を定義できます。これらのデコレータは、Pythonディクショナリを引数として受け入れ、キーはエクスペクテーションの名前、値はエクスペクテーション制約です。

検証に失敗したレコードを対象データセットに含める必要がある場合、複数のデータ品質制約を指定するにはexpect_allを使用します:

@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

expect_all_or_dropを使用して、検証に失敗したレコードをターゲットデータセットから削除する必要がある場合に、複数のデータ品質制約を指定します:

@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

expect_all_or_failを使用して、検証に失敗したレコードがパイプラインの実行を停止する必要がある場合に、複数のデータ品質制約を指定します:

@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

エクスペクテーションのコレクションを変数として定義し、それをパイプライン内の1つ以上のクエリーに渡すこともできます:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create cleaned and prepared dataset

無効なデータを隔離する

次の例では、一時テーブルおよびビューと組み合わせて期待値を使用します。このパターンは、パイプラインの更新中に期待値チェックに合格したレコードのメトリクスを提供し、さまざまなダウンストリームパスを通じて有効なレコードと無効なレコードを処理する方法を提供します。

この例では、 Databricks データセットに含まれるサンプル データを読み取ります。 Databricks データセットは Unity Catalog に発行するパイプラインではサポートされていないため、この例は、 Hive metastoreに発行するように構成されたパイプラインでのみ機能します。 ただし、このパターンは Unity Catalog 対応パイプラインでも機能しますが、 外部ロケーションからデータを読み取る必要があります。 Delta Live Tables で Unity Catalog を使用する方法の詳細については、「 Delta Live Tables パイプラインで Unity Catalog を使用する」を参照してください。

import dlt
from pyspark.sql.functions import expr

rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.table(
  name="raw_farmers_market"
)
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="farmers_market_quarantine",
  temporary=True,
  partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
  return (
    dlt.read("raw_farmers_market")
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime")
      .withColumn("is_quarantined", expr(quarantine_rules))
  )

@dlt.view(
  name="valid_farmers_market"
)
def get_valid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=false")
  )

@dlt.view(
  name="invalid_farmers_market"
)
def get_invalid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=true")
  )

テーブル全体の行数を検証する

2つのライブテーブル間の行数を比較するという期待を定義するテーブルをパイプラインに追加できます。この期待の結果は、イベントログとDelta Live Tables UIに表示されます。次の例では、tblaテーブルとtblbテーブルの行数が等しいことを検証します:

CREATE OR REFRESH LIVE TABLE count_verification(
  CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
  (SELECT COUNT(*) AS a_count FROM LIVE.tbla),
  (SELECT COUNT(*) AS b_count FROM LIVE.tblb)

Delta Live Tablesのエクスペクテーションに基づいた高度な検証を実行する

集計クエリーと結合クエリーを使用してライブテーブルを定義し、それらのクエリーの結果を期待値チェックの一部として使用できます。これは、複雑なデータ品質チェックを実行する場合、たとえば、派生テーブルにソーステーブルのすべてのレコードが含まれていることを確認したり、テーブル間での数値列の同等性を保証したりする場合に便利です。TEMPORARYキーワードを使用すると、これらのテーブルがターゲットスキーマに公開されないようにすることができます。

次の例では、予期されるすべてのレコードがreportテーブルに存在することを検証します:

CREATE TEMPORARY LIVE TABLE report_compare_tests(
  CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key

以下の例では、主キーの一意性を確保するために集約を使用しています:

CREATE TEMPORARY LIVE TABLE report_pk_tests(
  CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk

エクスペクテーションをポータブルかつ再利用可能にする

データ品質ルールは、パイプラインの実装とは別に管理できます。

Databricksでは、各ルールをタグで分類したDeltaテーブルにルールを格納することをお勧めします。データセット定義でこのタグを使用して、適用するルールを決定します。

次の例では、ルールを維持するためにrulesという名前のテーブルを作成します:

CREATE OR REPLACE TABLE
  rules
AS SELECT
  col1 AS name,
  col2 AS constraint,
  col3 AS tag
FROM (
  VALUES
  ("website_not_null","Website IS NOT NULL","validity"),
  ("location_not_null","Location IS NOT NULL","validity"),
  ("state_not_null","State IS NOT NULL","validity"),
  ("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
  ("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)

次のPythonの例では、rulesテーブルに保存されているルールに基づいてデータ品質のエクスペクテーションを定義します。get_rules()関数は、rulesテーブルからルールを読み取り、関数に渡されたtag 引数に一致するルールを含むPythonディクショナリを返します。ディクショナリは、データ品質制約を強制するために@dlt.expect_all_*()デコレータに適用されます。たとえば、validityでタグ付けされたルールに違反したレコードは、raw_farmers_marketテーブルから削除されます:

この例では、 Databricks データセットに含まれるサンプル データを読み取ります。 Databricks データセットは Unity Catalog に発行するパイプラインではサポートされていないため、この例は、 Hive metastoreに発行するように構成されたパイプラインでのみ機能します。 ただし、このパターンは Unity Catalog 対応パイプラインでも機能しますが、 外部ロケーションからデータを読み取る必要があります。 Delta Live Tables で Unity Catalog を使用する方法の詳細については、「 Delta Live Tables パイプラインで Unity Catalog を使用する」を参照してください。

import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  df = spark.read.table("rules")
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )

ルールを維持するためにrulesという名前のテーブルを作成する代わりに、メイン ルールに対する Python モジュールを、たとえばノートブックと同じフォルダー内のrules_module.pyという名前のファイルに作成できます。

def get_rules_as_list_of_dict():
  return [
    {
      "name": "website_not_null",
      "constraint": "Website IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "location_not_null",
      "constraint": "Location IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "state_not_null",
      "constraint": "State IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "fresh_data",
      "constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
      "tag": "maintained"
    },
    {
      "name": "social_media_access",
      "constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
      "tag": "maintained"
    }
  ]

次に、モジュールをインポートし、 rulesテーブルからではなくモジュールから読み取るようにget_rules()関数を変更して、前述のノートブックを変更します。

import dlt
from rules_module import *
from pyspark.sql.functions import expr, col

df = spark.createDataFrame(get_rules_as_list_of_dict())

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )