Auto Loader ファイル通知モードとは何ですか?
ファイル通知モードでは、 Auto Loader は、入力ディレクトリからファイル イベントをサブスクライブする通知サービスとキュー サービスを自動的に設定します。 ファイル通知を使用して、1 時間に数百万のファイルを取り込むように Auto Loader をスケーリングできます。 ディレクトリリストモードと比較すると、ファイル通知モードは、大きな入力ディレクトリや大量のファイルに対してよりパフォーマンスとスケーラビリティがありますが、追加のクラウド権限が必要です。
ファイル通知とディレクトリリストをいつでも切り替えることができ、一度のデータ処理の保証は維持できます。
警告:
Auto Loaderのソース パスの変更は、ファイル通知モードではサポートされていません。ファイル通知モードが使用されていて、パスが変更された場合、ディレクトリの更新時に新しいディレクトリに既に存在するファイルの取り込みに失敗する可能性があります。
Auto Loader ファイル通知モード で使用されるクラウドリソース
重要
ファイル通知モード用にクラウドインフラストラクチャを自動的に構成するには、昇格されたアクセス許可が必要です。 クラウド管理者またはワークスペース管理者に問い合わせてください。 見る:
Auto Loader では、 cloudFiles.useNotifications
オプションを [true
] に設定し、クラウド リソースを作成するために必要なアクセス許可を付与すると、ファイル通知を自動的に設定できます。 さらに、これらのリソースを作成するための権限を付与するために Auto Loader 追加のオプション を提供する必要がある場合があります。
次の表は、 Auto Loaderによって作成されるリソースをまとめたものです。
クラウドストレージ |
サブスクリプションサービス |
キュー サービス |
接頭辞* |
制限** |
---|---|---|---|---|
AWS S3 |
AWS SNS |
AWS SQS |
Databricks-auto-Ingest |
S3 バケットあたり 100 |
ADLS Gen2 |
Azure Event Grid |
Azure Queue Storage |
Databricks |
ストレージ アカウントあたり 500 |
GCS |
Google Pub/Sub |
Google Pub/Sub |
Databricks-auto-Ingest |
GCS バケットあたり 100 |
Azure Blob Storage |
Azure Event Grid |
Azure Queue Storage |
Databricks |
ストレージ アカウントあたり 500 |
* Auto Loader は、このプレフィックスでリソースに名前を付けます。
** 起動できる並列ファイル通知パイプラインの数
特定のストレージ アカウントに対して限られた数を超えるファイル通知パイプラインを実行する必要がある場合は、次のことができます。
AWS Lambda、Azure Functions、Google Cloud Functions などのサービスを活用して、コンテナまたはバケット全体をリッスンする単一のキューからディレクトリ固有のキューに通知をファンアウトします。
ファイル通知イベント
AWS S3 は、ファイルがプットアップロードまたはマルチパートアップロードのどちらでアップロードされたかに関係なく、ファイルが S3 バケットにアップロードされたときに ObjectCreated
イベントを提供します。
ADLS Gen2 では、Gen2 コンテナーに表示されるファイルに対してさまざまなイベント通知が提供されます。
Auto Loader は、ファイルを処理するための
FlushWithClose
イベントをリッスンします。Auto Loader ストリームは、ファイルを検出するための
RenameFile
アクションをサポートします。RenameFile
アクションでは、名前が変更されたファイルのサイズを取得するために、ストレージ・システムへのAPIリクエストが必要です。Databricks Runtime 9.0 以降で作成された Auto Loader ストリームでは、ファイルを検出するための
RenameDirectory
アクションがサポートされています。RenameDirectory
アクションでは、名前を変更したディレクトリの内容を一覧表示するために、ストレージ・システムへのAPI要求が必要です。
Google クラウド ストレージでは、ファイルのアップロード時に上書きやファイルのコピーなどの OBJECT_FINALIZE
イベントが提供されます。 アップロードに失敗しても、このイベントは生成されません。
注
クラウドプロバイダーは、非常にまれな条件下ですべてのファイルイベントの100%配信を保証するものではなく、ファイルイベントの待機時間に関する厳格なSLAを提供していません。 Databricks では、データの完全性が要件である場合、 cloudFiles.backfillInterval
オプションを使用して定期的なバック Auto Loader フィルをトリガーし、特定の SLA 内ですべてのファイルが検出されるようにすることをお勧めします。 通常のバックフィルをトリガーしても、重複は発生しません。
ADLS Gen2 および Azure Blob ストレージ のファイル通知を構成するために必要なアクセス許可
入力ディレクトリの読み取り権限が必要です。 「 Azure Blob Storage」を参照してください。
ファイル通知モードを使用するには、イベント通知サービスの設定とアクセスのための認証資格情報を提供する必要があります。 認証にはサービスプリンシパルのみが必要です。
サービスプリンシパル - Azure の組み込みロールの使用
Microsoft Entra ID (旧称 Azure Active Directory) アプリとサービス プリンシパルをクライアント ID とクライアント シークレットの形式で作成します。
このアプリに、入力パスが存在するストレージ アカウントに次のロールを割り当てます。
共同作成者: このロールは、キューやイベント サブスクリプションなど、ストレージ アカウントにリソースを設定するためのものです。
ストレージ キュー データ共同作成者: このロールは、キューからのメッセージの取得や削除などのキュー操作を実行するためのものです。 このロールは、接続文字列なしでサービス プリンシパルを提供する場合にのみ必要です。
このアプリに、関連するリソース グループに次のロールを割り当てます。
EventGrid イベント サブスクリプション共同作成者: このロールは、イベント サブスクリプションの作成や一覧表示などのイベント グリッド サブスクリプション操作を実行するためのものです。
詳細については、「 Azureポータルを使用して Azure ロールを割り当てる」を参照してください。
サービスプリンシパル - カスタムロールの使用
上記のロールに必要な過剰なアクセス許可が懸念される場合は、少なくとも次のアクセス許可を持つカスタム ロールを Azure ロール の JSON 形式で作成できます。
"permissions": [ { "actions": [ "Microsoft.EventGrid/eventSubscriptions/write", "Microsoft.EventGrid/eventSubscriptions/read", "Microsoft.EventGrid/eventSubscriptions/delete", "Microsoft.EventGrid/locations/eventSubscriptions/read", "Microsoft.Storage/storageAccounts/read", "Microsoft.Storage/storageAccounts/write", "Microsoft.Storage/storageAccounts/queueServices/read", "Microsoft.Storage/storageAccounts/queueServices/write", "Microsoft.Storage/storageAccounts/queueServices/queues/write", "Microsoft.Storage/storageAccounts/queueServices/queues/read", "Microsoft.Storage/storageAccounts/queueServices/queues/delete" ], "notActions": [], "dataActions": [ "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action" ], "notDataActions": [] } ]
次に、このカスタム ロールをアプリに割り当てることができます。
詳細については、「 Azureポータルを使用して Azure ロールを割り当てる」を参照してください。
一般的なエラー のトラブルシューティング
エラー:
java.lang.RuntimeException: Failed to create event grid subscription.
Auto Loader を初めて実行するときにこのエラー メッセージが表示された場合、Event Grid は Azure サブスクリプションにリソース プロバイダーとして登録されていません。これを Azure ポータルに登録するには、次のようにします。
サブスクリプションに移動します。
[設定] セクションの [リソース プロバイダー ] をクリックします。
プロバイダー
Microsoft.EventGrid
を登録します。
エラー:
403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...
Auto Loader を初めて実行するときにこのエラー メッセージが表示された場合は、Event Grid のサービスプリンシパルとストレージ アカウントに 共同作成者 ロールが付与されていることを確認してください。
AWS S3 のファイル通知を設定するために必要なアクセス許可
入力ディレクトリの読み取り権限が必要です。 詳細については、「 S3 接続の詳細 」を参照してください。
ファイル通知モードを使用するには、次の JSON ポリシードキュメントを IAM ユーザーまたはロールにアタッチします。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderSetup",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"s3:PutBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:SetTopicAttributes",
"sns:CreateTopic",
"sns:TagResource",
"sns:Publish",
"sns:Subscribe",
"sqs:CreateQueue",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:SetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility"
],
"Resource": [
"arn:aws:s3:::<bucket-name>",
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
},
{
"Sid": "DatabricksAutoLoaderList",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "*"
},
{
"Sid": "DatabricksAutoLoaderTeardown",
"Effect": "Allow",
"Action": [
"sns:Unsubscribe",
"sns:DeleteTopic",
"sqs:DeleteQueue"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
}
]
}
ここで:
<bucket-name>
: ストリームがファイルを読み取る S3 バケット名 (auto-logs
など)。*
をワイルドカードとして使用できます (例:databricks-*-logs
)。DBFS パスの基盤となる S3 バケットを確認するには、%fs mounts
を実行してノートブック内のすべての DBFS マウントポイントを一覧表示できます。<region>
: S3 バケットが存在する AWS リージョン (us-west-2
など)。 リージョンを指定しない場合は、*
を使用します。<account-number>
: S3 バケットを所有する AWS アカウント番号 (123456789012
など)。 アカウント番号を指定しない場合は、*
を使用します。
SQS および SNS ARN 仕様の文字列 databricks-auto-ingest-*
は、 cloudFiles
ソースが SQS および SNS サービスを作成するときに使用する名前プレフィックスです。 Databricks はストリームの初期実行時に通知サービスを設定するため、最初の実行後にアクセス許可が制限されたポリシーを使用できます (たとえば、ストリームを停止してから再起動します)。
注
前述のポリシーは、ファイル通知サービス (S3 バケット通知、SNS、および SQS サービス) の設定に必要なアクセス許可にのみ関係しており、S3 バケットへの読み取りアクセス権がすでにあることを前提としています。 S3 読み取り専用アクセス許可を追加する必要がある場合は、JSON ドキュメントの DatabricksAutoLoaderSetup
ステートメントの Action
リストに以下を追加します。
s3:ListBucket
s3:GetObject
初期セットアップ 後のアクセス許可の制限
上記のリソース設定のアクセス許可は、ストリームの初期実行時にのみ必要です。 最初の実行後、アクセス許可が制限された次の IAM ポリシーに切り替えることができます。
重要
アクセス許可が制限されているため、新しいストリーミング クエリーを開始したり、障害が発生した場合にリソースを再作成したりすることはできません (たとえば、SQS キューが誤って削除された場合)。また、クラウド リソース管理 API を使用してリソースを一覧表示または破棄することもできません。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderUse",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:TagResource",
"sns:Publish",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:<queue-name>",
"arn:aws:sns:<region>:<account-number>:<topic-name>",
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::<bucket-name>/*"
]
},
{
"Sid": "DatabricksAutoLoaderListTopics",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "arn:aws:sns:<region>:<account-number>:*"
}
]
}
GCS のファイル通知を設定するために必要な権限
GCS バケットとすべてのオブジェクトに対する list
と get
のアクセス許可が必要です。 詳細については、 IAM アクセス許可に関する Google のドキュメントを参照してください。
ファイル通知モードを使用するには、 GCS サービス アカウント と、Google Cloud Pub/Sub リソースへのアクセスに使用するアカウントの権限を追加する必要があります。
Pub/Sub Publisher
ロールを GCS サービス アカウントに追加します。これにより、アカウントは GCS バケットから Google Cloud Pub/Sub にイベント通知メッセージを発行できます。
Google Cloud Pub/Sub リソースに使用されるサービス アカウントについては、次の権限を追加する必要があります。
pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update
これを行うには、これらの権限を持つ IAM カスタムロールを作成する か、これらの権限をカバーする 既存の GCP ロール を割り当てます。
ファイル通知リソース を手動で構成または管理する
特権ユーザーは、ファイル通知リソースを手動で構成または管理できます。
クラウドプロバイダーを介してファイル通知サービスを手動で設定し、キュー識別子を手動で指定します。 詳細については、「 ファイル通知オプション 」を参照してください。
次の例に示すように、Scala APIs を使用して、通知サービスとキューイング サービスを作成または管理します。
# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds
# COMMAND ----------
#####################################
## Creating a ResourceManager in AWS
#####################################
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
.newManager() \
.option("cloudFiles.region", <region>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in Azure
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
.newManager() \
.option("cloudFiles.connectionString", <connection-string>) \
.option("cloudFiles.resourceGroup", <resource-group>) \
.option("cloudFiles.subscriptionId", <subscription-id>) \
.option("cloudFiles.tenantId", <tenant-id>) \
.option("cloudFiles.clientId", <service-principal-client-id>) \
.option("cloudFiles.clientSecret", <service-principal-client-secret>) \
.option("path", <path-to-specific-container-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
.newManager() \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices())
# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////
import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
.newManager
.option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
.option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////
import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
.newManager
.option("cloudFiles.connectionString", <connection-string>)
.option("cloudFiles.resourceGroup", <resource-group>)
.option("cloudFiles.subscriptionId", <subscription-id>)
.option("cloudFiles.tenantId", <tenant-id>)
.option("cloudFiles.clientId", <service-principal-client-id>)
.option("cloudFiles.clientSecret", <service-principal-client-secret>)
.option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////
import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
.newManager
.option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
.create()
// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
// List notification services created by <AL>
val df = manager.listNotificationServices()
// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
setUpNotificationServices(<resource-suffix>)
を使用して、 <prefix>-<resource-suffix>
という名前のキューとサブスクリプションを作成します (プレフィックスは、「 Auto Loader ファイル通知モードで使用されるクラウド リソース」にまとめられているストレージ システムによって異なります)。同じ名前の既存のリソースがある場合、Databricks は新しいリソースを作成するのではなく、既存のリソースを再利用します。 この関数は、 ファイル通知オプションの識別子を使用して cloudFiles
ソースに渡すことができるキュー識別子を返します。これにより、 cloudFiles
ソース ユーザーは、リソースを作成したユーザーよりも少ないアクセス許可を持つことができます。
setUpNotificationServices
を呼び出す場合にのみ newManager
する "path"
オプションを提供します。 listNotificationServices
や tearDownNotificationServices
には必要ありません。これは、ストリーミングクエリーを実行するときに使用する path
と同じです。
次のマトリックスは、ストレージの種類ごとに、どの API メソッドがどの Databricks Runtime でサポートされているかを示しています。
クラウドストレージ |
セットアップ API |
リスト API |
ティアダウンAPI |
---|---|---|---|
AWS S3 |
すべてのバージョン |
すべてのバージョン |
すべてのバージョン |
ADLS Gen2 |
すべてのバージョン |
すべてのバージョン |
すべてのバージョン |
GCS |
Databricks Runtime 9.1 以降 |
Databricks Runtime 9.1 以降 |
Databricks Runtime 9.1 以降 |
Azure Blob Storage |
すべてのバージョン |
すべてのバージョン |
すべてのバージョン |
ADLS Gen1 |
サポート |
サポート |
サポート |