チュートリアル:Apache Spark DataFramesを使用したデータの読み込みと変換

このチュートリアルでは、Databricks で Apache Spark Python (PySpark) DataFrame API、Apache Spark Scala DataFrame API、SparkR SparkDataFrame API を使用してデータを読み込み、変換する方法を説明します。

このチュートリアルの最後には、DataFrameとは何かを理解し、次のタスクに慣れることができます:

DataFrameとは

DataFrameは、潜在的に異なるタイプの列を持つ2次元のラベル付きデータ構造です。DataFrameは、スプレッドシート、SQLテーブル、または複数のオブジェクトから成る辞書のようなものと考えることができます。Apache Spark DataFramesは、一般的なデータ分析の問題を効率的に解決できる関数(列の選択、絞り込み、結合、集計)を多数提供します。

Apache Spark DataFramesは、Resilient Distributed Datasets(RDD)の上に構築された抽象化です。Spark DataFramesとSpark SQLでは、統合された計画および最適化エンジンが使用されているため、Databricksでサポートされているすべての言語(Python、SQL、Scala、およびR)でほぼ同じパフォーマンスを得ることができます。

要件

以下のチュートリアルを完了するには、以下の条件を満たす必要があります:

  • このチュートリアルの例を使用するには、ワークスペースでUnity Catalog が有効になっている必要があります。

  • このチュートリアルの例では、 Unity Catalogボリュームを使用してサンプル データを保存します。 これらの例を使用するには、ボリュームを作成し、そのボリュームのカタログ、スキーマ、およびボリューム名を使用して、例で使用されるボリュームパスを設定します。

  • Unity Catalog では次の権限が必要です。

    • READ VOLUME このチュートリアルで使用するボリュームの場合はWRITE VOLUME 、またはALL PRIVILEGESです。

    • USE SCHEMA または、このチュートリアルで使用されるスキーマの場合はALL PRIVILEGES

    • USE CATALOG または、このチュートリアルで使用されるカタログの場合はALL PRIVILEGES

    これらの権限を設定するには、 Databricks管理者またはUnity Catalog権限とセキュリティ保護可能なオブジェクトを参照してください。

ヒント

この記事の完成したノートブックについては、 DataFrameチュートリアル ノートブックを参照してください。

ステップ 1: 変数を定義してCSVファイルをロードする

このステップでは、このチュートリアルで使用する変数を定義し、赤ちゃんの名前データを含むCSVファイルをhealth.data.ny.govからUnity Catalogボリュームにロードします。

  1. をクリックして新しいノートブックを開きます新しいアイコンアイコン。 Databricksノートブックの操作方法については、 Databricksノートブックのインターフェイスとコントロール」を参照してください。

  2. 次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 <catalog-name><schema-name>、および <volume-name> をUnity Catalogボリュームのカタログ名、スキーマ名、およびボリューム名に置き換えます。 <table_name> を任意のテーブル名に置き換えます。このチュートリアルの後半で、赤ちゃんの名前のデータをこのテーブルにロードします。

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_table = catalog + "." + schema
    print(path_table) # Show the complete path
    print(path_volume) # Show the complete path
    
    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val fileName = "rows.csv"
    val tableName = "<table_name>"
    val pathVolume = s"/Volumes/$catalog/$schema/$volume"
    val pathTable = s"$catalog.$schema"
    print(pathVolume) // Show the complete path
    print(pathTable) // Show the complete path
    
    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_table <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_table) # Show the complete path
    
  3. Shift+Enterを押すとセルが実行され、新しい空白のセルが作成されます。

  4. 次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 このコードは 、Databricks dbutils rows.csvコマンドを使用して、 health.data.ny.gov からUnity Catalog ボリュームに ファイルをコピーします。

    dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
    
    dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
    
    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    
  5. Shift+Enterを押してセルを実行し、次のセルに移動します。

ステップ 2: DataFrameを作成する

このステップでは、テストデータを含むdf1 という名前のDataFrameを作成し、その内容を表示します。

  1. 次のコードをコピーして、新しい空のノートブックセルに貼り付けます。このコードは、テストデータを使用してDataFrameを作成し、DataFrameの内容とスキーマを表示します。

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    
    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    
    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = as.integer(c(2021)),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = as.integer(c(42))
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    
  2. Shift+Enterを押してセルを実行し、次のセルに移動します。

ステップ 3: ファイルからデータを にロードしますDataFrameCSV

このステップでは、以前にUnity Catalog ボリュームにロードしたCSVファイルからdf_csvという名前のDataFrameを作成します。spark.read.csvを参照してください。

  1. 次のコードをコピーして、新しい空のノートブックセルに貼り付けます。このコードは、CSVファイルから赤ちゃんの名前データをDataFrame df_csvに読み込み、DataFrameの内容を表示します。

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
        header=True,
        inferSchema=True,
        sep=",")
    display(df_csv)
    
    val dfCsv = spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("delimiter", ",")
        .csv(s"$pathVolume/$fileName")
    
    display(dfCsv)
    
    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
        source="csv",
        header = TRUE,
        inferSchema = TRUE,
        delimiter = ",")
    
    display(df_csv)
    
  2. Shift+Enterを押してセルを実行し、次のセルに移動します。

サポートされている様々なファイル形式からデータを読み込むことができます。

ステップ 4: DataFrameを表示して操作する

次の方法を使用して、赤ちゃんの名前のDataFramesを表示および操作します。

DataFrameの列名を変更する

DataFrameの列の名前を変更する方法を学びましょう。

  1. 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードは、df1_csv DataFrame内の列の名前を、df1 DataFrame内の対応する列と一致するように変更します。このコードはApache Spark withColumnRenamed()メソッドを使用します。

    df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
    df_csv.printSchema
    
    val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name")
    // when modifying a DataFrame in Scala, you must assign it to a new variable
    dfCsvRenamed.printSchema()
    
    df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
    printSchema(df_csv)
    
  2. Shift+Enterを押してセルを実行し、次のセルに移動します。

DataFramesの結合

あるDataFrameの行を別のDataFrameに追加する新しいDataFrameを作成する方法を学習します。

  1. 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードでは、Apache Sparkのunion()メソッドを使用して、最初のDataFrame dfの内容を、CSVファイルから読み込んだ赤ちゃんの名前データを含むDataFrame df_csvと結合します。

    df = df1.union(df_csv)
    display(df)
    
    val df = df1.union(dfCsvRenamed)
    display(df)
    
    display(df <- union(df1, df_csv))
    
  2. Shift+Enterを押してセルを実行し、次のセルに移動します。

DataFrame内の行をフィルタリングする

Apache Spark .filter() または.where() メソッドを使用して行をフィルタリングして、データセット内で最も人気のある赤ちゃんの名前を見つけてください。フィルタリングを使用して、DataFrame内で返したり変更したりする行のサブセットを選択します。以下の例のように、パフォーマンスや構文に違いはありません。

.filter()の使用方式

  1. 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードは、Apache Spark .filter()メソッドを使用して、DataFrame内の行数が50を超える行を表示します。

    display(df.filter(df["Count"] > 50))
    
    display(df.filter(df("Count") > 50))
    
    display(filteredDF <- filter(df, df$Count > 50))
    
  2. Shift+Enterを押してセルを実行し、次のセルに移動します。

.where()の使用方法

  1. 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードは、Apache Spark .where()メソッドを使用して、DataFrame内の行数が50を超える行を表示します。

    display(df.where(df["Count"] > 50))
    
    display(df.where(df("Count") > 50))
    
    display(filtered_df <- where(df, df$Count > 50))
    
  2. Shift+Enterを押してセルを実行し、次のセルに移動します。

DataFrame から列を選択し、頻度順に並べ替える

select()メソッドを使用して、DataFrameから返す列を指定することで、赤ちゃんの名前の頻度について学習します。結果を並べ替えるには、Apache Sparkのorderbyおよびdesc関数を使用します。

のPySpark .sql モジュールは、Apache Spark SQL関数をサポートします。このチュートリアルで使用するこれらの関数には、Apache Spark orderBy()desc()、および expr() 関数があります。 これらの関数の使用を有効にするには、必要に応じてセッションにインポートします。

  1. 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードは、desc()関数をインポートし、Apache Spark select()メソッドとApache Spark orderBy()およびdesc()関数を使用して、最も一般的な名前とその数を降順で表示します。

    from pyspark.sql.functions import desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    
    import org.apache.spark.sql.functions.desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    
    display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
    
  2. Shift+Enterを押してセルを実行し、次のセルに移動します。

サブセットDataFrameを作成する

既存のDataFrameからサブセットDataFrameを作成する方法を学びましょう。

  1. 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードでは、Apache Sparkのfilter メソッドを使用して、年、数、性別でデータを制限した新しいDataFrameを作成します。列を制限するためにApache Spark select()メソッドを使用します。また、Apache SparkのorderBy()およびdesc()関数を使用して、新しい DataFrameをカウントで並べ替えます。

    subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    display(subsetDF)
    
    val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    
    display(subsetDF)
    
    subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
    display(subsetDF)
    
  2. Shift+Enterを押してセルを実行し、次のセルに移動します。

ステップ 5: DataFrameを保存する

DataFrame を保存する方法について説明します。 DataFrame をテーブルに保存するか、DataFrame を 1 つまたは複数のファイルに書き込むことができます。

DataFrameをテーブルに保存する

Databricksでは、デフォルトですべてのテーブル向けにDelta Lakeを使用しています。DataFrameを保存するには、カタログとスキーマに対する CREATE テーブル権限が必要です。

  1. 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードは、このチュートリアルの最初に定義した変数を使用して、DataFrameの内容を表に保存します。

    df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
    
    df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
    
    saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
    
  2. Shift+Enterを押してセルを実行し、次のセルに移動します。

Apache Sparkアプリケーションのほとんどは、大規模なデータセットを分散処理します。Apache Sparkは単一のファイルではなく、ファイルのディレクトリを書き出します。Delta LakeはParquetフォルダとファイルを分割します。多くのデータシステムは、これらのファイルのディレクトリを読むことができます。Databricksでは、ほとんどのアプリケーションでファイルパスよりもテーブルを優先して使用することを推奨しています。

DataFrameをJSON ファイルに保存する

  1. 次のコードをコピーして、空のノートブックのセルに貼り付けます。 このコードは、DataFrame を JSON ファイルのディレクトリに保存します。

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    
    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    
    write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
    
  2. Shift+Enterを押してセルを実行し、次のセルに移動します。

JSONファイルから DataFrameを読み取る

Apache Sparkspark.read.format()read.jsonメソッドを使用して、ディレクトリからDataFrame に .json データ 方法を学習します。

  1. 次のコードをコピーして、空のノートブックのセルに貼り付けます。 このコードは、前の例で保存した JSON ファイルを表示します。

    display(spark.read.format("json").json("/tmp/json_data"))
    
    display(spark.read.format("json").json("/tmp/json_data"))
    
    display(read.json("/tmp/json_data"))
    
  2. Shift+Enterを押してセルを実行し、次のセルに移動します。

その他のタスク:PySpark、Scala、RでSQLクエリを実行する

Apache Spark DataFrames には、SQLをPySpark、Scala、Rと組み合わせるための次のオプションが用意されています。次のコードは、このチュートリアル用に作成したものと同じノートブックで実行できます。

列をSQLクエリとして指定する

Apache Spark selectExpr()メソッドの使用方法を学習します。これは、SQL 式を受け入れ、更新されたDataFrameを返すselect() メソッドのバリアントです。この方法では、upperなどのSQL式を使用できます。

  1. 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードは、Apache Spark selectExpr()メソッドとSQL upper式を使用して、文字列の列を大文字に変換します(列の名前を変更します)。

    display(df.selectExpr("Count", "upper(County) as big_name"))
    
    display(df.selectExpr("Count", "upper(County) as big_name"))
    
    display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
    
  2. Shift+Enterを押してセルを実行し、次のセルに移動します。

列にSQL構文を使用するにはexpr()を使用します

Apache Spark expr()関数をインポートして使用し、列が指定される任意の場所でSQL構文を使用する方法を学習します。

  1. 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードはexpr() 関数をインポートし、Apache Spark expr() 関数とSQL lower 式を使用して文字列を小文字に変換します(そして列の名前を変更します)。

    from pyspark.sql.functions import expr
    display(df.select("Count", expr("lower(County) as little_name")))
    
    import org.apache.spark.sql.functions.{col, expr}
    // Scala requires us to import the col() function as well as the expr() function
    
    display(df.select(col("Count"), expr("lower(County) as little_name")))
    
    display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
    # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
    
  2. Shift+Enterを押してセルを実行し、次のセルに移動します。

spark.sql()を使用して任意のSQLクエリを実行する

Apache Spark spark.sql()関数を使用して任意のSQLクエリを実行する方法を学習します。

  1. 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードでは、Apache Spark spark.sql() 関数を使用してSQL構文を使用してSQLテーブルにクエリを実行します。

    display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
    
    display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
    
    display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
    
  2. Shift+Enterを押してセルを実行し、次のセルに移動します。

DataFrameチュートリアルノートブック

次のノートブックには、このチュートリアルからのクエリーの例が含まれています。

Pythonを使用したDataFramesチュートリアル

ノートブックを新しいタブで開く

Scalaを使用したDataFramesチュートリアル

ノートブックを新しいタブで開く

R を使用した DataFrames チュートリアル

ノートブックを新しいタブで開く