Use os Hubs de Eventos do Azure como uma fonte de dados Delta Live Tables

Este artigo explica como usar Delta Live Tables para processar mensagens dos Hubs de Eventos do Azure. Você não pode usar o conector de hubs de eventos estruturados de transmissão porque esta biblioteca não está disponível como parte do Databricks Runtime, e o Delta Live Tables não permite que você use bibliotecas JVM de terceiros.

Como o Delta Live Tables pode se conectar aos Hubs de Eventos do Azure?

Os Hubs de Eventos do Azure fornecem um endpoint compatível com Apache Kafka que você pode usar com o conector Kafka estructurada de transmissão, disponível no Databricks Runtime, para processar mensagens dos Hubs de Eventos do Azure. Para obter mais informações sobre a compatibilidade dos hubs de eventos do Azure e do Apache Kafka, consulte usar os hubs de eventos do Azure em aplicativos Apache Kafka.

As passos a seguir descrevem a conexão de um pipeline Delta Live Tables a uma instância de hubs de eventos existente e o consumo de eventos de um tópico. Para concluir essas passos, você precisa dos seguintes valores de conexão dos Hubs de Eventos:

  • O nome do namespace dos Hubs de Eventos.

  • O nome da instância do Hub de Eventos no namespace dos Hubs de Eventos.

  • Um nome de política de acesso compartilhado e key de política para Hubs de Eventos. Por default, uma política RootManageSharedAccessKey é criada para cada namespace dos Hubs de Eventos. Esta política tem permissões manage, send e listen . Se o seu pipeline lê apenas dos Hubs de Eventos, o Databricks recomenda criar uma nova política apenas com permissão de escuta.

Para obter mais informações sobre as strings de conexão dos Hubs de Eventos, consulte Obter stringsde conexão de Hubs de Eventos.

Observação

  • Os hubs de eventos do Azure fornecem opções OAuth 2.0 e assinatura de acesso compartilhado (SAS) para autorizar o acesso aos seus recursos seguros. Estas instruções usam autenticação baseada em SAS.

  • Se o senhor obtiver as cadeias de conexão dos Event Hubs no portal Azure, elas poderão não conter o valor EntityPath. O valor EntityPath é necessário somente ao usar o conector do Event Hubs de transmissão estruturada. Para usar o conector de transmissão estruturada Kafka, é necessário fornecer apenas o nome do tópico.

Armazene a chave de política em um segredo do Databricks

Como a key de política é uma informação confidencial, Databricks recomenda não codificar o valor em seu código de pipeline. Em vez disso, use os segredos do Databricks para armazenar e gerenciar o acesso à key.

O exemplo a seguir usa a CLI do Databricks para criar um Secret Scope e armazenar a key nesse Secret Scope. No código do pipeline, use a função dbutils.secrets.get() com scope-name e shared-policy-name para recuperar o valor da key .

databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

Para obter mais informações sobre os segredos do Databricks, consulte Gerenciamento de segredos.

Crie um Notebook e adicione o código do pipeline para consumir eventos

O exemplo a seguir lê eventos de IoT de um tópico, mas o senhor pode adaptar o exemplo aos requisitos do seu aplicativo. Como prática recomendada, a Databricks recomenda o uso das configurações do pipeline do Delta Live Tables para configurar as variáveis do aplicativo. Seu código de pipeline usa a função spark.conf.get() para recuperar valores. Para obter mais informações sobre como usar as configurações do pipeline para parametrizar o pipeline, consulte Usar parâmetros com o pipeline do Delta Live Tables .

import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME                         = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp  BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dlt.create_table(
  comment="Raw IOT Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

Criar o pipeline

Crie um novo pipeline com as configurações a seguir, substituindo os valores de espaço reservado pelos valores apropriados para seu ambiente.

{
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "num_workers": 4
    }
  ],
  "development": true,
  "continuous": false,
  "channel": "CURRENT",
  "edition": "ADVANCED",
  "photon": false,
  "libraries": [
    {
      "notebook": {
        "path": "<path-to-notebook>"
      }
    }
  ],
  "name": "dlt_eventhub_ingestion_using_kafka",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
  "configuration": {
    "iot.ingestion.eh.namespace": "<eh-namespace>",
    "iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
    "iot.ingestion.eh.name": "<eventhub>",
    "io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
    "iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
    "iot.ingestion.spark.startingOffsets": "latest",
    "iot.ingestion.spark.failOnDataLoss": "false",
    "iot.ingestion.kafka.requestTimeout": "60000",
    "iot.ingestion.kafka.sessionTimeout": "30000"
  },
  "target": "<target-database-name>"
}

Substituir

  • <container-name> com o nome de um contêiner account de armazenamento do Azure.

  • <storage-account-name> com o nome de uma account de armazenamento ADLS Gen2.

  • <eh-namespace> com o nome do namespace dos Hubs de Eventos.

  • <eh-policy-name> com a key Secret Scope para a key de política dos Hubs de Eventos.

  • <eventhub> com o nome de sua instância de Hubs de Eventos.

  • <secret-scope-name> com o nome do Databricks Secret Scope que contém a key de política dos Hubs de Eventos.

Como prática recomendada, esse pipeline não usa o caminho de armazenamento DBFS default , mas usa uma account de armazenamento do Azure data lake Storage Gen2 (ADLS Gen2). Para obter mais informações sobre como configurar a autenticação para uma account de armazenamento ADLS Gen2, consulte Acessar credenciais de armazenamento com segurança com segredos em um pipeline.