Read and write XML files
Preview
This feature is in Public Preview.
This article describes how to read and write XML files.
Extensible Markup Language (XML) is a markup language for formatting, storing, and sharing data in textual format. It defines a set of rules for serializing data ranging from documents to arbitrary data structures.
Native XML file format support enables ingestion, querying, and parsing of XML data for batch processing or streaming. It can automatically infer and evolve schema and data types, supports SQL expressions like from_xml
, and can generate XML documents. It doesn’t require external jars and works seamlessly with Auto Loader, read_files
and COPY INTO
. You can optionally validate each row-level XML record against an XML Schema Definition (XSD).
Parse XML records
XML specification mandates a well-formed structure. However, this specification doesn’t immediately map to a tabular format. You must specify the rowTag
option to indicate the XML element that maps to a DataFrame
Row
. The rowTag
element becomes the top-level struct
. The child elements of rowTag
become the fields of the top-level struct
.
You can specify the schema for this record or let it be inferred automatically. Because the parser only examines the rowTag
elements, DTD and external entities are filtered out.
The following examples illustrate schema inference and parsing of an XML file using different rowTag
options:
xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)
val xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)
Read the XML file with rowTag
option as “books”:
df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
Output:
root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)
+------------------------------------------------------------------------------+
|book |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+
Read the XML file with rowTag
as “book”:
df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:
val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:
Output:
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)
+-----+-----------+---------------+
|_id |author |title |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+
Data source options
Data source options for XML can be specified the following ways:
The
.option/.options
methods of the following:DataFrameReader
DataFrameWriter
DataStreamReader
DataStreamWriter
The following built-in functions:
The
OPTIONS
clause of CREATE TABLE USING DATA_SOURCE
For a list of options, see Auto Loader options.
XSD support
You can optionally validate each row-level XML record by an XML Schema Definition (XSD). The XSD file is specified in the rowValidationXSDPath
option. The XSD does not otherwise affect the schema provided or inferred. A record that fails the validation is marked as “corrupted” and handled based on the corrupt record handling mode option described in the option section.
You can use XSDToSchema
to extract a Spark DataFrame schema from a XSD file. It supports only simple, complex, and sequence types, and only supports basic XSD functionality.
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="book">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="title" type="xs:string" />
<xs:element name="genre" type="xs:string" />
<xs:element name="price" type="xs:decimal" />
<xs:element name="publish_date" type="xs:date" />
<xs:element name="description" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
The following table shows the conversion of XSD data types to Spark data types:
XSD Data Types |
Spark Data Types |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Parse nested XML
XML data in a string-valued column in an existing DataFrame can be parsed with schema_of_xml
and from_xml
that returns the schema and the parsed results as new struct
columns. XML data passed as an argument to schema_of_xml
and from_xml
must be a single well-formed XML record.
schema_of_xml
Syntax
schema_of_xml(xmlStr [, options] )
Arguments
xmlStr
: A STRING expression specifying a single well-formed XML record.options
: An optionalMAP<STRING,STRING>
literal specifying directives.
Returns
A STRING holding a definition of a struct with n fields of strings where the column names are derived from the XML element and attribute names. The field values hold the derived formatted SQL types.
from_xml
Syntax
from_xml(xmlStr, schema [, options])
Arguments
xmlStr
: A STRING expression specifying a single well-formed XML record.schema
: A STRING expression or invocation of theschema_of_xml
function.options
: An optionalMAP<STRING,STRING>
literal specifying directives.
Returns
A struct with field names and types matching the schema definition. Schema must be defined as comma-separated column name and data type pairs as used in, for example, CREATE TABLE
. Most options shown in the data source options are applicable with the
following exceptions:
rowTag
: Because there is only one XML record, therowTag
option is not applicable.mode
(default:PERMISSIVE
): Allows a mode for dealing with corrupt records during parsing.PERMISSIVE
: When it meets a corrupted record, puts the malformed string into a field configured bycolumnNameOfCorruptRecord
, and sets malformed fields tonull
. To keep corrupt records, you can set a string type field namedcolumnNameOfCorruptRecord
in a user-defined schema. If a schema does not have the field, it drops corrupt records during parsing. When inferring a schema, it implicitly adds acolumnNameOfCorruptRecord
field in an output schema.FAILFAST
: Throws an exception when it meets corrupted records.
Structure conversion
Due to the structure differences between DataFrame and XML, there are some conversion rules from XML data to DataFrame
and from DataFrame
to XML data. Note that handling attributes can be disabled with the option excludeAttribute
.
Conversion from XML to DataFrame
Attributes: Attributes are converted as fields with the heading prefix attributePrefix
.
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
produces a schema below:
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
Character data in an element containing attribute(s) or child element(s): These are parsed into the valueTag
field. If there are multiple occurrences of character data, the valueTag
field is converted to an array
type.
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
produces a schema below:
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
Conversion from DataFrame to XML
Element as an array in an array: Writing a XML file from DataFrame
having a field
ArrayType
with its element as ArrayType
would have an additional nested field for the
element. This would not happen in reading and writing XML data but writing a DataFrame
read from other sources. Therefore, roundtrip in reading and writing XML files has the same
structure but writing a DataFrame
read from other sources is possible to have a different
structure.
DataFrame with a schema below:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
and with data below:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
produces a XML file below:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
The element name of the unnamed array in the DataFrame
is specified by the option arrayElementName
(Default: item
).
Rescued data column
The rescued data column ensures that you never lose or miss out on data during ETL. You can enable the rescued data column to capture any data that wasn’t parsed because one or more fields in a record have one of the following issues:
Absent from the provided schema
Does not match the data type of the provided schema
Has a case mismatch with the field names in the provided schema
The rescued data column is returned as a JSON document containing the columns that were rescued, and the source file path of the record. To remove the source file path from the rescued data column, you can set the following SQL configuration:
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").
You can enable the rescued data column by setting the option rescuedDataColumn
to a column name when reading data, such as _rescued_data
with spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)
.
The XML parser supports three modes when parsing records: PERMISSIVE
, DROPMALFORMED
, and FAILFAST
. When used together with rescuedDataColumn
, data type mismatches do not cause records to be dropped in DROPMALFORMED
mode or throw an error in FAILFAST
mode. Only corrupt records (incomplete or malformed XML) are dropped or throw errors.
Schema inference and evolution in Auto Loader
For a detailed discussion of this topic and applicable options, see Configure schema inference and evolution in Auto Loader. You can configure Auto Loader to automatically detect the schema of loaded XML data, allowing you to initialize tables without explicitly declaring the data schema and evolve the table schema as new columns are introduced. This eliminates the need to manually track and apply schema changes over time.
By default, Auto Loader schema inference seeks to avoid schema evolution issues due to type mismatches. For formats that don’t encode data types (JSON, CSV, and XML), Auto Loader infers all columns as strings, including nested fields in XML files. The Apache Spark DataFrameReader
uses a different behavior for schema inference, selecting data types for columns in XML sources based on sample data. To enable this behavior with Auto Loader, set the option cloudFiles.inferColumnTypes
to true
.
Auto Loader detects the addition of new columns as it processes your data. When Auto Loader detects a new column, the stream stops with an UnknownFieldException
. Before your stream throws this error, Auto Loader performs schema inference on the latest micro-batch of data and updates the schema location with the latest schema by merging new columns to the end of the schema. The data types of existing columns remain unchanged. Auto Loader supports different modes for schema evolution, which you set in the option cloudFiles.schemaEvolutionMode
.
You can use schema hints to enforce the schema information that you know and expect on an inferred schema. When you know that a column is of a specific data type, or if you want to choose a more general data type (for example, a double instead of an integer), you can provide an arbitrary number of hints for column data types as a string using SQL schema specification syntax. When the rescued data column is enabled, fields named in a case other than that of the schema are loaded to the _rescued_data
column. You can change this behavior by setting the option readerCaseSensitive
to false
, in which case Auto Loader reads data in a case-insensitive way.
Examples
The examples in this section use an XML file available for download in the Apache Spark GitHub repo.
Read and write XML
df = (spark.read
.format('xml')
.options(rowTag='book')
.load(xmlPath)) # books.xml
selected_data = df.select("author", "_id")
(selected_data.write
.options(rowTag='book', rootTag='books')
.xml('newbooks.xml'))
val df = spark.read
.option("rowTag", "book")
.xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("newbooks.xml")
df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
You can manually specify the schema when reading data:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("description", StringType(), True),
StructField("genre", StringType(), True),
StructField("price", DoubleType(), True),
StructField("publish_date", StringType(), True),
StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)
selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
SQL API
XML data source can infer data types:
DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;
You can also specify column names and types in DDL. In this case, the schema is not inferred automatically.
DROP TABLE IF EXISTS books;
CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");
Load XML using COPY INTO
DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;
COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');
Read XML with row validation
df = (spark.read
.format("xml")
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.load(inputPath))
df.printSchema()
val df = spark.read
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.xml(inputPath)
df.printSchema
Parse nested XML (from_xml and schema_of_xml)
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>
"""
df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}
val xmlData = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>""".stripMargin
val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
from_xml and schema_of_xml with SQL API
SELECT from_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>',
schema_of_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>')
);
Load XML with Auto Loader
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("table_name")
)
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow()
.toTable("table_name")
)