Auto Loader ファイル通知モードとは何ですか?

ファイル通知モードでは、 Auto Loader は、入力ディレクトリからファイル イベントをサブスクライブする通知サービスとキュー サービスを自動的に設定します。 ファイル通知を使用して、1 時間に数百万のファイルを取り込むように Auto Loader をスケーリングできます。 ディレクトリリストモードと比較すると、ファイル通知モードは、大きな入力ディレクトリや大量のファイルに対してよりパフォーマンスとスケーラビリティがありますが、追加のクラウド権限が必要です。

ファイル通知とディレクトリリストをいつでも切り替えることができ、一度のデータ処理の保証は維持できます。

警告:

Auto Loaderのソース パスの変更は、ファイル通知モードではサポートされていません。ファイル通知モードが使用されていて、パスが変更された場合、ディレクトリの更新時に新しいディレクトリに既に存在するファイルの取り込みに失敗する可能性があります。

Auto Loader ファイル通知モード で使用されるクラウドリソース

重要

ファイル通知モード用にクラウドインフラストラクチャを自動的に構成するには、昇格されたアクセス許可が必要です。 クラウド管理者またはワークスペース管理者に問い合わせてください。 見る:

Auto Loader では、 cloudFiles.useNotifications オプションを [true] に設定し、クラウド リソースを作成するために必要なアクセス許可を付与すると、ファイル通知を自動的に設定できます。 さらに、これらのリソースを作成するための権限を付与するために Auto Loader 追加のオプション を提供する必要がある場合があります。

次の表は、 Auto Loaderによって作成されるリソースをまとめたものです。

クラウドストレージ

サブスクリプションサービス

キュー サービス

接頭辞*

制限**

AWS S3

AWS SNS

AWS SQS

Databricks-auto-Ingest

S3 バケットあたり 100

ADLS Gen2

Azure Event Grid

Azure Queue Storage

Databricks

ストレージ アカウントあたり 500

GCS

Google Pub/Sub

Google Pub/Sub

Databricks-auto-Ingest

GCS バケットあたり 100

Azure Blob Storage

Azure Event Grid

Azure Queue Storage

Databricks

ストレージ アカウントあたり 500

* Auto Loader は、このプレフィックスでリソースに名前を付けます。

** 起動できる並列ファイル通知パイプラインの数

特定のストレージ アカウントに対して限られた数を超えるファイル通知パイプラインを実行する必要がある場合は、次のことができます。

  • AWS Lambda、Azure Functions、Google Cloud Functions などのサービスを活用して、コンテナまたはバケット全体をリッスンする単一のキューからディレクトリ固有のキューに通知をファンアウトします。

ファイル通知イベント

AWS S3 は、ファイルがプットアップロードまたはマルチパートアップロードのどちらでアップロードされたかに関係なく、ファイルが S3 バケットにアップロードされたときに ObjectCreated イベントを提供します。

ADLS Gen2 では、Gen2 コンテナーに表示されるファイルに対してさまざまなイベント通知が提供されます。

  • Auto Loader は、ファイルを処理するための FlushWithClose イベントをリッスンします。

  • Auto Loader ストリームは、ファイルを検出するための RenameFile アクションをサポートします。 RenameFile アクションでは、名前が変更されたファイルのサイズを取得するために、ストレージ・システムへのAPIリクエストが必要です。

  • Databricks Runtime 9.0 以降で作成された Auto Loader ストリームでは、ファイルを検出するための RenameDirectory アクションがサポートされています。RenameDirectory アクションでは、名前を変更したディレクトリの内容を一覧表示するために、ストレージ・システムへのAPI要求が必要です。

Google クラウド ストレージでは、ファイルのアップロード時に上書きやファイルのコピーなどの OBJECT_FINALIZE イベントが提供されます。 アップロードに失敗しても、このイベントは生成されません。

クラウドプロバイダーは、非常にまれな条件下ですべてのファイルイベントの100%配信を保証するものではなく、ファイルイベントの待機時間に関する厳格なSLAを提供していません。 Databricks では、データの完全性が要件である場合、 cloudFiles.backfillInterval オプションを使用して定期的なバック Auto Loader フィルをトリガーし、特定の SLA 内ですべてのファイルが検出されるようにすることをお勧めします。 通常のバックフィルをトリガーしても、重複は発生しません。

ADLS Gen2 および Azure Blob ストレージ のファイル通知を構成するために必要なアクセス許可

入力ディレクトリの読み取り権限が必要です。 「 Azure Blob Storage」を参照してください。

ファイル通知モードを使用するには、イベント通知サービスの設定とアクセスのための認証資格情報を提供する必要があります。 認証にはサービスプリンシパルのみが必要です。

  • サービスプリンシパル - Azure の組み込みロールの使用

    Microsoft Entra ID (旧称 Azure Active Directory) アプリとサービス プリンシパルをクライアント ID とクライアント シークレットの形式で作成します。

    このアプリに、入力パスが存在するストレージ アカウントに次のロールを割り当てます。

    • 共同作成者: このロールは、キューやイベント サブスクリプションなど、ストレージ アカウントにリソースを設定するためのものです。

    • ストレージ キュー データ共同作成者: このロールは、キューからのメッセージの取得や削除などのキュー操作を実行するためのものです。 このロールは、接続文字列なしでサービス プリンシパルを提供する場合にのみ必要です。

    このアプリに、関連するリソース グループに次のロールを割り当てます。

    詳細については、「 Azureポータルを使用して Azure ロールを割り当てる」を参照してください。

  • サービスプリンシパル - カスタムロールの使用

    上記のロールに必要な過剰なアクセス許可が懸念される場合は、少なくとも次のアクセス許可を持つカスタム ロールを Azure ロール の JSON 形式で作成できます。

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    次に、このカスタム ロールをアプリに割り当てることができます。

    詳細については、「 Azureポータルを使用して Azure ロールを割り当てる」を参照してください。

自動ローダーのアクセス許可

一般的なエラー のトラブルシューティング

エラー:

java.lang.RuntimeException: Failed to create event grid subscription.

Auto Loader を初めて実行するときにこのエラー メッセージが表示された場合、Event Grid は Azure サブスクリプションにリソース プロバイダーとして登録されていません。これを Azure ポータルに登録するには、次のようにします。

  1. サブスクリプションに移動します。

  2. [設定] セクションの [リソース プロバイダー ] をクリックします。

  3. プロバイダー Microsoft.EventGridを登録します。

エラー:

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

Auto Loader を初めて実行するときにこのエラー メッセージが表示された場合は、Event Grid のサービスプリンシパルとストレージ アカウントに 共同作成者 ロールが付与されていることを確認してください。

AWS S3 のファイル通知を設定するために必要なアクセス許可

入力ディレクトリの読み取り権限が必要です。 詳細については、「 S3 接続の詳細 」を参照してください。

ファイル通知モードを使用するには、次の JSON ポリシードキュメントを IAM ユーザーまたはロールにアタッチします。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

ここで:

  • <bucket-name>: ストリームがファイルを読み取る S3 バケット名 ( auto-logsなど)。 * をワイルドカードとして使用できます (例: databricks-*-logs)。DBFS パスの基盤となる S3 バケットを確認するには、 %fs mountsを実行してノートブック内のすべての DBFS マウントポイントを一覧表示できます。

  • <region>: S3 バケットが存在する AWS リージョン ( us-west-2など)。 リージョンを指定しない場合は、 *を使用します。

  • <account-number>: S3 バケットを所有する AWS アカウント番号 ( 123456789012など)。 アカウント番号を指定しない場合は、 *を使用します。

SQS および SNS ARN 仕様の文字列 databricks-auto-ingest-* は、 cloudFiles ソースが SQS および SNS サービスを作成するときに使用する名前プレフィックスです。 Databricks はストリームの初期実行時に通知サービスを設定するため、最初の実行後にアクセス許可が制限されたポリシーを使用できます (たとえば、ストリームを停止してから再起動します)。

前述のポリシーは、ファイル通知サービス (S3 バケット通知、SNS、および SQS サービス) の設定に必要なアクセス許可にのみ関係しており、S3 バケットへの読み取りアクセス権がすでにあることを前提としています。 S3 読み取り専用アクセス許可を追加する必要がある場合は、JSON ドキュメントの DatabricksAutoLoaderSetup ステートメントの Action リストに以下を追加します。

  • s3:ListBucket

  • s3:GetObject

初期セットアップ 後のアクセス許可の制限

上記のリソース設定のアクセス許可は、ストリームの初期実行時にのみ必要です。 最初の実行後、アクセス許可が制限された次の IAM ポリシーに切り替えることができます。

重要

アクセス許可が制限されているため、新しいストリーミング クエリーを開始したり、障害が発生した場合にリソースを再作成したりすることはできません (たとえば、SQS キューが誤って削除された場合)。また、クラウド リソース管理 API を使用してリソースを一覧表示または破棄することもできません。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

GCS のファイル通知を設定するために必要な権限

GCS バケットとすべてのオブジェクトに対する listget のアクセス許可が必要です。 詳細については、 IAM アクセス許可に関する Google のドキュメントを参照してください。

ファイル通知モードを使用するには、 GCS サービス アカウント と、Google Cloud Pub/Sub リソースへのアクセスに使用するアカウントの権限を追加する必要があります。

Pub/Sub Publisher ロールを GCS サービス アカウントに追加します。これにより、アカウントは GCS バケットから Google Cloud Pub/Sub にイベント通知メッセージを発行できます。

Google Cloud Pub/Sub リソースに使用されるサービス アカウントについては、次の権限を追加する必要があります。

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

これを行うには、これらの権限を持つ IAM カスタムロールを作成する か、これらの権限をカバーする 既存の GCP ロール を割り当てます。

GCS サービスアカウントの 検索

対応するプロジェクトの Google クラウド コンソールで、[ Cloud Storage > Settings] に移動します。 「クラウドストレージサービスアカウント」セクションには、GCSサービスアカウントのEメールが含まれています。

GCS サービス アカウント

ファイル通知モード 用のカスタム Google クラウド IAMロールの作成

対応するプロジェクトの Google クラウド コンソールで、[ IAM & Admin > Roles] に移動します。 次に、上部にロールを作成するか、既存のロールを更新します。 ロールの作成または編集画面で、[ Add Permissions] をクリックします。 必要な権限をロールに追加できるメニューが表示されます。

GCP IAM カスタムロール

ファイル通知リソース を手動で構成または管理する

特権ユーザーは、ファイル通知リソースを手動で構成または管理できます。

  • クラウドプロバイダーを介してファイル通知サービスを手動で設定し、キュー識別子を手動で指定します。 詳細については、「 ファイル通知オプション 」を参照してください。

  • 次の例に示すように、Scala APIs を使用して、通知サービスとキューイング サービスを作成または管理します。

クラウドインフラストラクチャを構成または変更するための適切な権限が必要です。 AzureS3、または GCS のアクセス許可に関するドキュメントを参照してください。

# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds

# COMMAND ----------

#####################################
## Creating a ResourceManager in AWS
#####################################

manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in Azure
#######################################

manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices())

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by <AL>
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

setUpNotificationServices(<resource-suffix>) を使用して、 <prefix>-<resource-suffix> という名前のキューとサブスクリプションを作成します (プレフィックスは、「 Auto Loader ファイル通知モードで使用されるクラウド リソース」にまとめられているストレージ システムによって異なります)。同じ名前の既存のリソースがある場合、Databricks は新しいリソースを作成するのではなく、既存のリソースを再利用します。 この関数は、 ファイル通知オプションの識別子を使用して cloudFiles ソースに渡すことができるキュー識別子を返します。これにより、 cloudFiles ソース ユーザーは、リソースを作成したユーザーよりも少ないアクセス許可を持つことができます。

setUpNotificationServicesを呼び出す場合にのみ newManager する "path" オプションを提供します。 listNotificationServicestearDownNotificationServicesには必要ありません。これは、ストリーミングクエリーを実行するときに使用する path と同じです。

次のマトリックスは、ストレージの種類ごとに、どの API メソッドがどの Databricks Runtime でサポートされているかを示しています。

クラウドストレージ

セットアップ API

リスト API

ティアダウンAPI

AWS S3

すべてのバージョン

すべてのバージョン

すべてのバージョン

ADLS Gen2

すべてのバージョン

すべてのバージョン

すべてのバージョン

GCS

Databricks Runtime 9.1 以降

Databricks Runtime 9.1 以降

Databricks Runtime 9.1 以降

Azure Blob Storage

すべてのバージョン

すべてのバージョン

すべてのバージョン

ADLS Gen1

サポート

サポート

サポート