Padrões comuns de carregamento de dados

O Auto Loader simplifica uma série de tarefas comuns de ingestão de dados. Esta referência rápida fornece exemplos para vários padrões populares.

Filtrando diretórios ou arquivos usando padrões glob

Padrões glob podem ser usados para filtrar diretórios e arquivos quando fornecidos no caminho.

Padrão

Descrição

?

Corresponde a qualquer caractere único

*

Corresponde a zero ou mais caracteres

[abc]

Corresponde a um único caractere do conjunto de caracteres {a,b,c}.

[a-z]

Corresponde a um único caractere do intervalo de caracteres {a…z}.

[^a]

Corresponde a um único caractere que não é do conjunto de caracteres ou intervalo {a}. Observe que o caractere ^ deve ocorrer imediatamente à direita do colchete de abertura.

{ab,cd}

Corresponde a uma string do conjunto de strings {ab, cd}.

{ab,c{de, fh}}

Corresponde a uma string do conjunto de strings {ab, cde, cfh}.

Use o path para fornecer padrões de prefixo, por exemplo:

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base-path>/*/files")
val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base-path>/*/files")

Importante

Você precisa usar a opção pathGlobFilter para fornecer padrões de sufixo explicitamente. O path fornece apenas um filtro de prefixo.

Por exemplo, se você deseja analisar apenas arquivos png em um diretório que contém arquivos com sufixos diferentes, você pode fazer:

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base-path>)
val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base-path>)

Observação

O comportamento de globbing default do Auto Loader é diferente do comportamento default de outras fontes de arquivo Spark. Adicione .option("cloudFiles.useStrictGlobber", "true") à sua leitura para usar globbing que corresponda ao comportamento default do Spark em relação às fontes de arquivo. Consulte a tabela a seguir para saber mais sobre globbing:

Padrão

Caminho de arquivo

globber default

Strict globber

/a/b

/a/b/c/arquivo.txt

Sim

Sim

/a/b

/a/b_dir/c/arquivo.txt

Não

Não

/a/b

/a/b.txt

Não

Não

/a/b/

/a/b.txt

Não

Não

/a/*/c/

/a/b/c/arquivo.txt

Sim

Sim

/a/*/c/

/a/b/c/d/arquivo.txt

Sim

Sim

/a/*/c/

/a/b/x/y/c/arquivo.txt

Sim

Não

/a/*/c

/a/b/c_file.txt

Sim

Não

/a/*/c/

/a/b/c_file.txt

Sim

Não

/a/*/c/

/a/*/cookie/arquivo.txt

Sim

Não

/a/b*

/a/b.txt

Sim

Sim

/a/b*

/a/b/arquivo.txt

Sim

Sim

/a/{0.txt,1.txt}

/a/0.txt

Sim

Sim

/a/*/{0.txt,1.txt}

/a/0.txt

Não

Não

/a/b/[cde-h]/i/

/a/b/c/i/arquivo.txt

Sim

Sim

Habilitar ETL fácil

Uma maneira fácil de colocar seus dados no Delta Lake sem perder nenhum dado é usar o seguinte padrão e habilitar a inferência de esquema com o Auto Loader. Databricks recomenda executar o código a seguir em um Job do Databricks para que ele reinicie automaticamente sua transmissão quando o esquema de seus dados de origem for alterado. Por default, o esquema é inferido como tipos strings , qualquer erro de análise (não deve haver nenhum se tudo permanecer como strings) irá para _rescued_data e qualquer nova coluna falhará na transmissão e evoluirá o esquema.

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")
spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Evite a perda de dados em dados bem estruturados

Quando você conhece seu esquema, mas deseja saber sempre que receber dados inesperados, o Databricks recomenda usar o rescuedDataColumn.

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")
spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Se você deseja que sua transmissão pare de processar se for introduzido um novo campo que não corresponda ao seu esquema, você pode adicionar:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Ativar pipeline de dados semiestruturado flexível

Quando você recebe dados de um fornecedor que introduz novas colunas na informação que eles fornecem, você pode não saber exatamente quando eles o fazem ou pode não ter largura de banda para atualizar seu pipeline de dados. Agora você pode aproveitar a evolução do esquema para reiniciar a transmissão e permitir que o Auto Loader atualize o esquema inferido automaticamente. Você também pode aproveitar schemaHints para alguns dos campos "sem esquema" que o fornecedor pode fornecer.

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")
spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Transformar dados JSON aninhados

Como o Auto Loader infere as colunas JSON de nível superior como strings, você pode ficar com objetos JSON aninhados que requerem transformações adicionais. Você pode usar as APIs de acesso a dados semiestruturados para transformar ainda mais o conteúdo JSON complexo.

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<source-data-with-nested-json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )
spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<source-data-with-nested-json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

Inferir dados JSON aninhados

Quando você tiver dados aninhados, poderá usar a opção cloudFiles.inferColumnTypes para inferir a estrutura aninhada de seus dados e outros tipos de coluna.

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source-data-with-nested-json>")
spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source-data-with-nested-json>")

Carregar arquivos CSV sem cabeçalhos

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)
val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Aplicar um esquema em arquivos CSV com cabeçalhos

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)
val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Ingerir imagem ou dados binários para Delta Lake para ML

Depois que os dados são armazenados no Delta Lake, você pode executar a inferência distribuída nos dados. Consulte Executar inferência distribuída usando pandas UDF.

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")
spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Sintaxe Auto Loader para DLT

Delta Live Tables fornece sintaxe Python ligeiramente modificada para Auto Loader e adiciona suporte SQL para Auto Loader.

Os exemplos a seguir usam o Auto Loader para criar dataset a partir de arquivos CSV e JSON:

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

Você pode usar opções de formato suportadas com o Auto Loader. Usando a função map() , você pode passar opções para o método cloud_files() . As opções são par key-valor, onde a key e os valores são strings. O seguinte descreve a sintaxe para trabalhar com o Auto Loader em SQL:

CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

O exemplo a seguir lê dados de arquivos CSV delimitados tabcom um cabeçalho:

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))

Você pode usar schema para especificar o formato manualmente; você deve especificar schema para formatos que não suportam inferência de esquema:

@dlt.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )
CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
  FROM cloud_files(
    "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
    "parquet",
    map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
  )

Observação

Delta Live Tables configura e gerencia automaticamente os diretórios de esquema e ponto de verificação ao usar o Auto Loader para ler arquivos. Entretanto, se você configurar manualmente qualquer um desses diretórios, a execução de uma refresh completa não afetará o conteúdo dos diretórios configurados. A Databricks recomenda a utilização dos diretórios configurados automaticamente para evitar efeitos secundários inesperados durante o processamento.