Configure a inferência e a evolução do esquema no Auto Loader

É possível configurar o Auto Loader para detectar automaticamente o esquema dos dados carregados, permitindo que você inicialize tabelas sem a necessidade de declarar explicitamente o esquema dos dados e evolua o esquema da tabela à medida que novas colunas são introduzidas.Isso elimina a necessidade de rastrear manualmente e aplicar alterações de esquema ao longo do tempo.

O Auto Loader também pode “resgatar” dados inesperados (por exemplo, de diferentes tipos de dados) em uma coluna de JSON blob, que você pode escolher acessar posteriormente usando as APIs semiestruturadas de acesso a dados.

Os seguintes formatos são compatíveis com a inferência e evolução do esquema:

Formato de arquivo

Versões compatíveis

JSON

Todas versões

CSV

Todas versões

XML

Databricks Runtime 14.3 LTS e acima

Avro

Databricks Runtime 10.4 LTS e acima

Parquet

Databricks Runtime 11.3 LTS e acima

ORC

Sem compatibilidade

Text

Não aplicável (esquema fixo)

Binaryfile

Não aplicável (esquema fixo)

Sintaxe para inferência e evolução de esquemas

A especificação de um diretório de destino para a opção cloudFiles.schemaLocation permite a inferência e a evolução do esquema. Você pode optar por usar o mesmo diretório que especificou para checkpointLocation. Caso você use o Delta Live Tables, o Databricks gerenciará automaticamente o local do esquema e outras informações de ponto de verificação.

Observação

Se você tiver mais de uma localização de dados de origem sendo carregada na tabela de destino, cada carga de trabalho de ingestão do Auto Loader requer um ponto de verificação de transmissão separado.

O exemplo a seguir usa parquet para cloudFiles.format. Use csv, avro ou json para outras fontes de arquivo. Todas as outras configurações de leitura e escrita permanecem iguais às configurações padrão para cada formato.

(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")
)
spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Como funciona a inferência de esquema do Auto Loader?

Para deduzir o esquema na primeira leitura dos dados, o Auto Loader amostra os primeiros 50 GB ou 1000 arquivos que encontra, o que acontecer primeiro.O Auto Loader armazena as informações de esquema em um _schemas de diretório no cloudFiles.schemaLocation configurado para controlar as alterações de esquema nos dados de entrada ao longo do tempo.

Observação

Para alterar o tamanho da amostra usada, você pode definir as configurações do SQL:

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(string de bytes, por exemplo 10gb)

e

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

(inteiro)

Em default, a inferência de esquema do Auto Loader procura evitar problemas de evolução do esquema devido a incompatibilidades de tipos. Para formatos que não codificam tipos de dados (JSON, CSV e XML), o Auto Loader infere todas as colunas como strings (incluindo campos aninhados em arquivos JSON). Para formatos com esquema tipado (Parquet e Avro), o Auto Loader coleta amostras de um subconjunto de arquivos e mescla os esquemas de arquivos individuais. Esse comportamento está resumido na tabela a seguir:

Formato de arquivo

Tipo de dados inferidos padrão

JSON

String

CSV

String

XML

String

Avro

Tipos codificados no esquema Avro

Parquet

Tipos codificados no esquema de Parquet

O Apache Spark DataFrameReader usa um comportamento diferente para inferência de esquema, selecionando tipos de dados para colunas em fontes JSON, CSV e XML com base em dados de amostra. Para ativar esse comportamento com o Auto Loader, defina a opção cloudFiles.inferColumnTypes como true.

Observação

Ao inferir o esquema para dados CSV, o Auto Loader assume que os arquivos contêm cabeçalhos. Se seus arquivos CSV não contiverem cabeçalhos, forneça a opção .option("header", "false"). Além disso, o Auto Loader merge os esquemas de todos os arquivos da amostra para criar um esquema global. O Auto Loader pode então ler cada arquivo de acordo com seu cabeçalho e analisar o CSV corretamente.

Observação

Quando uma coluna possui tipos de dados diferentes em dois arquivos Parquet, o Auto Loader escolhe o tipo mais amplo. Você pode usar schemaHints para substituir essa escolha. Quando você especifica dicas de esquema, o Auto Loader não converte a coluna para o tipo especificado, mas informa ao leitor Parquet para ler a coluna como o tipo especificado. No caso de incompatibilidade, a coluna é resgatada na coluna de dados resgatada.

Como funciona a evolução do esquema do Auto Loader?

O Auto Loader detecta a adição de novas colunas à medida que processa seus dados. Quando o Auto Loader detecta uma nova coluna, o fluxo para com um UnknownFieldException. Antes que sua transmissão gere esse erro, o Auto Loader realiza a inferência de esquema no último micro lote de dados e atualiza a localização do esquema com o esquema mais recente, mesclando novas colunas no final do esquema.Os tipos de dados das colunas existentes permanecem inalterados.

O Databricks recomenda configurar as transmissões do Auto Loader com fluxos de trabalho para reiniciar automaticamente após essas alterações de esquema.

O Auto Loader é compatível com os seguintes modos para evolução de esquema, que você definiu na opção cloudFiles.schemaEvolutionMode:

Modo

Comportamento na leitura de nova coluna

addNewColumns (padrão)

A transmissão da falha. Novas colunas são adicionadas ao esquema. As colunas existentes não desenvolvem tipos de dados.

rescue

O esquema nunca é evoluído e a transmissão não falha devido a mudanças de esquema. Todas as novas colunas são registradas na coluna de dados resgatados.

failOnNewColumns

A transmissão da falha. O fluxo não é reiniciado, a menos que o esquema fornecido seja atualizado ou que o arquivo de dados problemáticos sejam removidos.

none

Não evolui o esquema, novas colunas são ignoradas e os dados não são resgatados a menos que a opção rescuedDataColumn seja definida. A transmissão não falha devido a alterações de esquema.

Como as partições funcionam com o Auto Loader?

O Auto Loader tenta inferir colunas de partição a partir da estrutura de diretórios subjacente dos dados se os dados estiverem organizados em particionamento no estilo Hive.Por exemplo, o caminho do arquivo base_path/event=click/date=2021-04-01/f0.json resulta na inferência de date e event como colunas de partição. Se a estrutura de diretório subjacente contiver partições de Hive conflitantes ou não contiver particionamento no estilo de Hive, as colunas de partição serão ignoradas.

Os formatos de arquivo binário (binaryFile) e text têm esquemas de dados fixos, mas são compatíveis com inferência de coluna de partição. O Databricks recomenda configurar o cloudFiles.schemaLocation para estes formatos de arquivo. Isso evita possíveis erros ou perda de informações e impede a inferência de colunas de partições sempre que um Auto Loader começa.

Colunas de partição não são consideradas para a evolução do esquema. Se você tivesse uma estrutura inicial de diretórios como base_path/event=click/date=2021-04-01/f0.json, e depois começasse a receber novos arquivos como base_path/event=click/date=2021-04-01/hour=01/f1.json, o Auto Loader ignorará a coluna de horas. Para capturar informações para novas colunas de partição, defina cloudFiles.partitionColumns para event,date,hour.

Observação

A opção cloudFiles.partitionColumns usa uma lista separada por vírgulas de nomes de colunas. Somente colunas que existem como key=value pares em sua estrutura de diretório são analisadas.

Qual é a coluna de dados resgatados?

Quando o Auto Loader infere o esquema, uma coluna de dados resgatados é automaticamente adicionada ao seu esquema como _rescued_data. Você pode renomear a coluna ou incluí-la nos casos em que você fornece um esquema, configurando a opção rescuedDataColumn.

A coluna de dados resgatados garante que as colunas que não correspondam ao esquema sejam resgatadas em vez de serem descartadas.A coluna de dados resgatadas contém todos os dados que não foram analisados pelos seguintes motivos:

  • A coluna está ausente no esquema.

  • Incompatibilidades de tipos.

  • Incompatibilidades de casos.

A coluna de dados resgatados contém um JSON que contém as colunas resgatadas e o caminho do arquivo de origem do registro.

Observação

Os analisadores JSON e CSV são compatíveis com três modos ao analisar registros: PERMISSIVE, DROPMALFORMEDe FAILFAST. Quando utilizado junto com rescuedDataColumn, as incompatibilidades de tipo de dados não fazem com que os registros sejam eliminados no modo DROPMALFORMED nem geram um erro no modo FAILFAST. Somente registros corrompidos são eliminados ou resultam em erros, como JSON ou CSV incompletos ou malformados. Se o senhor usar badRecordsPath ao analisar JSON ou CSV, as incompatibilidades de tipo de dados não serão consideradas como registros ruins ao usar rescuedDataColumn. Somente registros JSON ou CSV incompletos e malformados são armazenados em badRecordsPath.

Alterar o comportamento que diferencia maiúsculas de minúsculas

A menos que a diferenciação de maiúsculas e minúsculas esteja ativada, as colunas abc, Abc e ABC são consideradas a mesma coluna para fins de inferência de esquema. O caso escolhido é arbitrário e depende dos dados amostrados. Você pode usar dicas de esquema para impor qual caso deve ser usado. Depois que uma seleção é feita e o esquema é inferido, o Auto Loader não considera as variantes de maiúsculas e minúsculas que não foram selecionadas consistentes com o esquema.

Quando a coluna de dados resgatados está ativada, os campos nomeados em um caso diferente daquele do esquema são carregados na coluna _rescued_data . Altere esse comportamento definindo a opção readerCaseSensitive como false, caso em que o Auto Loader lê os dados sem diferenciar maiúsculas de minúsculas.

Substitua a inferência de esquema com dicas de esquema

Você pode usar dicas de esquema para aplicar as informações de esquema que você conhece e espera em um esquema inferido. Quando você sabe que uma coluna possui um tipo de dado específico, ou se deseja escolher um tipo de dado mais genérico (por exemplo, um double ao invés de um integer), você pode fornecer um número arbitrário de dicas para tipos de dados de coluna como uma string usando a sintaxe de especificação de esquema SQL, como o seguinte:

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

Consulte a documentação sobre tipos de dados para obter a lista de tipos de dados suportados.

Se uma coluna não estiver presente no início da transmissão, você também pode usar sugestões de esquema para adicionar essa coluna ao esquema inferido.

Aqui está um exemplo de um esquema inferido para ver o comportamento com dicas de esquema.

Esquema inferido:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

Especificando as seguintes dicas de esquema:

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

você obtém:

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

Observação

O suporte a dicas de esquema de matriz e mapa está disponível no Databricks Runtime 9.1 LTSe acima.

Aqui está um exemplo de um esquema inferido com tipos de dados complexos para ver o comportamento com dicas de esquema.

Esquema inferido:

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

Especificando as seguintes dicas de esquema:

.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

você obtém:

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

Observação

As dicas de esquema serão usadas somente se você não fornecer um esquema ao Auto Loader. Você pode usar dicas de esquema se cloudFiles.inferColumnTypes estiver ativado ou desativado.