Lidar com registros e arquivos ruins

O Databricks fornece várias opções para lidar com arquivos que contêm registros inválidos. Exemplos de dados incorretos incluem:

  • Registros incompletos ou corrompidos: Observados principalmente em formatos de arquivo baseados em texto, como JSON e CSV. Por exemplo, um registro JSON que não possui uma chave de fechamento ou um registro CSV que não possui tantas colunas quanto o cabeçalho ou o primeiro registro do arquivo CSV.

  • Tipos de dados incompatíveis: quando o valor de uma coluna não tem o tipo de dados especificado ou inferido.

  • Nomes de campo incorretos: podem ocorrer em todos os formatos de arquivo, quando o nome da coluna especificado no arquivo ou registro tem uma capitalização diferente do esquema especificado ou inferido.

  • Arquivos corrompidos: quando um arquivo não pode ser lido, o que pode ser devido a metadados ou corrupção de dados em tipos de arquivos binários, como Avro, Parquet e ORC. Em raras ocasiões, pode ser causado por falhas transitórias de longa duração no sistema de armazenamento subjacente.

  • Arquivos ausentes: Um arquivo que foi descoberto durante o tempo de análise query e não existe mais no tempo de processamento.

Usar badRecordsPath

Quando você define badRecordsPath, o caminho especificado registra exceções para registros ou arquivos incorretos encontrados durante o carregamento de dados.

Além de registros e arquivos corrompidos, erros que indicam arquivos excluídos, exceção de conexão de rede, exceção de IO e assim por diante são ignorados e registrados no badRecordsPath.

Observação

O uso da opção badRecordsPath em uma fonte de dados baseada em arquivo tem algumas limitações importantes:

  • Não é transacional e pode levar a resultados inconsistentes.

  • Erros transitórios são tratados como falhas.

Não foi possível encontrar o arquivo de entrada

val df = spark.read
  .option("badRecordsPath", "/tmp/badRecordsPath")
  .format("parquet").load("/input/parquetFile")

// Delete the input parquet file '/input/parquetFile'
dbutils.fs.rm("/input/parquetFile")

df.show()

No exemplo acima, como df.show() não consegue localizar o arquivo de entrada, o Spark cria um arquivo de exceção no formato JSON para registrar o erro. Por exemplo, /tmp/badRecordsPath/20170724T101153/bad_files/xyz é o caminho do arquivo de exceção. Este arquivo está no diretório badRecordsPath especificado, /tmp/badRecordsPath. 20170724T101153 é a hora de criação deste DataFrameReader. bad_files é o tipo de exceção. xyz é um arquivo que contém um registro JSON, que possui o caminho do arquivo inválido e a mensagem de exceção/razão.

O arquivo de entrada contém um registro incorreto

// Creates a json file containing both parsable and corrupted records
Seq("""{"a": 1, "b": 2}""", """{bad-record""").toDF().write.format("text").save("/tmp/input/jsonFile")

val df = spark.read
  .option("badRecordsPath", "/tmp/badRecordsPath")
  .schema("a int, b int")
  .format("json")
  .load("/tmp/input/jsonFile")

df.show()

Neste exemplo, o DataFrame contém apenas o primeiro registro analisável ({"a": 1, "b": 2}). O segundo registro inválido ({bad-record) é registrado no arquivo de exceção, que é um arquivo JSON localizado em /tmp/badRecordsPath/20170724T114715/bad_records/xyz. O arquivo de exceção contém o registro incorreto, o caminho do arquivo que contém o registro e a mensagem de exceção/razão. Depois de localizar os arquivos de exceção, você pode usar um leitor JSON para processá-los.