Joins are often one of the most expensive parts of a query. Once tables get large, the join can heavily impact both runtime and memory usage, especially when the engine has to build a large hash table as part of execution.
If the join keys are already sorted, Polars can now take a cheaper path: a streaming sort-merge join. This post explains how that algorithm works, how to make use of it, how to verify that Polars chose the merge-join using the physical plan, and when this path is worth using. At the end, we show you can reach up to 18x performance improvements.
Why sort-merge joins matter
A sort-merge join becomes attractive when the expensive part, sorting, has already been done or is unnecessary because the data already arrives ordered. You can think of taxi ride logs, ticker data, or sensor readings.
In that situation, it has three useful properties:
- it does not require any intermediate data structures
- it has low additional memory pressure
- it has sequential access patterns that fit streaming execution well
However, if your data is not sorted, a hash join is often still the better default. You can view it as “sorted data unlocks a cheaper join strategy”.
The basic sort-merge algorithm
At the core of the algorithm are two sorted sequences and one linear scan.
The naive algorithm
The naive approach requires join keys that are both sorted and unique. This would be the pseudocode:
result = []
while not left.done and not right.done:
if left < right:
left.next()
elif left > right:
right.next()
else:
result.append(left + right)
left.next()
right.next()
return result
Let’s take the following sequences and see how this plays out:
Left = [1, 3, 4] and Right = [2, 3, 4].
Step 1:
Left : [1] 3 4
Right: [2] 3 4
Output: []
1 < 2, so Left advances.
Step 2:
Left : 1 [3] 4
Right: [2] 3 4
Output: []
3 > 2, so Right advances.
Step 3:
Left : 1 [3] 4
Right: 2 [3] 4
Output: [(3, 3)]
Now the keys match, so we emit one joined row and advance both sequences.
Step 4:
Left : 1 3 [4]
Right: 2 3 [4]
Output: [(3, 3), (4, 4)]
Again, the keys match, so we emit and advance.
Step 5:
Left : 1 3 4 [done]
Right: 2 3 4 [done]
Output: [(3, 3), (4, 4)]
At least one sequence is done, so we’re finished.
That is the whole idea: each side moves forward once, and we never need to jump backwards. Two pointers advance in lockstep.
The problem: duplicate keys
However, real data has repeated keys.
What happens when the sequences we want to join look like Left: [2, 2] and Right: [2, 2]?
With Left = [2, 2] and Right = [2, 2], the naive scan looks fine at first:
Frame 1:
Left : [2] 2
Right: [2] 2
Output: [(L[0], R[0])]
Frame 2:
Left : 2 [2]
Right: 2 [2]
Output: [(L[0], R[0]), (L[1], R[1])]
Then the sequences run out:
Frame 3:
Left : 2 2 [done]
Right: 2 2 [done]
Output: [(L[0], R[0]), (L[1], R[1])]
We still owe (L[0], R[1]) and (L[1], R[0]), but the scan has no way to return to the start of the matching run and emit the full Cartesian product of matches.
We need to be able to rewind.
The fix: mark
mark saves where matching keys started in Right.
When Left advances to another row with the same key, we rewind, advance only Left, and match again, instead of advancing both Left and Right.
This is the pseudocode for an inner join:
result = []
while True:
if left.done:
break
if right.done and not mark:
break
if not mark:
while left < right:
left.next()
if left.done: break
while left > right:
right.next()
if right.done: break
mark = right
if not right.done and left == right:
result.append(left + right)
right.next()
else:
right = mark
left.next()
mark = None
return result
Step 1: save mark when the first match is found.
Left : [2] 2
Right: [2] 2
Mark : ^
Output: []
Steps 2 & 3: consume the whole matching run for L0.
Left : [2] 2
Right: 2 [2]
Mark : ^
Output: [(L[0], R[0]), (L[0], R[1])]
Step 4: rewind Right to mark, then advance Left.
Left : 2 [2]
Right: [2] 2
Mark : ^
Output: [(L[0], R[0]), (L[0], R[1])]
Steps 5 & 6: consume the same right-hand run again, now for L1.
Left : 2 [2]
Right: 2 [2]
Mark : ^
Output: [(L[0], R[0]), (L[0], R[1]), (L[1], R[0]), (L[1], R[1])]
Step 7: We rewind Right, but then run out of elements on the Left side when we advance it.
We finish, emitting (L[0], R[0]), (L[0], R[1]), (L[1], R[0]), (L[1], R[1]).
Left : 2 2 [done]
Right: [2] 2
Mark : ^
Output: [(L[0], R[0]), (L[0], R[1]), (L[1], R[0]), (L[1], R[1])]
That is the purpose of mark: turn one contiguous run of equal keys on the right into a reusable range for every matching row on the left.
The important detail is that we do not need to remember all previous rows. We only need to remember where the current run of equal keys started. As long as the left key stays the same, that one bookmark is enough.
Duplicate keys are one of the complications, but they are not the only one. Null semantics, outer-join behavior, composite keys, and very large duplicate runs all add extra edge cases and bookkeeping. The core scan stays the same, but the full production algorithm has to account for those cases as well.
When Polars can use it
The query planner takes your optimized query plan and determines what algorithms to use to perform it most efficiently. (You can read more on how Polars executes its queries here: https://pola.rs/posts/polars_birds_eye_view/ ) Polars can take the sort-merge path for supported equi-joins and range joins when the query planner knows that both join keys are sorted.
This requires three conditions:
- the query runs on the streaming engine
- both join columns are known to be ordered on the join key
- the join is an equality join or a range join on those ordered keys
If those conditions are not met, Polars can still execute the join, but it will choose a different strategy.
The important practical point is that “known to be sorted” means known to the optimizer, not just known to you. If the plan does not contain sortedness information, Polars has to assume the safe thing and fall back to another join strategy.
Using the streaming sort-merge join
The simplest way to make that possible is to preserve or declare sortedness in your lazy plan and run the query on the streaming engine:
import polars as pl
# Set the streaming engine as default
pl.Config.set_engine_affinity("streaming")
# These input files have sorted "key" columns
left = pl.scan_parquet("left.parquet").set_sorted("key")
right = pl.scan_parquet("right.parquet").set_sorted("key")
result = (
left
.join(
right,
on="key",
how="inner",
)
.collect()
)
This gives the optimizer the ingredients it needs to pick a sort-merge join.
There is one important caveat: set_sorted is not a sort.
It is a trust-me declaration to the optimizer.
If the column is not actually sorted, Polars will not sort it for you or raise an error for you here.
You would be telling the engine it can rely on ordering that does not exist, which can lead to incorrect results.
If you are not absolutely sure the input is already ordered, apply .sort() explicitly instead.
set_sorted is for cases where the data source or an upstream step already guarantees the ordering and you want Polars to retain that information.
How to verify that Polars chose it
After building your lazy query, inspect the physical plan before collecting:
query = left.join(right, on="key", how="inner")
query.show_graph(plan_stage="physical")
That is worth checking every time you rely on sortedness.
It closes the loop between “I think this data is sorted” and “the optimizer actually used that fact”.
If you want to reproduce this locally, run the snippet above on a query where both sides are truly sorted, then compare the physical plan before and after removing set_sorted or the explicit sort step.
When to use it
If you would need to sort both sides purely to make one join eligible for sort-merge, the hash join is often still cheaper overall. The sort-merge path becomes much more compelling when one of these is true:
- the inputs are already sorted
- you need the data sorted anyway for later steps
Microbenchmark
We benchmarked on synthetic NumPy data with 100_000_000 rows per side, unique integer keys, and one payload column on each side, running on a MacBook M4 Pro with 14 cores.
| Query shape | Engine path | Time |
|---|---|---|
| No sortedness metadata | Streaming hash join | 1.333s |
.set_sorted("key") on both sides | Streaming merge join | 0.074s |
| Speedup | 18.0x |
Note that this benchmark uses unique keys, so it is close to a best-case scenario for sort-merge joins. Workloads with many duplicate keys and nulls will see a smaller gap because the merge-join path has to do more rewind work.
The code for the benchmark is included as an appendix below so you can reproduce it yourself.
Real-world benchmark: NYC taxi
The synthetic benchmark above uses uniquely-keyed, perfectly pre-sorted data, which is a best case. Below is what the same comparison looks like on real data.
We use the NYC Yellow Taxi Trip Records, a public dataset of 38 million taxi trips recorded in New York City during 2023. To create two tables of similar size, we split the trips into two groups by pickup zone (even-numbered and odd-numbered zone IDs), representing two separate event streams that each arrive sorted by time. Both tables are pre-sorted by pickup timestamp and written to Parquet before the benchmark starts, like they would be in a pipeline where time-ordering is required for downstream operations.
The join finds all trip pairs from the two zone groups that started at the same second.
| Query shape | Engine path | Time |
|---|---|---|
| No sortedness metadata | Streaming hash join | 0.347s |
.set_sorted("pickup_ts") on both sides | Streaming merge join | 0.099s |
| Speedup | 3.5x |
The speedup is smaller than the synthetic case (3.5x vs 18.0x) because the real data has about two pickups per second on average, which means the merge join does more rewind work through the duplicate key groups, which is exactly the tradeoff described in the note above.
The code for this benchmark is in the appendix as well.
Conclusion
The core idea of a sort-merge join is: if both sides are ordered, a join can become one linear scan instead of a large hash-table build and probe.
We also went over duplicate keys as a complication, and how mark handles them by turning one matching run on the right into a reusable range.
In Polars, this becomes practical when the streaming engine knows the join keys are sorted.
Once sortedness information is present in the plan, you can verify the result in the physical graph, where merge-join shows up directly.
Sortedness is not just a logical property of your domain or data; it can be exploited by both the engine and the optimizer by choosing different algorithms. When your keys are already ordered, or when ordering is useful elsewhere in the query, Polars can turn that into a cheaper join.
Appendix A: microbenchmark code
The following script creates two best-case datasets and times a hash and merge join.
import timeit
import numpy as np
import polars as pl
n_rows = 100_000_000
pl.Config.set_engine_affinity("streaming")
key = np.arange(n_rows, dtype=np.uint32)
left_payload = (key % 97).astype(np.uint16)
right_payload = (key % 89).astype(np.uint16)
left = pl.LazyFrame(
{
"key": key,
"left_payload": left_payload,
}
)
right = pl.LazyFrame(
{
"key": key,
"right_payload": right_payload,
}
)
hash_query = left.join(right, on="key", how="inner")
merge_query = left.set_sorted("key").join(
right.set_sorted("key"),
on="key",
how="inner",
)
hash_time = timeit.timeit(lambda: hash_query.collect(), number=1)
merge_time = timeit.timeit(lambda: merge_query.collect(), number=1)
speedup = hash_time / merge_time
print(f"hash join: {hash_time:.3f}s")
print(f"merge join: {merge_time:.3f}s")
print(f"speedup: {speedup:.2f}x")
Run it like this:
uv run --isolated --with polars,numpy python benchmark.py
If you want the benchmark to run longer on a larger machine, increase n_rows.
If you do that, make sure you have enough RAM for two input columns plus the joined output.
Appendix B: NYC taxi benchmark code
Download the 2023 yellow taxi Parquet files and place them in a specially made taxi directory using the following command:
mkdir -p taxi && cd taxi
for m in $(seq -w 1 12); do
curl -O "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-${m}.parquet"
done
Then run this benchmark script in that directory. This script preprocesses the dataset by sorting trips into two zone-split Parquet files, then benchmarks hash join vs merge join on those pre-sorted tables:
import timeit
import polars as pl
pl.Config.set_engine_affinity("streaming")
def prepare() -> None:
"""Sort trips by pickup timestamp and write two zone-split tables."""
all_trips = (
pl.scan_parquet(
"yellow_tripdata_2023-*.parquet",
# PULocationID changed from Int32 to Int64 across monthly files
cast_options=pl.ScanCastOptions(integer_cast="upcast"),
# Airport_fee column was added mid-year and is absent in earlier files
extra_columns="ignore",
)
.filter(pl.col("tpep_pickup_datetime").dt.year() == 2023)
.with_columns(
pl.col("tpep_pickup_datetime").dt.truncate("1s").alias("pickup_ts"),
)
.select(["pickup_ts", "PULocationID", "fare_amount", "trip_distance"])
.sort("pickup_ts")
.collect()
)
all_trips.filter(pl.col("PULocationID") % 2 == 0).write_parquet("yellow_even.parquet")
all_trips.filter(pl.col("PULocationID") % 2 == 1).write_parquet("yellow_odd.parquet")
prepare()
even = pl.scan_parquet("yellow_even.parquet")
odd = pl.scan_parquet("yellow_odd.parquet")
hash_query = even.join(odd, on="pickup_ts", how="inner", suffix="_odd")
merge_query = even.set_sorted("pickup_ts").join(
odd.set_sorted("pickup_ts"),
on="pickup_ts",
how="inner",
suffix="_odd",
)
hash_time = timeit.timeit(lambda: hash_query.collect(), number=1)
merge_time = timeit.timeit(lambda: merge_query.collect(), number=1)
speedup = hash_time / merge_time
print(f"hash join: {hash_time:.3f}s")
print(f"merge join: {merge_time:.3f}s")
print(f"speedup: {speedup:.1f}x")
You can run the benchmark script with:
uv run --isolated --with polars python taxi_benchmark.py