A good library abstracts away many complexities for its user. Polars is no different in this regard, as it maintains a philosophy that queries you write should be performant by default without knowing any of the internals. However, many users are interested in what happens under the hood either as a learning experience or to squeeze that last bit of performance out of their queries. In this blog post, we will provide a bird’s eye view of how Polars works and in future posts we will deep dive into each of its components.
High level overview
So, what is Polars? A short description would be “a query engine with a DataFrame frontend”. This is too high level even for a bird’s eye view. So let’s dive into the two elements, DataFrame and query engine, a bit more by looking at how a query gets executed. By taking a step-by-step journey through the execution of a query, we can observe each component in action and understand its role and purpose.
From a bird’s eye view, the execution of a query goes as follows. First we parse the query and validate it into a logical plan. The plan describes what the user intends to do, but not the how. Then our query optimizer traverses this plan (several times) to optimize any unnecessary work and produces an optimized logical plan. Following this optimization phase, the query planner transforms this logical plan into a physical plan, which outlines how the query is to be executed. This finalized physical plan serves as the ultimate input for the actual execution of the query and runs our compute kernels.
Query
When you interact with Polars, you use our DataFrame API. This API is specifically designed to allow for parallel execution and with performance in mind. Writing a Polars query in that sense is writing a small program (or this case query) in a domain-specific language (DSL) designed by Polars. This DSL has its own set of rules governing which queries are valid and which ones are not.
For this post, let’s use the famous NYE taxi dataset with taxi trips1. In the example below we calculate the average cost per minute for a trip over 25 dollars by zone. This case is simple enough to be easily understood while containing enough depth to showcase the purpose of the query engine.
import polars as pl
query = (
pl.scan_parquet("yellow_tripdata_2023-01.parquet")
.join(pl.scan_csv("taxi_zones.csv"), left_on="PULocationID", right_on="LocationID")
.filter(pl.col("total_amount") > 25)
.group_by("Zone")
.agg(
(pl.col("total_amount") /
(pl.col("tpep_dropoff_datetime") - pl.col("tpep_pickup_datetime")).dt.total_minutes()
).mean().alias("cost_per_minute")
).sort("cost_per_minute",descending=True)
)
The query above is of type LazyFrame
. It returns instantly while the NY taxi trips dataset is over 3 million rows, so what has happened? The statement defines the query, but does not yet execute it. This concept is known as lazy evaluation and is one of the key strengths of Polars. If you look into the data structure on the Rust side, you will see it contains two elements: a logical_plan
and configuration flags for the optimizer opt_state
.
pub struct LazyFrame {
pub logical_plan: LogicalPlan,
pub(crate) opt_state: OptState,
}
The logical plan is a tree with the data sources as leaves of the tree and the transformations as nodes. The plan describes the structure of a query and the expressions it contains.
pub enum LogicalPlan {
/// Filter on a boolean mask
Selection {
input: Box<LogicalPlan>,
predicate: Expr,
},
/// Column selection
Projection {
expr: Vec<Expr>,
input: Box<LogicalPlan>,
schema: SchemaRef,
options: ProjectionOptions,
},
/// Join operation
Join {
input_left: Box<LogicalPlan>,
input_right: Box<LogicalPlan>,
schema: SchemaRef,
left_on: Vec<Expr>,
right_on: Vec<Expr>,
options: Arc<JoinOptions>,
},
...
}
One important step when converting your query into a logical plan is validation. Polars knows the schema of the data upfront and can validate if the transformations are correct. This ensures you don’t run into any errors halfway through executing a query. For instance, defining a query where you select a column that does not exist returns an error before execution
pl.LazyFrame([]).select(pl.col("does_not_exist"))
polars.exceptions.ColumnNotFoundError: column_does_not_exist
Error originated just after this operation:
DF []; PROJECT */0 COLUMNS; SELECTION: "None"
We can view the logical plan by calling show_graph
on a LazyFrame
:
query.show_graph(optimized=False)
Query Optimization
The goal of the query optimizer is to optimize the LogicalPlan
for performance. It does this by traversing the tree structure and modifying/adding/removing nodes.There are many types of optimizations that will lead to faster execution, for instance changing the order of operations. Generally, you want filter
operations to occur as early as possible as it allows you to throw away any unused data and avoid unnessary work. In the example we can show our optimized logical plan with the same show_graph
function:
query.show_graph()
At first glance, it might look like both plans (optimized vs non optimized) are the same. However, two important optimizations have occured Projection pushdown and Predicate pushdown.
Polars has analyzed the query and noted that only use a small set of columns is used. For the trip data there are four columns. For the zone data there are two columns. Reading in the entire dataset would be wasteful as there is no need for the other columns. Therefore, by analyzing your query, Projection Pushdown will speed up reading in the data significantly. You can see the optimization in the leaf nodes under $\pi$ 4/19 and $\pi$ 2/4.
With Predicate pushdown Polars filters data as close to the source as possible. This avoids reading in data that a later stage in the query will be discarded. The filter node has been moved to the parquet reader under $\sigma$ which indicates our reader will immediately remove rows which do not match our filter. The next join operation will a lot faster as there is less data coming in.
Polars supports a range of optimizations which can be viewed here.
Query Execution
Once the logical plan has been optimized, it is time for execution. The logical plan is a blueprint for what the user wants to execute, not the how. This is where the physical plan comes into play. A naive solution would be to have one join algorithm and one sort algorithm; that way, you could execute the logical plan directly. However, this comes at a huge performance cost, because knowing the characteristics of your data and the environment you run in allows Polars to select more specialized algorithms. Thus there is not one join algorithm, but multiple, each with their own unique style and performance. The query planner converts the LogicalPlan into a PhysicalPlan and picks the best algorithms for the query. Then our compute engine performs the operations. This post will not go into much detail about the execution model of our engines or how it is able to work so fast. That is left for another time.
When we look at the performance difference of both plans (optimized vs non-optimized), we can see a 4x improvement. This is the power of lazy execution and using a query engine instead of eagerly evaluating every expression in order. It allows the engine to optimize and avoid unnecessary work. This whole improvement comes at zero cost for the user as all they have to do is write the query. All the complexity is hidden inside the query engine.
%%time
query.collect(no_optimization=True);
CPU times: user 2.45 s, sys: 1.18 s, total: 3.62 s
Wall time: 544 ms
%%time
query.collect();
CPU times: user 616 ms, sys: 54.2 ms, total: 670 ms
Wall time: 135 ms
Conclusion
During this post we covered the main components of Polars. Hopefully by now you will have a better understanding of how Polars works from its API down to execution. The next posts will dive deeper into every component, so stay tuned!