Delta Live Tables を使用したデータのロード
Delta Live Tables を使用して、 Databricks で Apache Spark でサポートされている任意の DATA からデータを読み込むことができます。データセット (テーブルとビュー) は、Spark DataFrame を返す任意のクエリー (ストリーミング DataFrame s や Spark DataFrames の Pandas など) に対して DeltaLive Tables で定義できます。データ取り込みタスクの場合、Databricks では、ほとんどのユース ケースでストリーミング テーブルを使用することをお勧めします。 ストリーミングテーブルは、 Auto Loader を使用してクラウドオブジェクトストレージから、またはKafkaなどのメッセージバスからデータを取り込むのに適しています。 次の例は、いくつかの一般的なパターンを示しています。
重要
すべてのデータソースがSQLをサポートしているわけではありません。 Delta Live Tables パイプラインで SQL ノートブックと Python ノートブックを混在させて、インジェスト以外のすべての操作に SQL を使用できます。
Delta Live Tables にデフォルトでパッケージ化されていないライブラリの操作の詳細については、 「Delta Live Tables パイプラインの Python 依存関係の管理」を参照してください。
クラウドオブジェクトストレージからファイルをロードする
Databricks では、クラウドオブジェクトストレージからのほとんどのデータ取り込みタスクに Auto Loader with Delta Live Tables を使用することをお勧めします。 Auto Loader と Delta Live Tables は、クラウドストレージに到着すると、増え続けるデータを増分的かつべき等にロードするように設計されています。 次の例では、 Auto Loader を使用して CSV ファイルと JSON ファイルからデータセットを作成します。
注
Auto LoaderUnity Catalog対応パイプラインで を含むファイルをロードするには、 外部ロケーション を使用する必要があります。Delta Live Tables で Unity Catalog を使用する方法の詳細については、「 Delta Live Tables パイプラインで Unity Catalog を使用する」を参照してください。
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")
Auto Loaderとはと Auto Loader SQL構文を参照してください。
警告
ファイル通知で Auto Loader を使用し、パイプラインまたはストリーミングテーブルの完全な更新を実行する場合は、リソースを手動でクリーンアップする必要があります。 ノートブックで CloudFilesResourceManager を使用して、クリーンアップを実行できます。
メッセージバスからのデータのロード
Delta Live Tables パイプラインを構成して、ストリーミング テーブルを含むメッセージ バスからデータを取り込むことができます。 Databricks では、ストリーミングテーブルを連続実行と拡張オートスケールと組み合わせて、メッセージバスからの低レイテンシーロードのための最も効率的なインジェストを提供することをお勧めします。 拡張オートスケールによるDelta Live Tables パイプラインのクラスター使用率の最適化を参照してください。
たとえば、次のコードは、Kafka からデータを取り込むようにストリーミングテーブルを構成します。
import dlt
@dlt.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "topic1")
.option("startingOffsets", "latest")
.load()
)
次の例のように、純粋な SQL でダウンストリーム操作を記述して、このデータに対してストリーミング変換を実行できます。
CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
*
FROM
STREAM(LIVE.kafka_raw)
WHERE ...
イベント ハブの操作例については、「 Azure イベント ハブを Delta Live Tables データソースとして使用する」を参照してください。
「ストリーミングデータソースの構成」を参照してください。
外部システムからデータを読み込む
Delta Live Tables Databricksでサポートされているあらゆるデータソースからのデータの読み込みをサポートします。 「情報ソースへの接続」を参照してください。 サポートされているデータソースについては、レイクハウスフェデレーションを使用して外部データを読み込むこともできます。 レイクハウスフェデレーション にはDatabricks Runtime 13.3 LTS以上が必要であるため、レイクハウスフェデレーション を使用するには、プレビュー チャンネルを使用するように パイプラインを構成する必要があります。
一部のデータソースは、 SQLで同等のサポートを受けていません。 これらのデータソースのいずれかでレイクハウスフェデレーションを使用できない場合は、 Python ノートブックを使用してソースからデータを取り込むことができます。 Python と SQL のソース コードを同じ Delta Live Tables パイプラインに追加できます。 次の例では、リモート PostgreSQL テーブル内のデータの現在の状態にアクセスするためのマテリアライズドビューを宣言します。
import dlt
@dlt.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
クラウドオブジェクトストレージから小さなデータセットまたは静的データセットをロードする
Apache Spark の読み込み構文を使用して、小さなデータセットまたは静的なデータセットを読み込むことができます。 Delta Live Tables では、Databricks 上の Apache Spark でサポートされているすべてのファイル形式がサポートされています。 完全な一覧については、「 データ形式のオプション」を参照してください。
次の例は、JSON を読み込んで Delta Live Tables テーブルを作成する方法を示しています。
@dlt.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;
注
SELECT * FROM format.`path`;
SQL コンストラクトは、Databricks 上のすべての SQL 環境に共通です。これは、SQL と Delta Live Tablesを使用したファイルへの直接アクセスに推奨されるパターンです。
パイプライン内のシークレットを使用してストレージ認証情報に安全にアクセスする
Databricks シークレット を使用して、アクセス キーやパスワードなどの資格情報を格納できます。 パイプラインでシークレットを構成するには、パイプライン設定クラスター構成で Spark プロパティを使用します。 Delta Live Tables パイプラインのコンピュートの設定を参照してください。
次の例では、シークレットを使用して、Azure Data Lake Storage Gen2 (ADLS Gen2) ストレージ アカウントから入力データを読み取るために必要なアクセス キーを Auto Loader格納します。これと同じ方法を使用して、パイプラインで必要なシークレット (AWS S3にアクセスするための キーや のパスワードなど)ApacheHive metastore を構成できます。
Azure Data Lake Storage Gen2 の操作の詳細については、「 Azure Data Lake Storage Gen2 と Blob Storage に接続する」を参照してください。
注
シークレット値を設定する spark_conf
コンフィギュレーション キーに spark.hadoop.
プレフィックスを追加する必要があります。
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
}
}
],
"name": "DLT quickstart using ADLS2"
}
取り替える
<storage-account-name>
を ADLS Gen2 ストレージ アカウント名に置き換えます。<scope-name>
を Databricks シークレットスコープ名に置き換えます。<secret-name>
を Azure ストレージ アカウントのアクセス キーを含むキーの名前に置き換えます。
import dlt
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
取り替える
<container-name>
は、入力データを格納する Azure ストレージ アカウント コンテナーの名前です。<storage-account-name>
を ADLS Gen2 ストレージ アカウント名に置き換えます。<path-to-input-dataset>
を入力データセットへのパスに置き換えます。
Azure Event Hubs からデータを読み込む
Azure Event Hubs は、Apache Kafka 互換インターフェースを提供するデータ ストリーミング サービスです。 Delta Live Tables ランタイムに含まれる構造化ストリーミング Kafka コネクタを使用して、Azure Event Hubs からメッセージを読み込むことができます。 Azure Event Hubs からのメッセージの読み込みと処理の詳細については、 「 Azure Event Hubs をDelta Live Tablesデータ ソースとして使用する」を参照してください。