Use os Hubs de Eventos do Azure como uma fonte de dados Delta Live Tables

Este artigo explica como usar Delta Live Tables para processar mensagens dos Hubs de Eventos do Azure. Você não pode usar o conector de hubs de eventos estruturados de transmissão porque esta biblioteca não está disponível como parte do Databricks Runtime, e o Delta Live Tables não permite que você use bibliotecas JVM de terceiros.

Como o Delta Live Tables pode se conectar aos Hubs de Eventos do Azure?

Os Hubs de Eventos do Azure fornecem um endpoint compatível com Apache Kafka que você pode usar com o conector Kafka estructurada de transmissão, disponível no Databricks Runtime, para processar mensagens dos Hubs de Eventos do Azure. Para obter mais informações sobre a compatibilidade dos hubs de eventos do Azure e do Apache Kafka, consulte usar os hubs de eventos do Azure em aplicativos Apache Kafka.

As passos a seguir descrevem a conexão de um pipeline Delta Live Tables a uma instância de hubs de eventos existente e o consumo de eventos de um tópico. Para concluir essas passos, você precisa dos seguintes valores de conexão dos Hubs de Eventos:

  • O nome do namespace dos Hubs de Eventos.

  • O nome da instância do Hub de Eventos no namespace dos Hubs de Eventos.

  • Um nome de política de acesso compartilhado e key de política para Hubs de Eventos. Por default, uma política RootManageSharedAccessKey é criada para cada namespace dos Hubs de Eventos. Esta política tem permissões manage, send e listen . Se o seu pipeline lê apenas dos Hubs de Eventos, o Databricks recomenda criar uma nova política apenas com permissão de escuta.

Para obter mais informações sobre as strings de conexão dos Hubs de Eventos, consulte Obter stringsde conexão de Hubs de Eventos.

Observação

  • Os hubs de eventos do Azure fornecem opções OAuth 2.0 e assinatura de acesso compartilhado (SAS) para autorizar o acesso aos seus recursos seguros. Estas instruções usam autenticação baseada em SAS.

  • Se você obtiver as strings de conexão dos Hubs de Eventos no portal do Azure, elas podem não conter o valor EntityPath. O valor EntityPath é necessário apenas ao usar o conector do hub de eventos estruturado de transmissão. O uso do Conector Kafka estruturado de transmissão requer o fornecimento apenas do nome do tópico.

Armazene a chave de política em um segredo do Databricks

Como a key de política é uma informação confidencial, Databricks recomenda não codificar o valor em seu código de pipeline. Em vez disso, use os segredos do Databricks para armazenar e gerenciar o acesso à key.

O exemplo a seguir usa a CLI do Databricks para criar um Secret Scope e armazenar a key nesse Secret Scope. No código do pipeline, use a função dbutils.secrets.get() com scope-name e shared-policy-name para recuperar o valor da key .

databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

Para obter mais informações sobre os segredos do Databricks, consulte Gerenciamento de segredos.

Crie um Notebook e adicione o código do pipeline para consumir eventos

O exemplo a seguir lê eventos IoT de um tópico, mas você pode adaptar o exemplo para os requisitos de seu aplicativo. Como prática recomendada, Databricks recomenda usar as configurações de pipeline Delta Live Tables para configurar variáveis de aplicativo. Seu código de pipeline usa a função spark.conf.get() para recuperar valores. Para obter mais informações sobre como usar configurações de pipeline para parametrizar seu pipeline, consulte Parameterize pipelines.

import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME                         = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp  BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dlt.create_table(
  comment="Raw IOT Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

Criar o pipeline

Crie um novo pipeline com as configurações a seguir, substituindo os valores de espaço reservado pelos valores apropriados para seu ambiente.

{
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "num_workers": 4
    }
  ],
  "development": true,
  "continuous": false,
  "channel": "CURRENT",
  "edition": "ADVANCED",
  "photon": false,
  "libraries": [
    {
      "notebook": {
        "path": "<path-to-notebook>"
      }
    }
  ],
  "name": "dlt_eventhub_ingestion_using_kafka",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
  "configuration": {
    "iot.ingestion.eh.namespace": "<eh-namespace>",
    "iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
    "iot.ingestion.eh.name": "<eventhub>",
    "io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
    "iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
    "iot.ingestion.spark.startingOffsets": "latest",
    "iot.ingestion.spark.failOnDataLoss": "false",
    "iot.ingestion.kafka.requestTimeout": "60000",
    "iot.ingestion.kafka.sessionTimeout": "30000"
  },
  "target": "<target-database-name>"
}

Substituir

  • <container-name> com o nome de um contêiner account de armazenamento do Azure.

  • <storage-account-name> com o nome de uma account de armazenamento ADLS Gen2.

  • <eh-namespace> com o nome do namespace dos Hubs de Eventos.

  • <eh-policy-name> com a key Secret Scope para a key de política dos Hubs de Eventos.

  • <eventhub> com o nome de sua instância de Hubs de Eventos.

  • <secret-scope-name> com o nome do Databricks Secret Scope que contém a key de política dos Hubs de Eventos.

Como prática recomendada, esse pipeline não usa o caminho de armazenamento DBFS default , mas usa uma account de armazenamento do Azure data lake Storage Gen2 (ADLS Gen2). Para obter mais informações sobre como configurar a autenticação para uma account de armazenamento ADLS Gen2, consulte Acessar credenciais de armazenamento com segurança com segredos em um pipeline.