Consultar dados de transmissão

Você pode usar o Databricks para query a fonte de dados usando transmissão estructurada. O Databricks fornece amplo suporte para cargas de trabalho de transmissão em Python e Scala e oferece suporte à maioria das funcionalidades de transmissão estruturada com SQL.

Os exemplos a seguir demonstram o uso de um coletor de memória para inspeção manual de dados de transmissão durante o desenvolvimento interativo no Notebook. Devido aos limites de saída de linha na IU Notebook , talvez você não observe todos os dados lidos pela transmissão query. Em cargas de trabalho de produção, você só deve acionar query de transmissão gravando-as em uma tabela de destino ou sistema externo.

Observação

O suporte SQL para query interativa em dados de transmissão é limitado ao Notebook rodando anexado à compute para todos os fins. Você também pode usar SQL ao declarar tabelas de transmissão com Delta Live Tables. Consulte O que são Delta Live Tables?.

Consulta de dados de sistemas de transmissão

A Databricks fornece leitores de dados de transmissão para os seguintes sistemas de transmissão:

  • Kafka

  • Kinesis

  • PubSub

  • Pulsar

Você deve fornecer detalhes de configuração ao inicializar query nesses sistemas, que variam dependendo do ambiente configurado e do sistema escolhido para leitura. Consulte Configurar transmissão de fonte de dados.

Cargas de trabalho comuns que envolvem sistemas de transmissão incluem ingestão de dados para o lakehouse e processamento de transmissão para drenar dados para sistemas externos. Para obter mais informações sobre cargas de trabalho de transmissão, consulte transmissão em Databricks.

Os exemplos a seguir demonstram uma transmissão interativa lida do Kafka:

display(spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'latest'
);

Consultar uma tabela como uma transmissão lida

Databricks cria todas as tabelas usando Delta Lake por default. Quando o senhor executa uma consulta de transmissão em uma tabela Delta, a consulta obtém automaticamente novos registros quando uma versão da tabela é confirmada. Pelo site default, as consultas de transmissão esperam que as tabelas de origem contenham apenas registros anexados. Se o senhor precisar trabalhar com dados de transmissão que contenham atualizações e exclusões, o site Databricks recomenda o uso de Delta Live Tables e APPLY CHANGES INTO. Consulte APLICAR ALTERAÇÕES em API: Simplificar a captura de dados de alterações (CDC) em Delta Live Tables.

Os exemplos a seguir demonstram a realização de uma leitura de transmissão interativa de uma tabela:

display(spark.readStream.table("table_name"))
SELECT * FROM STREAM table_name

Consulte dados no armazenamento de objetos clouds com o Auto Loader

Você pode transmitir dados do armazenamento de objetos clouds usando o Auto Loader, o conector de dados clouds do Databricks. Você pode usar o conector com arquivos armazenados em volumes do Unity Catalog ou em outros locais de armazenamento de objetos clouds . Databricks recomenda o uso de volumes para gerenciar o acesso aos dados no armazenamento de objetos clouds . Consulte Conectar-se à fonte de dados.

O Databricks otimiza esse conector para transmissão de ingestão de dados em armazenamento de objetos clouds que são armazenados em formatos populares estruturados, semiestruturados e não estruturados. A Databricks recomenda armazenar os dados ingeridos em um formato quase bruto para maximizar a taxa de transferência e minimizar a possível perda de dados devido a registros corrompidos ou alterações de esquema.

Para obter mais recomendações sobre a ingestão de dados do armazenamento de objetos em nuvens, consulte Ingestão de dados em um Databricks lakehouse.

Os exemplos a seguir demonstram uma transmissão interativa lida de um diretório de arquivos JSON em um volume:

display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))
SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')