Databricks での構造化ストリーミング パターン

これには、Databricks で構造化ストリーミングを操作するための一般的なパターンのノートブックとコード サンプルが含まれています。

構造化ストリーミング の使用を開始する

構造化ストリーミングを初めて使用する場合は、「 最初の構造化ストリーミングワークロードを実行する」を参照してください。

Python での構造化ストリーミングのシンクとして Cassandra に書き込む

Apache Cassandra は、分散、低待機時間、スケーラブル、高可用性の OLTP データベースです。

構造化ストリーミングは、 Spark Cassandra コネクタを介して Cassandra と連携します。 このコネクタは、RDD と DataFrame APIsの両方をサポートし、ストリーミング データの書き込みをネイティブにサポートしています。 *重要*対応するバージョンの スパークCassandraコネクタアセンブリを使用する必要があります。

次の例では、Cassandra データベース クラスター内の 1 つ以上のホストに接続します。 また、チェックポイントの場所や特定のキースペース名、テーブル名などの接続構成も指定します。

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

Azure Synapse アナリティクスへの書き込み using foreachBatch() in Python

streamingDF.writeStream.foreachBatch() を使用すると、既存のバッチ データ ライターを再利用して、ストリーミングの出力を Azure Synapse アナリティクスに書き込むことができます。 詳細については、 foreachBatch のドキュメント を参照してください。

この例を実行するには、Azure Synapse Analytics コネクタが必要です。 Azure Synapse Analytics コネクタの詳細については、「 Azure Synapse Analytics のクエリー データ」を参照してください。

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

ストリーム-ストリーム結合

これら 2 つのノートブックは、Python と Scala でストリームとストリームの結合を使用する方法を示しています。

ストリーム-ストリームは Python ノートブック に参加します

ノートブックを新しいタブで開く

ストリームストリームはScalaノートブック に参加します

ノートブックを新しいタブで開く