ストリーミング Avro データの読み取りと書き込み
Apache Avro は、ストリーミングの世界で一般的に使用されているデータシリアル化システムです。 一般的な解決策は、Apache Kafka に Avro 形式のデータを配置し、Confluent スキーマレジストリにメタデータを配置してから、Kafka と スキーマレジストリの両方に接続するストリーミングフレームワークでクエリーを実行することです。
Databricks では、Kafka の Avro データとスキーマ レジストリのメタデータを使用してストリーミング パイプラインを構築するための from_avro
関数と to_avro
関数 がサポートされています。この関数は、列を Avro 形式のバイナリとしてエンコード to_avro
、Avro バイナリ データを列にデコード from_avro
。 どちらの関数も 1 つの列を別の列に変換し、入出力 SQL データ・タイプは複合タイプまたはプリミティブ・タイプにすることができます。
注
from_avro
および to_avro
関数は、次の機能を備えています。
Python、Scala、およびJavaで利用できます。
バッチとストリーミングの両方のクエリで SQL 関数に渡すことができます。
「Avro ファイル データソース」も参照してください。
手動で指定したスキーマの例
from_json や to_json と同様に、from_avro
と to_avro
は任意のバイナリ列で使用できます。次の例のように、Avro スキーマを手動で指定できます。
import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
.select(
to_avro($"key").as("key"),
to_avro($"value").as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
jsonFormatSchema example
スキーマを JSON 文字列として指定することもできます。 たとえば、 /tmp/user.avsc
が次の場合です。
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
JSON 文字列を作成できます。
from pyspark.sql.avro.functions import from_avro, to_avro
jsonFormatSchema = open("/tmp/user.avsc", "r").read()
次に、 from_avro
のスキーマを使用します。
# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
スキーマレジストリの例
クラスターにスキーマレジストリサービスがある場合は、 from_avro
がそれを操作できるため、Avro スキーマを手動で指定する必要はありません。
次の例は、キーと値が型 STRING
と INT
のサブジェクト "t-key" と "t-value" としてスキーマレジストリに既に登録されていると仮定して、Kafka トピック "t" を読み取る方法を示しています。
import org.apache.spark.sql.avro.functions._
val schemaRegistryAddr = "https://myhost:8081"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr).as("value"))
to_avro
の場合、次の理由により、デフォルトの出力 Avro スキーマがスキーマレジストリサービスのターゲットサブジェクトのスキーマと一致しない場合があります。
Spark SQL 型から Avro スキーマへのマッピングは 1 対 1 ではありません。 「 Spark SQL -> Avro 変換でサポートされている型」を参照してください。
変換された出力 Avro スキーマがレコードタイプの場合、レコード名は
topLevelRecord
であり、デフォルトでは名前空間はありません。
デフォルトの出力スキーマ to_avro
がターゲット・サブジェクトのスキーマと一致する場合は、以下を実行できます。
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
それ以外の場合は、 to_avro
関数でターゲット サブジェクトのスキーマを指定する必要があります。
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
外部の Confluent スキーマレジストリに対する認証
Databricks Runtime 12.2 LTS 以降では、外部の Confluent スキーマ レジストリに対して認証できます。 次の例は、認証資格情報と API キーを含めるようにスキーマ レジストリ オプションを構成する方法を示しています。
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key"),
from_avro(
data = col("value"),
options = schema_registry_options,
subject = "t-value",
schemaRegistryAddress = schema_registry_address
).alias("value")
)
)
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options,
jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Unity Catalogボリュームでトラストストア ファイルとキーストア ファイルを使用する
Databricks Runtime 14.3 LTS 以降では、Unity Catalog ボリューム内のトラストストア ファイルとキーストア ファイルを使用して、Confluent スキーマ レジストリに対する認証を行うことができます。 次の構文を使用して 、前の例 の構成を更新します。
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
"confluent.schema.registry.ssl.key.password" -> "keyPassword")
from_avro
でスキーマ進化モードを使用する
Databricks Runtime 14.2 以降では、 from_avro
でスキーマ進化モードを使用できます。 スキーマ進化モードを有効にすると、スキーマ進化を検出した後にジョブが UnknownFieldException
をスローします。 Databricks では、スキーマ進化モードを使用してジョブを構成して、タスクの失敗時に自動的に再起動することをお勧めします。 構造化ストリーミングについては、本番運用に関する考慮事項を参照してください。
スキーマ進化は、ソースデータのスキーマが時間の経過とともに進化し、データソースからすべてのフィールドを取り込むことが予想される場合に便利です。 クエリーがデータソースでクエリーするフィールドをすでに明示的に指定している場合、追加されたフィールドはスキーマの進化に関係なく無視されます。
avroSchemaEvolutionMode
オプションを使用して、スキーマ進化を有効にします。次の表では、スキーマ進化モードのオプションについて説明します。
オプション |
挙動 |
---|---|
|
デフォルト。 スキーマの進化を無視し、ジョブを続行します。 |
|
スキーマ進化の検出時に |
注
この構成は、ストリーミング ジョブ間で変更し、同じチェックポイントを再利用できます。 スキーマ進化を無効にすると、列が削除される可能性があります。
解析モードを構成する
解析モードを構成して、スキーマ進化モードが無効で、スキーマが下位互換性のない方法で進化した場合に、失敗するか null レコードを出力するかを決定できます。 デフォルト設定では、互換性のないスキーマの変更を監視すると、 from_avro
失敗します。
mode
オプションを使用して、解析モードを指定します。次の表に、解析モードのオプションを示します。
オプション |
挙動 |
---|---|
|
デフォルト。 解析エラーは、 |
|
構文解析エラーは無視され、ヌル・レコードが出力されます。 |
注
スキーマ進化を有効にすると、 FAILFAST
はレコードが破損している場合にのみ例外をスローします。
スキーマ進化の使用例と解析モードの設定
次の例は、Confluent Schema Registry でスキーマ進化を有効にし FAILFAST
解析モードを指定する方法を示しています。
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
"avroSchemaEvolutionMode" -> "restart",
"mode" -> "FAILFAST")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
// We read the "key" binary column from the subject "t-key" in the schema
// registry at schemaRegistryAddr. We provide schemaRegistryOptions,
// which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
// to fail the query if the schema for the subject t-key evolves.
from_avro(
$"key",
"t-key",
schemaRegistryAddr,
schemaRegistryOptions.asJava).as("key"))
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
"avroSchemaEvolutionMode": "restart",
"mode": "FAILFAST",
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key")
)
)