ストリーミングデータのクエリー
Databricks を使用して、構造化ストリーミングを使用してストリーミングデータソースをクエリーできます。 Databricks は、Python と Scala でのストリーミング ワークロードを幅広くサポートし、SQL を使用したほとんどの構造化ストリーミング機能をサポートしています。
次の例は、ノートブックでの対話型開発中にストリーミング データを手動で検査するためにメモリ シンクを使用する方法を示しています。 ノートブック UI の行出力制限により、ストリーミングクエリーによって読み取られるすべてのデータが監視されない場合があります。 本番運用ワークロードでは、ストリーミングクエリーは、ターゲットテーブルまたは外部システムに書き込むことによってのみトリガーする必要があります。
手記
ストリーミング データに対する対話型クエリーの SQL サポートは、汎用コンピュートに接続されて実行されているノートブックに限定されます。 Delta Live Tables でストリーミング テーブルを宣言するときにも SQL を使用できます。 「 Delta Live Tables とは」を参照してください。
ストリーミングシステムからのデータのクエリー
Databricks には、次のストリーミング システム用のストリーミング データ リーダーが用意されています。
Kafka
Kinesis
PubSub (英語)
Pulsar
これらのシステムに対してクエリーを初期化するときには、構成済みの環境と読み取り元として選択したシステムによって異なる構成の詳細を指定する必要があります。 「ストリーミングデータソースの構成」を参照してください。
ストリーミング システムに関連する一般的なワークロードには、レイクハウスへのデータ取り込みや、外部システムにデータをシンクするためのストリーム処理などがあります。 ストリーミング ワークロードの詳細については、「 Databricks でのストリーミング」を参照してください。
次の例は、Kafka からのインタラクティブなストリーミング読み取りを示しています。
display(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>',
startingOffsets => 'latest'
);
ストリーミング読み込みとしてのテーブルへのクエリー
Databricks は、デフォルトで Delta Lake を使用してすべてのテーブルを作成します。 Delta テーブルに対してストリーミング クエリを実行すると、テーブルのバージョンがコミットされたときにクエリによって新しいレコードが自動的に取得されます。 デフォルトでは、ストリーミング クエリでは、ソース テーブルに追加されたレコードのみが含まれることが想定されます。 更新や削除を含むストリーミング データを操作する必要がある場合、Databricks では Delta Live Tables とAPPLY CHANGES INTO
使用をお勧めします。 APPLY CHANGES APIs参照してください: Delta Live Tablesを使用してチェンジデータキャプチャを簡素化します。
次の例は、テーブルからの対話型ストリーミング読み取りの実行を示しています。
display(spark.readStream.table("table_name"))
SELECT * FROM STREAM table_name
Auto Loaderによるクラウドオブジェクトのデータへのクエリー
Auto Loader(Databricks クラウド データ コネクタ) を使用して、クラウドオブジェクトストレージからデータをストリーミングできます。コネクタは、Unity Catalog ボリュームまたはその他のクラウドオブジェクトストレージの場所に格納されているファイルで使用できます。 Databricks では、ボリュームを使用してクラウドオブジェクトストレージ内のデータへのアクセスを管理することをお勧めします。 「データソースへの接続」を参照してください。
Databricks は、一般的な構造化形式、半構造化形式、非構造化形式で格納されているクラウドオブジェクトストレージ内のデータのストリーミング インジェスト用にこのコネクタを最適化します。 Databricks では、スループットを最大化し、レコードの破損やスキーマの変更による潜在的なデータ損失を最小限に抑えるために、取り込まれたデータをほぼ生の形式で保存することをお勧めします。
クラウドオブジェクトストレージからのデータの取り込みに関するその他の推奨事項については、 「Databricks レイクハウスへのデータの取り込み」を参照してください。
次の例は、ボリューム内の JSON ファイルのディレクトリから読み取られる対話型ストリーミングを示しています。
display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))
SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')