Databricks での構造化ストリーミング パターン

これには、Databricks で構造化ストリーミングを操作するための一般的なパターンのノートブックとコード サンプルが含まれています。

構造化ストリーミングの使用を開始する

構造化ストリーミングを初めて使用する場合は、「 最初の構造化ストリーミングワークロードを実行する」を参照してください。

Pythonでの構造化ストリーミングのシンクとして Cassandra に書き込む

Apache Cassandra は、分散、低待機時間、スケーラブル、高可用性の OLTP データベースです。

構造化ストリーミングは、 Spark Cassandra コネクタを介して Cassandra と連携します。 このコネクタは、RDD と DataFrame APIsの両方をサポートし、ストリーミング データの書き込みをネイティブにサポートしています。 *重要*対応するバージョンの スパークCassandraコネクタアセンブリを使用する必要があります。

次の例では、Cassandra データベース クラスター内の 1 つ以上のホストに接続します。 また、チェックポイントの場所や特定のキースペース名、テーブル名などの接続構成も指定します。

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

foreachBatch()をPythonで使用したAzure Synapse Analyticsへの書き込み

streamingDF.writeStream.foreachBatch() を使用すると、既存のバッチ データ ライターを再利用して、ストリーミングの出力を Azure Synapse アナリティクスに書き込むことができます。 詳細については、 foreachBatch のドキュメント を参照してください。

この例を実行するには、Azure Synapse Analytics コネクタが必要です。 Azure Synapse Analytics コネクタの詳細については、「 Azure Synapse Analytics のクエリー データ」を参照してください。

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

ストリーム-ストリーム結合

これら 2 つのノートブックは、Python と Scala でストリームとストリームの結合を使用する方法を示しています。

Python ノートブックでのStream-Strem joins

ノートブックを新しいタブで開く

ストリームストリームはScalaノートブックに参加します

ノートブックを新しいタブで開く