バッチ処理またはストリーム処理によるデータのクリーンアップと検証

レイクハウス内のデータ資産の品質を確保するには、データのクリーニングと検証が不可欠です。 この記事では、データ品質の向上を促進するように設計された Databricks 製品の概要と、カスタム ルールを実装するためのビジネス ロジックを定義するための推奨事項を示します。

Databricksでのスキーマ強制

Delta Lake は、書き込み時にスキーマと制約のチェックを強制するセマンティクスを提供し、レイクハウス内のテーブルのデータ品質を保証します。

スキーマ強制は、テーブルに書き込まれるデータが事前定義されたスキーマに準拠していることを保証します。 スキーマ検証ルールは、操作によって異なります。 スキーマ強制をご覧ください。

アプリケーションの進化に対応するために、 Deltaスキーマの変更とテーブルの進化を行うメカニズムを提供します。 フィールドのドロップやパイプラインの失敗を回避するために、訪問者進化をいつ使用するかを慎重に検討することが重要です。 スキーマを手動または自動で更新する方法の詳細については、 Delta Lake テーブルのスキーマの更新を参照してください。

テーブルの制約

制約は、通知主キー制約と外部キー制約、または強制制約の形式をとることができます。 ADD CONSTRAINT 句を参照してください。

Databricks のテーブル制約は、強制的なものか情報的なものかのいずれかです。

強制制約には、 NOT NULL 制約と CHECK 制約があります。

インフォメーショナル制約には、主キー制約と外部キー制約があります。

Databricksの制約を参照してください。

null 値または欠損値の処理

テーブルでは NOT NULL を強制できます。Deltaこれは、列に NULL の既存のレコードがない場合にのみ、既存のテーブルで有効にすることができ、NULL 値を持つ新しいレコードがテーブルに挿入されるのを防ぎます。

パターンの適用

正規表現 (regex) を使用して、データ フィールドに予想されるパターンを適用できます。 これは、特定の形式やパターンに従う必要があるテキストデータを処理する場合に特に便利です。

正規表現を使用してパターンを適用するには、SQL でREGEXPまたはRLIKE関数を使用できます。 これらの関数を使用すると、指定した正規表現パターンとデータフィールドを照合できます。

以下は、SQL でパターンを適用するために正規表現でCHECK制約を使用する方法の例です。

CREATE TABLE table_name (
  column_name STRING CHECK (column_name REGEXP '^[A-Za-z0-9]+$')
);

値の適用

制約を使用すると、テーブル内の列に値の範囲を適用できます。 これにより、指定した範囲内の有効な値のみを挿入または更新できます。

値の範囲制約を適用するには、SQL でCHECK制約を使用できます。 CHECK制約を使用すると、テーブル内のすべての行に対して真でなければならない条件を定義できます。

CHECK制約を使用して列に値の範囲を適用する方法の例を次に示します。

CREATE TABLE table_name (
  column_name INT CHECK (column_name >= 0 AND column_name <= 100)
);

Delta Live Tables を使用してエクスペクテーションを定義および構成します。

Delta Live Tables では、マテリアライズドビューまたはストリーミングテーブルを宣言する際の期待値を定義できます。 違反について警告するエクスペクション、違反レコードの削除、または違反に基づくワークロードの失敗を設定することを選択できます。 「パイプラインの期待値を使用してデータ品質を管理する」を参照してください。

データ型のキャスト

テーブルにデータを挿入または更新する場合、Databricks は情報を失うことなく安全に実行できる場合はデータ型をキャストします。

キャスト動作の詳細については、次の記事を参照してください。

カスタム・ビジネス・ロジック

フィルターと WHERE 句を使用して、不良レコードを隔離し、ダウンストリーム テーブルへの伝達を防ぐカスタム ロジックを定義できます。 CASE WHEN ... OTHERWISE 句を使用すると、条件付きロジックを定義して、予測可能な方法で期待に違反するレコードにビジネスロジックを適切に適用できます。

DECLARE current_time = now()

INSERT INTO silver_table
  SELECT * FROM bronze_table
  WHERE event_timestamp <= current_time AND quantity >= 0;

INSERT INTO quarantine_table
  SELECT * FROM bronze_table
  WHERE event_timestamp > current_time OR quantity < 0;

注:

Databricks では、特に構造化ストリーミングを使用する場合は、フィルター処理されたデータを常に別の書き込み操作として処理することを推奨しています。 .foreachBatch を使用して複数のテーブルに書き込むと、結果に一貫性がなくなる可能性があります。

たとえば、 NULL 値をエンコードできないアップストリーム システムがある場合、プレースホルダー値 -1 を使用して欠損データを表すことができます。 Databricks のすべてのダウンストリーム クエリに対してカスタム ロジックを記述して-1を含むレコードを無視するのではなく、case when ステートメントを使用して、これらのレコードを変換として動的に置き換えることができます。

INSERT INTO silver_table
  SELECT
    * EXCEPT weight,
    CASE
      WHEN weight = -1 THEN NULL
      ELSE weight
    END AS weight
  FROM bronze_table;