1. The DataFrame scale gap
When I started working on Polars, I was surprised how much DataFrame implementations differed from SQL and databases. SQL could run anywhere 1. It could run embedded, on a client server model, or on full OLAP data warehouses.
Whereas for dataframes, the API was different per use case and performance was drastically lacking behind SQL solutions. Locally, pandas was dominant, and remotely/distributed, it was PySpark. For end-users, pandas was very easy to get up and running, but it seems to have ignored what databases have learned over decades, there was no query optimization, poor data type implementation, many needless materializations, it offloaded memory handling to NumPy, and a few other design decisions that led to poor scaling and inconsistent behavior. PySpark was much closer to databases, it follows the relational model, has optimization, a distributed query engine and scaled properly. However PySpark is written in Scala, requires the JVM to run locally, has very poor unpythonic UX (java backtraces for one), and is very sensitive to OOMs. It also was designed for commodity hardware a decade ago and row-based OLAP execution has proven to be suboptimal.
With Polars we want to unify those two worlds under one flexible DataFrame API, backed by high performant compute. Our initial (and achieved) goal was offering an alternative for pandas with a flexible API that does enable query optimization, and parallel streaming execution. Second we want to make running DataFrame code remotely easy. Just like SQL, a Polars LazyFrame
is a description of a query, and it can be sent to a server to be executed remotely. This should be dead easy. In the cloud dominant era, you should not be limited to the boundaries of your laptop.
2. Run Polars everywhere
Our goal is to enable Scalable data processing with all the flexibility and expressiveness of Polars’ API. We are working on two things; Polars Cloud and a completely novel Streaming Engine design. We will explain more about the streaming engine in later posts; Today we want to share what are building with Polars Cloud.
The features we will offer:
- Distributed Polars; one API for all high performant DataFrame needs;
- Serverless compute;
- Configurable hardware, both GPU and CPU;
- Diagonal scaling; scaling both horizontally and vertically;
- Bring your own cloud; AWS, Azure and GCP;
- On premise licensing;
- Fault tolerance;
- Data lineage;
- Observability;
It will be very seamless to spin up hardware and run Polars queries remotely, either in batch mode for production ETL jobs, or interactively doing data exploration. The rest of the post, we want to explore this through a few code examples.
3. A remote query.
It’s important for us that starting a remote query feels native and seamless for the end user. Running a query remotely will be available from within Polars’ native API.
Note that we are agnostic of where you call this code. You can start a remote query from a notebook on your machine, an Airflow dag, an AWS Lambda, your server etc. The compute needed for data-processing is often much higher than the compute needed for orchestration in Airflow or Prefect. By not constraining you to a platform where you need to run your queries, we give you the flexibility to embed Polars Cloud in any environment.
In the query below we start our first query.
import polars as pl
import polars_cloud as pc
from datetime import date
query = (pl.scan_parquet("s3://my-dataset/")
.filter(pl.col("l_shipdate") <= date(1998, 9, 2))
.group_by("l_returnflag", "l_linestatus")
.agg(
avg_price=pl.mean("l_extendedprice"),
avg_disc=pl.mean("l_discount"),
count_order=pl.len()
)
)
in_progress = (
query
.remote(pc.ComputeContext(cpus=16, memory=64))
.write_parquet("s3://my-dst/")
)
We create a LazyFrame
and instead of collecting it locally, we can call .remote()
, which tells Polars to run this query remotely with the given pc.ComputeContext
. The ComputeContext
tells use what kind of hardware to spin up, and the write_parquet
call fires the query. We’ll get back an InProgressQueryRemote
object indicating that our query runs remotely. In the mean time we can asynchronously
work on other stuff, or we can block and await the result. Finally, the InProgressQueryRemote
can be turned into a LazyFrame
again to continue working on the result of the remote query. Let’s do that.
result = in_progress.await_result()
print(result)
new_lf: pl.LazyFrame = result.lazy()
shape: (4, 5)
┌──────────────┬──────────────┬──────────────┬──────────┬─────────────┐
│ l_returnflag ┆ l_linestatus ┆ avg_price ┆ avg_disc ┆ count_order │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ str ┆ f64 ┆ f64 ┆ u32 │
╞══════════════╪══════════════╪══════════════╪══════════╪═════════════╡
│ A ┆ F ┆ 38273.129735 ┆ 0.049985 ┆ 1478493 │
│ N ┆ O ┆ 38249.117989 ┆ 0.049997 ┆ 2920374 │
│ R ┆ F ┆ 38250.854626 ┆ 0.050009 ┆ 1478870 │
│ N ┆ F ┆ 38284.467761 ┆ 0.050093 ┆ 38854 │
└──────────────┴──────────────┴──────────────┴──────────┴─────────────┘
4. Scaling strategies
Polars can run these queries remotely differently than you would expect for Python libraries. We don’t require serialization via Pickle because we build a DSL tree natively in Rust. This DSL is send to the server to be analyzed and run. Cluster side, we have built a distributed scheduler-worker architecture in Rust that can analyze these queries, runs an adapted Polars optimizer and, depending on the query, comes up with a physical plan that scales horizontally or vertically. Polars is very strong in vertical scaling and we recognize that many queries will be most cost effective on a single machine. It however, is also very common that queries start with a set of operations that reduce the dataset size (think of group-by’s and filters). When querying large data sizes from cloud storage, you are bound to the IO limit of a single node. With horizontal scaling we can drastically increase those download limits and after the size reductions finish off on a single machine. We are building an architecture that shines in both horizontally and vertically scaling, dubbed diagonal scaling, choosing the optimal strategy dynamically.
5. Engines (CPU and GPU)
Besides multiple scaling strategies, we are committed to run open source Polars as our engine on the worker nodes. This ensures our incentives are aligned and that the semantics of Polars cloud will not deviate. Polars will allow you to run all engines. That means there will also be GPU support. You will be able to spin up a machine with a high end GPU and connect locally in interactive mode. Our new Streaming Engine has an out-of-core design, and will be able to spill to disk in an efficient manner. Together with distributed queries, this will truly scale Polars to any dataset. We already have the first preliminary results and on the PDS-H2 Benchmark and they look very promising. We already beat our in-memory engine by a factor of ~3 (and there are still a lot of performance opportunities) and it goes without saying that the memory charachteristics are much better.
6. Distributions strategies
Distributed
As mentioned, you can spin up a cluster of machines. This allows you to run a query in distributed mode.
lf: LazyFrame
result = (
lf.remote()
.distributed()
.write_parquet()
.await_result()
)
The semantics of a distributed query don’t change, it only tells Polars cloud it can use multiple nodes to finish the query if it needs to. Not every Polars operation is supported yet, but this will still be beneficial as distributed queries can reduce the dataset size until a single node can finish it on the streaming engine. Operations that are currently supported are all streamable operations, such as filter
, explode
, map
, and partitionable operations such as group-by
and equi-joins
.
Partitioned
Furthermore, we have partitioned queries, which partitions the query on a given key
on the available nodes in the cluster. This will semantically change the query as you will get a result for every unique key, meaning that a single query will have n_unique(key)
results:
lf: LazyFrame
result = (
lf.remote(pc.ComputeContext(cpus=16, memory=64, cluster_size=32))
.partition_by("day")
.write_parquet()
.await_result()
)
This can be very useful for timeseries where you want to run a query at a given time interval, e.g. daily, weekly, monthly etc.
Spawn many queries in parallel
Finally we make it easy to run many queries remotely. We provide a function spawn_many
which takes a list of LazyFrame
’s, which will run on the cluster.
import polars_cloud as pc
lazy_frames: list[LazyFrame]
results = pc.spawn_many(lazy_frames, partition_by="day", dst="s3://result_dst/").await_result()
7. Fault tolerance
Once you start dealing with multiple workers and hardware, things will fail, disks will drop, machines will disconnect etc. This has to be completely hidden from the user and this complexity will be handled by us. We will reschedule tasks if workers failed and ensure a query finishes independent of hardware failure.
Apply for early access
End of this month we are onboarding our first clients. Soon after we want to scale up and invite individuals that have their cloud stack on AWS. After that we will work on other cloud vendors and Kubernetes. Do you want have early access, reach out to us!
Footnotes
-
I don’t mean that you can swap backends easily. Different solutions have slightly different semantics, in ordering, null handling, supported functions, data-types, etc. ↩
-
This is a benchmark derived from TPC-H, but these results are not comparable to TPC-H results. See more details here. ↩