PySparkの基礎
この記事では、PySpark の使用方法を説明するために簡単な例を紹介します。 基本的なApache Spark 概念 を理解しており、Databricks コンピュート に接続された でコマンドを実行していることを前提としています。サンプル データを使用してDataFramesを作成し、このデータに対して行と列の操作を含む基本的な変換を実行し、複数のDataFramesを結合してこのデータを集計し、このデータを視覚化して、テーブルまたはファイルに保存します。
データのアップロード
この記事の一部の例では Databricksが提供するサンプル データを使用して、 DataFrames を使用してデータを読み込み、変換、および保存する方法を示します。 まだ Databricks にない独自のデータを使用する場合は、最初にデータをアップロードし、そこから DataFrame を作成できます。 「ファイルのアップロードを使用してテーブルを作成または変更する」と「Unity Catalog ボリュームにファイルをアップロードする」を参照してください。
Databricksサンプルデータについて
Databricks は、 samples
カタログと/databricks-datasets
ディレクトリにサンプル データを提供します。
samples
カタログのサンプル・データにアクセスするには、samples.<schema-name>.<table-name>
という形式を使用します。この記事では、架空のビジネスのデータを含むsamples.tpch
スキーマのテーブルを使用します。customer
テーブルには顧客に関する情報が含まれており、orders
それらの顧客による注文に関する情報が含まれています。dbutils.fs.ls
を使用して、/databricks-datasets
内のデータを探索します。ファイル パスを使用してこの場所のデータをクエリするには、Spark SQL または DataFrames を使用します。 Databricks が提供するサンプル データの詳細については、 「サンプル データセット」を参照してください。
データ型のインポート
多くの PySpark 操作では、SQL 関数を使用するか、ネイティブの Spark タイプと対話する必要があります。 必要な関数と型のみを直接インポートすることも、モジュール全体をインポートすることもできます。
# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *
# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round
インポートされた関数の中には Python の組み込み関数をオーバーライドするものがある可能性があるため、一部のユーザーはエイリアスを使用してこれらのモジュールをインポートすることを選択します。 次の例は、Apache Spark コード例で使用される一般的なエイリアスを示しています。
import pyspark.sql.types as T
import pyspark.sql.functions as F
データ型の包括的なリストについては、 「Spark データ型」を参照してください。
PySpark SQL 関数の包括的なリストについては、 「Spark 関数」を参照してください。
DataFrameを作成する
DataFrame を作成する方法はいくつかあります。 通常、テーブルやファイルのコレクションなどのデータソースに対してDataFrameを定義します。 次に、Apache Spark の基本概念のセクションで説明されているように、 display
などのアクションを使用して、変換の実行をトリガーします。 display
メソッドはDataFramesを出力します。
指定された値を持つDataFrameを作成する
指定された値を持つ DataFrame を作成するには、行がタプルのリストとして表現されるcreateDataFrame
メソッドを使用します。
df_children = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = ['name', 'age'])
display(df_children)
出力では、 df_children
の列のデータ型が自動的に推論されることに注意してください。 または、スキーマを追加してタイプを指定することもできます。 スキーマは、名前、データ型、および null 値が含まれているかどうかを示すBooleanフラグを指定する StructFields
で構成される StructType
を使用して定義されます。 データ型は pyspark.sql.types
からインポートする必要があります。
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
df_children_with_schema = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = StructType([
StructField('name', StringType(), True),
StructField('age', IntegerType(), True)
])
)
display(df_children_with_schema)
Unity Catalog のテーブルから DataFrame を作成する
Unity Catalog 内のテーブルから DataFrame を作成するには、形式<catalog-name>.<schema-name>.<table-name>
を使用してテーブルを識別するtable
メソッドを使用します。 左側のナビゲーションバーの「 カタログ」(Catalog ) をクリックし、「 カタログエクスプローラ」(Catalog Explorer ) を使用してテーブルに移動します。 それをクリックし、 「テーブル パスのコピー」を選択して、テーブル パスをノートブックに挿入します。
次の例では、テーブル samples.tpch.customer
をロードしますが、独自のテーブルへのパスを指定することもできます。
df_customer = spark.table('samples.tpch.customer')
display(df_customer)
アップロードされたファイルからDataFrameを作成する
Unity Catalog ボリュームにアップロードしたファイルから DataFrame を作成するには、 read
プロパティを使用します。 このメソッドは、 DataFrameReader
を返し、これを使用して適切な形式を読み取ることができます。 左側の小さなサイドバーにあるカタログオプションをクリックし、カタログブラウザを使用してファイルを見つけます。 それを選択し、[ ボリュームファイルパスのコピー]をクリックします。
以下の例は *.csv
ファイルから読み取りますが、 DataFrameReader
は他の多くの形式でのファイルのアップロードをサポートしています。 「DataFrameReader メソッド」を参照してください。
# Assign this variable your full volume file path
volume_file_path = ""
df_csv = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load(volume_file_path)
)
display(df_csv)
Unity Catalogボリュームの詳細については、 Unity Catalogボリュームとは何ですか?」を参照してください。
JSONレスポンスからDataFrameを作成する
REST API によって返される JSON 応答ペイロードから DataFrame を作成するには、Python requests
パッケージを使用して応答をクエリおよび解析します。 使用するには、パッケージをインポートする必要があります。 この例では、米国食品医薬品局(FDA)の医薬品申請データベースのデータを使用します。
import requests
# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)
# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)
で JSONやその他の半構造化データを操作する方法については、Databricks 「半構造化データのモデル化」 を参照してください。
JSONフィールドまたはオブジェクトを選択
変換された JSON から特定のフィールドまたはオブジェクトを選択するには、 []
表記を使用します。 たとえば、製品の配列であるproducts
フィールドを選択するには、次のようにします。
display(df_drugs.select(df_drugs["products"]))
また、メソッド呼び出しを連結して、複数のフィールドをトラバースすることもできます。 たとえば、医薬品申請の最初の製品のブランド名を出力するには、次のようにします。
display(df_drugs.select(df_drugs["products"][0]["brand_name"]))
ファイルからDataFrameを作成する
ファイルから DataFrame を作成する方法を示すために、この例では、 /databricks-datasets
ディレクトリに CSV データを読み込みます。
サンプル データセットに移動するには、 Databricks Utiltiesファイル システム コマンドを使用できます。 次の例では、 dbutils
を使用して/databricks-datasets
で使用可能なデータセットを一覧表示します。
display(dbutils.fs.ls('/databricks-datasets'))
または、次の例に示すように、 %fs
を使用してDatabricks CLI ファイル システム コマンドにアクセスすることもできます。
%fs ls '/databricks-datasets'
ファイルまたはファイルのディレクトリから DataFrame を作成するには、 load
メソッドでパスを指定します。
df_population = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)
DataFramesでデータを変換する
DataFrames を使用すると、組み込みメソッドを使用してデータの並べ替え、フィルタリング、集計を行うことで、データを簡単に変換できます。 多くの変換は DataFrames のメソッドとして指定されていませんが、代わりにspark.sql.functions
パッケージで提供されています。 Databricks Spark SQL 関数を参照してください。
列操作
Spark は多くの基本的な列操作を提供します。
ヒント
DataFrame 内のすべての列を出力するには、 columns
を使用します (例: df_customer.columns
。
列の選択
select
と col
を使用して特定の列を選択できます。col
関数は pyspark.sql.functions
サブモジュールにあります。
from pyspark.sql.functions import col
df_customer.select(
col("c_custkey"),
col("c_acctbal")
)
文字列として定義された式を受け取るexpr
を使用して列を参照することもできます。
from pyspark.sql.functions import expr
df_customer.select(
expr("c_custkey"),
expr("c_acctbal")
)
SQL 式を受け入れるselectExpr
を使用することもできます。
df_customer.selectExpr(
"c_custkey as key",
"round(c_acctbal) as account_rounded"
)
文字列リテラルを使用して列を選択するには、次の手順を実行します。
df_customer.select(
"c_custkey",
"c_acctbal"
)
特定の DataFrame から列を明示的に選択するには、 []
演算子または.
演算子を使用できます。 ( .
演算子を使用して、整数で始まる列、またはスペースまたは特殊文字を含む列を選択することはできません。 これは、一部の列の名前が同じであるDataFramesを結合する場合に特に役立ちます。
df_customer.select(
df_customer["c_custkey"],
df_customer["c_acctbal"]
)
df_customer.select(
df_customer.c_custkey,
df_customer.c_acctbal
)
列の作成
新しい列を作成するには、 withColumn
メソッドを使用します。 次の例では、顧客アカウント残高 c_acctbal
が 1000
を超えているかどうかに基づいてBoolean値を含む新しい列を作成します。
df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)
列の名前を変更する
列の名前を変更するには、既存の列名と新しい列名を受け入れる withColumnRenamed
メソッドを使用します。
df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")
alias
メソッドは、集計の一部として列の名前を変更する場合に特に役立ちます。
from pyspark.sql.functions import avg
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)
display(df_segment_balance)
行操作
Spark は多くの基本的な行操作を提供します。
行のフィルター
行をフィルタリングするには、DataFrame でfilter
またはwhere
メソッドを使用して、特定の行のみを返します。 フィルター処理する列を指定するには、 col
メソッドまたは列に評価される式を使用します。
from pyspark.sql.functions import col
df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)
複数の条件でフィルター処理するには、論理演算子を使用します。 たとえば、 &
と |
では、それぞれ条件 AND
と OR
できます。 次の例では、 c_nationkey
が 20
で、 c_acctbal
が 1000
より大きい行をフィルター処理します。
df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))
null 値を処理する
NULL 値を処理するには、 na.drop
メソッドを使用して NULL 値を含む行を削除します。 この方法では、 any
NULL 値を含む行をドロップするか、 all
NULL 値を含む行をドロップするかを指定できます。
null 値を削除するには、次のいずれかの例を使用します。
df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")
代わりに、すべて null 値を含む行のみを除外する場合は、以下を使用します。
df_customer_no_nulls = df_customer.na.drop("all")
次に示すように、これを指定することで、列のサブセットにこれを適用できます。
df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])
欠損値を埋めるには、 fill
メソッドを使用します。 これをすべての列に適用するか、列のサブセットに適用するかを選択できます。 以下の例では、アカウント残高c_acctbal
が null 値であるアカウント残高は0
で埋められます。
df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])
文字列を他の値に置き換えるには、 replace
メソッドを使用します。 以下の例では、空のアドレス文字列は単語UNKNOWN
に置き換えられます。
df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])
行の追加
行を追加するには、 union
メソッドを使用して新しい DataFrame を作成する必要があります。 次の例では、以前に作成した DataFrame df_that_one_customer
とdf_filtered_customer
が結合され、3 人の顧客を含む DataFrame が返されます。
df_appended_rows = df_that_one_customer.union(df_filtered_customer)
display(df_appended_rows)
注:
DataFrame をテーブルに書き込んでから新しい行を追加することで、 DataFramesを結合することもできます。 本番運用ワークロードの場合、データソースをターゲット テーブルに増分処理することで、データのサイズが増大してもレイテンシとコンピュート コストを大幅に削減できます。 「Databricks レイクハウスへのデータの取り込み」を参照してください。
行を並べ替える
重要
大規模なソートはコストがかかる可能性があり、ソートされたデータを保存して Spark でデータを再ロードすると、順序が保証されません。 並べ替えの使用は意図的に行ってください。
1 つ以上の列で行を並べ替えるには、 sort
または orderBy
メソッドを使用します。 デフォルトでは、これらのメソッドは昇順で並べ替えられます。
df_customer.orderBy(col("c_acctbal"))
降順でフィルタリングするには、 desc
を使用します。
df_customer.sort(col("c_custkey").desc())
次の例は、2 つの列で並べ替える方法を示しています。
df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())
DataFrame がソートされた後に返される行数を制限するには、 limit
メソッドを使用します。 次の例では、上位 10
の結果のみを表示します。
display(df_sorted.limit(10))
DataFramesを結合する
2 つ以上のDataFramesを結合するには、join
メソッドを使用します。 how
(結合タイプ) と on
(結合の基準となる列) 引数で、 DataFramesを結合する方法を指定できます。 一般的な結合タイプには、次のものがあります。
inner
: これは結合タイプ デフォルト であり、DataFrame 全体で 引数に一致する行のみを保持するon
DataFramesを返します。left
: 最初に指定した DataFrame のすべての行と、最初に指定した DataFrame と一致する 2 番目に指定した DataFrame の行のみが保持されます。outer
: 外部結合では、一致に関係なく、両方のDataFramesのすべての行が保持されます。
結合の詳細については、 Databricksでの結合の操作」を参照してください。 PySpark でサポートされている結合の一覧については、 「DataFrame 結合」を参照してください。
次の例では、 orders
DataFrame の各行がcustomers
DataFrame の対応する行と結合された単一の DataFrame を返します。 すべての注文が 1 人の顧客に対応することが予想されるため、内部結合が使用されます。
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_joined = df_order.join(
df_customer,
on = df_order["o_custkey"] == df_customer["c_custkey"],
how = "inner"
)
display(df_joined)
複数の条件で結合するには、&
や |
などのBoolean演算子を使用して、それぞれ AND
と OR
を指定します。 次の例では、条件を追加し、 o_totalprice
が 500,000
より大きい行のみにフィルター処理します。
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_complex_joined = df_order.join(
df_customer,
on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
how = "inner"
)
display(df_complex_joined)
データの集計
DataFrame 内のデータを集計するには、SQL のGROUP BY
と同様に、 groupBy
メソッドを使用してグループ化する列を指定し、 agg
メソッドを使用して集計を指定します。 avg
、 sum
、 max
、 min
などの一般的な集計を pyspark.sql.functions
からインポートします。次の例は、市場セグメント別の平均顧客残高を示しています。
from pyspark.sql.functions import avg
# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_balance)
from pyspark.sql.functions import avg
# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_nation_balance)
一部の集計はアクションであり、計算をトリガーします。 この場合、結果を出力するために他のアクションを使用する必要はありません。
DataFrame 内の行をカウントするには、 count
メソッドを使用します。
df_customer.count()
呼び出しの連鎖
DataFrames を変換するメソッドは DataFrames を返しますが、Spark はアクションが呼び出されるまで変換を実行しません。 この 遅延評価 は、利便性と読みやすさのために複数のメソッドを連鎖させることができることを意味します。 次の例は、フィルター処理、集計、および順序付けを連鎖させる方法を示しています。
from pyspark.sql.functions import count
df_chained = (
df_order.filter(col("o_orderstatus") == "F")
.groupBy(col("o_orderpriority"))
.agg(count(col("o_orderkey")).alias("n_orders"))
.sort(col("n_orders").desc())
)
display(df_chained)
DataFrameを視覚化する
ノートブックで DataFrame を視覚化するには、DataFrame の左上にあるテーブルの横にある+記号をクリックし、 [視覚化]を選択して、DataFrame に基づいて 1 つ以上のグラフを追加します。 視覚化の詳細については、 「Databricks ノートブックの視覚化」を参照してください。
display(df_order)
追加の視覚化を実行するには、Databricks PandasAPI用のSpark の使用を推奨しています。.pandas_api()
PandasAPIを使用すると、 の対応するSparkDataFrame にキャストできます。詳細については、PandasAPI 上のSpark を参照してください。
データを保存する
データを変換したら、 DataFrameWriter
方法を使用してデータを保存できます。 これらのメソッドの完全な一覧は、 DataFrameWriter にあります。 次のセクションでは、DataFrame をテーブルおよびデータ ファイルのコレクションとして保存する方法を示します。
DataFrameをテーブルとして保存する
DataFrame を Unity Catalog のテーブルとして保存するには、 write.saveAsTable
メソッドを使用し、 <catalog-name>.<schema-name>.<table-name>
形式でパスを指定します。
df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")
DataFrameをCSVとして書き込む
DataFrame を*.csv
形式で書き込むには、形式とオプションを指定してwrite.csv
メソッドを使用します。 デフォルトでは、指定されたパスにデータが存在する場合、書き込み操作は失敗します。 次のいずれかのモードを指定して、別のアクションを実行できます。
overwrite
ターゲット パス内の既存のデータをすべて DataFrame の内容で上書きします。append
DataFrame の内容をターゲット パスのデータに追加します。ignore
ターゲット・パスにデータが存在する場合、書き込みは暗黙のうちに失敗します。
次の例は、DataFrame の内容を CSV ファイルとして上書きする方法を示しています。
# Assign this variable your file path
file_path = ""
(df_joined.write
.format("csv")
.mode("overwrite")
.write(file_path)
)