Auto Loader can automatically detect the introduction of new columns to your data and restart so you don’t have to manage the tracking and handling of schema changes yourself. Auto Loader can also “rescue” data that was unexpected (for example, of differing data types) in a JSON blob column, that you can choose to access later using the semi-structured data access APIs.
The following formats are supported for schema inference and evolution:
Databricks Runtime 8.2 and above
Databricks Runtime 8.3 and above
Databricks Runtime 10.2 and above
Databricks Runtime 11.1 and above
Not applicable (fixed-schema)
Not applicable (fixed-schema)
The following example uses
parquet for the
json for other file sources. All other settings for read and write stay the same for the default behaviors for each format.
spark.readStream.format("cloudFiles") \ .option("cloudFiles.format", "parquet") \ # The schema location directory keeps track of your data schema over time .option("cloudFiles.schemaLocation", "<path_to_checkpoint>") \ .load("<path_to_source_data>") \ .writeStream \ .option("checkpointLocation", "<path_to_checkpoint>") \ .start("<path_to_target")
spark.readStream.format("cloudFiles") .option("cloudFiles.format", "parquet") // The schema location directory keeps track of your data schema over time .option("cloudFiles.schemaLocation", "<path_to_checkpoint>") .load("<path_to_source_data>") .writeStream .option("checkpointLocation", "<path_to_checkpoint>") .start("<path_to_target")
To infer the schema, Auto Loader samples the first 50 GB or 1000 files that it discovers, whichever limit is crossed first. To avoid incurring this inference cost at every stream start up, and to be able to provide a stable schema across stream restarts, you must set the option
cloudFiles.schemaLocation. Auto Loader creates a hidden directory
_schemas at this location to track schema changes to the input data over time. If your stream contains a single
cloudFiles source to ingest data, you can provide the checkpoint location as
cloudFiles.schemaLocation. Otherwise, provide a unique directory for this option. If your input data returns an unexpected schema for your stream, check that your schema location is being used by only a single Auto Loader source.
To change the size of the sample that’s used you can set the SQL configurations:
(byte string, for example
By default, Auto Loader infers columns in text-based file formats like CSV and JSON as
string columns. In JSON datasets, nested columns are also inferred as
string columns. Since JSON and CSV data is self-describing and can support many data types, inferring the data as string can help avoid schema evolution issues such as numeric type mismatches (integers, longs, floats). If you want to retain the original Spark schema inference behavior, set the option
When inferring schema for CSV data, Auto Loader assumes that the files contain headers. If your CSV files do not contain headers, provide the option
.option("header", "false"). In addition, Auto Loader merges the schemas of all the files in the sample to come up with a global schema. Auto Loader can then read each file according to its header and parse the CSV correctly.
Auto Loader also attempts to infer partition columns from the underlying directory structure of the data if the data is laid out in Hive style partitioning. For example, a file path such as
base_path/event=click/date=2021-04-01/f0.json would result in the inference of
event as partition columns. The data types for these columns will be strings unless you set
cloudFiles.inferColumnTypes to true. If the underlying directory structure contains conflicting Hive partitions or doesn’t contain Hive style partitioning, the partition columns will be ignored. You can provide the option
cloudFiles.partitionColumns as a comma-separated list of column names to always try and parse the given columns from the file path if these columns exist as
key=value pairs in your directory structure.
Each Parquet file is self-describing and associated with a typed schema. To infer the schema of the Parquet data, Auto Loader samples a subset of Parquet files and merges the schemas of these individual files.
If a column has different data types in two Parquet files, Auto Loader determines if one data type can be safely
upcast to the other. If upcasting is possible, Auto Loader can merge the two schemas and choose the more encompassing data type for this column; otherwise the inference fails. For example,
a: int and
a: double can be merged as
a: double and
a: string can be merged as
a: string; but
a: int and
a: struct cannot be merged. Note that, after merging
a: int and
a: double as
a: double, Auto Loader can read Parquet files with column
a: double as normal, but for the Parquet files with
a: int, Auto Loader needs to read
a as part of the rescued data column, because the data type is different from the inferred schema. Users still have a chance to safely upcast the rescued
a:int and backfill
Binary file (
text file formats have fixed data schemas, but also support partition column inference. The partition columns are inferred at each stream restart unless you specify
cloudFiles.schemaLocation. To avoid any potential errors or information loss, Databricks recommends setting
cloudFiles.partitionColumns as options for these file formats as
cloudFiles.schemaLocation is not a required option for these formats.
Unless case sensitivity is enabled, the columns
ABC are considered the same column for the purposes of schema inference. The case that is chosen is arbitrary and depends on the sampled data. You can use schema hints to enforce which case should be used. Once a selection has been made and the schema is inferred, Auto Loader does not consider the casing variants that were not selected consistent with the schema.
When rescued data column is enabled, fields named in a case other than that of the schema are loaded to the
_rescued_data column. Change this behavior by setting the option
readerCaseSensitive to false, in which case Auto Loader reads data in a case-insensitive way.
The data types that are inferred may not always be exactly what you’re looking for. By using schema hints, you can superimpose the information that you know and expect on an inferred schema.
By default, Apache Spark has a standard approach for inferring the type of data columns. For example, it infers nested JSON as structs and integers as longs. In contrast, Auto Loader considers all columns as strings. When you know that a column is of a specific data type, or if you want to choose an even more general data type (for example, a double instead of an integer), you can provide an arbitrary number of hints for columns data types as follows:
.option("cloudFiles.schemaHints", "tags map<string,string>, version int")
See the documentation on data types for the list of supported data types.
If a column is not present at the start of the stream, you can also use schema hints to add that column to the inferred schema.
Here is an example of an inferred schema to see the behavior with schema hints. Inferred schema:
|-- date: string |-- quantity: int |-- user_info: struct | |-- id: string | |-- name: string | |-- dob: string |-- purchase_options: struct | |-- delivery_address: string
By specifying the following schema hints:
.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")
you will get:
|-- date: string -> date |-- quantity: int |-- user_info: struct | |-- id: string | |-- name: string | |-- dob: string -> date |-- purchase_options: struct -> map<string,string> |-- time: timestamp
Array and Map schema hints support is available in Databricks Runtime 9.1 LTS and above.
Here is an example of an inferred schema with complex datatypes to see the behavior with schema hints. Inferred schema:
|-- products: array<string> |-- locations: array<string> |-- users: array<struct> | |-- users.element: struct | | |-- id: string | | |-- name: string | | |-- dob: string |-- ids: map<string,string> |-- names: map<string,string> |-- prices: map<string,string> |-- discounts: map<struct,string> | |-- discounts.key: struct | | |-- id: string | |-- discounts.value: string |-- descriptions: map<string,struct> | |-- descriptions.key: string | |-- descriptions.value: struct | | |-- content: int
By specifying the following schema hints:
.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")
you will get:
|-- products: array<string> -> array<int> |-- locations: array<int> -> array<string> |-- users: array<struct> | |-- users.element: struct | | |-- id: string -> int | | |-- name: string | | |-- dob: string |-- ids: map<string,string> -> map<string,int> |-- names: map<string,string> -> map<int,string> |-- prices: map<string,string> -> map<string,int> |-- discounts: map<struct,string> | |-- discounts.key: struct | | |-- id: string -> int | |-- discounts.value: string |-- descriptions: map<string,struct> | |-- descriptions.key: string | |-- descriptions.value: struct | | |-- content: int -> string
Schema hints are used only if you do not provide a schema to Auto Loader. You can use schema hints whether
cloudFiles.inferColumnTypes is enabled or disabled.
Auto Loader detects the addition of new columns as it processes your data. By default, addition of a new column causes your streams to stop with an
UnknownFieldException. Before your stream throws this error, Auto Loader performs schema inference on the latest micro-batch of data and updates the schema location with the latest schema. New columns are merged to the end of the schema. The data types of existing columns remain unchanged. By setting your Auto Loader stream within a Databricks job, you can get your stream to restart automatically after such schema changes.
Auto Loader supports the following modes for schema evolution, which you set in the option
addNewColumns: The default mode when a schema is not provided to Auto Loader. The streaming job fails with an
UnknownFieldException. New columns are added to the schema. Existing columns do not evolve data types.
addNewColumnsis not allowed when the schema of the stream is provided. You can instead provide your schema as a schema hint instead if you want to use this mode.
failOnNewColumns: If Auto Loader detects a new column, the stream fails. It will not restart unless the provided schema is updated, or the offending data file is removed.
rescue: The stream runs with the very first inferred or provided schema. Any data type changes or new columns that are added are rescued in the rescued data column that is automatically added to your stream’s schema as
_rescued_data. In this mode, your stream will not fail due to schema changes.
none: The default mode when a schema is provided. Does not evolve the schema, new columns are ignored, and data is not rescued unless the rescued data column is provided separately as an option.
Partition columns are not considered for schema evolution. If you had an initial directory structure like
base_path/event=click/date=2021-04-01/f0.json, and then start receiving new files as
base_path/event=click/date=2021-04-01/hour=01/f1.json, the hour column is ignored. To capture information for new partition columns, set
The rescued data column ensures that you never lose or miss out on data during ETL. The rescued data column contains any data that wasn’t parsed, either because it was missing from the given schema, or because there was a type mismatch, or because the casing of the column in the record or file didn’t match with that in the schema.
The rescued data column is part of the schema returned by Auto Loader as
_rescued_data by default as the schema is inferred. You can rename the column or include it in cases where you provide a schema by setting the option
The rescued data column is returned as a JSON blob containing the columns that were rescued, and the source file path of the record.
The JSON and CSV parsers support three modes when parsing records:
FAILFAST. When used together with
rescuedDataColumn, data type mismatches do not cause records to be dropped in
DROPMALFORMED mode or throw an error in
FAILFAST mode. Only corrupt records are dropped or throw errors, such as incomplete or malformed JSON or CSV. If you use
badRecordsPath when parsing JSON or CSV, data type mismatches are not considered as bad records when using the
rescuedDataColumn. Only incomplete and malformed JSON or CSV records are stored in