padrões estruturados transmitidos em Databricks

Contém Notebook e amostras de código para padrões comuns para trabalhar com transmissão estruturada em Databricks.

Começando a começar com transmissão estruturada

Se você é novo na transmissão estruturada, veja execução sua primeira carga de trabalho da transmissão estruturada.

Escreva para Cassandra como um coletor para transmissão estruturada em Python

O Apache Cassandra é um banco de dados OLTP distribuído, de baixa latência, escalável e altamente disponível.

transmissão estructurada funciona com Cassandra através do Spark Cassandra Connector. Esse conector oferece suporte a APIs RDD e DataFrame e possui suporte nativo para gravar dados transmitidos. *Importante* Você deve usar a versão correspondente do spark-Cassandra-connector-assembly.

O exemplo a seguir se conecta a um ou mais hosts em clusters de banco de dados Cassandra. Ele também especifica as configurações de conexão, como a localização do ponto de verificação e o espaço-chave específico e os nomes das tabelas:

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

Gravar no Azure Synapse analítico usando foreachBatch() em Python

streamingDF.writeStream.foreachBatch() permite que você reutilize gravadores de dados de lotes existentes para gravar a saída de uma query de transmissão para o Azure Synapse analítico. Consulte a documentação do foreachBatch para obter detalhes.

Para executar este exemplo, você precisa do conector Azure Synapse Analytics. Para obter detalhes sobre o conector Azure Synapse Analytics, consulte dadosquery em Azure Synapse Analytics.

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

transmissão-transmissão junta

Esses dois Notebook mostram como usar join de transmissão-transmissão em Python e Scala.

transmissão-transmissão junta-se ao Python Notebook

Abra o bloco de anotações em outra guia

transmissão-transmissão junta-se ao NotebookScala

Abra o bloco de anotações em outra guia