Auto Loaderファイル通知モードとは何ですか?
ファイル通知モードでは、 Auto Loader は、入力ディレクトリからファイル イベントをサブスクライブする通知サービスとキュー サービスを自動的に設定します。 ファイル通知を使用して、1 時間に数百万のファイルを取り込むようにスケール Auto Loader できます。 ディレクトリリストモードと比較すると、ファイル通知モードは、大規模な入力ディレクトリや大量のファイルに対してパフォーマンスと拡張性に優れていますが、追加のクラウド権限が必要です。
ファイル通知とディレクトリリストをいつでも切り替えることができ、正確なデータ処理の保証を維持します。
警告
Auto Loaderのソース パスの変更は、ファイル通知モードではサポートされていません。ファイル通知モードが使用されている場合、パスが変更されると、ディレクトリの更新時に新しいディレクトリに既に存在するファイルの取り込みに失敗する可能性があります。
ファイル通知モードは、シングル ユーザー コンピュートでのみサポートされています。
Auto Loader ファイル通知モードで使用されるクラウドリソース
重要
ファイル通知モードのクラウド インフラストラクチャを自動的に構成するには、昇格されたアクセス許可が必要です。 クラウド管理者またはワークスペース管理者に問い合わせてください。 見る:
Auto Loader は、オプション cloudFiles.useNotifications
を true
に設定し、クラウド リソースを作成するために必要なアクセス許可を提供すると、ファイル通知を自動的に設定できます。 さらに、これらのリソースを作成するための権限を に付与するための 追加オプション を提供する必要がある場合があります。Auto Loader
次の表は、 Auto Loaderによって作成されるリソースをまとめたものです。
クラウドストレージ |
サブスクリプションサービス |
キューサービス |
接頭辞* |
制限** |
---|---|---|---|---|
Amazon S3 |
AWS SNSの |
AWS SQSの |
Databricks の自動取り込み |
S3バケットあたり100個 |
ADLS Gen2 |
Azure イベント グリッド |
Azure キュー ストレージ |
Databricks |
ストレージ アカウントあたり 500 |
GCSの |
Google Pub/Sub |
Google Pub/Sub |
Databricks の自動取り込み |
GCSバケットあたり100個 |
Azure Blobストレージ |
Azure イベント グリッド |
Azure キュー ストレージ |
Databricks |
ストレージ アカウントあたり 500 |
* Auto Loader は、このプレフィックスを使用してリソースに名前を付けます。
** 起動できる並列ファイル通知パイプラインの数
特定のストレージ アカウントに対して制限された数を超えるファイル通知パイプラインを実行する必要がある場合は、次のことができます。
AWS Lambda、Azure Functions、Google Cloud Functions などのサービスを活用して、コンテナまたはバケット全体をリッスンする 1 つのキューからディレクトリ固有のキューに通知を分散します。
ファイル通知イベント
Amazon S3 は、ファイルが S3 バケットにアップロードされたときに、プットアップロードとマルチパートアップロードのどちらによってアップロードされたかに関係なく、 ObjectCreated
イベントを提供します。
ADLS Gen2 では、Gen2 コンテナーに表示されるファイルに対してさまざまなイベント通知が提供されます。
Auto Loader は、ファイルを処理するために
FlushWithClose
イベントをリッスンします。Auto Loader ストリームは、ファイルを検出するための
RenameFile
アクションをサポートします。RenameFile
アクションでは、名前が変更されたファイルのサイズを取得するために、ストレージシステムへのAPIリクエストが必要です。Auto LoaderDatabricks Runtime9.0
RenameDirectory
以降で作成された ストリームは、ファイルを検出するための アクションをサポートします。RenameDirectory
アクションでは、名前が変更されたディレクトリの内容を一覧表示するために、ストレージシステムへのAPIリクエストが必要です。
Google Cloud ストレージでは、ファイルのアップロード時に上書きやファイルのコピーなどの OBJECT_FINALIZE
イベントが提供されます。 アップロードに失敗しても、このイベントは生成されません。
注:
クラウドプロバイダーは、非常にまれな条件下ですべてのファイルイベントの100%配信を保証するものではなく、ファイルイベントの遅延に関する厳格なSLAを提供していません。 Databricks では、cloudFiles.backfillInterval
オプションを使用して Auto Loader で定期的なバックフィルをトリガーし、データの完全性が要件である場合は、特定の SLA 内のすべてのファイルが検出されるようにすることをお勧めします。 通常のバックフィルをトリガーしても、重複は発生しません。
ADLS Gen2 および Azure Blob ストレージのファイル通知を構成するために必要なアクセス許可
入力ディレクトリの読み取り権限が必要です。 「Azure Blob Storage」を参照してください。
ファイル通知モードを使用するには、イベント通知サービスを設定してアクセスするための認証資格情報を指定する必要があります。
認証に必要なのはサービスプリンシパルだけです。
サービスプリンシパル - Azure 組み込みロールの使用
Microsoft Entra ID (旧称 Azure Active Directory) アプリとサービスプリンシパルをクライアント ID とクライアント シークレットの形式で作成します。
このアプリに、入力パスが存在するストレージ アカウントに次のロールを割り当てます。
共同作成者: このロールは、キューやイベント サブスクリプションなど、ストレージ アカウント内のリソースを設定するためのものです。
ストレージ キュー データ共同作成者: このロールは、キューからのメッセージの取得や削除などのキュー操作を実行するためのものです。 このロールは、接続文字列なしでサービスプリンシパルを提供する場合にのみ必要です。
このアプリに、関連するリソース グループに次のロールを割り当てます。
EventGrid EventSubscription 共同作成者: このロールは、イベント サブスクリプションの作成や一覧表示など、Azure Event Grid (Event Grid) サブスクリプション操作を実行するためのものです。
詳細については、「 Azureポータルを使用して Azure ロールを割り当てる」を参照してください。
サービスプリンシパル - カスタムロールの使用
上記のロールに必要な過剰なアクセス許可が懸念される場合は、少なくとも次のアクセス許可を持つ カスタム ロール を作成できます (以下、Azure ロール JSON 形式)。
"permissions": [ { "actions": [ "Microsoft.EventGrid/eventSubscriptions/write", "Microsoft.EventGrid/eventSubscriptions/read", "Microsoft.EventGrid/eventSubscriptions/delete", "Microsoft.EventGrid/locations/eventSubscriptions/read", "Microsoft.Storage/storageAccounts/read", "Microsoft.Storage/storageAccounts/write", "Microsoft.Storage/storageAccounts/queueServices/read", "Microsoft.Storage/storageAccounts/queueServices/write", "Microsoft.Storage/storageAccounts/queueServices/queues/write", "Microsoft.Storage/storageAccounts/queueServices/queues/read", "Microsoft.Storage/storageAccounts/queueServices/queues/delete" ], "notActions": [], "dataActions": [ "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action" ], "notDataActions": [] } ]
その後、このカスタムロールをアプリに割り当てることができます。
詳細については、「 Azureポータルを使用して Azure ロールを割り当てる」を参照してください。
Amazon S3 のファイル通知を設定するために必要なアクセス許可
入力ディレクトリの読み取り権限が必要です。 詳細については、 S3 接続の詳細 を参照してください。
ファイル通知モードを使用するには、次の JSON ポリシードキュメントを IAM ユーザーまたはロールにアタッチします。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderSetup",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"s3:PutBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:SetTopicAttributes",
"sns:CreateTopic",
"sns:TagResource",
"sns:Publish",
"sns:Subscribe",
"sqs:CreateQueue",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:SetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility",
"sqs:PurgeQueue"
],
"Resource": [
"arn:aws:s3:::<bucket-name>",
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
},
{
"Sid": "DatabricksAutoLoaderList",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "*"
},
{
"Sid": "DatabricksAutoLoaderTeardown",
"Effect": "Allow",
"Action": [
"sns:Unsubscribe",
"sns:DeleteTopic",
"sqs:DeleteQueue"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
}
]
}
ここで、
<bucket-name>
: ストリームがファイルを読み取る S3 バケット名 (例:auto-logs
)。 ワイルドカードとして*
を使用できます (例:databricks-*-logs
)。 DBFS パスの基になる S3 バケットを見つけるには、%fs mounts
を実行して、ノートブック内のすべての DBFS マウント ポイントを一覧表示できます。<region>
: S3 バケットが存在する AWS リージョン (例:us-west-2
)。 地域を指定しない場合は、*
を使用します。<account-number>
: S3 バケットを所有する AWS アカウント番号 (例:123456789012
)。 アカウント番号を指定しない場合は、*
を使用します。
SQS および SNS ARN 仕様の文字列 databricks-auto-ingest-*
は、 cloudFiles
ソースが SQS および SNS サービスを作成するときに使用する名前プレフィックスです。 Databricks はストリームの初回実行時に通知サービスを設定するため、初回実行後に権限を減らしたポリシーを使用できます (たとえば、ストリームを停止してから再起動するなど)。
注:
上記のポリシーは、ファイル通知サービス (S3 バケット通知、SNS、SQS サービス) の設定に必要なアクセス許可のみに関係しており、S3 バケットへの読み取りアクセスがすでにあることを前提としています。 S3 読み取り専用のアクセス許可を追加する必要がある場合は、JSON ドキュメントの DatabricksAutoLoaderSetup
ステートメントのAction
リストに以下を追加します。
s3:ListBucket
s3:GetObject
初期セットアップ後のアクセス許可の制限
上記のリソース設定権限は、ストリームの初回実行時にのみ必要です。 最初の実行後、アクセス許可を減らした次の IAM ポリシーに切り替えることができます。
重要
アクセス許可が減ると、障害が発生した場合に新しいストリーミング クエリを開始したり、リソースを再作成したりすることはできません (たとえば、SQS キューが誤って削除されたなど)。また、クラウド リソース管理 API を使用してリソースを一覧表示または破棄することもできません。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderUse",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:TagResource",
"sns:Publish",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility",
"sqs:PurgeQueue"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:<queue-name>",
"arn:aws:sns:<region>:<account-number>:<topic-name>",
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::<bucket-name>/*"
]
},
{
"Sid": "DatabricksAutoLoaderListTopics",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "arn:aws:sns:<region>:<account-number>:*"
}
]
}
GCSのファイル通知を設定するために必要な権限
GCS バケットとすべてのオブジェクトに対する list
および get
のアクセス許可が必要です。 詳細については、 IAM のアクセス許可に関する Google のドキュメントを参照してください。
ファイル通知モードを使用するには、 GCS サービス アカウント と Google クラウド Pub/Sub リソースへのアクセスに使用するアカウントの権限を追加する必要があります。
Pub/Sub Publisher
ロールを GCS サービス アカウントに追加します。これにより、アカウントは GCS バケットから Google クラウド Pub/Sub にイベント通知メッセージを公開できます。
Google Cloud Pub/Subリソースに使用するサービスアカウントについては、以下の権限を追加する必要があります。
pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update
これを行うには、これらのアクセス許可を持つ IAM カスタムロールを作成するか、 既存の GCP ロール を割り当ててこれらのアクセス許可をカバーします。
ファイル通知リソースを手動で構成または管理する
特権ユーザーは、ファイル通知リソースを手動で構成または管理できます。
クラウドプロバイダーを通じてファイル通知サービスを手動で設定し、キュー識別子を手動で指定します。 詳細については、「 ファイル通知オプション 」を参照してください。
次の例に示すように、 Scala APIs を使用して通知とキューイング サービスを作成または管理します。
# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds
# COMMAND ----------
#####################################
## Creating a ResourceManager in AWS
#####################################
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
.newManager() \
.option("cloudFiles.region", <region>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in Azure
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
.newManager() \
.option("cloudFiles.connectionString", <connection-string>) \
.option("cloudFiles.resourceGroup", <resource-group>) \
.option("cloudFiles.subscriptionId", <subscription-id>) \
.option("cloudFiles.tenantId", <tenant-id>) \
.option("cloudFiles.clientId", <service-principal-client-id>) \
.option("cloudFiles.clientSecret", <service-principal-client-secret>) \
.option("path", <path-to-specific-container-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
.newManager() \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)
# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////
import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
.newManager
.option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
.option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////
import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
.newManager
.option("cloudFiles.connectionString", <connection-string>)
.option("cloudFiles.resourceGroup", <resource-group>)
.option("cloudFiles.subscriptionId", <subscription-id>)
.option("cloudFiles.tenantId", <tenant-id>)
.option("cloudFiles.clientId", <service-principal-client-id>)
.option("cloudFiles.clientSecret", <service-principal-client-secret>)
.option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////
import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
.newManager
.option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
.create()
// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
// List notification services created by <AL>
val df = manager.listNotificationServices()
// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
setUpNotificationServices(<resource-suffix>)
を使用して、<prefix>-<resource-suffix>
という名前のキューとサブスクリプションを作成します (プレフィックスは、Auto Loader ファイル通知モードで使用されるクラウド リソースに要約されているストレージ システムによって異なります。同じ名前の既存のリソースがある場合、Databricks は新しいリソースを作成する代わりに既存のリソースを再利用します。 この関数は、ファイル通知オプションの識別子を使用してcloudFiles
ソースに渡すことができるキュー識別子を返します。これにより、 cloudFiles
ソース ユーザーは、リソースを作成するユーザーよりも少ないアクセス許可を持つことができます。
を呼び出す場合にのみnewManager
する"path"
オプションを提供します (setUpNotificationServices
;listNotificationServices
やtearDownNotificationServices
には必要ありません。これは、ストリーミング クエリを実行するときに使用するのと同じ path
です。
次のマトリックスは、ストレージの種類ごとに、どの Databricks Runtime でどの API メソッドがサポートされているかを示しています。
クラウドストレージ |
セットアップAPI |
リストAPI |
ティアダウンAPI |
---|---|---|---|
Amazon S3 |
すべてのバージョン |
すべてのバージョン |
すべてのバージョン |
ADLS Gen2 |
すべてのバージョン |
すべてのバージョン |
すべてのバージョン |
GCSの |
Databricks Runtime 9.1 以降 |
Databricks Runtime 9.1 以降 |
Databricks Runtime 9.1 以降 |
Azure Blobストレージ |
すべてのバージョン |
すべてのバージョン |
すべてのバージョン |
ADLS Gen1 |
サポートされていません |
サポートされていません |
サポートされていません |
一般的なエラーのトラブルシューティング
このセクションでは、ファイル通知モードで Auto Loader を使用する場合の一般的なエラーとその解決方法について説明します。
Event Grid サブスクリプションを作成できませんでした
Auto Loader を初めて実行するときに次のエラー メッセージが表示される場合は、Event Grid が Azure サブスクリプションにリソース プロバイダーとして登録されていません。
java.lang.RuntimeException: Failed to create event grid subscription.
Event Grid をリソース プロバイダーとして登録するには、次の操作を行います。
Azure portal で、サブスクリプションに移動します。
「設定」セクションの 「リソース・プロバイダ 」をクリックします。
登録する the provider
Microsoft.EventGrid
.
Event Grid サブスクリプション操作を実行するために必要な承認
Auto Loader を初めて実行するときに次のエラー メッセージが表示される場合は、共同作成者ロールが Event Grid のサービス プリンシパルとストレージ アカウントに割り当てられていることを確認します。
403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...
Event Grid クライアントがプロキシをバイパス
Databricks Runtime 15.2 以降では、Auto Loader の Event Grid 接続では、デフォルトによるシステム プロパティのプロキシ設定が使用されます。Databricks Runtime 13.3 LTS、14.3 LTS、および 15.0 から 15.2 では、 Spark Config プロパティを spark.databricks.cloudFiles.eventGridClient.useSystemProperties true
に設定することで、プロキシを使用するように Event Grid 接続を手動で構成できます。 「Databricks での Spark 構成プロパティの設定」を参照してください。