Databricks SDK for Go
この記事では、 Databricks SDK for Go を使用して、Databricks アカウント、ワークスペース、および関連リソースでの操作を自動化する方法について説明します。 この記事では、Databricks SDK for Go の README、 API リファレンス、 例を補足します。
注
この機能は ベータ版 であり、本番環境で使用しても問題ありません。
ベータ期間中、Databricks では、プロジェクトの go.mod
ファイルなど、コードが依存する Databricks SDK for Go の特定のマイナー バージョンへの依存関係をピン留めすることをお勧めします。 依存関係の固定の詳細については、「 依存関係の管理」を参照してください。
Go 用 Databricks SDK の使用を開始する
Go が既にインストールされ、既存の Go コード プロジェクトが既に 作成 され 、Databricks 認証 が構成されている開発コンピューターで、 コマンドを実行して
go.mod
Go コードの依存関係を追跡する ファイルを作成します。go mod init
go mod init sample
go mod edit -require
コマンドを実行して Databricks SDK for Go パッケージへの依存関係を取得し、0.8.0
を CHANGELOG に記載されている最新バージョンの Databricks SDK for Go パッケージに置き換えます。go mod edit -require github.com/databricks/databricks-sdk-go@v0.8.0
go.mod
ファイルは次のようになります。module sample go 1.18 require github.com/databricks/databricks-sdk-go v0.8.0
プロジェクト内で、Databricks SDK for Go をインポートする Go コード ファイルを作成します。 次の例では、次の内容の
main.go
という名前のファイルで、Databricks ワークスペース内のすべてのクラスターを一覧表示します。package main import ( "context" "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/service/compute" ) func main() { w := databricks.Must(databricks.NewWorkspaceClient()) all, err := w.Clusters.ListAll(context.Background(), compute.ListClustersRequest{}) if err != nil { panic(err) } for _, c := range all { println(c.ClusterName) } }
不足しているモジュールの依存関係を追加するには、
go mod tidy
コマンドを実行します。go mod tidy
注
エラー
go: warning: "all" matched no packages
が表示された場合は、Databricks SDK for Go をインポートする Go コード ファイルを追加するのを忘れています。main
モジュール内のパッケージのビルドとテストをサポートするために必要なすべてのパッケージのコピーを取得するには、go mod vendor
コマンドを実行します。go mod vendor
Databricks 認証用に開発用コンピューターを設定します。
go run
コマンドを実行して、main.go
という名前のファイルを想定して、Go コード ファイルを実行します。go run main.go
注
前の
w := databricks.Must(databricks.NewWorkspaceClient())
の呼び出しで引数として*databricks.Config
を設定しないことで、Databricks SDK for Go は既定のプロセスを使用して Databricks 認証を実行しようとします。この既定の動作をオーバーライドするには、「 Databricks アカウントまたはワークスペースで Databricks SDK for Go を認証する」を参照してください。
Go 用に Databricks SDK を更新する
変更ログに記載されている Databricks SDK for Go パッケージの 1 つを使用するように Go プロジェクトを更新するには、次の手順を実行します。
プロジェクトのルートから
go get
コマンドを実行し、更新を実行するための-u
フラグを指定し、Databricks SDK for Go パッケージの名前とターゲット バージョン番号を指定します。 たとえば、バージョン0.12.0
に更新するには、次のコマンドを実行します。go get -u github.com/databricks/databricks-sdk-go@v0.12.0
不足しているモジュールや古いモジュールの依存関係を追加および更新するには、
go mod tidy
コマンドを実行します。go mod tidy
go mod vendor
コマンドを実行して、main
モジュール内のパッケージのビルドとテストをサポートするために必要なすべての新規および更新されたパッケージのコピーを取得します。go mod vendor
Databricks アカウントまたはワークスペースを使用して Go 用の Databricks SDK を認証する
Databricks SDK for Go は、 Databricks クライアント統合認証 標準を実装しており、認証に対するアーキテクチャとプログラムによる統合アプローチと一貫性のあるアプローチです。 このアプローチにより、Databricks による認証の設定と自動化をより一元化し、予測可能にすることができます。 これにより、Databricks 認証を一度構成すると、認証構成をさらに変更することなく、その構成を複数の Databricks ツールや SDK で使用できます。 Go のより完全なコード例など、詳細については、「クライアント統合認証Databricks」を参照してください。
Databricks SDK for Go を使用して Databricks 認証を初期化するために使用できるコーディング パターンには、次のようなものがあります。
Databricks のデフォルト認証を使用するには、次のいずれかを実行します。
ターゲット Databricks 認証の種類に必要なフィールドを持つカスタム Databricks 構成プロファイル を作成または識別します。 次に、
DATABRICKS_CONFIG_PROFILE
環境変数をカスタム構成プロファイルの名前に設定します。ターゲットの Databricks 認証の種類に必要な環境変数を設定します。
次に、たとえば、次のように Databricks デフォルト認証を使用して
WorkspaceClient
オブジェクトをインスタンス化します。import ( "github.com/databricks/databricks-sdk-go" ) // ... w := databricks.Must(databricks.NewWorkspaceClient())
必須フィールドのハードコーディングはサポートされていますが、Databricks personal アクセストークンなどの機密情報がコード内で公開されるリスクがあるため、お勧めしません。 次の例では、Databricks トークン認証のために Databricks ホストとアクセストークンの値をハードコーディングします。
import ( "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/config" ) // ... w := databricks.Must(databricks.NewWorkspaceClient(&databricks.Config{ Host: "https://...", Token: "...", }))
Databricks SDK for Go README の 「認証 」も参照してください。
例
次のコード例は、Databricks SDK for Go を使用して、クラスターの作成と削除、ジョブの実行、アカウント ユーザーの一覧表示を行う方法を示しています。 これらのコード例では、 Databricks SDK for Go の既定の Databricks 認証 プロセスを使用します。
その他のコード例については、GitHub の Databricks SDK for Go リポジトリにある サンプル フォルダーを参照してください。
クラスターを作成する
このコード例では、利用可能な最新の Databricks Runtime 長期サポート (LTS) バージョンと、ローカル ディスクを持つ使用可能な最小のクラスター ノード タイプを使用してクラスターを作成します。 このクラスターにはワーカーが 1 つあり、クラスターは 15 分のアイドル時間が経過すると自動的に終了します。 CreateAndWait
メソッドを呼び出すと、新しいクラスターがワークスペースで実行されるまでコードが停止します。
package main
import (
"context"
"fmt"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/compute"
)
func main() {
const clusterName = "my-cluster"
const autoTerminationMinutes = 15
const numWorkers = 1
w := databricks.Must(databricks.NewWorkspaceClient())
ctx := context.Background()
// Get the full list of available Spark versions to choose from.
sparkVersions, err := w.Clusters.SparkVersions(ctx)
if err != nil {
panic(err)
}
// Choose the latest Long Term Support (LTS) version.
latestLTS, err := sparkVersions.Select(compute.SparkVersionRequest{
Latest: true,
LongTermSupport: true,
})
if err != nil {
panic(err)
}
// Get the list of available cluster node types to choose from.
nodeTypes, err := w.Clusters.ListNodeTypes(ctx)
if err != nil {
panic(err)
}
// Choose the smallest available cluster node type.
smallestWithLocalDisk, err := nodeTypes.Smallest(clusters.NodeTypeRequest{
LocalDisk: true,
})
if err != nil {
panic(err)
}
fmt.Println("Now attempting to create the cluster, please wait...")
runningCluster, err := w.Clusters.CreateAndWait(ctx, compute.CreateCluster{
ClusterName: clusterName,
SparkVersion: latestLTS,
NodeTypeId: smallestWithLocalDisk,
AutoterminationMinutes: autoTerminationMinutes,
NumWorkers: numWorkers,
})
if err != nil {
panic(err)
}
switch runningCluster.State {
case compute.StateRunning:
fmt.Printf("The cluster is now ready at %s#setting/clusters/%s/configuration\n",
w.Config.Host,
runningCluster.ClusterId,
)
default:
fmt.Printf("Cluster is not running or failed to create. %s", runningCluster.StateMessage)
}
// Output:
//
// Now attempting to create the cluster, please wait...
// The cluster is now ready at <workspace-host>#setting/clusters/<cluster-id>/configuration
}
クラスターを完全に削除する
このコード例では、指定したクラスター ID を持つクラスターをワークスペースから完全に削除します。
package main
import (
"context"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/clusters"
)
func main() {
// Replace with your cluster's ID.
const clusterId = "1234-567890-ab123cd4"
w := databricks.Must(databricks.NewWorkspaceClient())
ctx := context.Background()
err := w.Clusters.PermanentDelete(ctx, compute.PermanentDeleteCluster{
ClusterId: clusterId,
})
if err != nil {
panic(err)
}
}
ジョブの実行
このコード例では、指定したクラスターで指定したノートブックを実行する Databricks ジョブを作成します。 コードを実行すると、既存のノートブックのパス、既存のクラスター ID、および関連するジョブ設定がターミナルのユーザーから取得されます。 RunNowAndWait
メソッドを呼び出すと、ワークスペースで新しいジョブの実行が完了するまでコードが停止します。
package main
import (
"bufio"
"context"
"fmt"
"os"
"strings"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/jobs"
)
func main() {
w := databricks.Must(databricks.NewWorkspaceClient())
ctx := context.Background()
nt := jobs.NotebookTask{
NotebookPath: askFor("Workspace path of the notebook to run:"),
}
jobToRun, err := w.Jobs.Create(ctx, jobs.CreateJob{
Name: askFor("Some short name for the job:"),
Tasks: []jobs.JobTaskSettings{
{
Description: askFor("Some short description for the job:"),
TaskKey: askFor("Some key to apply to the job's tasks:"),
ExistingClusterId: askFor("ID of the existing cluster in the workspace to run the job on:"),
NotebookTask: &nt,
},
},
})
if err != nil {
panic(err)
}
fmt.Printf("Now attempting to run the job at %s/#job/%d, please wait...\n",
w.Config.Host,
jobToRun.JobId,
)
runningJob, err := w.Jobs.RunNow(ctx, jobs.RunNow{
JobId: jobToRun.JobId,
})
if err != nil {
panic(err)
}
jobRun, err := runningJob.Get()
if err != nil {
panic(err)
}
fmt.Printf("View the job run results at %s/#job/%d/run/%d\n",
w.Config.Host,
jobRun.JobId,
jobRun.RunId,
)
// Output:
//
// Now attempting to run the job at <workspace-host>/#job/<job-id>, please wait...
// View the job run results at <workspace-host>/#job/<job-id>/run/<run-id>
}
// Get job settings from the user.
func askFor(prompt string) string {
var s string
r := bufio.NewReader(os.Stdin)
for {
fmt.Fprint(os.Stdout, prompt+" ")
s, _ = r.ReadString('\n')
if s != "" {
break
}
}
return strings.TrimSpace(s)
}
Unity Catalogボリューム内のファイルを管理する
このコード例は、Unity Catalogボリュームにアクセスするために、 WorkspaceClient
内のfiles
機能へのさまざまな呼び出しを示しています。
package main
import (
"context"
"io"
"os"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/files"
)
func main() {
w := databricks.Must(databricks.NewWorkspaceClient())
catalog := "main"
schema := "default"
volume := "my-volume"
volumePath := "/Volumes/" + catalog + "/" + schema + "/" + volume // /Volumes/main/default/my-volume
volumeFolder := "my-folder"
volumeFolderPath := volumePath + "/" + volumeFolder // /Volumes/main/default/my-volume/my-folder
volumeFile := "data.csv"
volumeFilePath := volumeFolderPath + "/" + volumeFile // /Volumes/main/default/my-volume/my-folder/data.csv
uploadFilePath := "./data.csv"
// Create an empty folder in a volume.
err := w.Files.CreateDirectory(
context.Background(),
files.CreateDirectoryRequest{DirectoryPath: volumeFolderPath},
)
if err != nil {
panic(err)
}
// Upload a file to a volume.
fileUpload, err := os.Open(uploadFilePath)
if err != nil {
panic(err)
}
defer fileUpload.Close()
w.Files.Upload(
context.Background(),
files.UploadRequest{
Contents: fileUpload,
FilePath: volumeFilePath,
Overwrite: true,
},
)
// List the contents of a volume.
items := w.Files.ListDirectoryContents(
context.Background(),
files.ListDirectoryContentsRequest{DirectoryPath: volumePath},
)
for {
if items.HasNext(context.Background()) {
item, err := items.Next(context.Background())
if err != nil {
break
}
println(item.Path)
} else {
break
}
}
// List the contents of a folder in a volume.
itemsFolder := w.Files.ListDirectoryContents(
context.Background(),
files.ListDirectoryContentsRequest{DirectoryPath: volumeFolderPath},
)
for {
if itemsFolder.HasNext(context.Background()) {
item, err := itemsFolder.Next(context.Background())
if err != nil {
break
}
println(item.Path)
} else {
break
}
}
// Print the contents of a file in a volume.
file, err := w.Files.DownloadByFilePath(
context.Background(),
volumeFilePath,
)
if err != nil {
panic(err)
}
bufDownload := make([]byte, file.ContentLength)
for {
file, err := file.Contents.Read(bufDownload)
if err != nil && err != io.EOF {
panic(err)
}
if file == 0 {
break
}
println(string(bufDownload[:file]))
}
// Delete a file from a volume.
w.Files.DeleteByFilePath(
context.Background(),
volumeFilePath,
)
// Delete a folder from a volume.
w.Files.DeleteDirectory(
context.Background(),
files.DeleteDirectoryRequest{
DirectoryPath: volumeFolderPath,
},
)
}
アカウントユーザーを一覧表示する
このコード例では、Databricks アカウント内で使用可能なユーザーを一覧表示します。
package main
import (
"context"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/iam"
)
func main() {
a := databricks.Must(databricks.NewAccountClient())
all, err := a.Users.ListAll(context.Background(), iam.ListAccountUsersRequest{})
if err != nil {
panic(err)
}
for _, u := range all {
println(u.UserName)
}
}