Databricks レイクハウスにデータを取り込む

Databricks は、Delta Lake がサポートするレイクハウスにデータを取り込むためのさまざまな方法を提供します。 この記事では、サポートされているインジェスト ツールと、データソースや待機時間などの条件に基づいて使用する方法に関するガイダンスの一覧を示します。

インジェスト方法

次の方法を使用して、Databricks にデータを取り込むことができます。

  • 頻度の低い処理のための一連のデータ行のバッチ取り込み

  • 個々のデータ行またはデータ行のセットが到着したときのストリーミング取り込みで、リアルタイム処理を行います

取り込まれたデータは Delta テーブル に読み込まれ、ダウンストリーム データと AI のユース ケース全体で使用できます。 Databricksレイクハウスのアーキテクチャにより、ユースケース間でデータを複製する必要がなく、Unity Catalogを活用して、すべてのデータにわたって一元化されたアクセス制御、監査、リネージ、およびデータディスカバリーを行うことができます。

バッチ取り込み

バッチインジェストでは、データを行のセット(またはバッチ)として Databricks にロードします。多くの場合、スケジュール(毎日など)に基づいて、または手動でトリガーされます。 これは、従来の抽出、変換、ロード (ETL) のユースケースの「抽出」部分を表しています。 バッチ インジェストを使用して、次の場所からデータを読み込むことができます。

  • CSV などのローカル ファイル

  • Amazon S3、Azureデータレイクストレージ、Google Cloud Storage などのクラウドオブジェクトストレージ

バッチ取り込みでは、CSV、TSV、JSON、XML、Avro、ORC、Parquet、テキスト ファイルなど、さまざまなファイル ソース形式がサポートされています。

Databricks は、従来のバッチ インジェスト オプションと増分バッチ インジェスト オプションの両方をサポートしています。 従来のバッチ取り込みでは、実行されるたびにすべてのレコードが処理されますが、増分バッチ取り込みでは、データソース内の新しいレコードが自動的に検出され、すでに取り込まれたレコードは無視されます。 つまり、処理するデータが少なくて済むため、インジェスト ジョブの実行が速くなり、コンピュート リソースをより効率的に使用できます。

従来の (1 回限りの) バッチ取り込み

データ追加UIを使用して、ローカルデータファイルをアップロードしたり、公開URLからファイルをダウンロードしたりできます。 「ファイルのアップロード」を参照してください。

増分バッチ取り込み

このセクションでは、サポートされている増分バッチ取り込みツールについて説明します。

ストリーミング テーブル

CREATE STREAMING TABLE SQL コマンドを使用すると、クラウド・オブジェクト・ストレージからストリーミング・テーブルにデータを増分的にロードできます。CREATE STREAMING TABLEを参照してください。

例: ストリーミング テーブルを使用した増分バッチ取り込み

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")
クラウドオブジェクトストレージコネクタ

Auto Loaderは、組み込み Cloud Object Storage Connector であり、新しいデータ ファイルが Amazon S3 (S3)、 Azure データレイク Storage Gen 2 (ALDS2)、または Google Cloud Storage (GCS) に到着したときに、段階的かつ効率的に処理できます。 Auto Loaderを参照してください。

例: Auto Loader を使用した増分バッチ取り込み

df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data")
  .schema("/databricks-datasets/retail-org/customers/schema")
  .load("/databricks-datasets/retail-org/customers/")

ストリーミング インジェスト

ストリーミング インジェストでは、データ行またはデータ行のバッチが生成されたときに継続的に読み込まれるため、ほぼリアルタイムで到着したときにクエリを実行できます。 ストリーミングインジェストを使用して、Apache Kafka、Amazon Kinesis、Google Pub/Sub、Apache Pulsar などのソースからストリーミングデータをロードできます。

Databricks では、組み込みコネクタを使用したストリーミング インジェストもサポートされています。 これらのコネクタを使用すると、ストリーミング ソースから到着する新しいデータを段階的かつ効率的に処理できます。 ストリーミングデータソースの設定を参照してください。

例: Kafka からのストリーミング取り込み

spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "topic1")
    .option("startingOffsets", "latest")
    .load()

Delta Live Tables を使用したバッチとストリーミングのインジェストBatch and streaming ingestion with Delta Live Tables

Databricks では、Delta Live Tables を使用して、信頼性とスケーラビリティに優れたデータ処理パイプラインを構築することをお勧めします。 Delta Live Tables はバッチ取り込みとストリーミング取り込みの両方をサポートしており、 Auto Loaderでサポートされている任意のデータソースからデータを取り込むことができます。

例: Delta Live Tables を使用した増分バッチ取り込み

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

例: Delta Live Tables を使用した Kafka からのストリーミング インジェスト

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

インジェスト スケジュール

データは、1 回限りの操作として、定期的なスケジュールで、または継続的に取り込むことができます。

  • ほぼリアルタイムのストリーミングの使用例では、連続モードを使用します。

  • バッチ取得のユースケースでは、1 回だけ取り込むか、定期的なスケジュールを設定します。

「ジョブによるインジェスト」および「トリガー パイプライン モードと継続的パイプライン モード」を参照してください。

インジェストパートナー

多くのサードパーティ ツールは、Databricks へのバッチまたはストリーミング インジェストをサポートしています。 Databricks はさまざまなサードパーティ統合を検証していますが、ソースシステムへのアクセスを設定し、データを取り込む手順はツールによって異なります。 検証済みのツールの一覧については、「 インジェスト パートナー 」を参照してください。 一部の技術パートナーは、サードパーティ ツールをレイクハウス データに簡単に接続できる UI を提供する Databricks Partner Connectでも紹介されています。

DIYインジェスト

Databricks 一般的なコンピュートプラットフォームを提供します。 その結果、Python や Java など、Databricks でサポートされている任意のプログラミング言語を使用して、独自のインジェスト コネクタを作成できます。 また、データロードツール、Airbyte、Debeziumなどの一般的なオープンソースコネクタライブラリをインポートして活用することもできます。

データ取り込みの代替手段

Databricks では、大量のデータ、低レイテンシのクエリ、サードパーティ API の制限に対応するようにスケーリングできるため、ほとんどのユース ケースでインジェストを推奨しています。 インジェストでは、ソース システムから Databricks にデータがコピーされるため、重複するデータが発生し、時間の経過と共に古くなる可能性があります。 データをコピーしない場合は、次のツールを使用できます。

  • レイクハウスフェデレーション を使用すると、データを移動せずに外部データソースをクエリできます。

  • Delta Sharing を使用すると、プラットフォーム、クラウド、リージョン間でデータを安全に共有できます。

ただし、データをコピーしたくない場合は、レイクハウスフェデレーションまたは Delta Sharingを使用します。

Delta Sharing を使用する場合

Delta Sharing は、次のシナリオで選択します。

  • データ重複の制限

  • 可能な限り最新のデータのクエリ

レイクハウスフェデレーションを使用する場合

次のシナリオでは、レイクハウスフェデレーションを選択します。

  • ETL パイプラインのアドホック レポート作成または概念実証作業

Delta Lakeへのデータの移行

既存のデータを Delta Lake に移行する方法については、Delta Lake へのデータの移行を参照してください。

COPY INTO (レガシー)

CREATE STREAMING TABLE SQL コマンドは、クラウド オブジェクト ストレージからの増分インジェストに推奨される従来の COPY INTO SQL コマンドの代替手段です。 「COPY INTO」を参照してください。よりスケーラブルで堅牢なファイルインジェストエクスペリエンスを実現するために、DatabricksSQLユーザーはCOPY INTOではなくストリーミングテーブルを活用することをお勧めします。