DatabricksによるAmazon Redshift へのクエリー

Databricks を使用して、Amazon Redshift からテーブルを読み書きできます。

注:

Redshift へのクエリを管理するには、レイクハウスフェデレーションを使用することをお勧めします。 「レイクハウスフェデレーションとは」をご覧ください。

Databricks Redshift データソースは、Amazon S3 を使用して Redshift との間でデータを効率的に転送し、JDBC を使用して Redshift で適切な COPY コマンドと UNLOAD コマンドを自動的にトリガーします。

注:

Databricks Runtime 11.3 LTS 以降では、Databricks Runtime には Redshift JDBC ドライバーが含まれており、format オプションの redshift キーワードを使用してアクセスできます。 「Databricks Runtime リリースノートのバージョンと、各 Databricks Runtime に含まれるドライバー バージョンの互換性」を参照してください。ユーザー提供のドライバーは引き続きサポートされ、バンドルされている JDBC ドライバーよりも優先されます。

Databricks Runtime 10.4 LTS 以下では、Redshift JDBC ドライバーを手動でインストールする必要があり、クエリーでは形式にドライバー (com.databricks.spark.redshift) を使用する必要があります。 「 Redshift ドライバーのインストール」を参照してください。

使用方法

次の例は、Redshift ドライバーとの接続を示しています。 PostgreSQL JDBC ドライバーを使用している場合は、 url パラメーター値を置き換えます。

AWS 認証情報を設定したら、Python、SQL、R、または Scala の Spark データソース API でデータソースを使用できます。

重要

Unity Catalog で定義されている外部ロケーション は、 tempdir の場所としてサポートされていません。

# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") # Optional - will use default port 5439 if not specified.
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a query
df = (spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table

# Write back to a table
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()
)

# Write back to a table using IAM Role based authentication
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()
)

Databricks Runtime 10.4 LTS 以下で SQL を使用してデータを読み取ります。

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  dbtable '<table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

Databricks Runtime 11.3 LTS 以降で SQL を使用してデータを読み取る:

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  host '<hostname>',
  port '<port>', /* Optional - will use default port 5439 if not specified. *./
  user '<username>',
  password '<password>',
  database '<database-name>'
  dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
  tempdir 's3a://<bucket>/<directory-path>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

SQLを使用してデータを書き込みます。

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
  dbtable '<new-table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;

SQL API では、新しいテーブルの作成のみがサポートされ、上書きや追加はサポートされません。

Databricks Runtime 10.4 LTS 以下で R を使用してデータを読み取ります。

df <- read.df(
   NULL,
   "com.databricks.spark.redshift",
   tempdir = "s3a://<your-bucket>/<your-directory-path>",
   dbtable = "<your-table-name>",
   url = "jdbc:redshift://<the-rest-of-the-connection-string>")

Databricks Runtime 11.3 LTS 以降で R を使用してデータを読み取る:

df <- read.df(
  NULL,
  "redshift",
  host = "hostname",
  port = "port",
  user = "username",
  password = "password",
  database = "database-name",
  dbtable = "schema-name.table-name",
  tempdir = "s3a://<your-bucket>/<your-directory-path>",
  forward_spark_s3_credentials = "true",
  dbtable = "<your-table-name>")
// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") /* Optional - will use default port 5439 if not specified. */
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", true)
  .load()

// Read data from a query
val df = spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table

// Write back to a table
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()

// Write back to a table using IAM Role based authentication
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()

Redshiftを使用するための推奨事項

クエリーの実行により、大量のデータが S3 に抽出される場合があります。 Redshift の同じデータに対して複数のクエリーを実行する予定の場合、Databricks では、 Delta Lake を使用して抽出されたデータを保存することをお勧めします。

設定

S3 と Redshift への認証

データソースには、次の図に示すように、複数のネットワーク接続が含まれます。

                            ┌───────┐
       ┌───────────────────>│  S3   │<─────────────────┐
       │    IAM or keys     └───────┘    IAM or keys   │
       │                        ^                      │
       │                        │ IAM or keys          │
       v                        v               ┌──────v────┐
┌────────────┐            ┌───────────┐         │┌──────────┴┐
│  Redshift  │            │  Spark    │         ││   Spark   │
│            │<──────────>│  Driver   │<────────>| Executors │
└────────────┘            └───────────┘          └───────────┘
               JDBC with                  Configured
               username /                     in
               password                     Spark
        (SSL enabled by default)

データソースは、Redshift との間でデータを転送するときに、S3 に対してデータの読み取りと書き込みを行います。 その結果、S3 バケットへの読み取りおよび書き込みアクセス権を持つ AWS 認証情報 ( tempdir 設定パラメーターを使用して指定) が必要です。

注:

データソースは、S3 で作成した一時ファイルをクリーンアップしません。 そのため、 オブジェクトライフサイクル設定 で専用の一時 S3 バケットを使用して、指定した有効期限が経過すると一時ファイルが自動的に削除されるようにすることをお勧めします。 これらのファイルを暗号化する方法については、このドキュメントの 「暗号化 」セクションを参照してください。 Unity Catalog で定義されている外部ロケーションtempdirの場所として使用することはできません。

次のセクションでは、各接続の認証構成オプションについて説明します。

Spark ドライバーから Redshift へ

Sparkドライバーは、ユーザー名とパスワードを使用してJDBC経由でRedshiftに接続します。 Redshift は、この接続を認証するための IAMロールの使用をサポートしていません。 デフォルトでは、この接続はSSL暗号化を使用します。詳細については、「 暗号化」を参照してください。

Spark から S3 へ

S3は、Redshiftからの読み取りまたはRedshiftへの書き込み時に大量のデータを保存する仲介役として機能します。 Spark は、Hadoop FileSystem インターフェイスと Amazon Java SDK の S3 クライアントの両方を使用して S3 に接続します。

注:

DBFS マウントを使用して Redshift の S3 へのアクセスを設定することはできません。

  • Hadoop confでキーを設定します。 AWS キーは、 Hadoop 設定プロパティを使用して指定できます。 tempdir構成がs3a://ファイルシステムを指している場合は、Hadoop XML構成ファイルでfs.s3a.access.keyおよびfs.s3a.secret.keyプロパティを設定するか、sc.hadoopConfiguration.set()を呼び出してSparkのグローバルHadoop構成を構成できます。s3n://ファイルシステムを使用する場合は、次の例に示すように、従来の構成キーを指定できます。

    たとえば、 s3a ファイルシステムを使用している場合は、次のように追加します。

    sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
    

    レガシー s3n ファイルシステムの場合は、以下を追加します。

    sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
    

    次のコマンドは、一部の Spark 内部に依存していますが、すべての PySpark バージョンで動作するはずであり、将来変更される可能性は低いです。

      sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>")
      sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
    

Redshift から S3 へ

forward_spark_s3_credentialsオプションを true に設定すると、Spark が JDBC 経由で S3 に接続するために使用している AWS キー認証情報が Redshift に自動的に転送されます。JDBC クエリーにはこれらの資格情報が埋め込まれるため、Databricks では JDBC 接続の SSL 暗号化を有効にすることを強くお勧めします。

暗号化

  • JDBC の保護: JDBC URL に SSL 関連の設定が存在しない限り、データソースはデフォルトで SSL 暗号化を有効にし、Redshift サーバーが信頼できる (つまり、 sslmode=verify-full) であることも検証します。 そのために、サーバー証明書は、最初に必要になったときにAmazonサーバーから自動的にダウンロードされます。 それが失敗した場合は、事前にバンドルされた証明書ファイルがフォールバックとして使用されます。 これは、RedshiftとPostgreSQLの両方のJDBCドライバーに当てはまります。

    この機能に問題がある場合、または単にSSLを無効にしたい場合は、DataFrameReaderまたはDataFrameWriter.option("autoenablessl", "false")を呼び出すことができます。

    カスタムSSL関連設定を指定する場合は、Redshiftのドキュメントの指示に従うことができます。 Java および JDBC ドライバ構成オプションでの SSL 証明書とサーバー証明書の使用 JDBC に存在する SSL 関連のオプション url データソースで使用されます (つまり、自動構成はトリガーされません)。

  • S3に保存されているUNLOADデータ(Redshiftからの読み取り時に保存されるデータ)の暗号化:S3へのデータのアンロードに関するRedshiftのドキュメントによると、「UNLOADはAmazon S3サーバー側の暗号化(SSE-S3)を使用してデータファイルを自動的に暗号化します」。

    Redshiftはカスタムキーを使用したクライアント側の暗号化もサポートしていますが( 「暗号化されたデータファイルのアンロード」を参照)、データソースには必要な対称キーを指定する機能がありません。

  • S3に保存されたCOPYデータ(Redshiftへの書き込み時に保存されるデータ)の暗号化: Amazon S3からの暗号化されたデータファイルのロードに関するRedshiftのドキュメントによると、

COPY コマンドを使用して、AWS が管理する暗号化キー (SSE-S3 または SSE-KMS) によるサーバー側の暗号化、クライアント側の暗号化、またはその両方を使用して Amazon S3 にアップロードされたデータファイルをロードできます。COPY は、顧客指定のキー (SSE-C) を使用した Amazon S3 サーバー側の暗号化をサポートしていません。

パラメーター

Spark SQLで提供されるパラメーター・マップまたはOPTIONSは、次の設定をサポートしています。

パラメーター

*必須

デフォルト

説明

dbtable

はい (クエリーが指定されていない場合)。

なし

Redshift で作成または読み取りを行うテーブル。 このパラメーターは、データを Redshift に保存し直すときに必要です。

query

はい (dbtable が指定されていない場合)。

なし

Redshiftで読み取るクエリー。

user

なし

なし

Redshift ユーザー名。 パスワードオプションと組み合わせて使用する必要があります。 ユーザーとパスワードが URL で渡されていない場合にのみ使用でき、両方を渡すとエラーになります。 このパラメーターは、ユーザー名にエスケープする必要がある特殊文字が含まれている場合に使用します。

password

なし

なし

Redshift のパスワード。 userオプションと組み合わせて使用する必要があります。ユーザーとパスワードが URL で渡されない場合にのみ使用できます。両方を渡すとエラーになります。 このパラメーターは、パスワードにエスケープする必要がある特殊文字が含まれている場合に使用します。

url

あり

なし

JDBC URL (次の形式)

jdbc:subprotocol://<host>:<port>/database?user=<username>&password=<password>

subprotocol は、ロードしたJDBCドライバに応じて、 postgresql または redshiftにすることができます。 Redshift 互換ドライバーが 1 つクラスパス上にあり、この URL と一致する必要があります。 hostport はRedshiftマスターノードを指す必要があるため、ドライバーアプリケーションからのアクセスを許可するようにセキュリティグループやVPCを設定する必要があります。 database はRedshiftデータベース名を識別し、 userpassword はデータベースにアクセスするための認証情報であり、JDBCのこのURLに埋め込む必要があり、ユーザーアカウントには参照されるテーブルに必要な権限が必要です。

search_path

なし

なし

Redshiftでスキーマ検索パスを設定します。 SET search_path toコマンドを使用して設定します。テーブルを検索するスキーマ名のコンマ区切りリストである必要があります。 search_path の Redshift ドキュメントを参照してください。

aws_iam_role

IAMロールを使用して承認する場合のみ。

なし

Redshift クラスターにアタッチされた IAM Redshift COPY/UNLOAD オペレーションロール の完全指定の ARN (例: arn:aws:iam::123456789000:role/<redshift-iam-role>)。

forward_spark_s3_credentials

なし

false

true場合、データソースは Spark が S3 への接続に使用している認証情報を自動的に検出し、それらの認証情報を JDBC 経由で Redshift に転送します。これらの資格情報は JDBC クエリーの一部として送信されるため、このオプションを使用する場合は、JDBC 接続の SSL 暗号化を有効にすることを強くお勧めします。

temporary_aws_access_key_id

なし

なし

AWS アクセスキーには、S3 バケットへの書き込み権限が必要です。

temporary_aws_secret_access_key

なし

なし

提供されたアクセスキーに対応するAWSシークレットアクセスキー。

temporary_aws_session_token

なし

なし

提供されたアクセスキーに対応するAWSセッショントークン。

tempdir

あり

なし

Amazon S3 の書き込み可能な場所で、読み取り時にアンロードされたデータと、書き込み時に Redshift にロードされる Avro データに使用されます。 通常の ETL パイプラインの一部として Spark の Redshift データソースを使用している場合は、バケットに ライフサイクルポリシー を設定し、それをこのデータの一時的な場所として使用すると便利です。

Unity Catalog で定義されている外部ロケーションtempdir場所として使用することはできません。

jdbcdriver

なし

JDBC URL のサブプロトコルによって決定されます。

使用するJDBCドライバのクラス名。 このクラスはクラスパス上に存在する必要があります。 ほとんどの場合、適切なドライバ・クラス名はJDBC URLのサブプロトコルによって自動的に決定されるため、このオプションを指定する必要はありません。

diststyle

なし

EVEN

テーブルの作成時に使用するRedshift 分散スタイルEVENKEYALLのいずれかになります(Redshiftのドキュメントを参照)。KEYを使用する場合は、distkey オプションを使用して分散キーも設定する必要があります。

distkey

いいえ、 DISTSTYLE KEY

なし

表の作成時に分散キーとして使用する表の列の名前。

sortkeyspec

なし

なし

完全な Redshiftソートキー 定義。 たとえば、次のようになります。

  • SORTKEY(my_sort_column)

  • COMPOUND SORTKEY(sort_col_1, sort_col_2)

  • INTERLEAVED SORTKEY(sort_col_1, sort_col_2)

usestagingtable (非推奨)

なし

true

この非推奨のオプションを false に設定すると、上書き操作の宛先テーブルが書き込みの開始時にすぐに削除され、上書き操作が非アトミックになり、宛先テーブルの可用性が低下します。 これにより、上書きに必要な一時ディスク容量が減る可能性があります。

usestagingtable=false操作を設定すると、データの損失や使用不能のリスクがあるため、非推奨となり、宛先テーブルを手動で削除する必要があります。

description

なし

なし

テーブルの説明。 SQL COMMENTコマンドを使用して設定され、ほとんどのクエリーツールに表示されます。 個々の列に説明を設定するには、 description メタデータも参照してください。

preactions

なし

なし

コマンドをロードする前に実行される SQL コマンドの ; 区切りリスト COPY 。 新しいデータを読み込む前に、いくつかの DELETE コマンドなどをここで実行すると便利な場合があります。 コマンドに %sが含まれている場合、テーブル名は実行前に書式設定されます (ステージング テーブルを使用している場合)。

これらのコマンドが失敗すると、エラーとして扱われ、例外がスローされることに注意してください。 ステージング テーブルを使用している場合、事前アクションが失敗すると、変更が元に戻され、バックアップ テーブルが復元されます。

postactions

なし

なし

データのロード時にCOPYが成功した後に実行される SQL コマンドの;区切りリスト。新しいデータをロードするときに、いくつかの GRANT コマンドなどをここで実行すると便利な場合があります。 コマンドに %sが含まれている場合、テーブル名は実行前に書式設定されます (ステージング テーブルを使用している場合)。

これらのコマンドが失敗すると、エラーとして扱われ、例外がスローされることに注意してください。 ステージング テーブルを使用している場合、ポスト アクションが失敗すると、変更が元に戻され、バックアップ テーブルが復元されます。

extracopyoptions

なし

なし

データをロードするときに Redshift COPY コマンドに追加する追加オプションのリスト ( TRUNCATECOLUMNSMAXERROR n など) (他のオプションについては、 Redshift のドキュメント を参照してください)。

これらのオプションは COPY コマンドの末尾に追加されるため、コマンドの末尾で意味のあるオプションのみを使用できますが、考えられるほとんどのユースケースをカバーする必要があります。

tempformat

なし

AVRO

Redshift への書き込み時に S3 に一時ファイルを保存する形式。 デフォルトは AVROです。その他の許容値は、CSV と gzip 圧縮された CSV の CSVCSV GZIP です。

Redshift は Avro ファイルの読み込み時よりも CSV の読み込み時に大幅に高速であるため、その tempformat を使用すると、Redshift への書き込み時にパフォーマンスが大幅に向上する可能性があります。

csvnullstring

なし

@NULL@

CSV tempformat を使用するときに null に書き込む文字列値。 これは、実際のデータには表示されない値にする必要があります。

csvseparator

なし

,

tempformat を CSV または CSV GZIPに設定して一時ファイルを書き込むときに使用する区切り文字。 これは、有効なASCII文字(「,」や「|」など)である必要があります。

csvignoreleadingwhitespace

なし

true

true に設定すると、 tempformatCSV または CSV GZIPに設定されている場合に、書き込み中に値から先頭の空白が削除されます。 それ以外の場合、空白は保持されます。

csvignoretrailingwhitespace

なし

true

true に設定すると、 tempformatCSV または CSV GZIPに設定されている場合に、書き込み中に値から末尾の空白が削除されます。 それ以外の場合、空白は保持されます。

infer_timestamp_ntz_type

なし

false

trueの場合、Redshift TIMESTAMP型の値は、読み取り時に TimestampNTZType (タイムゾーンなしのタイムスタンプ) として解釈されます。それ以外の場合、すべてのタイムスタンプは、基になる Redshift テーブルのタイプに関係なく、 TimestampType として解釈されます。

その他の構成オプション

文字列列の最大サイズの構成

Redshift テーブルを作成する場合、デフォルトの動作では、文字列の列に TEXT 列が作成されます。 Redshift は TEXT 列を VARCHAR(256)として保存するため、これらの列の最大サイズは 256 文字です (ソース)。

より大きな列をサポートするには、 maxlength 列メタデータ フィールドを使用して、個々の文字列列の最大長を指定します。 これは、デフォルトよりも小さい最大長の列を宣言することで、スペースを節約するパフォーマンスの最適化を実装する場合にも役立ちます。

注:

Spark の制限により、SQL および R 言語 APIs では、列のメタデータの変更はサポートされていません。

df = ... # the dataframe you'll want to write to Redshift

# Specify the custom width of each column
columnLengthMap = {
  "language_code": 2,
  "country_code": 2,
  "url": 2083,
}

# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
  metadata = {'maxlength': length}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", jdbcURL) \
  .option("tempdir", s3TempDirectory) \
  .option("dbtable", sessionTable) \
  .save()

以下は、SparkのScala APIを使用して複数の列のメタデータフィールドを更新する例です。

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom width of each column
val columnLengthMap = Map(
  "language_code" -> 2,
  "country_code" -> 2,
  "url" -> 2083
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
  val metadata = new MetadataBuilder().putLong("maxlength", length).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

df.write
  .format("com.databricks.spark.redshift")
  .option("url", jdbcURL)
  .option("tempdir", s3TempDirectory)
  .option("dbtable", sessionTable)
.save()

カスタム列タイプを設定する

列の種類を手動で設定する必要がある場合は、 redshift_type 列メタデータを使用できます。 たとえば、 Spark SQL Schema -> Redshift SQL 型マッチャーをオーバーライドしてユーザー定義の列型を割り当てる場合は、次の操作を実行できます。

# Specify the custom type of each column
columnTypeMap = {
  "language_code": "CHAR(2)",
  "country_code": "CHAR(2)",
  "url": "BPCHAR(111)",
}

df = ... # the dataframe you'll want to write to Redshift

# Apply each column metadata customization
for (colName, colType) in columnTypeMap.iteritems():
  metadata = {'redshift_type': colType}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom type of each column
val columnTypeMap = Map(
  "language_code" -> "CHAR(2)",
  "country_code" -> "CHAR(2)",
  "url" -> "BPCHAR(111)"
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
  val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

列のエンコードを構成する

テーブルを作成するときは、 encoding 列メタデータフィールドを使用して、各列の圧縮エンコードを指定します (使用可能なエンコードについては、 Amazon ドキュメント を参照してください)。

の説明の設定

Redshiftでは、ほとんどのクエリーツール( COMMENT コマンドを使用)に表示される説明を列に添付できます。 description列メタデータ フィールドを設定して、個々の列の説明を指定できます。

Redshiftへのクエリープッシュダウン

Sparkオプティマイザーは、次の演算子をRedshiftにプッシュダウンします。

  • Filter

  • Project

  • Sort

  • Limit

  • Aggregation

  • Join

ProjectFilter内では、次の式がサポートされています。

  • ほとんどの Boolean 論理演算子

  • 比較

  • 基本的な算術演算

  • 数値キャストと文字列キャスト

  • ほとんどの文字列関数

  • スカラーサブクエリ (Redshift に完全にプッシュダウンできる場合)。

注:

このプッシュダウンでは、日付とタイムスタンプを操作する式はサポートされていません。

Aggregation内では、次の集計関数がサポートされています。

  • AVG

  • COUNT

  • MAX

  • MIN

  • SUM

  • STDDEV_SAMP

  • STDDEV_POP

  • VAR_SAMP

  • VAR_POP

該当する場合は、 DISTINCT 句と組み合わせます。

Join内では、次のタイプの結合がサポートされています。

  • INNER JOIN

  • LEFT OUTER JOIN

  • RIGHT OUTER JOIN

  • LEFT SEMI JOIN

  • LEFT ANTI JOIN

  • オプティマイザによって Join に書き換えられるサブクエリ。 WHERE EXISTSWHERE NOT EXISTS

注:

結合プッシュダウンは FULL OUTER JOINをサポートしていません。

プッシュダウンは、 LIMITを使用したクエリーで最も有益かもしれません。 SELECT * FROM large_redshift_table LIMIT 10 などのクエリーは、まずテーブル全体が中間の結果として S3 にアンロードされるため、非常に時間がかかる可能性があります。プッシュダウンでは、 LIMIT はRedshiftで実行されます。 集計を使用したクエリーでは、集計をRedshiftにプッシュダウンすることで、転送する必要のあるデータの量を減らすこともできます。

クエリー Redshift へのプッシュダウンはデフォルトで有効になっています。 spark.databricks.redshift.pushdownfalseに設定することで無効にできます。無効になっている場合でも、Sparkはフィルターをプッシュダウンし、列の削除をRedshiftに実行します。

Redshiftドライバーのインストール

Redshift データソースには、Redshift 互換の JDBC ドライバーも必要です。 Redshift は PostgreSQL データベース システムに基づいているため、Databricks Runtime に含まれる PostgreSQL JDBC ドライバーまたは Amazon 推奨の Redshift JDBC ドライバーを使用できます。 PostgreSQL JDBCドライバを使用するためにインストールする必要はありません。 各 Databricks Runtime リリースに含まれる PostgreSQL JDBC ドライバーのバージョンは、Databricks Runtime リリースノートに記載されています。

Redshift JDBCドライバーを手動でインストールするには:

  1. Amazonからドライバーをダウンロードします。

  2. ドライバーを Databricks ワークスペースにアップロードします。 「ライブラリ」を参照してください。

  3. クラスターにライブラリをインストールします。

注:

Databricks では、最新バージョンの Redshift JDBC ドライバーを使用することをお勧めします。 Redshift JDBC ドライバーのバージョンが 1.2.41 より前のバージョンには、次の制限があります。

  • バージョン 1.2.16 のドライバは、SQL クエリーで where 句を使用すると空のデータを返します。

  • 1.2.41 より前のバージョンのドライバーでは、列の null 値の許容が "Unknown" ではなく "Not Nullable" と誤って報告されるため、無効な結果が返される場合があります。

トランザクションの保証

このセクションでは、Spark の Redshift データソースのトランザクション保証について説明します。

RedshiftとS3のプロパティに関する一般的な背景

Redshift トランザクション保証の一般的な情報については、Redshift ドキュメントの「 並列書き込み操作の管理 」の章を参照してください。 一言で言えば、Redshiftは、Redshift BEGIN コマンドのドキュメントに従って 、シリアル化可能な分離 を提供します。

4 つのトランザクション分離レベルのいずれかを使用できますが、Amazon Redshift はすべての分離レベルをシリアル化可能として処理します。

Redshiftのドキュメントによると:

Amazon Redshift は、個別に実行される各 SQL コマンドが個別にコミットするデフォルトの 自動コミット 動作をサポートしています。

したがって、 COPYUNLOAD などの個々のコマンドは原子的でトランザクション的ですが、明示的な BEGINEND は、複数のコマンドやクエリーの原子性を強制するためにのみ必要です。

Redshift からの読み取りと Redshift への書き込みの場合、データソースは S3 でデータの読み取りと書き込みを行います。 Spark と Redshift はどちらも、パーティション分割された出力を生成し、S3 の複数のファイルに保存します。 Amazon S3 データ整合性モデルのドキュメントによると、 S3 バケットのリスティングオペレーションは結果整合性があるため、この結果整合性のソースによるデータの欠落や不完全さを避けるために、ファイルを特別な長さにする必要があります。

Spark の Redshift データソースの保証

既存のテーブルへの追加

Redshift に行を挿入する場合、データソースは COPY コマンドを使用し、特定の結果整合性のある S3 オペレーションから保護するための マニフェスト を指定します。 その結果、既存のテーブルへの spark-redshift 追加は、通常の Redshift COPY コマンドと同じアトミックおよびトランザクションプロパティを持ちます。

新しいテーブルを作成する (SaveMode.CreateIfNotExists)

新しいテーブルの作成は 2 段階のプロセスで、 CREATE TABLE コマンドとそれに続く COPY コマンドで行の初期セットを追加します。 どちらの操作も同じトランザクションで実行されます。

既存のテーブルを上書きする

デフォルトでは、データソースはトランザクションを使用して上書きを実行し、宛先テーブルを削除し、新しい空のテーブルを作成し、それに行を追加することによって実装されます。

非推奨の usestagingtable 設定が falseに設定されている場合、データソースは新しいテーブルに行を追加する前に DELETE TABLE コマンドをコミットし、上書き操作の原子性を犠牲にしますが、上書き中に Redshift が必要とするステージングスペースの量を減らします。

Redshiftテーブルへのクエリー

クエリー Redshift UNLOAD コマンドを使用してクエリーを実行し、その結果を S3 に保存し、 マニフェスト を使用して特定の結果整合性のある S3 オペレーションから保護します。 その結果、Spark の Redshift データソースからのクエリーは、通常の Redshift クエリーと同じ整合性プロパティを持つ必要があります。

一般的な問題と解決策

S3 バケットと Redshift クラスターが異なる AWS リージョンにある

デフォルトでは、S3 バケットと Redshift クラスターが異なる AWS リージョンにある場合、S3 <-> Redshift コピーは機能しません。

S3 バケットが別のリージョンにあるときに Redshift テーブルを読み取ろうとすると、次のようなエラーが表示されることがあります。

ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.

同様に、別のリージョンの S3 バケットを使用して Redshift に書き込もうとすると、次のエラーが発生する可能性があります。

error:  Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
  • 書き込みます:Redshift の COPY コマンドは S3 バケットリージョンの明示的な指定をサポートしているため、extracopyoptions設定に region 'the-region-name'を追加することで、このような場合に Redshift への書き込みを適切に機能させることができます。たとえば、米国東部 (バージニア) リージョンのバケットと Scala API では、次を使用します。

    .option("extracopyoptions", "region 'us-east-1'")
    

    または、 awsregion 設定を使用することもできます。

    .option("awsregion", "us-east-1")
    
  • 読み取り: Redshift UNLOAD コマンドは、S3 バケットリージョンの明示的な指定もサポートしています。 読み取りを正しく機能させるには、 awsregion 設定に領域を追加します。

    .option("awsregion", "us-east-1")
    

JDBC URLに特殊文字を含むパスワードを使用すると、認証エラーが発生する

JDBC URL の一部としてユーザー名とパスワードを指定し、パスワードに ;?&などの特殊文字が含まれている場合は、次の例外が表示されることがあります。

java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'

これは、ユーザー名またはパスワードの特殊文字がJDBCドライバによって正しくエスケープされていないことが原因です。 対応する DataFrame オプション userpasswordを使用して、ユーザー名とパスワードを指定してください。 詳細については、「 パラメーター」を参照してください。

実行時間の長い Spark クエリーは、対応する Redshift 操作が完了しても無期限にハングします

Redshift との間で大量のデータを読み書きしている場合、AWS Redshift モニタリングページに、対応する LOAD または UNLOAD 操作が完了し、クラスターがアイドル状態であることが示されていても、Spark クエリーが無期限にハングすることがあります。 これは、Redshift と Spark の間の接続がタイムアウトしたことが原因です。 これを回避するには、 tcpKeepAlive JDBC フラグが有効になっていて、 TCPKeepAliveMinutes が低い値 (1 など) に設定されていることを確認してください。

詳細については、「 Amazon Redshift JDBC ドライバーの設定」を参照してください。

タイム・スタンプとタイム・ゾーン・セマンティクス

データを読み取ると、Redshift TIMESTAMPTIMESTAMPTZ の両方のデータ型が Spark TimestampTypeにマッピングされ、値が協定世界時 (UTC) に変換され、UTC タイムスタンプとして格納されます。 Redshift TIMESTAMPの場合、値にタイムゾーン情報がないため、ローカルタイムゾーンが想定されます。 Redshift テーブルにデータを書き込むと、Spark TimestampType が Redshift TIMESTAMP データ型にマップされます。

移行ガイド

データソースでは、Spark S3 認証情報が Redshift に転送される前に、 forward_spark_s3_credentials を明示的に設定する必要があります。 この変更は、 aws_iam_role または temporary_aws_* 認証メカニズムを使用する場合には影響しません。 ただし、以前のデフォルトの動作に依存していた場合は、以前の Redshift から S3 への認証メカニズムを引き続き使用するには、 forward_spark_s3_credentialstrue に明示的に設定する必要があります。 3 つの認証メカニズムとそのセキュリティのトレードオフについては、このドキュメントの「 S3 と Redshift への認証 」セクションを参照してください。