プロトコルバッファ の読み取りと書き込み

Databricks は、Apache Spark 構造体とプロトコル バッファー (protobuf) 間のシリアル化と逆シリアル化のネイティブ サポートを提供します。 Protobuf サポートは、Apache Spark DataFrame トランスフォーマーとして実装され、構造化ストリーミングまたはバッチ操作で使用できます。

プロトコル バッファー を逆シリアル化およびシリアル化する方法

Databricks Runtime 12.2 LTS 以降では、 from_protobuf関数とto_protobuf関数を使用してデータをシリアル化および逆シリアル化できます。 Protobuf シリアル化は、ストリーミング ワークロードでよく使用されます。

protobuf 関数の基本的な構文は、読み取り関数と書き込み関数で似ています。 これらの関数は、使用する前にインポートする必要があります。

from_protobuf バイナリ列を構造体にキャストし、構造体列をバイナリにキャスト to_protobufoptions 引数で指定されたスキーマレジストリ、または descFilePath 引数で指定された記述子ファイルを指定する必要があります。

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

次の例は、 from_protobuf() を使用してバイナリ プロトブフ レコードを処理し、 to_protobuf()を使用して Spark SQL 構造体をバイナリ プロトコルに変換する方法を示しています。

Confluent スキーマレジストリ での protobuf の使用

Databricks では、 Confluent スキーマレジストリ を使用した Protobuf の定義がサポートされています。

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
  input_df
    .select(
      from_protobuf("proto_bytes", options = schema_registry_options)
        .alias("proto_event")
    )
)

# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
  proto_events_df
    .selectExpr("struct(name, id, context) as event")
    .select(
      to_protobuf("event", options = schema_registry_options)
        .alias("proto_bytes")
    )
)
import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
    .select(
        from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
            .as("proto_event")
    )

// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
    .selectExpr("struct(name, id, context) as event")
    .select(
        to_protobuf($"event", options = schemaRegistryOptions.asJava)
            .as("proto_bytes")
    )

外部の Confluent スキーマレジストリ に対する認証

外部の Confluent スキーマレジストリに対して認証するには、スキーマレジストリオプションを更新して認証資格情報と API キーを含めます。

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }
val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

Unity Catalogボリュームでトラストストア ファイルとキーストア ファイルを使用する

Databricks Runtime 14.3 LTS 以降では、Unity Catalog ボリューム内のトラストストア ファイルとキーストア ファイルを使用して、Confluent スキーマ レジストリに対する認証を行うことができます。 次の例に従って、スキーマレジストリオプションを更新します。

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }
val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

記述子ファイルで の Protobuf の使用

コンピュートクラスターで使用できる protobuf 記述子ファイルを参照することもできます。 ファイルの場所に応じて、ファイルを読み取るための適切なアクセス許可があることを確認してください。

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

descriptor_file = "/path/to/proto_descriptor.desc"

proto_events_df = (
    input_df.select(
      from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
    )
)

proto_binary_df = (
  proto_events_df
    .select(
      to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
    )
)
import org.apache.spark.sql.protobuf.functions._

val descriptorFile = "/path/to/proto_descriptor.desc"

val protoEventsDF = inputDF
  .select(
    from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
  )

val protoBytesDF = protoEventsDF
  .select(
    to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
  )

Protobuf 関数でサポートされているオプション

Protobuf 関数では、次のオプションがサポートされています。

  • mode: Protobuf レコードの逆シリアル化中のエラーの処理方法を決定します。 このエラーは、レコードの実際のスキーマと from_protobuf()で予期されるスキーマの不一致など、さまざまなタイプの不正な形式のレコードが原因である可能性があります。

    • :

      • FAILFAST(デフォルト): 不正な形式のレコードが検出され、タスクが失敗すると、エラーがスローされます。

      • PERMISSIVE: 不正な形式のレコードに対して NULL が返されます。 このオプションは、多くのレコードを削除する可能性があるため、慎重に使用してください。 これは、ソース内のレコードのごく一部が正しくない場合に便利です。

  • 再帰フィールド.max.depth: 再帰フィールドのサポートを追加します。 Spark SQL スキーマでは、再帰フィールドはサポートされていません。 このオプションを指定しない場合、再帰フィールドは許可されません。 Protobufで再帰フィールドをサポートするには、指定された深さまで拡張する必要があります。

    • :

      • -1 (デフォルト): 再帰フィールドは許可されません。

      • 0: 再帰フィールドがドロップされます。

      • 1: 単一レベルの再帰を許可します。

      • [2-10]: 多重再帰のしきい値を最大 10 レベルまで指定します。

        値を 0 より大きい値に設定すると、ネストされたフィールドが設定された深さまで拡張され、再帰フィールドが可能になります。 非常に大きなスキーマを誤って作成することを避けるために、10 より大きい値は許可されません。 Protobuf メッセージの深さが構成された制限を超えている場合、返される Spark 構造体は再帰制限の後に切り捨てられます。

    • : 次の再帰フィールドを持つ Protobuf について考えてみます。

      message Person { string name = 1; Person friend = 2; }
      

      次に、この設定に異なる値を持つ終了スキーマを示します。

      • オプションを 1 に設定: STRUCT<name: STRING>

      • オプションを 2 に設定します。 STRUCT<name STRING, friend: STRUCT<name: STRING>>

      • オプションを 3 に設定します。 STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>

  • convert.any.fields.to.JSON:このオプションを使用すると、Protobuf Any フィールドを JSON に変換できます。 この機能は慎重に有効にする必要があります。 JSON の変換と処理は非効率的です。 さらに、JSON 文字列フィールドでは Protobuf スキーマの安全性が失われ、ダウンストリーム処理でエラーが発生しやすくなります。

    • :

      • False (デフォルト): 実行時に、このようなワイルドカード フィールドには、任意の Protobuf メッセージをバイナリ データとして含めることができます。 デフォルトでは、このようなフィールドは通常の Protobuf メッセージのように扱われます。 スキーマが (STRUCT<type_url: STRING, value: BINARY>)の 2 つのフィールドがあります。 デフォルトでは、バイナリ value フィールドはいかなる方法でも解釈されません。 ただし、バイナリ データは、一部のアプリケーションで実際に動作するのに便利ではない場合があります。

      • True: この値を True に設定すると、実行時に Any フィールドを JSON 文字列に変換できます。 このオプションを使用すると、バイナリが解析され、Protobuf メッセージが JSON 文字列に逆シリアル化されます。

    • : 次のように定義された 2 つの Protobuf 型について考えてみます。

      message ProtoWithAny {
         string event_name = 1;
         google.protobuf.Any details = 2;
      }
      
      message Person {
         string name = 1;
         int32 id = 2;
      }
      

      このオプションを有効にすると、 from_protobuf("col", messageName ="ProtoWithAny") のスキーマは STRUCT<event_name: STRING, details: STRING>になります。

      実行時に、 details フィールドに Protobuf メッセージが含まれている場合 Person 、戻り値は ('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')のようになります。

    • 必要条件:

      • Anyフィールドで使用されるすべての Protobuf 型の定義は、from_protobuf()に渡される Protobuf 記述子ファイルで使用できる必要があります。

      • Protobuf Any 見つからない場合は、そのレコードのエラーになります。

      • この機能は現在、schema-registry ではサポートされていません。

  • emit.default.values に追加します。 Protobuf を Spark 構造体に逆シリアル化するときに、値が 0 のフィールドのレンダリングを有効にします。 このオプションは慎重に使用してください。 通常、このようなセマンティクスの細かい違いに依存することはお勧めできません。

      • False (デフォルト): シリアル化された Protobuf のフィールドが空の場合、Spark 構造体の結果のフィールドは既定で null になります。 このオプションを有効にせず、 null をデフォルト値として扱う方が簡単です。

      • True: このオプションを有効にすると、このようなフィールドには対応するデフォルト値が入力されます。

    • : 次のように構築された Protobuf を持つ次の Protobuf について考えてみ Person(age=0, middle_name="")

      syntax = "proto3";
      
      message Person {
         string name = 1;
         int64 age = 2;
         optional string middle_name = 3;
         optional int64 salary = 4;
      }
      
      • このオプションを False に設定すると、 from_protobuf() を呼び出した後の Spark 構造体はすべて null ( {"name": null, "age": null, "middle_name": "", "salary": null}. 2 つのフィールド (agemiddle_name) に値が設定されていても、Protobuf では既定値であるため、ワイヤ形式では含まれません。

      • このオプションを True に設定すると、 from_protobuf() を呼び出した後の Spark 構造体は {"name": "", "age": 0, "middle_name": "", "salary": null}になります。 salary フィールドは、明示的に optional として宣言され、入力レコードに設定されていないため、ヌルのままです。

  • enums.as.intsです。 有効にすると、Protobuf の列挙型フィールドが Spark の整数フィールドとしてレンダリングされます。

      • False (デフォルト)

      • True: 有効にすると、Protobuf の列挙型フィールドが Spark の整数フィールドとしてレンダリングされます。

    • : 次の Protobuf について考えてみます。

      syntax = "proto3";
      
      message Person {
         enum Job {
           NONE = 0;
           ENGINEER = 1;
           DOCTOR = 2;
           NURSE = 3;
         }
         Job job = 1;
      }
      

      次のような Protobuf メッセージがあるPerson(job = ENGINEER)

      • このオプションを無効にすると、対応する Spark 構造体が {"job": "ENGINEER"}されます。

      • このオプションを有効にすると、対応する Spark 構造体が {"job": 1}になります。

      これらのフィールドのスキーマは、それぞれのケースで異なることに注意してください (デフォルトの文字列ではなく整数)。 このような変更は、ダウンストリーム テーブルのスキーマに影響を与える可能性があります。

スキーマレジストリオプション

以下のスキーマレジストリオプションは、Protobuf 関数でスキーマレジストリを使用する場合に関連します。

  • schema.registry.subject (スキーマ・レジストリ・サブジェクト)

    • *必須

    • Schema Registry のスキーマのサブジェクトを指定します(例: "client-event")。

  • schema.registry.address (スキーマ・レジストリ・アドレス)

    • *必須

    • スキーマレジストリの URL ( https://schema-registry.example.com:8081

  • schema.registry.protobuf.name

    • オプション

    • デフォルト: <NONE>

    • サブジェクトのスキーマレジストリエントリには、1 つの proto ファイルと同様に、複数の Protobuf 定義を含めることができます。 このオプションを指定しない場合は、最初の Protobuf がスキーマに使用されます。 Protobuf メッセージの名前がエントリ内の最初のメッセージでない場合は、そのメッセージの名前を指定します。 たとえば、"Person" と "Location" の 2 つの Protobuf 定義がこの順序で含まれているエントリがあるとします。 ストリームが "Person" ではなく "Location" に対応する場合は、このオプションを "Location" (またはパッケージ "com.example.protos.Location" を含むフルネーム) に設定します。

  • schema.registry.スキーマ進化.mode

    • デフォルト: "restart"。

    • サポートされているモード:

      • 「再起動」

      • 「なし」

    • このオプションは、 from_protobuf()のスキーマ進化モードを設定します。 クエリーの開始時に、Sparkは指定されたサブジェクトの最新のschema-idを記録します。 これにより、 from_protobuf()のスキーマが決まります。 新しいスキーマは、クエリーの開始後に Schema Registry に公開される場合があります。 受信レコードで新しい schema-id が検出された場合、それはスキーマの変更を示します。 このオプションは、スキーマへのこのような変更の処理方法を決定します。

      • restart (デフォルト):新しいスキーマIDに気付いたときに UnknownFieldException をトリガーします。 これにより、クエリーが終了します。 Databricks では、クエリー スキーマの変更の取得に失敗したときに再起動するようにワークフローを構成することをお勧めします。

      • none: スキーマ ID の変更は無視されます。 新しい schema-id を持つレコードは、クエリーの開始時に確認されたのと同じスキーマで解析されます。 新しい Protobuf 定義には下位互換性があることが期待され、新しいフィールドは無視されます。

  • confluent.schema.registry です。<schema-registy-client-option>

    • オプション

    • Schema-registry は、 Confluent Schema Registry クライアントを使用して Confluent Schema-registry に接続します。 クライアントがサポートする構成オプションは、プレフィックス「confluent.schema.registry」で指定できます。 たとえば、次の 2 つの設定では、"USER_INFO" 認証資格情報が提供されます。

      • "confluent.schema.registry.basic.auth.credentials.ソース": 「USER_INFO」

      • "confluent.schema.registry.basic.auth.user.info": "<KEY> : <SECRET>"