Auto Loader

Auto Loader incrementally and efficiently processes new data files as they arrive in cloud storage. Auto Loader can load data files from AWS S3 (s3://), Azure Data Lake Storage Gen2 (ADLS Gen2, abfss://), Google Cloud Storage (GCS, gs://), Azure Blob Storage (wasbs://), and ADLS Gen1 (adl://) in addition to Databricks File System (DBFS, dbfs:/). Auto Loader can ingest JSON, CSV, PARQUET, AVRO, ORC, TEXT, and BINARYFILE file formats.

Auto Loader provides a Structured Streaming source called cloudFiles. Given an input directory path on the cloud file storage, the cloudFiles source automatically processes new files as they arrive, with the option of also processing existing files in that directory.

Auto Loader can scale to loading data from storage accounts that contain billions of files that need to be backfilled to pipelines where millions of files are loaded in an hour.

How Auto Loader works

Auto Loader supports two modes for detecting new files: directory listing and file notification.

  • Directory listing: Auto Loader identifies new files by listing the input directory. Directory listing mode allows you to quickly start Auto Loader streams without any permission configurations other than access to your data on cloud storage. In Databricks Runtime 9.1 and above, Auto Loader can automatically detect whether files are arriving with lexical ordering to your cloud storage and significantly reduce the amount of API calls it needs to make to detect new files. See Incremental Listing for more details.

  • File notification: Auto Loader can automatically set up a notification service and queue service that subscribe to file events from the input directory. File notification mode is more performant and scalable for large input directories or a high volume of files but requires additional cloud permissions for set up. See Leveraging file notifications for more details.

The availability for these modes are listed below.

Cloud Storage

Directory Listing

File Notifications

AWS S3

All versions

All versions

ADLS Gen2

All versions

All versions

GCS

All versions

Databricks Runtime 9.1 and above

Azure Blob Storage

All versions

All versions

ADLS Gen1

Databricks Runtime 7.3 and above

Unsupported

DBFS

All versions

For mount points only.

As files are discovered, their metadata is persisted in a scalable key-value store (RocksDB) in the checkpoint location of your Auto Loader pipeline. This key-value store ensures that data is processed exactly once. You can switch file discovery modes across stream restarts and still obtain exactly-once data processing guarantees. In fact, this is how Auto Loader can both perform a backfill on a directory containing existing files and concurrently process new files that are being discovered through file notifications.

In case of failures, Auto Loader can resume from where it left off by information stored in the checkpoint location and continue to provide exactly-once guarantees when writing data into Delta Lake. You don’t need to maintain or manage any state yourself to achieve fault tolerance or exactly-once semantics.

When to use COPY INTO and when to use Auto Loader

The COPY INTO command is another convenient way to load data incrementally into a Delta table with exactly-once guarantees. Here are a few things to consider when choosing between Auto Loader and COPY INTO:

  • If you’re going to ingest files in the order of thousands, you can use COPY INTO. If you are expecting files in the order of millions or more over time, use Auto Loader. Auto Loader can discover files more cheaply compared to COPY INTO and can split the processing into multiple batches.

  • If your data schema is going to evolve frequently, Auto Loader provides better primitives around schema inference and evolution. See Schema inference and evolution for more details.

  • Loading a subset of re-uploaded files can be a bit easier to manage with COPY INTO. With Auto Loader, it’s harder to reprocess a select subset of files. However, you can use COPY INTO to reload the subset of files while an Auto Loader stream is running simultaneously.

Benefits over Apache Spark FileStreamSource

In Apache Spark, you can read files incrementally using spark.readStream.format(fileFormat).load(directory). Auto Loader provides the following benefits over the file source:

  • Scalability: Auto Loader can discover billions of files efficiently. Backfills can be performed asynchronously to avoid wasting any compute resources.

  • Performance: The cost of discovering files with Auto Loader scales with the number of files that are being ingested instead of the number of directories that the files may land in. See Optimized directory listing.

  • Schema inference and evolution support: Auto Loader can detect schema drifts, notify you when schema changes happen, and rescue data that would have been otherwise ignored or lost. See Schema inference and evolution.

  • Cost: Auto Loader uses native cloud APIs to get lists of files that exist in storage. In addition, Auto Loader’s file notification mode can help reduce your cloud costs further by avoiding directory listing altogether. Auto Loader can automatically set up file notification services on storage to make file discovery much cheaper.

Quickstart

The following code example demonstrates how Auto Loader detects new data files as they arrive in cloud storage. You can run the example code from within a notebook attached to a Databricks cluster.

  1. Create the file upload directory, for example:

    user_dir = '<my-name>@<my-organization.com>'
    upload_path = "/FileStore/shared-uploads/" + user_dir + "/population_data_upload"
    
    dbutils.fs.mkdirs(upload_path)
    
    val user_dir = "<my-name>@<my-organization.com>"
    val upload_path = "/FileStore/shared-uploads/" + user_dir + "/population_data_upload"
    
    dbutils.fs.mkdirs(upload_path)
    
  2. Create the following sample CSV files, and then upload them to the file upload directory by using the DBFS file browser:

    WA.csv:

    city,year,population
    Seattle metro,2019,3406000
    Seattle metro,2020,3433000
    

    OR.csv:

    city,year,population
    Portland metro,2019,2127000
    Portland metro,2020,2151000
    
  3. Run the following code to start Auto Loader.

    checkpoint_path = '/tmp/delta/population_data/_checkpoints'
    write_path = '/tmp/delta/population_data'
    
    # Set up the stream to begin reading incoming files from the
    # upload_path location.
    df = spark.readStream.format('cloudFiles') \
      .option('cloudFiles.format', 'csv') \
      .option('header', 'true') \
      .schema('city string, year int, population long') \
      .load(upload_path)
    
    # Start the stream.
    # Use the checkpoint_path location to keep a record of all files that
    # have already been uploaded to the upload_path location.
    # For those that have been uploaded since the last check,
    # write the newly-uploaded files' data to the write_path location.
    df.writeStream.format('delta') \
      .option('checkpointLocation', checkpoint_path) \
      .start(write_path)
    
    val checkpoint_path = "/tmp/delta/population_data/_checkpoints"
    val write_path = "/tmp/delta/population_data"
    
    // Set up the stream to begin reading incoming files from the
    // upload_path location.
    val df = spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .option("header", "true")
      .schema("city string, year int, population long")
      .load(upload_path)
    
    // Start the stream.
    // Use the checkpoint_path location to keep a record of all files that
    // have already been uploaded to the upload_path location.
    // For those that have been uploaded since the last check,
    // write the newly-uploaded files' data to the write_path location.
    df.writeStream.format("delta")
      .option("checkpointLocation", checkpoint_path)
      .start(write_path)
    
  4. With the code from step 3 still running, run the following code to query the data in the write directory:

    df_population = spark.read.format('delta').load(write_path)
    
    display(df_population)
    
    '''
    Result:
    +----------------+------+------------+
    | city           | year | population |
    +================+======+============+
    | Seattle metro  | 2019 | 3406000    |
    +----------------+------+------------+
    | Seattle metro  | 2020 | 3433000    |
    +----------------+------+------------+
    | Portland metro | 2019 | 2127000    |
    +----------------+------+------------+
    | Portland metro | 2020 | 2151000    |
    +----------------+------+------------+
    '''
    
    val df_population = spark.read.format("delta").load(write_path)
    
    display(df_population)
    
    /* Result:
    +----------------+------+------------+
    | city           | year | population |
    +================+======+============+
    | Seattle metro  | 2019 | 3406000    |
    +----------------+------+------------+
    | Seattle metro  | 2020 | 3433000    |
    +----------------+------+------------+
    | Portland metro | 2019 | 2127000    |
    +----------------+------+------------+
    | Portland metro | 2020 | 2151000    |
    +----------------+------+------------+
    */
    
  5. With the code from step 3 still running, create the following additional CSV files, and then upload them to the upload directory by using the DBFS file browser:

    ID.csv:

    city,year,population
    Boise,2019,438000
    Boise,2020,447000
    

    MT.csv:

    city,year,population
    Helena,2019,81653
    Helena,2020,82590
    

    Misc.csv:

    city,year,population
    Seattle metro,2021,3461000
    Portland metro,2021,2174000
    Boise,2021,455000
    Helena,2021,81653
    
  6. With the code from step 3 still running, run the following code to query the existing data in the write directory, in addition to the new data from the files that Auto Loader has detected in the upload directory and then written to the write directory:

    df_population = spark.read.format('delta').load(write_path)
    
    display(df_population)
    
    '''
    Result:
    +----------------+------+------------+
    | city           | year | population |
    +================+======+============+
    | Seattle metro  | 2019 | 3406000    |
    +----------------+------+------------+
    | Seattle metro  | 2020 | 3433000    |
    +----------------+------+------------+
    | Helena         | 2019 | 81653      |
    +----------------+------+------------+
    | Helena         | 2020 | 82590      |
    +----------------+------+------------+
    | Boise          | 2019 | 438000     |
    +----------------+------+------------+
    | Boise          | 2020 | 447000     |
    +----------------+------+------------+
    | Portland metro | 2019 | 2127000    |
    +----------------+------+------------+
    | Portland metro | 2020 | 2151000    |
    +----------------+------+------------+
    | Seattle metro  | 2021 | 3461000    |
    +----------------+------+------------+
    | Portland metro | 2021 | 2174000    |
    +----------------+------+------------+
    | Boise          | 2021 | 455000     |
    +----------------+------+------------+
    | Helena         | 2021 | 81653      |
    +----------------+------+------------+
    '''
    
    val df_population = spark.read.format("delta").load(write_path)
    
    display(df_population)
    
    /* Result
    +----------------+------+------------+
    | city           | year | population |
    +================+======+============+
    | Seattle metro  | 2019 | 3406000    |
    +----------------+------+------------+
    | Seattle metro  | 2020 | 3433000    |
    +----------------+------+------------+
    | Helena         | 2019 | 81653      |
    +----------------+------+------------+
    | Helena         | 2020 | 82590      |
    +----------------+------+------------+
    | Boise          | 2019 | 438000     |
    +----------------+------+------------+
    | Boise          | 2020 | 447000     |
    +----------------+------+------------+
    | Portland metro | 2019 | 2127000    |
    +----------------+------+------------+
    | Portland metro | 2020 | 2151000    |
    +----------------+------+------------+
    | Seattle metro  | 2021 | 3461000    |
    +----------------+------+------------+
    | Portland metro | 2021 | 2174000    |
    +----------------+------+------------+
    | Boise          | 2021 | 455000     |
    +----------------+------+------------+
    | Helena         | 2021 | 81653      |
    +----------------+------+------------+
    */
    
  7. To clean up, cancel the running code in step 3, and then run the following code, which deletes the upload, checkpoint, and write directories:

    dbutils.fs.rm(write_path, True)
    dbutils.fs.rm(upload_path, True)
    
    dbutils.fs.rm(write_path, true)
    dbutils.fs.rm(upload_path, true)
    

See also Tutorial: Continuously ingest data into Delta Lake with Auto Loader.

Schema inference and evolution

Note

Available in Databricks Runtime 8.2 and above.

Auto Loader supports schema inference and evolution with CSV, JSON, binary (binaryFile), and text file formats. See Schema inference and evolution in Auto Loader for details.

Scaling Auto Loader to large volumes of data

Using Trigger.AvailableNow and rate limiting

Note

Available in Databricks Runtime 10.1 for Scala only.

Available in Databricks Runtime 10.2 and above for Python and Scala.

Auto Loader can be scheduled to run in Databricks Jobs as a batch job by using Trigger.AvailableNow. The AvailableNow trigger will instruct Auto Loader to process all files that arrived before the query start time. New files that are uploaded after the stream has started will be ignored until the next trigger.

With Trigger.AvailableNow, file discovery will happen asynchronously with data processing and data can be processed across multiple micro-batches with rate limiting. Auto Loader by default processes a maximum of 1000 files every micro-batch. You can configure cloudFiles.maxFilesPerTrigger and cloudFiles.maxBytesPerTrigger to configure how many files or how many bytes should be processed in a micro-batch. The file limit is a hard limit but the byte limit is a soft limit, meaning that more bytes can be processed than the provided maxBytesPerTrigger. When the options are both provided together, Auto Loader will process as many files that are needed to hit one of the limits.

Optimized directory listing

Note

Available in Databricks Runtime 9.0 and above.

Auto Loader can discover files on cloud storage systems using directory listing more efficiently than other alternatives. For example, if you had files being uploaded every 5 minutes as /some/path/YYYY/MM/DD/HH/fileName, to find all the files in these directories, the Apache Spark file source would list all subdirectories in parallel, causing 1 (base directory) + 365 (per day) * 24 (per hour) = 8761 LIST API directory calls to storage. By receiving a flattened response from storage, Auto Loader reduces the number of API calls to the number of files in storage divided by the number of results returned by each API call (1000 with S3, 5000 with ADLS Gen2, and 1024 with GCS), greatly reducing your cloud costs.

Incremental Listing

Note

Available in Databricks Runtime 9.1 LTS and above.

For lexicographically generated files, Auto Loader now can leverage the lexical file ordering and optimized listing APIs to improve the efficiency of directory listing by listing from recently ingested files rather than listing the contents of the entire directory.

By default, Auto Loader will automatically detect whether a given directory is applicable for incremental listing by checking and comparing file paths of previously completed directory listings. To ensure eventual completeness of data in auto mode, Auto Loader will automatically trigger a full directory list after completing 7 consecutive incremental lists. You can control the frequency of full directory lists by setting cloudFiles.backfillInterval to trigger asynchronous backfills at a given interval.

You can explicitly enable or disable incremental listing by setting cloudFiles.useIncrementalListing to "true" or "false" (default "auto"). When explicitly enabled, Auto Loader will not trigger full directory lists unless a backfill interval is set. Services like AWS Kinesis Firehose, AWS DMS, and Azure Data Factory are services that can be configured to upload files to a storage system in lexical order. See the Appendix for more examples of lexical directory structures.

Leveraging file notifications

When files do not arrive with lexical ordering to a bucket, you can use file notifications to scale Auto Loader to ingest millions of files an hour. Auto Loader can set up file notifications for you automatically when you set the option cloudFiles.useNotifications to true and provide the necessary permissions to create cloud resources. In addition, you may need to provide the following additional options to provide Auto Loader authorization to create these resources. The following table summarizes which resources are created by Auto Loader.

Cloud Storage

Subscription Service

Queue Service

Prefix (1)

Limit (2)

AWS S3

AWS SNS

AWS SQS

databricks-auto-ingest

100 per S3 bucket

ADLS Gen2

Azure Event Grid

Azure Queue Storage

databricks

500 per storage account

GCS

Google Pub/Sub

Google Pub/Sub

databricks-auto-ingest

100 per GCS bucket

Azure Blob Storage

Azure Event Grid

Azure Queue Storage

databricks

500 per storage account

  1. Auto Loader will name the resources with this prefix

  2. How many concurrent file notification pipelines can be launched

If you cannot provide Auto Loader with the necessary permissions to create file notification services, you can ask your cloud administrators to use the setUpNotificationServices method in the next section in a Databricks Scala notebook to create file notification services for you. Alternatively, your cloud administrators can set up the file notification services manually, and can provide you with the queue identifier to leverage file notifications. See File notification options for more details.

You can switch between file notifications and directory listing at any time and still maintain exactly once data processing guarantees.

Note

Cloud providers do not guarantee 100% delivery of all file events under very rare conditions and do not provide any strict SLAs on the latency of the file events. Databricks recommends that you trigger regular backfills with Auto Loader by using the cloudFiles.backfillInterval option to guarantee that all files are discovered within a given SLA if data completeness is a requirement. Triggering regular backfills will not cause duplicates.

If you require running more than the limited number of file notification pipelines for a given storage account, you can:

  • Consider rearchitecting how files are uploaded to leverage incremental listing instead of file notifications

  • Leverage a service such as AWS Lambda, Azure Functions, or Google Cloud Functions to fan out notifications from a single queue that listens to an entire container or bucket into directory specific queues

File notification events

AWS S3 provides an ObjectCreated event when a file is uploaded to an S3 bucket regardless of whether it was uploaded by a put or multi-part upload.

ADLS Gen2 provides different event notifications for files appearing in your Gen2 container.

  • Auto Loader listens for the FlushWithClose event for processing a file.

  • Auto Loader streams created with Databricks Runtime 8.3 and after support the RenameFile action for discovering files. RenameFile actions will require an API request to the storage system to get the size of the renamed file.

  • Auto Loader streams created with Databricks Runtime 9.0 and after support the RenameDirectory action for discovering files. RenameDirectory actions will require API requests to the storage system to list the contents of the renamed directory.

Google Cloud Storage provides an OBJECT_FINALIZE event when a file is uploaded, which includes overwrites and file copies. Failed uploads do not generate this event.

Managing file notification resources

You can use Scala APIs to manage the notification and queuing services created by Auto Loader. You must configure the resource setup permissions described in Permissions before using this API.

/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by Auto Loader
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Use setUpNotificationServices(<resource-suffix>) to create a queue and a subscription with the name <prefix>-<resource-suffix> (the prefix depends on the storage system summarized in Leveraging file notifications. If there is an existing resource with the same name, Databricks reuses the resource that already exists instead of creating a new one. This function returns a queue identifier that you can pass to the cloudFiles source using the identifier in File notification options. This enables the cloudFiles source user to have fewer permissions than the user who creates the resources. See Permissions.

Provide the "path" option to newManager only if calling setUpNotificationServices; it is not needed for listNotificationServices or tearDownNotificationServices. This is the same path that you use when running a streaming query.

Cloud Storage

Setup API

List API

Tear down API

AWS S3

All versions

All versions

All versions

ADLS Gen2

All versions

All versions

All versions

GCS

Databricks Runtime 9.1 and above

Databricks Runtime 9.1 and above

Databricks Runtime 9.1 and above

Azure Blob Storage

All versions

All versions

All versions

ADLS Gen1

Unsupported

Unsupported

Unsupported

Event retention

Note

Available in Databricks Runtime 8.4 and above.

Auto Loader keeps track of discovered files in the checkpoint location using RocksDB to provide exactly-once ingestion guarantees. For high volume datasets, you can use the cloudFiles.maxFileAge option to expire events from the checkpoint location to reduce your storage costs and Auto Loader start up time. The minimum value that you can set for cloudFiles.maxFileAge is "14 days". Deletes in RocksDB appear as tombstone entries, therefore you should expect the storage usage to increase temporarily as events expire before it starts to level off.

Warning

cloudFiles.maxFileAge is provided as a cost control mechanism for high volume datasets, ingesting in the order of millions of files every hour. Tuning cloudFiles.maxFileAge incorrectly can lead to data quality issues. Therefore, Databricks doesn’t recommend tuning this parameter unless absolutely required.

Trying to tune the cloudFiles.maxFileAge option can lead to unprocessed files being ignored by Auto Loader or already processed files expiring and then being re-processed causing duplicate data. Here are some things to consider when choosing a cloudFiles.maxFileAge:

  • If your stream restarts after a long time, file notification events that are pulled from the queue that are older than cloudFiles.maxFileAge are ignored. Similarly, if you use directory listing, files that may have appeared during the down time that are older than cloudFiles.maxFileAge are ignored.

  • If you use directory listing mode and use cloudFiles.maxFileAge, for example set to "1 month", you stop your stream and restart the stream with cloudFiles.maxFileAge set to "2 months", all files that are older than 1 month, but more recent than 2 months are reprocessed.

The best approach to tuning cloudFiles.maxFileAge would be to start from a generous expiration, for example, "1 year" and working downwards to something like "9 months". If you set this option the first time you start the stream, you will not ingest data older than cloudFiles.maxFileAge, therefore, if you want to ingest old data you should not set this option as you start your stream.

Running Auto Loader in production

Databricks recommends that you follow the streaming best practices for running Auto Loader in production.

Monitoring Auto Loader

Listen to stream updates

To further monitor Auto Loader streams, Databricks recommends using Apache Spark’s Streaming Query Listener interface.

Auto Loader reports metrics to the Streaming Query Listener at every batch. You can view how many files exist in the backlog and how large the backlog is in the numFilesOutstanding and numBytesOutstanding metrics under the Raw Data tab in the streaming query progress dashboard:

{
  "sources" : [
    {
      "description" : "CloudFilesSource[/path/to/source]",
      "metrics" : {
        "numFilesOutstanding" : "238",
        "numBytesOutstanding" : "163939124006"
      }
    }
  ]
}

In Databricks Runtime 10.1 and later, when using file notification mode, the metrics will also include the approximate number of file events that are in the cloud queue as approximateQueueSize for AWS and Azure.

Using Auto Loader in Delta Live Tables

Auto Loader can be used in Delta Live Tables in SQL as well as Python. When using Auto Loader in Delta Live Tables, you do not need to provide a schema location or checkpoint location as these locations will be managed by Delta Live Tables for your pipelines.

The following examples use Auto Loader to create datasets from CSV and JSON files:

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )
CREATE OR REFRESH STREAMING LIVE TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING LIVE TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

You can use supported format options with Auto Loader. The following example reads data from tab-delimited CSV files:

CREATE OR REFRESH STREAMING LIVE TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t"))

Note

Delta Live Tables automatically configures and manages the schema and checkpoint directories when using Auto Loader to read files. However, if you manually configure either of these directories, performing a full refresh does not affect the contents of the configured directories. Databricks recommends using the automatically configured directories to avoid unexpected side effects during processing.

Cost considerations

When running Auto Loader, your main source of costs would be the cost of compute resources and file discovery.

To reduce compute costs, Databricks recommends using Databricks Jobs to schedule Auto Loader as batch jobs using Trigger.AvailableNow (in Databricks Runtime 10.1 and later) or Trigger.Once instead of running it continuously as long as you don’t have low latency requirements.

File discovery costs can come in the form of LIST operations on your storage accounts in directory listing mode and API requests on the subscription service, and queue service in file notification mode. To reduce file discovery costs, Databricks recommends:

  • Providing a ProcessingTime trigger when running Auto Loader continuously in directory listing mode

  • Architecting file uploads to your storage account in lexical ordering to leverage Incremental Listing when possible

  • Using Databricks Runtime 9.0 or later in directory listing mode, especially for deeply nested directories

  • Leveraging file notifications when incremental listing is not possible

  • Using resource tags to tag resources created by Auto Loader to track your costs

Configuring Auto Loader

Configuration options specific to the cloudFiles source are prefixed with cloudFiles so that they are in a separate namespace from other Structured Streaming source options.

File format options

With Auto Loader you can ingest JSON, CSV, PARQUET, AVRO, TEXT, BINARYFILE, and ORC files. See Format Options for the options for these file formats.

Common Auto Loader options

You can configure the following options for directory listing or file notification mode.

Option

cloudFiles.allowOverwrites

Type: Boolean

Whether to allow input directory file changes to overwrite existing data. Available in Databricks Runtime 7.6 and above.

Default value: false

cloudFiles.backfillInterval

Type: Interval String

Auto Loader can trigger asynchronous backfills at a given interval, e.g. 1 day to backfill once a day, or 1 week to backfill once a week. File event notification systems do not guarantee 100% delivery of all files that have been uploaded therefore you can use backfills to guarantee that all files eventually get processed, available in Databricks Runtime 8.4 (Unsupported) and above. If using the incremental listing, you can also use regular backfills to guarantee the eventual completeness, available in Databricks Runtime 9.1 LTS and above.

Default value: None

cloudFiles.format

Type: String

The data file format in the source path. Allowed values include:

Default value: None (required option)

cloudFiles.includeExistingFiles

Type: Boolean

Whether to include existing files in the stream processing input path or to only process new files arriving after initial setup. This option is evaluated only when you start a stream for the first time. Changing this option after restarting the stream has no effect.

Default value: true

cloudFiles.inferColumnTypes

Type: Boolean

Whether to infer exact column types when leveraging schema inference. By default, columns are inferred as strings when inferring JSON datasets. See schema inference for more details.

Default value: false

cloudFiles.maxBytesPerTrigger

Type: Byte String

The maximum number of new bytes to be processed in every trigger. You can specify a byte string such as 10g to limit each microbatch to 10 GB of data. This is a soft maximum. If you have files that are 3 GB each, Databricks processes 12 GB in a microbatch. When used together with cloudFiles.maxFilesPerTrigger, Databricks consumes up to the lower limit of cloudFiles.maxFilesPerTrigger or cloudFiles.maxBytesPerTrigger, whichever is reached first. This option has no effect when used with Trigger.Once().

Default value: None

cloudFiles.maxFileAge

Type: Interval String

How long a file event is tracked for deduplication purposes. Databricks does not recommend tuning this parameter unless you are ingesting data at the order of millions of files an hour. See the section on Event retention for more details.

Default value: None

cloudFiles.maxFilesPerTrigger

Type: Integer

The maximum number of new files to be processed in every trigger. When used together with cloudFiles.maxBytesPerTrigger, Databricks consumes up to the lower limit of cloudFiles.maxFilesPerTrigger or cloudFiles.maxBytesPerTrigger, whichever is reached first. This option has no effect when used with Trigger.Once().

Default value: 1000

cloudFiles.partitionColumns

Type: String

A comma separated list of Hive style partition columns that you would like inferred from the directory structure of the files. Hive style partition columns are key value pairs combined by an equality sign such as <base_path>/a=x/b=1/c=y/file.format. In this example, the partition columns are a, b, and c. By default these columns will be automatically added to your schema if you are using schema inference and provide the <base_path> to load data from. If you provide a schema, Auto Loader expects these columns to be included in the schema. If you do not want these columns as part of your schema, you can specify "" to ignore these columns. In addition, you can use this option when you want columns to be inferred the file path in complex directory structures, like the example below:

<base_path>/year=2022/week=1/file1.csv <base_path>/year=2022/month=2/day=3/file2.csv <base_path>/year=2022/month=2/day=4/file3.csv

Specifying cloudFiles.partitionColumns as year,month,day will return year=2022 for file1.csv, but the month and day columns will be null. month and day will be parsed correctly for file2.csv and file3.csv.

Default value: None

cloudFiles.schemaEvolutionMode

Type: String

The mode for evolving the schema as new columns are discovered in the data. By default, columns are inferred as strings when inferring JSON datasets. See schema evolution for more details.

Default value: "addNewColumns" when a schema is not provided. "none" otherwise.

cloudFiles.schemaHints

Type: String

Schema information that you provide to Auto Loader during schema inference. See schema hints for more details.

Default value: None

cloudFiles.schemaLocation

Type: String

The location to store the inferred schema and subsequent changes. See schema inference for more details.

Default value: None (required when inferring the schema)

cloudFiles.validateOptions

Type: Boolean

Whether to validate Auto Loader options and return an error for unknown or inconsistent options.

Default value: true

Directory Listing options

The following options are relevant to directory listing mode.

Option

cloudFiles.useIncrementalListing

Type: String

Whether to use the incremental listing rather than the full listing in directory listing mode. By default, Auto Loader will make the best effort to automatically detect if a given directory is applicable for the incremental listing. You can explicitly use the incremental listing or use the full directory listing by setting it as true or false respectively.

Available in Databricks Runtime 9.1 LTS and above.

Default value: auto

Available values: auto, true, false

File notification options

The following options are relevant to file notification mode.

Option

cloudFiles.fetchParallelism

Type: Integer

Number of threads to use when fetching messages from the queueing service.

Default value: 1

cloudFiles.pathRewrites

Type: A JSON string

Required only if you specify a queueUrl that receives file notifications from multiple S3 buckets and you want to leverage mount points configured for accessing data in these containers. Use this option to rewrite the prefix of the bucket/key path with the mount point. Only prefixes can be rewritten. For example, for the configuration {"<databricks-mounted-bucket>/path": "dbfs:/mnt/data-warehouse"}, the path s3://<databricks-mounted-bucket>/path/2017/08/fileA.json is rewritten to dbfs:/mnt/data-warehouse/2017/08/fileA.json.

Default value: None

cloudFiles.resourceTags

Type: Map(String, String)

A series of key-value tag pairs to help associate and identify related resources, for example:

cloudFiles.option("cloudFiles.resourceTag.myFirstKey", "myFirstValue")           .option("cloudFiles.resourceTag.mySecondKey", "mySecondValue")

For more information on AWS, see Amazon SQS cost allocation tags and Configuring tags for an Amazon SNS topic. (1)

For more information on Azure, see Naming Queues and Metadata and the coverage of properties.labels in Event Subscriptions. Auto Loader stores these key-value tag pairs in JSON as labels. (1)

For more information on GCP, see Reporting usage with labels. (1)

Default value: None

cloudFiles.useNotifications

Type: Boolean

Whether to use file notification mode to determine when there are new files. If false, use directory listing mode. See How Auto Loader works.

Default value: false

(1) Auto Loader adds the following key-value tag pairs by default on a best-effort basis:

  • vendor: Databricks

  • path: The location from where the data is loaded. Unavailable in GCP due to labeling limitations.

  • checkpointLocation: The location of the stream’s checkpoint. Unavailable in GCP due to labeling limitations.

  • streamId: A globally unique identifier for the stream.

These key names are reserved and you cannot overwrite their values.

AWS specific options

Provide the following option only if you choose cloudFiles.useNotifications = true and you want Auto Loader to set up the notification services for you:

Option

cloudFiles.region

Type: String

The region where the source S3 bucket resides and where the AWS SNS and SQS services will be created.

Default value: In Databricks Runtime 9.0 and above the region of the EC2 instance. In Databricks Runtime 8.4 and below you must specify the region.

Provide the following option only if you choose cloudFiles.useNotifications = true and you want Auto Loader to use a queue that you have already set up:

Option

cloudFiles.queueUrl

Type: String

The URL of the SQS queue. If provided, Auto Loader directly consumes events from this queue instead of setting up its own AWS SNS and SQS services.

Default value: None

You can use the following options to provide credentials to access AWS SNS and SQS when IAM roles are not available or when you’re ingesting data from different clouds.

Option

cloudFiles.awsAccessKey

Type: String

The AWS access key ID for the user. Must be provided with cloudFiles.awsSecretKey.

Default value: None

cloudFiles.awsSecretKey

Type: String

The AWS secret access key for the user. Must be provided with cloudFiles.awsAccessKey.

Default value: None

cloudFiles.roleArn

Type: String

The ARN of an IAM role to assume. The role can be assumed from your cluster’s instance profile or by providing credentials with cloudFiles.awsAccessKey and cloudFiles.awsSecretKey.

Default value: None

cloudFiles.roleExternalId

Type: String

An identifier to provide while assuming a role using cloudFiles.roleArn.

Default value: None

cloudFiles.roleSessionName

Type: String

An optional session name to use while assuming a role using cloudFiles.roleArn.

Default value: None

cloudFiles.stsEndpoint

Type: String

An optional endpoint to provide for accessing AWS STS when assuming a role using cloudFiles.roleArn.

Default value: None

Azure specific options

You must provide values for all of the following options if you specify cloudFiles.useNotifications = true and you want Auto Loader to set up the notification services for you:

Option

cloudFiles.clientId

Type: String

The client ID or application ID of the service principal.

Default value: None

cloudFiles.clientSecret

Type: String

The client secret of the service principal.

Default value: None

cloudFiles.connectionString

Type: String

The connection string for the storage account, based on either account access key or shared access signature (SAS).

Default value: None

cloudFiles.resourceGroup

Type: String

The Azure Resource Group under which the storage account is created.

Default value: None

cloudFiles.subscriptionId

Type: String

The Azure Subscription ID under which the resource group is created.

Default value: None

cloudFiles.tenantId

Type: String

The Azure Tenant ID under which the service principal is created.

Default value: None

Important

Automated notification setup is available in Azure China and Government regions with Databricks Runtime 9.1 and later. You must provide a queueName to use Auto Loader with file notifications in these regions for older DBR versions.

Provide the following option only if you choose cloudFiles.useNotifications = true and you want Auto Loader to use a queue that you have already set up:

Option

cloudFiles.queueName

Type: String

The name of the Azure queue. If provided, the cloud files source directly consumes events from this queue instead of setting up its own Azure Event Grid and Queue Storage services. In that case, your cloudFiles.connectionString requires only read permissions on the queue.

Default value: None

Google specific options

Auto Loader can automatically set up notification services for you by leveraging Google Service Accounts. You can configure your cluster to assume a service account by following Google service setup. The permissions that your service account needs are specified in Required permissions for setting up file notification resources. Otherwise, you can provide the following options for authentication if you want Auto Loader to set up the notification services for you.

Option

cloudFiles.client

Type: String

The client ID of the Google Service Account.

Default value: None

cloudFiles.clientEmail

Type: String

The email of the Google Service Account.

Default value: None

cloudFiles.privateKey

Type: String

The private key that’s generated for the Google Service Account.

Default value: None

cloudFiles.privateKeyId

Type: String

The id of the private key that’s generated for the Google Service Account.

Default value: None

cloudFiles.projectId

Type: String

The id of the project that the GCS bucket is in. The Google Cloud Pub/Sub subscription will also be created within this project.

Default value: None

Provide the following option only if you choose cloudFiles.useNotifications = true and you want Auto Loader to use a queue that you have already set up:

Option

cloudFiles.subscription

Type: String

The name of the Google Cloud Pub/Sub subscription. If provided, the cloud files source consumes events from this queue instead of setting up its own GCS Notification and Google Cloud Pub/Sub services.

Default value: None

References

For an overview and demonstration of Auto Loader, watch this YouTube video (59 minutes).

For details on how to use Auto Loader, see:

Common data loading patterns

Filtering directories or files using glob patterns

Glob patterns can be used for filtering directories and files when provided in the path.

Pattern

Description

?

Matches any single character

*

Matches zero or more characters

[abc]

Matches a single character from character set {a,b,c}.

[a-z]

Matches a single character from the character range {a…z}.

[^a]

Matches a single character that is not from character set or range {a}. Note that the ^ character must occur immediately to the right of the opening bracket.

{ab,cd}

Matches a string from the string set {ab, cd}.

{ab,c{de, fh}}

Matches a string from the string set {ab, cde, cfh}.

Use the path for providing prefix patterns, for example:

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base_path>/*/files")
val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base_path>/*/files")

Important

You need to use the option pathGlobFilter for explicitly providing suffix patterns. The path only provides a prefix filter.

For example, if you would like to parse only png files within a directory that contains files with different suffixes, you can do:

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base_path>)
val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base_path>)

Frequently asked questions (FAQ)

Does Auto Loader process the file again when the file gets appended or overwritten?

Files are processed exactly once unless cloudFiles.allowOverwrites is enabled. If a file is appended to or overwritten, Databricks does not guarantee which version of the file is processed. For well-defined behavior, Databricks suggests that you use Auto Loader to ingest only immutable files. If this does not meet your requirements, contact your Databricks representative.

If my data files do not arrive continuously, but in regular intervals, for example, once a day, should I still use this source and are there any benefits?

Yes and yes. In this case, you can set up a Trigger.Once or Trigger.AvailableNow (available in Databricks Runtime 10.2 and later) Structured Streaming job and schedule to run after the anticipated file arrival time. Auto Loader works well with both infrequent or frequent updates. Even if the eventual updates are very large, Auto Loader scales well to the input size. Auto Loader’s efficient file discovery techniques and schema evolution capabilities make Auto Loader the go to method for incremental data ingestion.

What happens if I change the checkpoint location when restarting the stream?

A checkpoint location maintains important identifying information of a stream. Changing the checkpoint location effectively means that you have abandoned the previous stream and started a new stream.

Do I need to create event notification services beforehand?

No. If you choose file notification mode and provide the required permissions, Auto Loader can create file notification services for you. See Leveraging file notifications

How do I clean up the event notification resources created by Auto Loader?

You can use the cloud resource manager to list and tear down resources. You can also delete these resources manually, using the cloud provider’s UI or APIs.

Can I run multiple streaming queries from different input directories on the same bucket/container?

Yes, as long as they are not parent-child directories; for example, prod-logs/ and prod-logs/usage/ would not work because /usage is a child directory of /prod-logs.

Can I use this feature when there are existing file notifications on my bucket/container?

Yes, as long as your input directory does not conflict with the existing notification prefix (for example, the above parent-child directories).

Troubleshooting

Error:

java.lang.RuntimeException: Failed to create event grid subscription.

If you see this error message when you run Auto Loader for the first time, the Event Grid is not registered as a Resource Provider in your Azure subscription. To register this on Azure portal:

  1. Go to your subscription.

  2. Click Resource Providers under the Settings section.

  3. Register the provider Microsoft.EventGrid.

Error:

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

If you see this error message when you run Auto Loader for the first time, ensure you have given the Contributor role to your service principal for Event Grid as well as your storage account.

Appendix

Required permissions for setting up file notification resources

ADLS Gen2 and Azure Blob Storage

You must have read permissions for the input directory. See Azure Blob Storage and Azure Data Lake Storage Gen2.

To use file notification mode, you must provide authentication credentials for setting up and accessing the event notification services. In Databricks Runtime 8.1 and above, you only need a service principal for authentication. For Databricks Runtime 8.0 and below, you must provide both a service principal and a connection string.

  • Service principal - using Azure built-in roles

    Create an Azure Active Directory app and service principal in the form of client ID and client secret.

    Assign this app the following roles to the storage account in which the input path resides:

    • Contributor: This role is for setting up resources in your storage account, such as queues and event subscriptions.

    • Storage Queue Data Contributor: This role is for performing queue operations such as retrieving and deleting messages from the queues. This role is required in Databricks Runtime 8.1 and above only when you provide a service principal without a connection string.

    Assign this app the following role to the related resource group:

    For more information, see Assign Azure roles using the Azure portal.

  • Service principal - using custom role

    If you are concerned with the execessive permissions required for the preceding roles, you may create a Custom Role with at least the following permissions, listed below in Azure role JSON format:

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    Then, you may assign this custom role to your app.

    For more information, see Assign Azure roles using the Azure portal.

  • Connection string

    Auto Loader requires a connection stringto authenticate for Azure Queue Storage operations, such as creating a queue and reading and deleting messages from the queue. The queue is created in the same storage account where the input directory path is located. You can find your connection string in your account key or shared access signature (SAS).

    If you are using Databricks Runtime 8.1 or above, you do not need a connection string.

    If you are using Databricks Runtime 8.0 or below, you must provide a connection string to authenticate for Azure Queue Storage operations, such as creating a queue and retrieving and deleting messages from the queue. The queue is created in the same storage account in which the input path resides. You can find your connection string in your account key or shared access signature (SAS). When configuring an SAS token, you must provide the following permissions:

Auto loader permissions

AWS S3

You must have read permissions for the input directory. See S3 connection details for more details.

To use file notification mode, attach the following JSON policy document to your IAM user or role.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:DeleteMessageBatch",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility",
        "sqs:ChangeMessageVisibilityBatch"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

where:

  • <bucket-name>: The S3 bucket name where your stream will read files, for example, auto-logs. You can use * as a wildcard, for example, databricks-*-logs. To find out the underlying S3 bucket for your DBFS path, you can list all the DBFS mount points in a notebook by running %fs mounts.

  • <region>: The AWS region where the S3 bucket resides, for example, us-west-2. If you don’t want to specify the region, use *.

  • <account-number>: The AWS account number that owns the S3 bucket, for example, 123456789012. If don’t want to specify the account number, use *.

The string databricks-auto-ingest-* in the SQS and SNS ARN specification is the name prefix that the cloudFiles source uses when creating SQS and SNS services. Since Databricks sets up the notification services in the initial run of the stream, you can use a policy with reduced permissions after the initial run (for example, stop the stream and then restart it).

Note

The preceding policy is concerned only with the permissions needed for setting up file notification services, namely S3 bucket notification, SNS, and SQS services and assumes you already have read access to the S3 bucket. If you need to add S3 read-only permissions, add the following to the Action list in the DatabricksAutoLoaderSetup statement in the JSON document:

  • s3:ListBucket

  • s3:GetObject

Reduced permissions after initial setup

The resource setup permissions described above are required only during the initial run of the stream. After the first run, you can switch to the following IAM policy with reduced permissions.

Important

With the reduced permissions, you won’t able to start new streaming queries or recreate resources in case of failures (for example, the SQS queue has been accidentally deleted); you also won’t be able to use the cloud resource management API to list or tear down resources.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:DeleteMessageBatch",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility",
       "sqs:ChangeMessageVisibilityBatch"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}
Securely ingest data in a different AWS account

Auto Loader can load data across AWS accounts by assuming an IAM role. After setting the temporary security credentials created by AssumeRole, you can have Auto Loader load cloud files cross-accounts. To set up the Auto Loader for cross-AWS accounts, follow the doc: _. Make sure you:

  • Verify that you have the AssumeRole meta role assigned to the cluster.

  • Configure the cluster’s Spark configuration to include the following properties:

    fs.s3a.credentialsType AssumeRole
    fs.s3a.stsAssumeRole.arn arn:aws:iam::<bucket-owner-acct-id>:role/MyRoleB
    fs.s3a.acl.default BucketOwnerFullControl
    

GCS

You must have list and get permissions on your GCS bucket and on all the objects. For details, see the Google documentation on IAM permissions.

To use file notification mode, you need to add permissions for the GCS service account and the account used to access the Google Cloud Pub/Sub resources.

Add the Pub/Sub Publisher role to the GCS service account. This will allow the account to publish event notification messages from your GCS buckets to Google Cloud Pub/Sub.

As for the service account used for the Google Cloud Pub/Sub resources, you will need to add the following permissions:

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

To do this, you can either create an IAM custom role with these permissions or assign pre-existing GCP roles to cover these permissions.

Finding the GCS Service Account

In the Google Cloud Console for the corresponding project, navigate to Cloud Storage > Settings. On that page, you should see a section titled “Cloud Storage Service Account” containing the email of the GCS service account.

GCS Service Account

Creating a Custom Google Cloud IAM Role for File Notification Mode

In the Google Cloud console for the corresponding project, navigate to IAM & Admin > Roles. Then, either create a role at the top or update an existing role. In the screen for role creation or edit, click Add Permissions. A menu should then pop up in which you can add the desired permissions to the role.

GCP IAM Custom Roles

Lexical ordering of files

For files to be lexically ordered, new files that are uploaded need to have a prefix that is lexicographically greater than existing files. Some examples of lexical ordered directories are shown below.

Versioned files

Delta Lake tables make commits to its transaction log in a lexical order.

<path_to_table>/_delta_log/00000000000000000000.json
<path_to_table>/_delta_log/00000000000000000001.json <- guaranteed to be written after version 0
<path_to_table>/_delta_log/00000000000000000002.json <- guaranteed to be written after version 1
...

AWS DMS uploads CDC files to AWS S3 in a versioned manner.

database_schema_name/table_name/LOAD00000001.csv
database_schema_name/table_name/LOAD00000002.csv
...

Date partitioned files

Files can be uploaded in a date partitioned format and leverage incremental listing. Some examples of this are:

// <base_path>/yyyy/MM/dd/HH:mm:ss-randomString
<base_path>/2021/12/01/10:11:23-b1662ecd-e05e-4bb7-a125-ad81f6e859b4.json
<base_path>/2021/12/01/10:11:23-b9794cf3-3f60-4b8d-ae11-8ea320fad9d1.json
...

// <base_path>/year=yyyy/month=MM/day=dd/hour=HH/minute=mm/randomString
<base_path>/year=2021/month=12/day=04/hour=08/minute=22/442463e5-f6fe-458a-8f69-a06aa970fc69.csv
<base_path>/year=2021/month=12/day=04/hour=08/minute=22/8f00988b-46be-4112-808d-6a35aead0d44.csv <- this may be uploaded before the file above as long as processing happens less frequently than a minute

When files are uploaded with date partitioning, some things to keep in mind are:

  • Months, days, hours, minutes need to be left padded with zeros to ensure lexical ordering (should be uploaded as hour=03, instead of hour=3 or 2021/05/03 instead of 2021/5/3).

  • Files don’t necessarily have to be uploaded in lexical order in the deepest directory as long as processing happens less frequently than the parent directory’s time granularity

Some services that can upload files in a date partitioned lexical ordering are:

Format Options

Generic options

The following options apply to all file formats.

Option

modifiedAfter

Type: Timestamp String, for example, 2021-01-01 00:00:00.000000 UTC+0

An optional timestamp to ingest files that have a modification timestamp after the provided timestamp.

Default value: None

modifiedBefore

Type: Timestamp String, for example, 2021-01-01 00:00:00.000000 UTC+0

An optional timestamp to ingest files that have a modification timestamp before the provided timestamp.

Default value: None

pathGlobFilter

Type: String

A potential glob pattern to provide for choosing files. Equivalent to PATTERN in COPY INTO.

Default value: None

recursiveFileLookup

Type: Boolean

Whether to load data recursively within the base directory and skip partition inference.

Default value: false

JSON options

Option

allowBackslashEscapingAnyCharacter

Type: Boolean

Whether to allow backslashes to escape any character that succeeds it. If not enabled, only characters that are explicitly listed by the JSON specification can be escaped.

Default value: false

allowComments

Type: Boolean

Whether to allow the use of Java, C, and C++ style comments ('/', '*', and '//' varieties) within parsed content or not.

Default value: false

allowNonNumericNumbers

Type: Boolean

Whether to allow the set of not-a-number (NaN) tokens as legal floating number values.

Default value: true

allowNumericLeadingZeros

Type: Boolean

Whether to allow integral numbers to start with additional (ignorable) zeroes (for example, 000001).

Default value: false

allowSingleQuotes

Type: Boolean

Whether to allow use of single quotes (apostrophe, character '\') for quoting strings (names and String values).

Default value: true

allowUnquotedControlChars

Type: Boolean

Whether to allow JSON strings to contain unescaped control characters (ASCII characters with value less than 32, including tab and line feed characters) or not.

Default value: false

allowUnquotedFieldNames

Type: Boolean

Whether to allow use of unquoted field names (which are allowed by JavaScript, but not by the JSON specification).

Default value: false

badRecordsPath

Type: String

The path to store files for recording the information about bad JSON records.

Default value: None

columnNameOfCorruptRecord

Type: String

The column for storing records that are malformed and cannot be parsed. If the mode for parsing is set as DROPMALFORMED, this column will be empty.

Default value: _corrupt_record

dateFormat

Type: String

The format for parsing date strings.

Default value: yyyy-MM-dd

dropFieldIfAllNull

Type: Boolean

Whether to ignore columns of all null values or empty arrays and structs during schema inference.

Default value: false

encoding or charset

Type: String

The name of the encoding of the JSON files. See java.nio.charset.Charset for list of options. You cannot use UTF-16 and UTF-32 when multiline is true.

Default value: UTF-8

inferTimestamp

Type: Boolean

Whether to try and infer timestamp strings as a TimestampType. When set to true, schema inference may take noticeably longer.

Default value: false

lineSep

Type: String

A string between two consecutive JSON records.

Default value: None, which covers \r, \r\n, and \n

locale

Type: String

A java.util.Locale identifier. Influences default date, timestamp, and decimal parsing within the JSON.

Default value: US

mode

Type: String

Parser mode around handling malformed records. One of 'PERMISSIVE', 'DROPMALFORMED', or 'FAILFAST'.

Default value: PERMISSIVE

multiLine

Type: Boolean

Whether the JSON records span multiple lines.

Default value: false

prefersDecimal

Type: Boolean

Whether to infer floats and doubles as DecimalType during schema inference.

Default value: false

primitivesAsString

Type: Boolean

Whether to infer primitive types like numbers and booleans as StringType.

Default value: false

rescuedDataColumn

Type: String

Whether to collect all data that can’t be parsed due to a data type mismatch or schema mismatch (including column casing) to a separate column. This column is included by default when using Auto Loader. For more details, refer to Rescued data column.

Default value: None

timestampFormat

Type: String

The format for parsing timestamp strings.

Default value: yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]

timeZone

Type: String

The java.time.ZoneId to use when parsing timestamps and dates.

Default value: None

CSV options

Option

badRecordsPath

Type: String

The path to store files for recording the information about bad CSV records.

Default value: None

charToEscapeQuoteEscaping

Type: Char

The character used to escape the character used for escaping quotes. For example, for the following record: [ " a\\", b ]:

  • If the character to escape the '\' is undefined, the record won’t be parsed. The parser will read characters: [a],[\],["],[,],[ ],[b] and throw an error because it cannot find a closing quote.

  • If the character to escape the '\' is defined as '\', the record will be read with 2 values: [a\] and [b].

Default value: '\0'

columnNameOfCorruptRecord

Type: String

A column for storing records that are malformed and cannot be parsed. If the mode for parsing is set as DROPMALFORMED, this column will be empty.

Default value: _corrupt_record

comment

Type: Char

Defines the character that represents a line comment when found in the beginning of a line of text. Use '\0' to disable comment skipping.

Default value: '#'

dateFormat

Type: String

The format for parsing date strings.

Default value: yyyy-MM-dd

emptyValue

Type: String

String representation of an empty value.

Default value: ""

encoding or charset

Type: String

The name of the encoding of the CSV files. See java.nio.charset.Charset for the list of options. UTF-16 and UTF-32 cannot be used when multiline is true.

Default value: UTF-8

enforceSchema

Type: Boolean

Whether to forcibly apply the specified or inferred schema to the CSV files. If the option is enabled, headers of CSV files are ignored. This option is ignored by default when using Auto Loader to rescue data and allow schema evolution.

Default value: true

escape

Type: Char

The escape character to use when parsing the data.

Default value: '\'

header

Type: Boolean

Whether the CSV files contain a header. Auto Loader assumes that files have headers when inferring the schema.

Default value: false

ignoreLeadingWhiteSpace

Type: Boolean

Whether to ignore leading whitespaces for each parsed value.

Default value: false

ignoreTrailingWhiteSpace

Type: Boolean

Whether to ignore trailing whitespaces for each parsed value.

Default value: false

inferSchema

Type: Boolean

Whether to infer the data types of the parsed CSV records or to assume all columns are of StringType. Requires an additional pass over the data if set to true.

Default value: false

lineSep

Type: String

A string between two consecutive CSV records.

Default value: None, which covers \r, \r\n, and \n

locale

Type: String

A java.util.Locale identifier. Influences default date, timestamp, and decimal parsing within the CSV.

Default value: US

maxCharsPerColumn

Type: Int

Maximum number of characters expected from a value to parse. Can be used to avoid memory errors. Defaults to -1, which means unlimited.

Default value: -1

maxColumns

Type: Int

The hard limit of how many columns a record can have.

Default value: 20480

mergeSchema

Type: Boolean

Whether to infer the schema across multiple files and to merge the schema of each file. Enabled by default for Auto Loader when inferring the schema.

Default value: false

mode

Type: String

Parser mode around handling malformed records. One of 'PERMISSIVE', 'DROPMALFORMED', and 'FAILFAST'.

Default value: PERMISSIVE

multiLine

Type: Boolean

Whether the CSV records span multiple lines.

Default value: false

nanValue

Type: String

The string representation of a non-a-number value when parsing FloatType and DoubleType columns.

Default value: "NaN"

negativeInf

Type: String

The string representation of negative infinity when parsing FloatType or DoubleType columns.

Default value: "-Inf"

nullValue

Type: String

String representation of a null value.

Default value: ""

parserCaseSensitive (deprecated)

Type: Boolean

While reading files, whether to align columns declared in the header with the schema case sensitively. This is true by default for Auto Loader. Columns that differ by case will be rescued in the rescuedDataColumn if enabled. This option has been deprecated in favor of readerCaseSensitive.

Default value: false

positiveInf

Type: String

The string representation of positive infinity when parsing FloatType or DoubleType columns.

Default value: "Inf"

quote

Type: Char

The character used for escaping values where the field delimiter is part of the value.

Default value: '\'

rescuedDataColumn

Type: String

Whether to collect all data that can’t be parsed due to: a data type mismatch, and schema mismatch (including column casing) to a separate column. This column is included by default when using Auto Loader. For more details refer to Rescued data column.

Default value: None

sep or delimiter

Type: String

The separator string between columns.

Default value: ","

timestampFormat

Type: String

The format for parsing timestamp strings.

Default value: yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]

timeZone

Type: String

The java.time.ZoneId to use when parsing timestamps and dates.

Default value: None

unescapedQuoteHandling

Type: String

The strategy for handling unescaped quotes. Allowed options:

  • STOP_AT_CLOSING_QUOTE: If unescaped quotes are found in the input, accumulate the quote character and proceed parsing the value as a quoted value, until a closing quote is found.

  • BACK_TO_DELIMITER: If unescaped quotes are found in the input, consider the value as an unquoted value. This will make the parser accumulate all characters of the current parsed value until the delimiter defined by sep is found. If no delimiter is found in the value, the parser will continue accumulating characters from the input until a delimiter or line ending is found.

  • STOP_AT_DELIMITER: If unescaped quotes are found in the input, consider the value as an unquoted value. This will make the parser accumulate all characters until the delimiter defined by sep, or a line ending is found in the input.

  • SKIP_VALUE: If unescaped quotes are found in the input, the content parsed for the given value will be skipped (until the next delimiter is found) and the value set in nullValue will be produced instead.

  • RAISE_ERROR: If unescaped quotes are found in the input, a TextParsingException will be thrown.

Default value: STOP_AT_DELIMITER

PARQUET options

Option

datetimeRebaseMode

Type: String

Controls the rebasing of the DATE and TIMESTAMP values between Julian and Proleptic Gregorian calendars. Allowed values: EXCEPTION, LEGACY, and CORRECTED.

Default value: LEGACY

int96RebaseMode

Type: String

Controls the rebasing of the INT96 timestamp values between Julian and Proleptic Gregorian calendars. Allowed values: EXCEPTION, LEGACY, and CORRECTED.

Default value: LEGACY

mergeSchema

Type: Boolean

Whether to infer the schema across multiple files and to merge the schema of each file.

Default value: false

AVRO options

Option

avroSchema

Type: String

Optional schema provided by a user in Avro format. When reading Avro, this option can be set to an evolved schema, which is compatible but different with the actual Avro schema. The deserialization schema will be consistent with the evolved schema. For example, if you set an evolved schema containing one additional column with a default value, the read result will contain the new column too.

Default value: None

datetimeRebaseMode

Type: String

Controls the rebasing of the DATE and TIMESTAMP values between Julian and Proleptic Gregorian calendars. Allowed values: EXCEPTION, LEGACY, and CORRECTED.

Default value: LEGACY

mergeSchema

Type: Boolean

Whether to infer the schema across multiple files and to merge the schema of each file. mergeSchema for Avro does not relax data types.

Default value: false

BINARYFILE options

Binary files do not have any additional configuration options.

TEXT options

Option

encoding

Type: String

The name of the encoding of the TEXT files. See java.nio.charset.Charset for list of options.

Default value: UTF-8

lineSep

Type: String

A string between two consecutive TEXT records.

Default value: None, which covers \r, \r\n and \n

wholeText

Type: Boolean

Whether to read a file as a single record.

Default value: false

ORC options

Option

mergeSchema

Type: Boolean

Whether to infer the schema across multiple files and to merge the schema of each file.

Default value: false