Tutorial: carregar e transformar dados usando Apache Spark DataFrames

Este tutorial mostra aos senhores como carregar e transformar dados usando o Apache Spark Python (PySpark) DataFrame API, o Apache Spark Scala DataFrame API e o SparkR SparkDataFrame API em Databricks.

Ao final deste tutorial, você entenderá o que é um DataFrame e conhecerá as seguintes tarefas:

O que é um DataFrame?

DataFrame é uma estrutura de dados bidimensional rotulada com colunas de tipos variados. Imagine o DataFrame como uma planilha, uma tabela SQL ou um dicionário de objetos em série. Os DataFrames do Apache Spark oferecem um conjunto abrangente de funções (selecionar, filtrar, unir, agregar colunas) que permitem que você resolva problemas comuns de análise de dados de forma simples.

Os DataFrames do Apache Spark são uma abstração criada sobre os Resilient Distributed Datasets (RDDs). Os DataFrames do Spark e o Spark SQL utilizam um mecanismo de planejamento e otimização unificado, permitindo que você tenha um desempenho quase idêntico em todas as linguagens compatíveis com o o Databricks (Python, SQL, Scala e R).

Requisitos

Para concluir o tutorial a seguir, você precisa atender aos seguintes requisitos:

  • Para usar os exemplos neste tutorial, seu workspace deve ter o Unity Catalog habilitado.

  • Os exemplos deste tutorial usam um volume do Unity Catalog para armazenar dados de amostra. Para usar esses exemplos, crie um volume e use o catálogo, o esquema e os nomes de volume desse volume para definir o caminho do volume usado pelos exemplos.

  • O senhor deve ter as seguintes permissões no Unity Catalog:

    • READ VOLUME e WRITE VOLUME, ou ALL PRIVILEGES para o volume usado neste tutorial.

    • USE SCHEMA ou ALL PRIVILEGES para o esquema usado neste tutorial.

    • USE CATALOG ou ALL PRIVILEGES para o catálogo usado neste tutorial.

    Para definir essas permissões, consulte o administrador do Databricks ou os privilégios e objetos protegidos do Unity Catalog.

Dica

Para obter um Notebook completo para este artigo, consulte DataFrame tutorial Notebook.

o passo 1: Definir variáveis e carregar o arquivo CSV

Este passo define as variáveis a serem usadas neste tutorial e, em seguida, carrega um arquivo CSV contendo dados de nomes de bebês do health.data.ny.gov no volume Unity Catalog.

  1. Abra um novo Notebook clicando no ícone Novo ícone. Para saber como navegar pelo Databricks Notebook, consulte Databricks Notebook interface e controles.

  2. Copie e cole o código a seguir na nova célula vazia do site Notebook. Substitua <catalog-name>, <schema-name> e <volume-name> pelos nomes de catálogo, esquema e volume de um volume do Unity Catalog. Substitua <table_name> por um nome de tabela de sua escolha. O senhor carregará os dados do nome do bebê nessa tabela mais adiante neste tutorial.

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_table = catalog + "." + schema
    print(path_table) # Show the complete path
    print(path_volume) # Show the complete path
    
    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val fileName = "rows.csv"
    val tableName = "<table_name>"
    val pathVolume = s"/Volumes/$catalog/$schema/$volume"
    val pathTable = s"$catalog.$schema"
    print(pathVolume) // Show the complete path
    print(pathTable) // Show the complete path
    
    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_table <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_table) # Show the complete path
    
  3. Pressione Shift+Enter para executar a célula e criar uma nova célula em branco.

  4. Copie e cole o código a seguir na nova célula vazia do site Notebook. Esse código copia o arquivo rows.csv de health.data.ny.gov para o volume do Unity Catalog usando o comando Databricks dbutuils.

    dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
    
    dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
    
    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    
  5. Pressione Shift+Enter para executar a célula e passar para a próxima célula.

o passo 2: Criar um DataFrame

Esta etapa cria um DataFrame chamado df1 com dados de teste e exibe o conteúdo dele.

  1. Copie e cole o código a seguir na nova célula vazia do notebook. Esse código cria o DataFrame com dados de teste e, em seguida, exibe o conteúdo e o esquema do DataFrame.

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    
    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    
    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = as.integer(c(2021)),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = as.integer(c(42))
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    
  2. Pressione Shift+Enter para executar a célula e passar para a próxima célula.

o passo 3: Carregar dados em um DataFrame a partir do arquivo CSV

Esta etapa cria um DataFrame chamado df_csv do arquivo CSV que você carregou anteriormente em seu volume do Unity Catalog. Consulte spark.read.csv.

  1. Copie e cole o código a seguir na nova célula vazia do notebook. Esse código carrega dados de nomes de bebês no DataFrame df_csv do arquivo CSV e exibe o conteúdo do DataFrame.

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
        header=True,
        inferSchema=True,
        sep=",")
    display(df_csv)
    
    val dfCsv = spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("delimiter", ",")
        .csv(s"$pathVolume/$fileName")
    
    display(dfCsv)
    
    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
        source="csv",
        header = TRUE,
        inferSchema = TRUE,
        delimiter = ",")
    
    display(df_csv)
    
  2. Pressione Shift+Enter para executar a célula e passar para a próxima célula.

Você pode enviar dados de vários formatos de arquivo compatíveis.

o passo 4: view e interagir com o senhor DataFrame

Veja e interaja com os DataFrames de nomes de bebês usando os seguintes métodos.

Renomear coluna no DataFrame

Aprenda a renomear uma coluna em um DataFrame.

  1. Copie e cole o código a seguir em uma célula vazia do notebook. Esse código renomeia uma coluna no DataFrame df1_csv para corresponder à respectiva coluna no DataFrame df1. Esse código usa o método withColumnRenamed() do Apache Spark.

    df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
    df_csv.printSchema
    
    val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name")
    // when modifying a DataFrame in Scala, you must assign it to a new variable
    dfCsvRenamed.printSchema()
    
    df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
    printSchema(df_csv)
    
  2. Pressione Shift+Enter para executar a célula e passar para a próxima célula.

Combinar DataFrames

Saiba como criar um novo DataFrame que adiciona as linhas de um DataFrame a outro.

  1. Copie e cole o código a seguir em uma célula vazia do notebook. Este código usa o método union() do Apache Spark para combinar o conteúdo do seu primeiro DataFrame df com o DataFrame df_csv contendo os dados de nomes de bebês carregados do arquivo CSV.

    df = df1.union(df_csv)
    display(df)
    
    val df = df1.union(dfCsvRenamed)
    display(df)
    
    display(df <- union(df1, df_csv))
    
  2. Pressione Shift+Enter para executar a célula e passar para a próxima célula.

Filtrar linhas em um DataFrame

Descubra os nomes de bebês mais populares em seu conjunto de dados filtrando as linhas com os métodos .filter() ou .where() do Apache Spark. Use a filtragem para selecionar um subconjunto de linhas a serem retornadas ou modificadas em um DataFrame. Não há diferença no desempenho ou sintaxe, como visto nos exemplos a seguir.

Como usar o método .filter()

  1. Copie e cole o código a seguir em uma célula vazia do notebook. Esse código usa o método .filter() do Apache Spark para exibir essas linhas no DataFrame com uma contagem superior a 50.

    display(df.filter(df["Count"] > 50))
    
    display(df.filter(df("Count") > 50))
    
    display(filteredDF <- filter(df, df$Count > 50))
    
  2. Pressione Shift+Enter para executar a célula e passar para a próxima célula.

Como usar o método .where()

  1. Copie e cole o código a seguir em uma célula vazia do notebook. Esse código usa o método .where() do Apache Spark para exibir essas linhas no DataFrame com uma contagem superior a 50.

    display(df.where(df["Count"] > 50))
    
    display(df.where(df("Count") > 50))
    
    display(filtered_df <- where(df, df$Count > 50))
    
  2. Pressione Shift+Enter para executar a célula e passar para a próxima célula.

Selecione colunas de um DataFrame e ordene por frequência

Aprenda a frequência de nomes de bebês com o método select() para especificar as colunas do DataFrame a serem retornadas. Use as funções orderby e desc do Apache Spark para ordenar os resultados.

O módulo PySpark.sql para Apache Spark oferece suporte às funções de SQL. Entre essas funções que usamos neste tutorial estão as funções orderBy(), desc() e expr() do Apache Spark. Você habilita o uso dessas funções importando-as para sua sessão, conforme necessário.

  1. Copie e cole o código a seguir em uma célula vazia do notebook. Esse código importa a função desc() e, em seguida, usa o método select() do Apache Spark e as funções orderBy() e desc() do Apache Spark para exibir os nomes mais comuns e suas contagens em ordem decrescente.

    from pyspark.sql.functions import desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    
    import org.apache.spark.sql.functions.desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    
    display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
    
  2. Pressione Shift+Enter para executar a célula e passar para a próxima célula.

Criar um DataFrame de subconjunto

Aprenda como criar um DataFrame de subconjunto com um DataFrame existente.

  1. Copie e cole o código a seguir em uma célula vazia do notebook. Esse código usa o método filter do Apache Spark para criar um novo DataFrame restringindo os dados por ano, contagem e sexo. Ele usa o método select() do Apache Spark para limitar as colunas. Usa também as funções orderBy() e desc() do Apache Spark para classificar o novo DataFrame por contagem.

    subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    display(subsetDF)
    
    val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    
    display(subsetDF)
    
    subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
    display(subsetDF)
    
  2. Pressione Shift+Enter para executar a célula e passar para a próxima célula.

o passo 5: Salvar o DataFrame

Saiba como salvar um DataFrame,. O senhor pode salvar seu DataFrame em uma tabela ou gravar o DataFrame em um arquivo ou em vários arquivos.

Salvar o DataFrame em uma tabela

O Databricks usa o formato Delta Lake para todas as tabelas por padrão. Para salvar seu DataFrame, é necessário ter privilégio para CREATE tabela no catálogo e no esquema.

  1. Copie e cole o código a seguir em uma célula vazia do notebook. Este código salva o conteúdo do DataFrame em uma tabela usando a variável que você definiu no início deste tutorial.

    df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
    
    df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
    
    saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
    
  2. Pressione Shift+Enter para executar a célula e passar para a próxima célula.

A maioria dos aplicativos do Apache Spark trabalha com grandes conjuntos de dados e de forma distribuída. O Apache Spark grava um diretório de arquivos em vez de um único arquivo. O Delta Lake divide as pastas e arquivos do Parquet. Muitos sistemas de dados conseguem ler esses diretórios de arquivos. A Databricks recomenda o uso de tabelas em vez de caminhos de arquivo para a maioria das aplicações.

Salvar o DataFrame em arquivos JSON

  1. Copie e cole o código a seguir em uma célula vazia do site Notebook. Esse código salva o DataFrame em um diretório de arquivos JSON.

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    
    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    
    write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
    
  2. Pressione Shift+Enter para executar a célula e passar para a próxima célula.

Ler o DataFrame de um arquivo JSON

Saiba como usar o método Apache Spark spark.read.format() para read.json dados de um diretório em um DataFrame.

  1. Copie e cole o código a seguir em uma célula vazia do site Notebook. Esse código exibe os arquivos JSON que o senhor salvou no exemplo anterior.

    display(spark.read.format("json").json("/tmp/json_data"))
    
    display(spark.read.format("json").json("/tmp/json_data"))
    
    display(read.json("/tmp/json_data"))
    
  2. Pressione Shift+Enter para executar a célula e passar para a próxima célula.

Outras tarefas: executar consultas de SQL no PySpark, Scala e R

O DataFrames do Apache Spark oferece as seguintes opções para combinar SQL com PySpark, Scala e R. Você pode executar o código a seguir no mesmo notebook que criou para este tutorial.

Especificar uma coluna como uma consulta SQL

Aprenda como usar o método selectExpr() do Apache Spark. Essa é uma variante do método select() que aceita expressões SQL e retorna um DataFrame atualizado. Este método permite usar uma expressão SQL, como upper.

  1. Copie e cole o código a seguir em uma célula vazia do notebook. Esse código usa o método selectExpr() do Apache Spark e a expressão upper do SQL para converter uma coluna de strings em maiúsculas (e renomear a coluna).

    display(df.selectExpr("Count", "upper(County) as big_name"))
    
    display(df.selectExpr("Count", "upper(County) as big_name"))
    
    display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
    
  2. Pressione Shift+Enter para executar a célula e passar para a próxima célula.

Use expr() para usar a sintaxe SQL para uma coluna

Saiba como importar e usar a função expr() do Apache Spark para usar a sintaxe SQL em qualquer local onde haveria especificação de uma coluna.

  1. Copie e cole o código a seguir em uma célula vazia do notebook. Esse código importa a função expr() e, em seguida, usa a função expr() do Apache Spark e a expressão lower do SQL para converter uma coluna de strings em minúsculas (e renomear a coluna).

    from pyspark.sql.functions import expr
    display(df.select("Count", expr("lower(County) as little_name")))
    
    import org.apache.spark.sql.functions.{col, expr}
    // Scala requires us to import the col() function as well as the expr() function
    
    display(df.select(col("Count"), expr("lower(County) as little_name")))
    
    display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
    # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
    
  2. Pressione Shift+Enter para executar a célula e passar para a próxima célula.

Executar uma consulta SQL arbitrária usando a função spark.sql()

Saiba como usar a função spark.sql() do Apache Spark para executar consultas SQL arbitrárias.

  1. Copie e cole o código a seguir em uma célula vazia do notebook. Esse código usa a função spark.sql() do Apache Spark para consultar uma tabela SQL usando a sintaxe SQL.

    display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
    
    display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
    
    display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
    
  2. Pressione Shift+Enter para executar a célula e passar para a próxima célula.

Notebooks do tutorial do DataFrame

Os notebooks a seguir incluem os exemplos de consultas deste tutorial.

Tutorial de DataFrames usando Python

Abra o bloco de anotações em outra guia

Tutorial de DataFrames usando Scala

Abra o bloco de anotações em outra guia

Tutorial de DataFrames usando o R

Abra o bloco de anotações em outra guia