O que é o modo de notificação de arquivo Auto Loader ?

No modo de notificação de arquivo, o Auto Loader configura automaticamente um serviço de notificação e um serviço de fila que se inscreve em eventos de arquivo do diretório de entrada. Você pode usar notificações de arquivo para escalar o Auto Loader para ingerir milhões de arquivos por hora. Quando comparado ao modo de listagem de diretórios, o modo de notificação de arquivo é mais eficaz e escalável para grandes diretórios de entrada ou um grande volume de arquivos, mas requer permissões cloud adicionais.

Você pode alternar entre notificações de arquivo e listagem de diretório a qualquer momento e ainda manter garantias de processamento de dados exatamente uma vez.

Aviso

A alteração do caminho de origem do Auto Loader não é suportada no modo de notificação de arquivo. Se o modo de notificação de arquivo for usado e o caminho for alterado, você poderá falhar ao ingerir arquivos que já estão presentes no novo diretório no momento da atualização do diretório.

Recurso de nuvem usado no modo de notificação de arquivo Auto Loader

Importante

Você precisa de permissões elevadas para configurar automaticamente a infraestrutura cloud para o modo de notificação de arquivo. Entre em contato com o administrador cloud ou administrador workspace . Ver:

O Auto Loader pode configurar notificações de arquivo para você automaticamente quando você define a opção cloudFiles.useNotifications como true e fornece as permissões necessárias para criar recursos cloud . Além disso, pode ser necessário fornecer opções adicionais para conceder autorização ao Auto Loader para criar esses recursos.

A tabela a seguir resume quais recursos são criados pelo Auto Loader.

armazenamento cloud

serviço de inscrição

serviço de fila

Prefixo *

Limite **

AWS S3

AWS SNS

AWS SQS

databricks-auto-ingestão

100 por balde S3

ADLS Gen2

Grade de Eventos do Azure

Armazenamento de filas do Azure

Databricks

500 por accountde armazenamento

GCS

Google Pub/Sub

Google Pub/Sub

databricks-auto-ingestão

100 por intervalo GCS

Armazenamento Azure Blob

Grade de Eventos do Azure

Armazenamento de filas do Azure

Databricks

500 por accountde armazenamento

* Auto Loader nomeia os recursos com este prefixo.

** Quantos pipelines de notificação de arquivos concorrentes podem ser iniciados

Se você precisar executar mais do que o número limitado de pipelines de notificação de arquivo para uma determinada account de armazenamento, poderá:

  • Aproveite um serviço como AWS Lambda, Azure Functions ou Google cloud Functions para distribuir notificações de uma única fila que escuta um contêiner ou balde inteiro em filas específicas de diretório.

Eventos de notificação de arquivo

O AWS S3 fornece um evento ObjectCreated quando um arquivo é carregado em um bucket do S3, independentemente de ter sido carregado por um put ou multi-part upload.

ADLS Gen2 fornece diferentes notificações de eventos para arquivos que aparecem em seu contêiner Gen2.

  • O Auto Loader escuta o evento FlushWithClose para processar um arquivo.

  • A transmissão Auto Loader suporta a ação RenameFile para descoberta de arquivos. As ações RenameFile exigem uma solicitação de API ao sistema de armazenamento para obter o tamanho do arquivo renomeado.

  • A transmissão Auto Loader criada com Databricks Runtime 9.0 e posterior suporta a ação RenameDirectory para descobrir arquivos. As ações RenameDirectory requerem solicitações de API para o sistema de armazenamento para listar o conteúdo do diretório renomeado.

O armazenamento cloud do Google fornece um evento OBJECT_FINALIZE quando um arquivo é upload, que inclui substituições e cópias de arquivos. upload com falha não gera este evento.

Observação

os provedores cloud não garantem a entrega de 100% de todos os eventos de arquivo em condições muito raras e não fornecem SLAs rígidos sobre a latência dos eventos de arquivo. A Databricks recomenda que você dispare preenchimentos regulares com o Auto Loader usando a opção cloudFiles.backfillInterval para garantir que todos os arquivos sejam descobertos em um determinado SLA se a integridade dos dados for um requisito. Acionar preenchimentos regulares não causa duplicatas.

Permissões necessárias para configurar a notificação de arquivo para ADLS Gen2 e Azure Blob Storage

Você deve ter permissões de leitura para o diretório de entrada. Consulte ArmazenamentoAzure Blob .

Para usar o modo de notificação de arquivo, o senhor deve fornecer credenciais de autenticação para configurar e acessar o serviço de notificação de eventos. O senhor só precisa de uma entidade de serviço para autenticação.

  • entidade de serviço - usando funções integradas do Azure

    Crie um aplicativo e uma entidade de serviço do Microsoft Entra ID (antigo Azure Active Directory) na forma de ID do cliente e segredo do cliente.

    Atribua a este aplicativo as seguintes funções à account de armazenamento na qual reside o caminho de entrada:

    • Contribuidor: esta função é para configurar recursos em sua conta de armazenamento, como filas e inscrição de eventos.

    • Colaborador de dados da fila de armazenamento: Essa função é para executar operações de fila, como recuperar e excluir mensagens das filas. Essa função é necessária somente quando o senhor fornece uma entidade de serviço sem uma string de conexão.

    Atribua a este aplicativo a seguinte função ao grupo de recursos relacionado:

    Para obter mais informações, consulte Atribuir funções do Azure usando o portal do Azure.

  • entidade de serviço - usando função personalizada

    Se estiver preocupado com as permissões excessivas necessárias para as funções anteriores, pode criar uma função personalizada com pelo menos as seguintes permissões, listadas abaixo no formato JSON da função Azure:

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    Em seguida, você pode atribuir essa função personalizada ao seu aplicativo.

    Para obter mais informações, consulte Atribuir funções do Azure usando o portal do Azure.

Permissões do carregador automático

Solução de problemas de erros comuns

Erro:

java.lang.RuntimeException: Failed to create event grid subscription.

Se você vir esta mensagem de erro ao executar Auto Loader pela primeira vez, a Grade de Eventos não está registrada como um Provedor de Recursos em sua inscrição no Azure. Para registrar isso no portal do Azure:

  1. Aceda à sua inscrição.

  2. Clique em Provedores de recursos na seção Configurações.

  3. registrar o provedor Microsoft.EventGrid.

Erro:

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

Se você vir esta mensagem de erro ao executar Auto Loader pela primeira vez, certifique-se de ter atribuído a função de Colaborador à sua entidade de serviço para a Grade de Eventos, bem como à sua account de armazenamento.

Permissões necessárias para configurar a notificação de arquivo para AWS S3

Você deve ter permissões de leitura para o diretório de entrada. Consulte detalhes da conexão S3 para obter mais detalhes.

Para usar o modo de notificação de arquivo, anexe o seguinte documento de política JSON ao seu usuário ou função do IAM.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

onde:

  • <bucket-name>: O nome do bucket S3 onde sua transmissão irá ler os arquivos, por exemplo, auto-logs. Você pode usar * como curinga, por exemplo, databricks-*-logs. Para descobrir o bucket S3 subjacente para seu caminho DBFS, você pode listar todos os pontos de montagem DBFS em um Notebook executando %fs mounts.

  • <region>: a região da AWS em que o bucket do S3 reside, por exemplo, us-west-2. Se você não deseja especificar a região, use *.

  • <account-number>: o número account da AWS que possui o bucket S3, por exemplo, 123456789012. Se não quiser especificar o número account , use *.

As strings databricks-auto-ingest-* na especificação SQS e SNS ARN são o prefixo de nome que a fonte cloudFiles usa ao criar serviços SQS e SNS. Como o Databricks configura os serviços de notificação na execução inicial da transmissão, você pode usar uma política com permissões reduzidas após a execução inicial (por exemplo, interromper a transmissão e reiniciá-la).

Observação

A política anterior se preocupa apenas com as permissões necessárias para configurar serviços de notificação de arquivo, ou seja, notificação de bucket S3, SNS e serviços SQS e pressupõe que você já tenha acesso de leitura ao bucket S3. Se você precisar adicionar permissões somente leitura do S3, adicione o seguinte à lista Action na instrução DatabricksAutoLoaderSetup no documento JSON:

  • s3:ListBucket

  • s3:GetObject

Permissões reduzidas após a configuração inicial

As permissões de configuração de recursos descritas acima são necessárias apenas durante a execução inicial da transmissão. Após a primeira execução, você pode alternar para a seguinte política do IAM com permissões reduzidas.

Importante

Com as permissões reduzidas, não é possível iniciar nova query de transmissão ou recriar recursos em caso de falhas (por exemplo, a fila do SQS foi deletada acidentalmente); você também não pode usar a API de gerenciamento de recursos cloud para listar ou desmontar recursos.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

Permissões necessárias para configurar a notificação de arquivo para GCS

Você deve ter as permissões list e get em seu bucket GCS e em todos os objetos. Para obter detalhes, consulte a documentação do Google sobre permissões IAM.

Para usar o modo de notificação de arquivo, você precisa adicionar permissões para a conta de serviço GCS e a conta usada para acessar os recursos do Google cloud Pub/Sub.

Adicione a função Pub/Sub Publisher à account de serviço GCS. Isso permite que a account publique mensagens de notificação de eventos de seus intervalos GCS no Google cloud Pub/Sub.

Quanto à account de serviço usada para os recursos Google cloud Pub/Sub, você precisa adicionar as seguintes permissões:

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

Para fazer isso, você pode criar uma função personalizada do IAM com essas permissões ou atribuir funções pré-existentes do GCP para abranger essas permissões.

Encontrar a conta de serviço do GCS

No Console do Google cloud para o projeto correspondente, navegue até Cloud Storage > Settings. A seção “cloud Storage Service account” contém o email da account do serviço GCS.

accountde serviço do GCS

Criação de uma IAM role para o modo de notificação de arquivo

No console cloud do Google para o projeto correspondente, navegue até IAM & Admin > Roles. Em seguida, crie uma função na parte superior ou atualize uma função existente. Na tela de criação ou edição de função, clique em Add Permissions. Aparece um menu no qual você pode adicionar as permissões desejadas à função.

Papéis personalizados de IAM do GCP

Configurar ou gerenciar manualmente recursos de notificação de arquivo

Os usuários privilegiados podem configurar ou gerenciar manualmente os recursos de notificação de arquivo.

  • Configure os serviços de notificação de arquivo manualmente por meio do provedor cloud e especifique manualmente o identificador de fila. Consulte Opções de notificação de arquivo para obter mais detalhes.

  • Utilize APIs Scala para criar ou gerenciar o serviço de notificações e enfileiramento, conforme exemplo a seguir:

Observação

Você deve ter permissões apropriadas para configurar ou modificar a infraestrutura cloud . Consulte a documentação de permissões para Azure, S3 ou GCS.

# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds

# COMMAND ----------

#####################################
## Creating a ResourceManager in AWS
#####################################

manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in Azure
#######################################

manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices())

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by <AL>
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Use setUpNotificationServices(<resource-suffix>) para criar uma fila e uma inscrição com o nome <prefix>-<resource-suffix> (o prefixo depende do sistema de armazenamento resumido no recursocloud usado no modo de notificação de arquivo Auto Loader . Se houver um recurso existente com o mesmo nome, o Databricks reutilizará o recurso existente em vez de criar um novo. Esta função retorna um identificador de fila que você pode passar para a fonte cloudFiles usando o identificador em Opções de notificação de arquivo. Isso permite que o usuário de origem cloudFiles tenha menos permissões do que o usuário que cria os recursos.

Forneça a opção "path" para newManager somente se chamar setUpNotificationServices; não é necessário para listNotificationServices ou tearDownNotificationServices. Este é o mesmo path que você usa ao executar uma query de transmissão.

A matriz a seguir indica quais métodos de API têm suporte em qual Databricks Runtime para cada tipo de armazenamento:

armazenamento cloud

API de configuração

API de lista

Derrubar API

AWS S3

Todas versões

Todas versões

Todas versões

ADLS Gen2

Todas versões

Todas versões

Todas versões

GCS

Databricks Runtime 9.1e acima

Databricks Runtime 9.1e acima

Databricks Runtime 9.1e acima

Armazenamento Azure Blob

Todas versões

Todas versões

Todas versões

ADLS Gen1

sem suporte

sem suporte

sem suporte