構造化ストリーミングの初回ワークロードを実行する

この記事では、Databricksで最初の構造化ストリーミングクエリを実行するために必要なコード例と基本概念を説明します。構造化ストリーミングは、ほぼリアルタイムおよび増分処理のワークロードに使用できます。

構造化ストリーミングは、デルタライブテーブルのストリーミングテーブルを支えるテクノロジーの一つです。Databricksは、すべての新しいETL、インジェスト、および構造化ストリーミングワークロードに Delta Live Tablesを使用することを推奨しています「 Delta Live Tables とは」を参照してください。

Delta Live Tablesでは、ストリーミングテーブルを宣言するためのわずかに変更された構文が提供されていますが、ストリーミングの読み取りと変換を構成するための一般的な構文は、Databricksのすべてのストリーミングユースケースに適用されます。また、Delta Live Tablesは、状態情報、メタデータ、および多数の構成を管理することにより、ストリーミングを簡素化します。

データストリームから読み取る

構造化ストリーミングを使用すると、サポートされているデータソースからデータを段階的に取り込むことができます。Databricks構造化ストリーミングワークロードで使用される最も一般的なデータソースには、次のようなものがあります。

  • クラウドオブジェクトストレージ内のデータファイル

  • メッセージバスとキュー

  • Delta Lake

Databricksでは、クラウドオブジェクトストレージからのストリーミング取り込みにAuto Loaderを使用することを推奨しています。Auto Loaderは、構造化ストリーミングでサポートされるほとんどのファイル形式をサポートします。「Auto Loader とは」を参照してください。

各データソースには、データバッチの読み込み方法を指定するためのオプションがいくつか用意されています。リーダーの構成中に、設定する必要がある主なオプションは次のカテゴリーに分類されます。

  • データソースやフォーマットを指定するオプション(ファイルタイプ、区切り文字、スキーマなど)

  • ソースシステムへのアクセスを構成するオプション(ポート設定や認証情報など)

  • ストリームのどこから開始するかを指定するオプション(Kafkaのオフセットや、既存のファイルをすべて読み込むなど)

  • 各バッチで処理されるデータ量を制御するオプション(バッチごとの最大オフセット、ファイル、バイト数など)

Auto Loaderを使用してオブジェクトストレージからストリーミングデータを読み込む

次の例は、形式とオプションを示すために cloudFiles を使用するAuto Loaderを使用してJSONデータを読み込む方法を示しています。schemaLocation オプションを使用すると、スキーマの推論と展開が可能になります。次のコードをDatabricksノートブックのセルに貼り付け、セルを実行して、raw_df という名前のストリーミングデータフレームを作成します。

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

Databricks上の他の読み込み操作と同様に、ストリーミング読み込みを設定しても、実際にはデータは読み込まれません。ストリームが開始される前に、データに対するアクションをトリガーする必要があります。

ストリーミング DataFrame で display() を呼び出すと、ストリーミング ジョブが開始されます。 ほとんどの構造化ストリーミングのユース ケースでは、ストリームをトリガーするアクションはシンクへのデータの書き込みである必要があります。 「 運用環境向けの構造化ストリーミング コードの準備」を参照してください。

ストリーミング変換を実行する

構造化ストリーミングは、DatabricksやSpark SQLで利用可能な変換のほとんどをサポートしています。MLflowモデルをUDFとしてロードし、変換としてストリーミング予測を行うこともできます。

次のコード例では、Spark SQL関数を使用して、取り込まれたJSONデータを追加情報でエンリッチする簡単な変換を完了します。

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

結果の transformed_df には、データソースに到着したときに各レコードを読み込んで変換するためのクエリ命令が含まれています。

構造化ストリーミングは、データソースを無制限または無限のデータセットとして扱います。このように、いくつかの変換は、無限のアイテムをソートする必要があるため、構造化ストリーミングワークロードではサポートされません。

ほとんどの集計と多くの結合では、透かし、ウィンドウ、出力モードを使用して状態情報を管理する必要があります。「透かしを適用してデータ処理のしきい値を制御する」を参照してください。

データシンクへの書き込み

データシンクは、ストリーミング書き込み操作のターゲットです。Databricksストリーミングワークロードで使用される一般的なシンクには次のものがあります。

  • Delta Lake

  • メッセージバスとキュー

  • キー値のデータベース

データソースと同様に、ほとんどのデータシンクには、データをターゲットシステムに書き込む方法を制御するための多くのオプションがあります。ライターの構成中に、設定する必要がある主なオプションは次のカテゴリーに分類されます。

  • 出力モード(デフォルトでは追加)。

  • チェックポイントの場所(各ライターに必要)。

  • トリガー間隔。「構造化ストリーミングのトリガー間隔を構成する」を参照してください。

  • データシンクまたは形式(ファイルタイプ、区切り文字、スキーマなど)を指定するオプション。

  • ターゲットシステムへのアクセスを構成するオプション(ポート設定や資格情報など)。

Delta Lakeへの増分バッチ書き込みを実行する

以下の例では、指定されたファイルパスとチェックポイントを使用してデルタレイクに書き込みます。

重要

設定するストリーミングライターごとに、必ず一意のチェックポイント場所を指定してください。チェックポイントは、ストリームの一意のIDを提供し、処理されたすべてのレコードと、ストリーミングクエリに関連する状態情報を追跡します。

トリガーの availableNow 設定は、ソースデータセットの未処理のレコードをすべて処理してからシャットダウンするように構造化ストリーミングに指示するため、ストリームを実行したままにすることを心配することなく、次のコードを安全に実行できます。

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

この例では、データソースに新しいレコードが到着しないため、このコードを繰り返し実行しても新しいレコードは取り込まれません。

警告

構造化ストリーミングの実行により、自動終了によるコンピューティングリソースのシャットダウンを防ぐことができます。予期せぬ出費を避けるため、ストリーミングクエリーは必ず終了させてください。

本番運用のために構造化ストリーミングコードを準備する

Databricksでは、ほとんどの構造化ストリーミングワークロードにDelta Live Tablesを使用することを推奨しています。次の推奨事項は、本番運用のために構造化ストリーミングワークロードを準備するための開始点となります。

  • displaycount などの結果を返す不要なコードをノートブックから削除します。

  • 構造化ストリーミングワークロードを対話型クラスターで実行しないでください。常にストリームをジョブとしてスケジュールします。

  • ストリーミングジョブが自動的に回復するようにするには、無限の再試行を行うようにジョブを構成します。

  • 構造化ストリーミングを使用するワークロードには自動スケーリングを使用しないでください。

その他の推奨事項については、「構造化ストリーミングの本番運用に関する考慮事項」を参照してください。

Delta Lakeからデータを読み取り、変換し、Delta Lakeに書き込む

Delta Lake では、構造化ストリーミングをソースとシンクの両方として操作するための広範なサポートがあります。 「テーブル ストリーミングの読み取りと書き込みDelta 」を参照してください。

次の例は、デルタテーブルからすべての新しいレコードを増分的に読み込み、別のデルタテーブルのスナップショットと結合して、デルタテーブルに書き込む構文例を示しています。

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

ソーステーブルの読み込み、ターゲットテーブルおよび指定されたチェックポイントの場所への書き込みを行うには、適切な権限が設定されている必要があります。データソースとシンクに関連する値を使用して、角括弧(<>)で示されたすべてのパラメーターを入力します。

Delta Live Tablesは、Delta Lakeパイプラインを作成するための完全な宣言型構文を提供し、トリガーやチェックポイントなどのプロパティを自動的に管理します。「 Delta Live Tables とは」を参照してください。

Kafkaからデータを読み込み、変換し、Kafkaに書き込む

Apache Kafkaやその他のメッセージングバスでは、大規模なデータセットでレイテンシーが最も低くなります。Databricksを使用して、Kafkaから取り込んだデータに変換を適用し、データをKafkaに書き戻すことができます。

クラウドオブジェクトストレージにデータを書き込むと、レイテンシーのオーバーヘッドが増えます。メッセージングバスからのデータをDelta Lakeに保存したいが、ストリーミングワークロードのレイテンシーを可能な限り低くする必要がある場合、データをレイクハウスに取り込み、ダウンストリームのメッセージングバスシンクにほぼリアルタイムの変換を適用するために、個別のストリーミングジョブを構成することを推奨します。

次のコード例は、KafkaからのデータをDeltaテーブルのデータと結合し、Kafkaに書き戻すことでデータをエンリッチする簡単なパターンを示しています。

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

Kafka サービスにアクセスするための適切なアクセス許可が構成されている必要があります。 山括弧(<>)で示されているすべてのパラメーターを、データソースとシンクに関連する値を使用して入力します。 「Apache Kafka と Databricks を使用したストリーム処理」を参照してください。