Work with joins on Databricks

Databricks supports ANSI standard join syntax. This article describes differences between joins with batch and stream processing and provides some recommendations for optimizing join performance.

Note

Databricks also supports standard syntax for the set operators UNION, INTERSECT, and EXCEPT. See Set operators.

Differences between streaming and batch joins

Joins on Databricks are either stateful or stateless.

All batch joins are stateless joins. Results process immediately and reflect data at the time the query runs. Each time the query executes, new results are calculated based on the specified source data. See Batch joins.

Joins between two streaming data sources are stateful. In stateful joins, Databricks tracks information about the data sources and the results and iteratively updates the results. Stateful joins can provide powerful solutions for online data processing, but can be difficult to implement effectively. They have complex operational semantics depending on the output mode, trigger interval, and watermark. See Stream-stream joins.

Stream-static joins are stateless, but provide a good option for joining an incremental data source (such as a facts table) with a static data source (such as a slowly-changing dimensional table). Rather than joining all records from both sides each time a query executes, only newly received records from the streaming source are joined with the current version of the static table. See Stream-static joins.

Batch joins

Databricks supports standard SQL join syntax, including inner, outer, semi, anti, and cross joins. See JOIN.

Stream-stream joins

Joining two streaming data sources can present significant challenges in managing state information and reasoning about results computation and output. Before implementing a stream-stream join, Databricks recommends developing a strong understanding of the operational semantics for stateful streaming, including how watermarks impact state management. See the following articles:

Databricks recommends specifying watermarks for both sides of all stream-steam joins. The following join types are supported:

  • Inner joins

  • Left outer joins

  • Right outer joins

  • Full outer joins

  • Left semi joins

See the Apache Spark Structured Streaming documentation on stream-steam joins.

Stream-static joins

Note

The described behavior for stream-static joins assumes that the static data is stored using Delta Lake.

A stream-static join joins the latest valid version of a Delta table (the static data) to a data stream using a stateless join.

When Databricks processes a micro-batch of data in a stream-static join, the latest valid version of data from the static Delta table joins with the records present in the current micro-batch. Because the join is stateless, you do not need to configure watermarking and can process results with low latency. The data in the static Delta table used in the join should be slowly-changing.

The following example demonstrates this pattern:

streamingDF = spark.readStream.table("orders")
staticDF = spark.read.table("customers")

query = (streamingDF
  .join(staticDF, streamingDF.customer_id==staticDF.id, "inner")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .table("orders_with_customer_info")
)

Optimize join performance

Compute with Photon enabled always selects the best join type. See What is Photon?.

Using a recent Databricks Runtime version with Photon enabled generally provides good join performance, but you should also consider the following recommendations:

  • Cross joins are very expensive. Remove cross joins from workloads and queries that require low latency or frequent recomputation.

  • Join order matters. When performing multiple joins, always join your smallest tables first and then join the result with larger tables.

  • The optimizer can struggle on queries with many joins and aggregations. Saving out intermediate results can accelerate query planning and computing results.

  • Keep fresh statistics to improve performance. Run the query ANALYZE TABLE table_name COMPUTE STATISTICS to update statistics in the query planner.

Note

In Databricks Runtime 14.3 LTS and above, you can modify the columns that Delta Lake collects stats on for data skipping and then recompute existing statistics in the Delta log. See Specify Delta statistics columns.

Join hints on Databricks

Apache Spark supports specifying join hints for range joins and skew joins. Hints for skew joins are not necessary as Databricks automatically optimizes these joins. See Hints

Hints for range joins can be useful if join performance is poor and you are performing inequality joins. Examples include joining on timestamp ranges or a range of clustering IDs. See Range join optimization.