Lidar com registros e arquivos ruins
O Databricks fornece várias opções para lidar com arquivos que contêm registros inválidos. Exemplos de dados incorretos incluem:
Registros incompletos ou corrompidos: Observados principalmente em formatos de arquivo baseados em texto, como JSON e CSV. Por exemplo, um registro JSON que não possui uma chave de fechamento ou um registro CSV que não possui tantas colunas quanto o cabeçalho ou o primeiro registro do arquivo CSV.
Tipos de dados incompatíveis: quando o valor de uma coluna não tem o tipo de dados especificado ou inferido.
Nomes de campo incorretos: podem ocorrer em todos os formatos de arquivo, quando o nome da coluna especificado no arquivo ou registro tem uma capitalização diferente do esquema especificado ou inferido.
Arquivos corrompidos: quando um arquivo não pode ser lido, o que pode ser devido a metadados ou corrupção de dados em tipos de arquivos binários, como Avro, Parquet e ORC. Em raras ocasiões, pode ser causado por falhas transitórias de longa duração no sistema de armazenamento subjacente.
Arquivos ausentes: Um arquivo que foi descoberto durante o tempo de análise query e não existe mais no tempo de processamento.
Usar badRecordsPath
Quando você define badRecordsPath
, o caminho especificado registra exceções para registros ou arquivos incorretos encontrados durante o carregamento de dados.
Além de registros e arquivos corrompidos, erros que indicam arquivos excluídos, exceção de conexão de rede, exceção de IO e assim por diante são ignorados e registrados no badRecordsPath
.
Observação
O uso da opção badRecordsPath
em uma fonte de dados baseada em arquivo tem algumas limitações importantes:
Não é transacional e pode levar a resultados inconsistentes.
Erros transitórios são tratados como falhas.
Não foi possível encontrar o arquivo de entrada
val df = spark.read
.option("badRecordsPath", "/tmp/badRecordsPath")
.format("parquet").load("/input/parquetFile")
// Delete the input parquet file '/input/parquetFile'
dbutils.fs.rm("/input/parquetFile")
df.show()
No exemplo acima, como df.show()
não consegue localizar o arquivo de entrada, o Spark cria um arquivo de exceção no formato JSON para registrar o erro. Por exemplo, /tmp/badRecordsPath/20170724T101153/bad_files/xyz
é o caminho do arquivo de exceção. Este arquivo está no diretório badRecordsPath
especificado, /tmp/badRecordsPath
. 20170724T101153
é a hora de criação deste DataFrameReader
. bad_files
é o tipo de exceção. xyz
é um arquivo que contém um registro JSON, que possui o caminho do arquivo inválido e a mensagem de exceção/razão.
O arquivo de entrada contém um registro incorreto
// Creates a json file containing both parsable and corrupted records
Seq("""{"a": 1, "b": 2}""", """{bad-record""").toDF().write.format("text").save("/tmp/input/jsonFile")
val df = spark.read
.option("badRecordsPath", "/tmp/badRecordsPath")
.schema("a int, b int")
.format("json")
.load("/tmp/input/jsonFile")
df.show()
Neste exemplo, o DataFrame contém apenas o primeiro registro analisável ({"a": 1, "b": 2}
). O segundo registro inválido ({bad-record
) é registrado no arquivo de exceção, que é um arquivo JSON localizado em /tmp/badRecordsPath/20170724T114715/bad_records/xyz
. O arquivo de exceção contém o registro incorreto, o caminho do arquivo que contém o registro e a mensagem de exceção/razão. Depois de localizar os arquivos de exceção, você pode usar um leitor JSON para processá-los.