XML ファイルの読み取りと書き込み
プレビュー
この機能は パブリックプレビュー版です。
この記事では、XML ファイルの読み取りと書き込みの方法について説明します。
Extensible Markup Language (XML) は、データをテキスト形式で書式設定、保存、および共有するためのマークアップ言語です。 これは、ドキュメントから任意のデータ構造に至るまで、データをシリアル化するための一連のルールを定義します。
ネイティブ XML ファイル形式のサポートにより、バッチ処理またはストリーミング用の XML データの取り込み、クエリ、解析が可能になります。 スキーマとデータ型を自動的に推測して進化させ、 from_xml
などの SQL 式をサポートし、XML ドキュメントを生成できます。 外部の jar は必要なく、Auto Loader、 read_files
、 COPY INTO
とシームレスに動作します。 オプションで、各行レベルの XML レコードを XML スキーマ定義 (XSD) に照らして検証できます。
XML レコードの構文解析
XML 仕様では、整形式の構造が義務付けられています。 ただし、この仕様はすぐに表形式にマップされるわけではありません。 rowTag
オプションを指定して、DataFrame
Row
にマップする XML 要素を指定する必要があります。rowTag
要素が最上位のstruct
になります。rowTag
の子要素は、最上位の struct
のフィールドになります。
このレコードのスキーマを指定することも、自動的に推論することもできます。 パーサーは rowTag
要素のみを調べるため、DTD と外部エンティティーは除外されます。
次の例は、さまざまな rowTag
オプションを使用した XML ファイルのスキーマ推論と解析を示しています。
xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)
val xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)
オプション rowTag
"books" として XML ファイルを読み取ります。
df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
アウトプット:
root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)
+------------------------------------------------------------------------------+
|book |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+
rowTag
"book" を含む XML ファイルを読み取ります。
df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:
val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:
アウトプット:
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)
+-----+-----------+---------------+
|_id |author |title |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+
データソースオプション
XML のデータ ソース オプションは、次の方法で指定できます。
次の
.option/.options
方法があります。データフレームリーダー
データフレームライター
データストリームリーダー
データストリームライター
次の組み込み関数:
CREATE TABLE USING DATA_SOURCEの
OPTIONS
句
オプションの一覧については、「 Auto Loader オプション」を参照してください。
XSD のサポート
オプションで、XML スキーマ定義 (XSD) によって各行レベルの XML レコードを検証できます。 XSD ファイルは rowValidationXSDPath
オプションで指定します。 それ以外の場合、XSD は、提供または推論されるスキーマには影響しません。 検証に失敗したレコードは「破損」としてマークされ、オプションのセクションで説明されている破損レコード処理モードオプションに基づいて処理されます。
XSDToSchema
を使用して、XSD ファイルから Spark DataFrame スキーマを抽出できます。 単純型、複合型、およびシーケンス型のみをサポートし、基本的な XSD 機能のみをサポートします。
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="book">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="title" type="xs:string" />
<xs:element name="genre" type="xs:string" />
<xs:element name="price" type="xs:decimal" />
<xs:element name="publish_date" type="xs:date" />
<xs:element name="description" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
次の表は、XSD データ型から Spark データ型への変換を示しています。
XSD データ型 |
Spark データ型 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
入れ子になったXMLの解析
既存の DataFrame の文字列値列の XML データは、スキーマと解析結果を新しいstruct
列として返すschema_of_xml
とfrom_xml
を使用して解析できます。 引数として schema_of_xml
および from_xml
に渡される XML データは、1 つの整形式の XML レコードでなければなりません。
schema_of_xml
構文
schema_of_xml(xmlStr [, options] )
引数
xmlStr
: 単一の整形式 XML レコードを指定する文字列式。options
: ディレクティブを指定するオプションのMAP<STRING,STRING>
リテラル。
戻り値
n 個の文字列フィールドを持つ構造体の定義を保持する文字列。列名は XML 要素および属性名から派生します。 フィールド値には、派生したフォーマット済み SQL タイプが保持されます。
from_xml
構文
from_xml(xmlStr, schema [, options])
引数
xmlStr
: 単一の整形式 XML レコードを指定する文字列式。schema
: 文字列式またはschema_of_xml
関数の呼び出し。options
: ディレクティブを指定するオプションのMAP<STRING,STRING>
リテラル。
戻り値
スキーマ定義に一致するフィールド名と型を持つ構造体。 スキーマは、たとえば CREATE TABLE
で使用されるように、コンマで区切られた列名とデータ型のペアとして定義する必要があります。 データ ソース オプションに示されているほとんどのオプションは、次の例外を除いて適用できます。
rowTag
: XML レコードが 1 つしかないため、rowTag
オプションは適用されません。mode
(デフォルト:PERMISSIVE
): 解析中に破損したレコードを処理するモードを許可します。PERMISSIVE
: 破損したレコードに遭遇すると、不正な形式の文字列をcolumnNameOfCorruptRecord
で構成されたフィールドに挿入し、不正な形式のフィールドをnull
に設定します。 破損したレコードを保持するには、ユーザー定義のスキーマにcolumnNameOfCorruptRecord
という名前の文字列型フィールドを設定します。 スキーマにフィールドがない場合、解析中に破損したレコードがドロップされます。 スキーマを推論すると、出力スキーマにcolumnNameOfCorruptRecord
フィールドが暗黙的に追加されます。FAILFAST
: 破損したレコードに出会うと例外をスローします。
構造変換
DataFrame と XML の構造の違いにより、XML データからDataFrame
へ、およびDataFrame
から XML データへの変換ルールがいくつかあります。 属性の処理は、オプション excludeAttribute
で無効にできることに注意してください。
XML から DataFrame への変換
属性: 属性は、見出しの接頭辞が attributePrefix
のフィールドとして変換されます。
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
以下のスキーマを生成します。
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
属性または子要素を含む要素内の文字データ: これらは valueTag
フィールドに解析されます。 文字データが複数ある場合は、 valueTag
フィールドが array
タイプに変換されます。
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
以下のスキーマを生成します。
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
DataFrame から XML への変換
配列内の配列としての要素: 要素が ArrayType
としてフィールドArrayType
を持つDataFrame
から XML ファイルを書き込むと、その要素にネストされたフィールドが追加されます。これは、XML データの読み取りおよび書き込みでは発生しませんが、他のソースから読み取ったDataFrame
を書き込む場合には発生しません。 したがって、XML ファイルの読み取りと書き込みのラウンドトリップは同じ構造になりますが、他のソースから読み取ったDataFrame
の書き込みは異なる構造になる可能性があります。
以下のスキーマを含む DataFrame:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
そして、以下のデータで:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
以下の XML ファイルを生成します。
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
DataFrame
内の名前のない配列の要素名は、オプションarrayElementName
で指定されます (デフォルト: item
)。
レスキューされたデータ列
レスキューされたデータ列により、ETL 中にデータが失われたり見逃したりすることがなくなります。 レスキューされたデータ列を有効にして、レコード内の 1 つ以上のフィールドに次のいずれかの問題があるために解析されなかったデータをキャプチャできます。
指定されたスキーマに存在しない
指定されたスキーマのデータ型と一致しません
指定されたスキーマのフィールド名と大文字と小文字が一致しません
レスキューされたデータ列は、レスキューされた列とレコードのソース ファイル パスを含む JSON ドキュメントとして返されます。 レスキューされたデータ列からソース ファイル パスを削除するには、次の SQL 構成を設定します。
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").
復旧されたデータ列を有効にするには、データを読み取るときにオプション rescuedDataColumn
を列名に設定します ( _rescued_data
with spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)
など)。
XML パーサーは、レコードを解析するときに、 PERMISSIVE
、 DROPMALFORMED
、 FAILFAST
の 3 つのモードをサポートします。 rescuedDataColumn
と一緒に使用すると、データ型の不一致によって、DROPMALFORMED
モードでレコードがドロップされたり、FAILFAST
モードでエラーがスローされたりすることはありません。破損したレコード (不完全な XML または不正な形式の XML) のみがドロップされるか、エラーがスローされます。
Auto Loader でのスキーマの推論と進化
このトピックと適用可能なオプションの詳細については、「Auto Loaderでのスキーマ推論と進化の設定」を参照してください。ロードされた XML データのスキーマを自動的に検出するように Auto Loader を設定できるため、データ スキーマを明示的に宣言せずにテーブルを初期化し、新しい列が導入されるたびにテーブル スキーマを進化させることができます。 これにより、スキーマの変更を経時的に手動で追跡して適用する必要がなくなります。
デフォルトでは、Auto Loader スキーマ推論は、型の不一致によるスキーマ進化の問題を回避しようとします。 データ型 (JSON、CSV、および XML) をエンコードしない形式の場合、 Auto Loader 、XML ファイル内のネストされたフィールドを含むすべての列を文字列として推論します。 Apache Spark DataFrameReader
は、スキーマ推論に異なる動作を使用し、サンプル データに基づいて XML ソース内の列のデータ型を選択します。 Auto Loader でこの動作を有効にするには、オプションcloudFiles.inferColumnTypes
をtrue
に設定します。
Auto Loader は、データの処理中に新しい列の追加を検出します。 Auto Loader が新しい列を検出すると、ストリームはUnknownFieldException
で停止します。ストリームでこのエラーがスローされる前に、 Auto Loader はデータの最新のマイクロバッチに対してスキーマ推論を実行し、新しい列をスキーマの末尾にマージすることで、スキーマの場所を最新のスキーマで更新します。 既存の列のデータ型は変更されません。 Auto Loader は、オプション cloudFiles.schemaEvolutionMode
で設定する スキーマ進化のさまざまなモードをサポートしています。
スキーマ ヒントを使用すると、推論されたスキーマに対して、既知のスキーマ情報と期待するスキーマ情報を適用できます。列が特定のデータ型であることがわかっている場合、またはより一般的なデータ型 (たとえば、整数ではなく double 形式) を選択する場合は、SQL スキーマ指定構文を使用して、列データ型のヒントを任意の数として文字列として提供できます。 レスキューされたデータ列が有効になっている場合、スキーマのケース以外の名前のフィールドが _rescued_data
列にロードされます。 この動作を変更するには、オプション [readerCaseSensitive
] を [false
] に設定します。この場合、 Auto Loader は大文字と小文字を区別しない方法でデータを読み取ります。
例:
このセクションの例では、 Apache Spark GitHub リポジトリでダウンロードできる XML ファイルを使用します。
XMLの読み取りと書き込み
df = (spark.read
.format('xml')
.options(rowTag='book')
.load(xmlPath)) # books.xml
selected_data = df.select("author", "_id")
(selected_data.write
.options(rowTag='book', rootTag='books')
.xml('newbooks.xml'))
val df = spark.read
.option("rowTag", "book")
.xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("newbooks.xml")
df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
データの読み取り時にスキーマを手動で指定できます。
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("description", StringType(), True),
StructField("genre", StringType(), True),
StructField("price", DoubleType(), True),
StructField("publish_date", StringType(), True),
StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)
selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
SQL API
XML データソースはデータ型を推論できます。
DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;
DDL で列の名前と型を指定することもできます。 この場合、スキーマは自動的に推論されません。
DROP TABLE IF EXISTS books;
CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");
COPY INTO を使用して XML をロードする
DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;
COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');
行検証による XML の読み取り
df = (spark.read
.format("xml")
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.load(inputPath))
df.printSchema()
val df = spark.read
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.xml(inputPath)
df.printSchema
入れ子になった XML の解析 (from_xml と schema_of_xml)
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>
"""
df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}
val xmlData = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>""".stripMargin
val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
SQL API を使用した from_xml と schema_of_xml
SELECT from_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>',
schema_of_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>')
);
Auto Loaderを使用して XML をロードする
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("table_name")
)
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow()
.toTable("table_name")
)