Funções definidas pelo usuário no Databricks Connect for Scala

Observação

Este artigo aborda o Databricks Connect para Databricks Runtime 14.1 e acima.

Este artigo descreve como executar funções definidas pelo usuário com Databricks Connect for Scala. O Databricks Connect permite conectar IDEs populares, servidores Notebook e aplicativos personalizados a clusters Databricks. Para a versão Python destes artigos, consulte Funções definidas pelo utilizador em Databricks Connect for Python.

Observação

Antes de começar a usar o Databricks Connect, você deve configurar o cliente Databricks Connect.

Para o Databricks Runtime 14.1 e acima, o Databricks Connect for Scala dá suporte à execução de funções definidas pelo usuário (UDFs).

Para executar uma UDF, a classe compilada e os JARs exigidos pela UDF devem ser carregados nos clusters. A API addCompiledArtifacts() pode ser usada para especificar a classe compilada e os arquivos JAR que devem ser upload.

Observação

O Scala usado pelo cliente deve corresponder à versão do Scala nos clusters do Databricks. Para verificar a versão Scala dos clusters , consulte a seção “Ambiente do Sistema” para a versão do Databricks Runtime dos clustersem Databricks Runtime notas sobre a versão versões e compatibilidade.

O programa Scala a seguir configura uma UDF simples que eleva ao quadrado os valores em uma coluna.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    def squared(x: Int): Int = x * x

    val squared_udf = udf(squared _)

    spark.range(3)
      .withColumn("squared", squared_udf(col("id")))
      .select("squared")
      .show()
  }
}

No exemplo anterior, como a UDF está totalmente contida em Main, apenas o artefato compilado de Main é adicionado. Se a UDF se espalhar por outras classes ou usar biblioteca externa (ou seja, JARs), todas essas bibliotecas também deverão ser incluídas.

Quando a sessão do Spark já estiver inicializada, outras classes e JARs compilados poderão ser upload usando a API spark.addArtifact().

Observação

Ao fazer upload de JARs, todos os JARs de dependência transitiva devem ser incluídos para upload. As APIs não realizam nenhuma detecção automática de dependências transitivas.

APIs de conjunto de dados digitados

O mesmo mecanismo descrito na seção anterior para UDFs também se aplica a APIs dataset digitados.

APIs de dataset digitados permitem transformações de execução, como mapa, filtro e agregações no dataset resultante. Eles também são executados de forma semelhante aos UDFs nos clusters Databricks.

O aplicativo Scala a seguir usa a API map() para modificar um número em uma coluna de resultados para strings prefixadas.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    spark.range(3).map(f => s"row-$f").show()
  }
}

Embora este exemplo use a API map(), isso também se aplica a outras APIs dataset digitados, como filter(), mapPartitions() etc.