Google Pub/Subをサブスクライブする
Databricks は、Databricks Runtime 13.3 LTS 以降で Google Pub/Sub にサブスクライブするための組み込みコネクタを提供します。 このコネクタは、サブスクライバーからのレコードに対して exactly-once 処理セマンティクスを提供します。
注:
Pub/Sub が重複レコードをパブリッシュし、レコードが順不同でサブスクライバーに到着することがあります。 重複レコードや順序が正しくないレコードを処理する Databricks コードを記述する必要があります。
構文の例
十分な権限を持つ Google サービス アカウントがクラスターに接続されている場合は、次の基本構文を使用して Pub/Sub からの構造化ストリーミングの読み取りを構成できます。 Google サービス アカウントを参照してください。
val query = spark.readStream
.format("pubsub")
// we will create a Pubsub subscription if none exists with this id
.option("subscriptionId", "mysub") // required
.option("topicId", "fe-demo-prod-dnd") // required
.option("projectId", "fe-prod-dbx") // required
.load()
次の例のように、承認オプションを直接渡すこともできます。
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// we will create a Pubsub subscription if none exists with this id
.option("subscriptionId", "mysub") // required
.option("topicId", "mytopic") // required
.option("projectId", "myproject") // required
.options(authOptions)
.load()
その他の構成オプションについては、 Pub/Sub ストリーミング読み取りのオプションを構成するをご覧ください。
Pub/Subへのアクセスを構成する
Databricks では、Google サービス アカウント (GSA) を使用して Pub/Sub への接続を管理することを推奨しています。
GSA を使用する場合、ストリームに追加の許可オプションを直接指定する必要はありません。
注:
GSA は、共有アクセス・モードで構成されたコンピュートではサポートされません。
Databricks では、承認オプションを指定するときにシークレットを使用することをお勧めします。 接続を承認するには、次のオプションが必要です。
clientEmail
clientId
privateKey
privateKeyId
次の表に、設定された資格情報に必要なロールを示します。
ロール |
必須またはオプション |
使用方法 |
---|---|---|
|
*必須 |
サブスクリプションが存在するかどうかを確認し、サブスクリプションを取得する |
|
*必須 |
サブスクリプションからデータを取得する |
|
オプション |
サブスクリプションが存在しない場合は作成可能にし、ストリーム終了時にサブスクリプションを削除する |
Pub/Sub スキーマ
ストリームのスキーマは、次の表に示すように、Pub/Sub からフェッチされたレコードと一致します。
フィールド |
タイプ |
---|---|
|
|
|
|
|
|
|
|
Pub/Sub ストリーミング読み取りのオプションを構成する
次の表では、Pub/Sub でサポートされるオプションについて説明します。 すべてのオプションは、 .option("<optionName>", "<optionValue>")
構文を使用した構造化ストリーミング読み取りの一部として構成されます。
注:
一部の Pub/Sub 構成オプションでは、マイクロバッチの代わりにフェッチの概念が使用されます。これは内部実装の詳細を反映しており、オプションは他の構造化ストリーミングコネクタの必然的な結果と同様に機能しますが、レコードがフェッチされてから処理される点が異なります。
オプション |
デフォルト値 |
説明 |
---|---|---|
|
ストリーム初期化時に存在するエグゼキューターの数の半分に設定します。 |
サブスクリプションからレコードをフェッチする並列 Spark タスクの数。 |
|
|
|
|
なし |
トリガーされた各マイクロバッチ中に処理されるバッチサイズのソフト制限。 |
|
1000 |
レコードを処理する前にタスクごとにフェッチするレコードの数。 |
|
10秒 |
レコードを処理する前に各タスクがフェッチする期間。 Databricks では、既定値の使用が推奨されています。 |
Pub/Subの増分バッチ処理セマンティクス
Trigger.AvailableNow
を使用して、Pub/Sub ソースから使用可能なレコードを増分バッチで消費できます。
Databricks では、 Trigger.AvailableNow
設定で読み取りを開始すると、タイムスタンプが記録されます。 バッチによって処理されるレコードには、以前にフェッチされたすべてのデータと、記録されたストリーム開始タイムスタンプよりもタイムスタンプが小さい新しく公開されたレコードが含まれます。
増分バッチ処理の構成を参照してください。