Delta Live Tablesフローを使用した増分データのロードと処理

この記事では、フローとは何か、また、Delta Live Tablesパイプラインでフローを使用し、ソースからターゲットのストリーミングテーブルへデータを段階的に処理する方法について説明します。Delta Live Tablesでは、フローは2つの方法で定義されます。

  1. フローは、ストリーミングテーブルを更新するクエリを作成すると自動的に定義されます。

  2. Delta Live Tables は、複数のストリーミングソースからのストリーミングテーブルへの追加など、より複雑な処理のためのフローを明示的に定義する機能も提供します。

この記事では、ストリーミングテーブルを更新するクエリを定義するときに作成される暗黙的なフローについて説明し、より複雑なフローを定義するための構文の詳細を示します。

フローとは

Delta Live Tablesでは、フローはソースデータを段階的に処理してターゲットのストリーミングテーブルを更新するストリーミングクエリです。パイプラインで作成するほとんどのDelta Live Tablesデータセットでは、フローをクエリの一部として定義し、フローを明示的に定義する必要はありません。たとえば、ストリーミングテーブルを作成するために個別のテーブルステートメントとフローステートメントを使用する代わりに、単一のDDLコマンドでDelta Live Tablesにストリーミングテーブルを作成します。

注:

この CREATE FLOW の例は説明のみを目的として提供されており、有効なDelta Live Tables構文ではないキーワードが含まれています。

CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")

-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;

CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");

クエリによって定義されたデフォルトのフローに加えて、Delta Live Tables Python および SQL インターフェースは、フローの追加機能を提供します。追加フローは、単一のストリーミングテーブルを更新するために複数のストリーミングソースからデータを読み取る必要がある処理をサポートします。たとえば、既存のストリーミングテーブルとフローがあり、この既存のストリーミングテーブルに書き込む新しいストリーミングソースを追加したい場合は、フローの追加機能を使用できます。

フローの追加を使用して複数のソースストリームからストリーミングテーブルに書き込む

注:

追加フロー処理を使用するには、プレビュー チャンネルを使用するようにパイプラインを構成する必要があります。

Pythonインターフェースの @append_flow デコレータ、または SQLインターフェースの CREATE FLOW 句を使用して、複数のストリーミングソースからストリーミングテーブルに書き込みます。次のような処理には、フローの追加を使用します。

  • 完全な更新を必要とせずに、既存のストリーミングテーブルにデータを追加するストリーミングソースを追加します。たとえば、事業を展開しているすべての地域の地域データを組み合わせたテーブルがあるとします。新しい地域が展開されると、完全な更新を実行せずに、新しい地域のデータをテーブルに追加できます。例:複数のKafkaトピックからストリーミングテーブルへの書き込みを参照してください。

  • 欠落している履歴データを追加して、ストリーミング・テーブルを更新します(バックフィル)。たとえば、Apache Kafkaトピックによって書き込まれる既存のストリーミングテーブルがあるとします。また、ストリーミングテーブルに一度だけ挿入する必要があるテーブルに履歴データが保存されており、データを挿入する前に複雑な集計を実行することが処理に含まれているため、データをストリーミングすることはできません。例: 1 回限りのデータバックフィルを実行するを参照してください。

  • クエリでUNION句を使用する代わりに、複数のソースからのデータを結合して単一のストリーミングテーブルに書き込みます。UNIONの代わりに追加フロー処理を使用すると、完全更新を実行せずにターゲットテーブルを段階的に更新できます。例:UNIONの代わりにフローの追加処理を使用するを参照してください。

追加フロー処理によって出力されるレコードのターゲットは、既存のテーブルまたは新しいテーブルにすることができます。Pythonクエリの場合は、 create_streaming_table()関数を使用してターゲットテーブルを作成します。

重要

  • 期待値によるデータ品質制約を定義する必要がある場合は、create_streaming_table()関数の一部としてターゲットテーブルに期待値を定義するか、既存のテーブル定義に期待値を定義します。@append_flow定義で期待値を定義することはできません。

  • フローはフロー名で識別され、この名前はストリーミングチェックポイントを識別するために使用されます。チェックポイントを識別するためにフロー名を使用するということは、以下のことを意味します。

    • パイプライン内の既存のフローの名前が変更された場合、チェックポイントは引き継がれず、名前が変更されたフローは事実上まったく新しいフローになります。

    • 既存のチェックポイントが新しいフロー定義と一致しないため、パイプラインでフロー名を再利用することはできません。

@append_flowの構文は次のとおりです。

import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming query>)
CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.

CREATE FLOW
  flow_name
AS INSERT INTO
  target_table BY NAME
SELECT * FROM
  source;

例: 複数の Kafka トピックからストリーミングテーブルへの書き込み

次の例では、kafka_targetというストリーミングテーブルを作成し、2つのKafkaトピックからそのストリーミングテーブルに書き込みます。

import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dlt.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )
CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

SQLクエリで使用されるread_kafka()テーブル値関数の詳細については、SQL言語リファレンスのread_kafkaを参照してください。

例: 1 回限りのデータバックフィルを実行する

次の例では、ストリーミングテーブルに履歴データを追加するクエリを実行しています。

注:

バックフィルクエリが定期的に、または継続的に実行されるパイプラインの一部である場合に、1回限りのバックフィルを確実に実行するには、パイプラインを1回実行した後にクエリを削除してください。バックフィルディレクトリに新しいデータを追加するには、クエリはそのままにしておきます。

import dlt

@dlt.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  cloud_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO
  csv_target BY NAME
SELECT * FROM
  cloud_files(
    "path/to/backfill/data/dir",
    "csv"
  );

例: UNIONの代わりにフローの追加処理を使用する

UNION句を含むクエリを使用する代わりに、フローの追加クエリを使用して複数のソースを結合し、1 つのストリーミングテーブルに書き込むことができます。UNIONの代わりにフローの追加クエリを使用すると、完全な更新を実行せずに、複数のソースからストリーミングテーブルに追加できます。

次のPythonの例には、UNION句で複数のデータソースを組み合わせたクエリが含まれています。

@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")

  raw_orders_eu =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")

  return raw_orders_us.union(raw_orders_eu)

以下の例では、UNIONクエリを フローの追加クエリに置き換えています:

dlt.create_streaming_table("raw_orders")

@dlt.append_flow(target="raw_orders")
def raw_oders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")
CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/us",
    "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/eu",
    "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/apac",
    "csv"
  );