R で DataFrames とテーブルを操作する
重要
Databricks の SparkR は、Databricks Runtime 16.0 以降では 非推奨とされています 。 Databricks 代わりに Sparklyr を使用することをお勧めします。
この記事では、 SparkR、 sparklyr、 dplyr などの R パッケージを使用して、R data.frame
、 Spark DataFrames、およびメモリ内テーブルを操作する方法について説明します。
SparkR、sparklyr、dplyr を操作すると、これらすべてのパッケージで特定の操作を完了でき、最も使いやすいパッケージを使用できる場合があります。 たとえば、クエリーを実行するには、 SparkR::sql
、 sparklyr::sdf_sql
、 dplyr::select
などの関数を呼び出します。 また、これらのパッケージの 1 つまたは 2 つだけで操作を完了できる場合もあり、選択する操作は使用シナリオによって異なります。 たとえば、 sparklyr::sdf_quantile
を呼び出す方法は、両方の関数が分位数を計算する場合でも、 dplyr::percentile_approx
を呼び出す方法とは少し異なります。
SQL を SparkR と sparklyr の間のブリッジとして使用できます。 たとえば、 SparkR::sql
を使用して、sparklyr で作成したテーブルをクエリーできます。 sparklyr::sdf_sql
を使用して、SparkR で作成したテーブルをクエリーできます。また dplyr
コードは実行前に常にメモリ内のSQLに変換されます。 API の相互運用性 と SQL 変換も参照してください。
SparkR、sparklyr、dplyr をロードする
SparkR、sparklyr、dplyr の各パッケージは、Databricks クラスターにインストールされている Databricks Runtime に含まれています。 したがって、これらのパッケージの呼び出しを開始する前に、通常の install.package
を呼び出す必要はありません。 ただし、これらのパッケージを最初に library
で読み込む必要があります。 たとえば、Databricks ワークスペースの R ノートブック内から、 ノートブック セルで次のコードを実行して、SparkR、sparklyr、dplyr を読み込みます。
library(SparkR)
library(sparklyr)
library(dplyr)
sparklyr をクラスターに接続する
sparklyr を読み込んだ後、 databricks
接続方法を指定して、 sparklyr::spark_connect
を呼び出してクラスターに接続する必要があります。たとえば、ノートブック セルで次のコードを実行して、ノートブックをホストするクラスターに接続します。
sc <- spark_connect(method = "databricks")
これに対し、Databricks ノートブックでは、SparkR で使用するためのクラスター上に SparkSession
が既に確立されているため、SparkR の呼び出しを開始する前に SparkR::sparkR.session
を呼び出す必要はありません。
JSON データファイルをワークスペースにアップロードする
この記事のコード例の多くは、Databricks ワークスペース内の特定の場所にある、特定の列名とデータ型のデータに基づいています。 このコード例のデータは、GitHub 内の book.json
という名前の JSON ファイルに由来します。 このファイルを取得してワークスペースにアップロードするには:
GitHub の books.json ファイルにアクセスし、テキストエディターを使用して、その内容をローカルマシン上のどこかにある
books.json
という名前のファイルにコピーします。Databricks ワークスペースのサイドバーで、[ カタログ] をクリックします。
[ テーブルの作成] をクリックします。
[ファイルのアップロード] タブで、
books.json
ファイルをローカル コンピューターから [アップロードするファイルのドロップ] ボックスにドロップします。または、[クリックして参照] を選択し、ローカルコンピューターからbooks.json
ファイルを参照します。
デフォルトでは、Databricks はローカルの books.json
ファイルをワークスペース内の DBFS の場所にパス /FileStore/tables/books.json
でアップロードします。
[ UI を使用して テーブルを作成] または [ノートブックにテーブルを作成] をクリックしないでください。 この記事のコード例では、この DBFS の場所にアップロードされた books.json
ファイルのデータを使用します。
JSON データを DataFrame に読み込む
sparklyr::spark_read_json
を使用して、アップロードされた JSON ファイルを DataFrameに読み込み、接続、JSON ファイルへのパス、およびデータの内部テーブル表現の名前を指定します。 この例では、 book.json
ファイルに複数行が含まれるように指定する必要があります。 ここでの列のスキーマの指定はオプションです。 それ以外の場合、sparklyr は default によって列のスキーマを推論します。 たとえば、ノートブックのセルで次のコードを実行して、アップロードされた JSON ファイルのデータを jsonDF
という名前の DataFrame に読み込みます。
jsonDF <- spark_read_json(
sc = sc,
name = "jsonTable",
path = "/FileStore/tables/books.json",
options = list("multiLine" = TRUE),
columns = c(
author = "character",
country = "character",
imageLink = "character",
language = "character",
link = "character",
pages = "integer",
title = "character",
year = "integer"
)
)
DataFrameの最初の数行を印刷する
SparkR::head
、SparkR::show
、または sparklyr::collect
を使用して、 DataFrameの最初の行を印刷できます。 デフォルトでは、 head
は最初の 6 行をデフォルトで出力します。 show
と collect
は、最初の 10 行を印刷します。 たとえば、ノートブックのセルで次のコードを実行して、 jsonDF
という名前の DataFrame の最初の行を出力します。
head(jsonDF)
# Source: spark<?> [?? x 8]
# author country image…¹ langu…² link pages title year
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int>
# 1 Chinua Achebe Nigeria images… English "htt… 209 Thin… 1958
# 2 Hans Christian Andersen Denmark images… Danish "htt… 784 Fair… 1836
# 3 Dante Alighieri Italy images… Italian "htt… 928 The … 1315
# 4 Unknown Sumer and Akk… images… Akkadi… "htt… 160 The … -1700
# 5 Unknown Achaemenid Em… images… Hebrew "htt… 176 The … -600
# 6 Unknown India/Iran/Ir… images… Arabic "htt… 288 One … 1200
# … with abbreviated variable names ¹imageLink, ²language
show(jsonDF)
# Source: spark<jsonTable> [?? x 8]
# author country image…¹ langu…² link pages title year
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int>
# 1 Chinua Achebe Nigeria images… English "htt… 209 Thin… 1958
# 2 Hans Christian Andersen Denmark images… Danish "htt… 784 Fair… 1836
# 3 Dante Alighieri Italy images… Italian "htt… 928 The … 1315
# 4 Unknown Sumer and Ak… images… Akkadi… "htt… 160 The … -1700
# 5 Unknown Achaemenid E… images… Hebrew "htt… 176 The … -600
# 6 Unknown India/Iran/I… images… Arabic "htt… 288 One … 1200
# 7 Unknown Iceland images… Old No… "htt… 384 Njál… 1350
# 8 Jane Austen United Kingd… images… English "htt… 226 Prid… 1813
# 9 Honoré de Balzac France images… French "htt… 443 Le P… 1835
# 10 Samuel Beckett Republic of … images… French… "htt… 256 Moll… 1952
# … with more rows, and abbreviated variable names ¹imageLink, ²language
# ℹ Use `print(n = ...)` to see more rows
collect(jsonDF)
# A tibble: 100 × 8
# author country image…¹ langu…² link pages title year
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int>
# 1 Chinua Achebe Nigeria images… English "htt… 209 Thin… 1958
# 2 Hans Christian Andersen Denmark images… Danish "htt… 784 Fair… 1836
# 3 Dante Alighieri Italy images… Italian "htt… 928 The … 1315
# 4 Unknown Sumer and Ak… images… Akkadi… "htt… 160 The … -1700
# 5 Unknown Achaemenid E… images… Hebrew "htt… 176 The … -600
# 6 Unknown India/Iran/I… images… Arabic "htt… 288 One … 1200
# 7 Unknown Iceland images… Old No… "htt… 384 Njál… 1350
# 8 Jane Austen United Kingd… images… English "htt… 226 Prid… 1813
# 9 Honoré de Balzac France images… French "htt… 443 Le P… 1835
# 10 Samuel Beckett Republic of … images… French… "htt… 256 Moll… 1952
# … with 90 more rows, and abbreviated variable names ¹imageLink, ²language
# ℹ Use `print(n = ...)` to see more rows
SQL クエリーを実行し、テーブルへの書き込みとテーブルからの読み取りを行う
dplyr 関数を使用して、 DataFrameで SQL クエリーを実行できます。 たとえば、ノートブックのセルで次のコードを実行して、 dplyr::group_by
と dployr::count
を使用して、 jsonDF
という名前の DataFrame から作成者別のカウントを取得します。 dplyr::arrange
と dplyr::desc
を使用して、結果をカウントの降順に並べ替えます。次に、デフォルトで最初の10行を印刷します。
group_by(jsonDF, author) %>%
count() %>%
arrange(desc(n))
# Source: spark<?> [?? x 2]
# Ordered by: desc(n)
# author n
# <chr> <dbl>
# 1 Fyodor Dostoevsky 4
# 2 Unknown 4
# 3 Leo Tolstoy 3
# 4 Franz Kafka 3
# 5 William Shakespeare 3
# 6 William Faulkner 2
# 7 Gustave Flaubert 2
# 8 Homer 2
# 9 Gabriel García Márquez 2
# 10 Thomas Mann 2
# … with more rows
# ℹ Use `print(n = ...)` to see more rows
その後、 sparklyr::spark_write_table
を使用して、Databricks のテーブルに結果を書き込むことができます。 たとえば、ノートブックのセルで次のコードを実行してクエリーを再実行し、結果を json_books_agg
という名前のテーブルに書き込みます。
group_by(jsonDF, author) %>%
count() %>%
arrange(desc(n)) %>%
spark_write_table(
name = "json_books_agg",
mode = "overwrite"
)
テーブルが作成されたことを確認するには、 sparklyr::sdf_sql
を SparkR::showDF
と共に使用して、テーブルのデータを表示します。 たとえば、ノートブックのセルで次のコードを実行してテーブルを DataFrame にクエリーし、 sparklyr::collect
を使用して DataFrame の最初の 10 行を デフォルト で出力します。
collect(sdf_sql(sc, "SELECT * FROM json_books_agg"))
# A tibble: 82 × 2
# author n
# <chr> <dbl>
# 1 Fyodor Dostoevsky 4
# 2 Unknown 4
# 3 Leo Tolstoy 3
# 4 Franz Kafka 3
# 5 William Shakespeare 3
# 6 William Faulkner 2
# 7 Homer 2
# 8 Gustave Flaubert 2
# 9 Gabriel García Márquez 2
# 10 Thomas Mann 2
# … with 72 more rows
# ℹ Use `print(n = ...)` to see more rows
sparklyr::spark_read_table
を使用して同様のことを行うこともできます。たとえば、ノートブックのセルで次のコードを実行して、 jsonDF
という名前の前の DataFrame を DataFrame にクエリし、 sparklyr::collect
を使用して DataFrame の最初の 10 行を デフォルト によって印刷します。
fromTable <- spark_read_table(
sc = sc,
name = "json_books_agg"
)
collect(fromTable)
# A tibble: 82 × 2
# author n
# <chr> <dbl>
# 1 Fyodor Dostoevsky 4
# 2 Unknown 4
# 3 Leo Tolstoy 3
# 4 Franz Kafka 3
# 5 William Shakespeare 3
# 6 William Faulkner 2
# 7 Homer 2
# 8 Gustave Flaubert 2
# 9 Gabriel García Márquez 2
# 10 Thomas Mann 2
# … with 72 more rows
# ℹ Use `print(n = ...)` to see more rows
DataFrame に列とコンピュートの列値を追加する
dplyr 関数を使用して、 DataFrames にカラムを追加したり、カラムの値を計算することができます。
たとえば、ノートブックのセルで次のコードを実行して、 jsonDF
という名前の DataFrame の内容を取得します。 dplyr::mutate
を使用して today
という名前の列を追加し、この新しい列に現在のタイムスタンプを入力します。次に、これらの内容を withDate
という名前の新しい DataFrame に書き込み、 dplyr::collect
を使用して新しい DataFrameの最初の10行をデフォルトで印刷します。
注
dplyr::mutate
Hive の組み込み関数 (UDF とも呼ばれます) と組み込み集計関数 (UDAF とも呼ばれます) に準拠する引数のみを受け入れます。 一般的な情報については、「 Hive 関数」を参照してください。 このセクションの日付関連の関数に関する情報については、「 日付関数」を参照してください。
withDate <- jsonDF %>%
mutate(today = current_timestamp())
collect(withDate)
# A tibble: 100 × 9
# author country image…¹ langu…² link pages title year today
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int> <dttm>
# 1 Chinua A… Nigeria images… English "htt… 209 Thin… 1958 2022-09-27 21:32:59
# 2 Hans Chr… Denmark images… Danish "htt… 784 Fair… 1836 2022-09-27 21:32:59
# 3 Dante Al… Italy images… Italian "htt… 928 The … 1315 2022-09-27 21:32:59
# 4 Unknown Sumer … images… Akkadi… "htt… 160 The … -1700 2022-09-27 21:32:59
# 5 Unknown Achaem… images… Hebrew "htt… 176 The … -600 2022-09-27 21:32:59
# 6 Unknown India/… images… Arabic "htt… 288 One … 1200 2022-09-27 21:32:59
# 7 Unknown Iceland images… Old No… "htt… 384 Njál… 1350 2022-09-27 21:32:59
# 8 Jane Aus… United… images… English "htt… 226 Prid… 1813 2022-09-27 21:32:59
# 9 Honoré d… France images… French "htt… 443 Le P… 1835 2022-09-27 21:32:59
# 10 Samuel B… Republ… images… French… "htt… 256 Moll… 1952 2022-09-27 21:32:59
# … with 90 more rows, and abbreviated variable names ¹imageLink, ²language
# ℹ Use `print(n = ...)` to see more rows
次に、 dplyr::mutate
を使用して、 withDate
DataFrameの内容にさらに2つの列を追加します。 新しい month
列と year
列には、 today
列の月と年の数値が含まれます。 次に、これらの内容を withMMyyyy
という名前の新しいデータフレームに書き込み、 dplyr::select
を dplyr::collect
と共に使用して、新しいデータフレームの最初の 10 行の author
、 title
、 month
、および year
列を次のように出力します。
withMMyyyy <- withDate %>%
mutate(month = month(today),
year = year(today))
collect(select(withMMyyyy, c("author", "title", "month", "year")))
# A tibble: 100 × 4
# author title month year
# <chr> <chr> <int> <int>
# 1 Chinua Achebe Things Fall Apart 9 2022
# 2 Hans Christian Andersen Fairy tales 9 2022
# 3 Dante Alighieri The Divine Comedy 9 2022
# 4 Unknown The Epic Of Gilgamesh 9 2022
# 5 Unknown The Book Of Job 9 2022
# 6 Unknown One Thousand and One Nights 9 2022
# 7 Unknown Njál's Saga 9 2022
# 8 Jane Austen Pride and Prejudice 9 2022
# 9 Honoré de Balzac Le Père Goriot 9 2022
# 10 Samuel Beckett Molloy, Malone Dies, The Unnamable, the … 9 2022
# … with 90 more rows
# ℹ Use `print(n = ...)` to see more rows
次に、 dplyr::mutate
を使用して、 withMMyyyy
DataFrameの内容にさらに2つの列を追加します。 新しい formatted_date
列には today
列の yyyy-MM-dd
部分が含まれ、新しい day
列には新しい formatted_date
列の数値日が含まれます。次に、これらの内容を withUnixTimestamp
という名前の新しいデータフレームに書き込み、 dplyr::select
を dplyr::collect
と共に使用して、新しいデータフレームの最初の 10 行の title
、 formatted_date
、および day
列を次のように出力します。
withUnixTimestamp <- withMMyyyy %>%
mutate(formatted_date = date_format(today, "yyyy-MM-dd"),
day = dayofmonth(formatted_date))
collect(select(withUnixTimestamp, c("title", "formatted_date", "day")))
# A tibble: 100 × 3
# title formatted_date day
# <chr> <chr> <int>
# 1 Things Fall Apart 2022-09-27 27
# 2 Fairy tales 2022-09-27 27
# 3 The Divine Comedy 2022-09-27 27
# 4 The Epic Of Gilgamesh 2022-09-27 27
# 5 The Book Of Job 2022-09-27 27
# 6 One Thousand and One Nights 2022-09-27 27
# 7 Njál's Saga 2022-09-27 27
# 8 Pride and Prejudice 2022-09-27 27
# 9 Le Père Goriot 2022-09-27 27
# 10 Molloy, Malone Dies, The Unnamable, the trilogy 2022-09-27 27
# … with 90 more rows
# ℹ Use `print(n = ...)` to see more rows
一時ビューを作成する
既存の DataFramesに基づく名前付き一時ビューをメモリ内に作成できます。 たとえば、ノートブックのセルで次のコードを実行して、 SparkR::createOrReplaceTempView
を使用して jsonTable
という名前の前の DataFrame の内容を取得し、 timestampTable
という名前の一時的なビューを作成します。 次に、 sparklyr::spark_read_table
を使用して一時ビューの内容を読み取ります。 デフォルトで一時テーブルの最初の 10 行を印刷するには、 sparklyr::collect
を使用します。
createOrReplaceTempView(withTimestampDF, viewName = "timestampTable")
spark_read_table(
sc = sc,
name = "timestampTable"
) %>% collect()
# A tibble: 100 × 10
# author country image…¹ langu…² link pages title year today
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int> <dttm>
# 1 Chinua A… Nigeria images… English "htt… 209 Thin… 1958 2022-09-27 21:11:56
# 2 Hans Chr… Denmark images… Danish "htt… 784 Fair… 1836 2022-09-27 21:11:56
# 3 Dante Al… Italy images… Italian "htt… 928 The … 1315 2022-09-27 21:11:56
# 4 Unknown Sumer … images… Akkadi… "htt… 160 The … -1700 2022-09-27 21:11:56
# 5 Unknown Achaem… images… Hebrew "htt… 176 The … -600 2022-09-27 21:11:56
# 6 Unknown India/… images… Arabic "htt… 288 One … 1200 2022-09-27 21:11:56
# 7 Unknown Iceland images… Old No… "htt… 384 Njál… 1350 2022-09-27 21:11:56
# 8 Jane Aus… United… images… English "htt… 226 Prid… 1813 2022-09-27 21:11:56
# 9 Honoré d… France images… French "htt… 443 Le P… 1835 2022-09-27 21:11:56
# 10 Samuel B… Republ… images… French… "htt… 256 Moll… 1952 2022-09-27 21:11:56
# … with 90 more rows, 1 more variable: month <chr>, and abbreviated variable
# names ¹imageLink, ²language
# ℹ Use `print(n = ...)` to see more rows, and `colnames()` to see all variable names
DataFrame の統計分析を実行する
統計分析には、dplyrと一緒に sparklyr を使用できます。
たとえば、統計を実行する DataFrame を作成します。 これを行うには、ノートブック セルで次のコードを実行し、 sparklyr::sdf_copy_to
を使用して、R に組み込まれている iris
データセットの内容を iris
という名前の DataFrame に書き込みます。 デフォルトで一時テーブルの最初の 10 行を印刷するには、 sparklyr::sdf_collect
を使用します。
irisDF <- sdf_copy_to(
sc = sc,
x = iris,
name = "iris",
overwrite = TRUE
)
sdf_collect(irisDF, "row-wise")
# A tibble: 150 × 5
# Sepal_Length Sepal_Width Petal_Length Petal_Width Species
# <dbl> <dbl> <dbl> <dbl> <chr>
# 1 5.1 3.5 1.4 0.2 setosa
# 2 4.9 3 1.4 0.2 setosa
# 3 4.7 3.2 1.3 0.2 setosa
# 4 4.6 3.1 1.5 0.2 setosa
# 5 5 3.6 1.4 0.2 setosa
# 6 5.4 3.9 1.7 0.4 setosa
# 7 4.6 3.4 1.4 0.3 setosa
# 8 5 3.4 1.5 0.2 setosa
# 9 4.4 2.9 1.4 0.2 setosa
# 10 4.9 3.1 1.5 0.1 setosa
# … with 140 more rows
# ℹ Use `print(n = ...)` to see more rows
次に、 dplyr::group_by
を使用して、 Species
列で行をグループ化します。 dplyr::summarize
を dplyr::percentile_approx
と共に使用して、 Sepal_Length
列の 25 番目、50 番目、75 番目、および 100 番目の分位数による要約統計量を Species
単位で計算します。sparklyr::collect
を使用して結果を印刷します。
注
dplyr::summarize
Hive の組み込み関数 (UDF とも呼ばれます) と組み込み集計関数 (UDAF とも呼ばれます) に準拠する引数のみを受け入れます。 一般的な情報については、「 Hive 関数」を参照してください。 percentile_approx
に関する情報については、 組み込み集計関数 (UDAF)を参照してください。
quantileDF <- irisDF %>%
group_by(Species) %>%
summarize(
quantile_25th = percentile_approx(
Sepal_Length,
0.25
),
quantile_50th = percentile_approx(
Sepal_Length,
0.50
),
quantile_75th = percentile_approx(
Sepal_Length,
0.75
),
quantile_100th = percentile_approx(
Sepal_Length,
1.0
)
)
collect(quantileDF)
# A tibble: 3 × 5
# Species quantile_25th quantile_50th quantile_75th quantile_100th
# <chr> <dbl> <dbl> <dbl> <dbl>
# 1 virginica 6.2 6.5 6.9 7.9
# 2 versicolor 5.6 5.9 6.3 7
# 3 setosa 4.8 5 5.2 5.8
同様の結果は、たとえば、次の sparklyr::sdf_quantile
を使用して計算できます。
print(sdf_quantile(
x = irisDF %>%
filter(Species == "virginica"),
column = "Sepal_Length",
probabilities = c(0.25, 0.5, 0.75, 1.0)
))
# 25% 50% 75% 100%
# 6.2 6.5 6.9 7.9