Apache Pulsarからのストリーム
プレビュー
この機能は パブリックプレビュー版です。
Databricks Runtime 14.1 以降では、構造化ストリーミングを使用して、Databricks 上の Apache Pulsar からデータをストリーミングできます。
構造化ストリーミングは、Pulsar ソースから読み取られたデータに対して、正確に 1 回限りの処理セマンティクスを提供します。
構文の例
以下は、構造化ストリーミングを使用して Pulsar から読み取る基本的な例です。
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
トピックを指定するには、常に service.url
と次のいずれかのオプションを指定する必要があります。
topic
topics
topicsPattern
オプションの完全なリストについては、「 Pulsar ストリーミング読み取りのオプションを構成する」を参照してください。
Pulsarへの認証
Databricks は、Pulsar に対するトラストストアとキーストアの認証をサポートしています。 Databricks では、構成の詳細を格納するときにシークレットを使用することをお勧めします。
ストリーム構成時に次のオプションを設定できます。
pulsar.client.authPluginClassName
pulsar.client.authParams
pulsar.client.useKeyStoreTls
pulsar.client.tlsTrustStoreType
pulsar.client.tlsTrustStorePath
pulsar.client.tlsTrustStorePassword
ストリームで PulsarAdmin
を使用する場合は、次の設定も行います。
pulsar.admin.authPluginClassName
pulsar.admin.authParams
次に、認証オプションの設定例を示します。
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
Pulsarのスキーマ
Pulsarから読み取られるレコードのスキーマは、トピックのスキーマがどのようにエンコードされているかによって異なります。
Avro または JSON スキーマを持つトピックの場合、フィールド名とフィールド型は結果の Spark DataFrame に保持されます。
Pulsarのスキーマのないトピックや単純なデータ型のトピックの場合、ペイロードは
value
列にロードされます。リーダーがスキーマの異なる複数のトピックを読み取るように構成されている場合は、生のコンテンツを
value
列に読み込むようにallowDifferentTopicSchemas
を設定します。
Pulsarレコードには、以下のメタデータフィールドがあります。
列 |
タイプ |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
Pulsar ストリーミング読み取りのオプションを構成する
すべてのオプションは、 .option("<optionName>", "<optionValue>")
構文を使用した構造化ストリーミング読み取りの一部として構成されます。 オプションを使用して認証を設定することもできます。 Pulsarへの認証を参照してください。
次の表に、Pulsar に必要な構成を示します。 オプション topic
、 topics
、または topicsPattern
のいずれか 1 つだけを指定する必要があります。
オプション |
デフォルト値 |
説明 |
---|---|---|
|
なし |
Pulsar サービスの Pulsar |
|
なし |
使用するトピックのトピック名文字列。 |
|
なし |
使用するトピックのコンマ区切りリスト。 |
|
なし |
使用するトピックで照合する Java 正規表現文字列。 |
次の表に、Pulsar でサポートされるその他のオプションを示します。
オプション |
デフォルト値 |
説明 |
---|---|---|
|
なし |
Sparkアプリケーションの進行状況を追跡するためにコネクタによって使用される定義済みのサブスクリプション名。 |
|
なし |
Sparkアプリケーションの進行状況を追跡するためのランダムなサブスクリプションを生成するためにコネクタによって使用されるプレフィックス。 |
|
120000 |
Pulsar からメッセージを読み取るためのタイムアウト (ミリ秒単位)。 |
|
|
目的のトピックが作成されるまでコネクタが待機するかどうか。 |
|
|
データが失われたとき (たとえば、トピックが削除された場合や、保持ポリシーのためにメッセージが削除された場合など) にクエリーを失敗させるかどうかを制御します。 |
|
|
スキーマが異なる複数のトピックを読み取る場合は、このパラメーターを使用して、スキーマベースのトピック値の自動逆シリアル化をオフにします。 これが |
|
|
|
|
なし |
マイクロバッチごとに処理する最大バイト数のソフト制限。 これを指定する場合は、 |
|
なし |
Pulsar |
また、次のパターンを使用して、Pulsar クライアント、管理者、およびリーダーの構成を指定することもできます。
パターン |
コニフィギュレーション オプションへのリンク |
---|---|
|
|
|
|
|
開始オフセットJSONの構築
メッセージ ID を手動で作成して特定のオフセットを指定し、これを JSON として startingOffsets
オプションに渡すことができます。 次のコード例は、この構文を示しています。
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()