Carregar dados com Delta Live Tables

Você pode carregar dados de qualquer fonte de dados suportada pelo Apache Spark em Databricks usando Delta Live Tables. Você pode definir dataset (tabelas e view) em Delta Live Tables em qualquer query que retorne um Spark DataFrame, incluindo DataFrames e Pandas transmitidos para Spark DataFrames. Para tarefas de ingestão de dados, Databricks recomenda o uso de tabelas de transmissão para a maioria dos casos de uso. tabelas de transmissão são boas para ingerir dados do armazenamento de objetos cloud usando o Auto Loader ou de barramentos de mensagens como Kafka. Os exemplos abaixo demonstram alguns padrões comuns.

Importante

Nem todas as fontes de dados têm suporte a SQL. Você pode misturar SQL e Python Notebook em um pipeline Delta Live Tables para usar SQL para todas as operações além da ingestão.

Para obter detalhes sobre como trabalhar com biblioteca não pacote em Delta Live Tables por default, consulte gerenciar dependências de Python para o pipeline Delta Live Tables .

Carregar arquivos do armazenamento de objetos na nuvem

Databricks recomenda usar o Auto Loader com Delta Live Tables para a maioria das tarefas de ingestão de dados do armazenamento de objetos cloud . O Auto Loader e o Delta Live Tables são projetados para carregar de forma incremental e idempotente dados cada vez maiores à medida que chegam ao armazenamento em cloud . Os exemplos a seguir usam Auto Loader para criar dataset de arquivos CSV e JSON:

Observação

Para carregar arquivos com o Auto Loader em um pipeline habilitado para Unity Catalog, você deve usar locais externos. Para saber mais sobre como usar o Unity Catalog com Delta Live Tables, consulte Usar o Unity Catalog com seu pipelineDelta Live Tables.

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")

Consulte O que é o Auto Loader? e Sintaxe SQL do Auto Loader.

Aviso

Se o senhor usar Auto Loader com notificações de arquivo e executar um refresh completo para sua tabela pipeline ou de transmissão, deverá limpar manualmente o recurso. O senhor pode usar o CloudFilesResourceManager em um Notebook para realizar a limpeza.

Carregar dados de um barramento de mensagem

O senhor pode configurar o pipeline Delta Live Tables para ingerir dados de barramentos de mensagens com tabelas de transmissão. Databricks recomenda a combinação de tabelas de transmissão com execução contínua e autoescala aprimorada para fornecer a ingestão mais eficiente para o carregamento de baixa latência de barramentos de mensagens. Consulte Otimizar a utilização de cluster do pipeline Delta Live Tables com autoescala aprimorada.

Por exemplo, o código a seguir configura uma tabela transmitida para ingerir dados do Kafka:

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

Você pode escrever operações downstream em SQL puro para realizar transformações transmitidas nesses dados, como no exemplo a seguir:

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(LIVE.kafka_raw)
WHERE ...

Para obter um exemplo de como trabalhar com os Hubs de Eventos, consulte Usar os Hubs de Eventos do Azure como uma fonte de dados Delta Live Tables.

Consulte Configurar transmissão de fonte de dados.

Carregar dados de sistemas externos

Delta Live Tables suporta o carregamento de dados de qualquer fonte de dados suportada pelo site Databricks. Consulte Conectar-se à fonte de dados. O senhor também pode carregar o uso externo de dados lakehouse Federation para fontes de dados compatíveis. Como o lakehouse Federation requer o Databricks Runtime 13.3 LTS ou acima, para usar o lakehouse Federation, o pipeline deve estar configurado para usar o canal de visualização.

Algumas fontes de dados não têm suporte equivalente em SQL. Se o senhor não puder usar a Lakehouse Federation com uma dessas fontes de dados, poderá usar um notebook Python para ingerir dados da fonte. O senhor pode adicionar código-fonte Python e SQL ao mesmo pipeline do Delta Live Tables. O exemplo a seguir declara um view materializado para acessar o estado atual dos dados em uma tabela PostgreSQL remota:

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Carregue conjuntos de dados pequenos ou estáticos do armazenamento de objetos na nuvem

Você pode carregar dataset pequeno ou estático usando a sintaxe de carregamento do Apache Spark. Delta Live Tables suporta todos os formatos de arquivo suportados pelo Apache Spark no Databricks. Para obter uma lista completa, consulte Opções de formato de dados.

Os exemplos a seguir demonstram o carregamento de JSON para criar tabelas Delta Live Tables:

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

Observação

A construção SQL SELECT * FROM format.`path`; é comum a todos os ambientes SQL no Databricks. É o padrão recomendado para acesso direto a arquivos usando SQL com Delta Live Tables.

Acesse com segurança credenciais de armazenamento com segredos em um pipeline

O Databricks senhor pode usar os segredos do para armazenar credenciais, como chaves de acesso ou senhas. Para configurar o segredo em seu pipeline, use uma propriedade do Spark na configuração do cluster de configurações do pipeline. Consulte Configurar computação para um pipeline do Delta Live Tables.

O exemplo a seguir usa um segredo para armazenar um acesso key necessário para ler dados de entrada de um armazenamento Azure Data Lake Storage Gen2 (ADLS Gen2) account usando Auto Loader. O senhor pode usar esse mesmo método para configurar qualquer segredo exigido pelo seu pipeline, por exemplo, a chave AWS para acessar S3, ou a senha para um Apache Hive metastore.

Para saber mais sobre como trabalhar com o Azure Data Lake Storage Gen2, consulte Conectar-se ao Azure Data Lake Storage Gen2 e ao Blob Storage.

Observação

Você deve adicionar o prefixo spark.hadoop. à key de configuração spark_conf que define o valor do segredo.

{
    "id": "43246596-a63f-11ec-b909-0242ac120002",
    "clusters": [
      {
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        },
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5,
          "mode": "ENHANCED"
        }
      }
    ],
    "development": true,
    "continuous": false,
    "libraries": [
      {
        "notebook": {
          "path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
        }
      }
    ],
    "name": "DLT quickstart using ADLS2"
}

Substituir

  • <storage-account-name> com o nome account de armazenamento ADLS Gen2.

  • <scope-name> com o nome do Databricks Secret Scope .

  • <secret-name> com o nome da key que contém a key acesso da account de armazenamento do Azure.

import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Substituir

  • <container-name> com o nome do contêiner account de armazenamento do Azure que armazena os dados de entrada.

  • <storage-account-name> com o nome account de armazenamento ADLS Gen2.

  • <path-to-input-dataset> com o caminho para o dataset de entrada.

Carregar dados dos Hubs de Eventos do Azure

Azure O Event Hubs é um serviço de transmissão de dados que oferece uma interface compatível com Apache Kafka . O senhor pode usar o conector de transmissão estruturada Kafka, incluído no tempo de execução Delta Live Tables, para carregar mensagens de Azure Event Hubs. Para saber mais sobre como carregar e processar mensagens de Azure Event Hubs, consulte Use Azure Event Hubs as a Delta Live Tables fonte de dados.