Back to blog

Polars in Aggregate: Faster I/O, inequality joins, and much more

Wed, 4 Dec 2024

Polars in Aggregate is a summary of what we have been working on in recent weeks. This can be seen as a cherry-picked changelog with a bit more context. This Polars in Aggregate covers the releases of Polars 1.3.0 to Polars 1.15.0.

In this article we will cover faster I/O in general, and many performance improvements to Parquet in particular, the addition of inequality joins, a major documentation rework, added support for scanning directly from bytes and other buffers, the new bitwise operations and aggregations, work on the support for array and list arithmetic, the new function pl.concat_arr, and an experimental credential provider to make it easier to work with cloud providers.

Even faster I/O

Use two-pass algorithm for csv to ensure correctness and SIMDize more ~17% (#19088)

Despite being a fairly plain format, reading a CSV file in parallel is not trivial. If you scan to an arbitrary new line in the middle of the file, you risk being inside a quoted field with newline characters. Before this improvement, Polars would fallback to a single-threaded algorithm if it found one of these edge cases while reading a CSV file.

Now, Polars uses a two-pass algorithm that starts by scanning the CSV file to split the file into chunks that can be parsed in parallel, even if those chunks contain quoted fields with newlines. While the two-pass algorithm requires a bit more work, the increased usage of SIMD operations and the fact that we could remove the single-threaded fallback means this change was a net gain in terms of performance.

The 17% performance gain indicated in the PR title was measured by the PR author reading the New York city taxi data.

Better cloud scan performance (#19728)

In line with other improvements made to cloud interoperability, we improved the performance of the scan_ family of functions when operating on cloud files. With this change, we have better control over the number and size of the requests we make, so that they’re more evenly distributed and have a more reasonable chunk size. In a series of different queries to a file with 12,000 columns and 24,000 rows hosted in an S3 bucket, we measured a speed up of an order of magnitude:

querytime (1.12.0, seconds)time (after PR)speedup
pl.read_parquet(path)29.0209072.77122110.5x
sp.select(cs.by_index(range(6_000))).collect()14.7171461.759168.4x
sp.select(cs.by_index(range(0, 12_000, 5))).collect()28.7162272.41413911.9x
sp.select(cs.by_index(range(0, 12_000, 3))).collect()28.8151212.51508811.5x
sp.collect(new_streaming=True)28.9398483.0016169.6x

In the queries above, sp = pl.scan_parquet(path). The PR linked and the issue linked within provide details if you want to replicate the benchmark.

Parquet file format improvements

Going through the release notes from Polars 1.3.0 to Polars 1.15.0, you will notice a very recurring theme in the sections β€œπŸš€ Performance improvements”. I/O with Parquet files has been improved a lot over the last few releases. This article covers changes over 13 minor releases and every single one of them improved Parquet I/O, over dozens of performance improvements, bug fixes, and enhancements.

For example, the PRs #18030 and #19256 sped up the writing of integer data. This was achieved by speeding up the bottleneck of determining whether it is worth using a dictionary encoding for integer data or not.

The plot below shows how much faster Parquet reading is with respect to versions 0.20.16 (when significant work started on Parquet performance) and 1.3.0, the base version for this post. The plot includes benchmarks across a number of classical datasets that show that Parquet reading is roughly 53% faster when compared to version 1.3.0 and 139% faster when compared to version 0.20.16:

Another noteworthy improvement to Parquet file scanning is the addition of the strategy parallel="prefiltered". According to the documentation on polars.scan_parquet, the strategy parallel="prefiltered" starts by evaluating the pushed-down predicates in parallel to determine what rows need to be read. Then, it parallelizes over the columns and row groups while skipping the rows that do not need to be read. This strategy tends to be good when you have a large dataset and one of 3 things apply:

  1. You have a very complex predicate that cannot be easily reasoned about with the Parquet statistics.
  2. You have a very strict predicate that only selects relatively little rows out of your dataset.
  3. Your predicate filters a range of a sorted column (possibly amongst other filters).

At the time of writing, the default behaviour of scan_parquet lets Polars automatically select a parallelization strategy, which means strategy="prefiltered" might be turned on by Polars.

Inequality joins (#18365)

Polars introduced the dataframe method join_where, that joins two dataframes based on one or multiple inequalities:

east = pl.DataFrame(
    {
        "id": [100, 101, 102],
        "dur": [120, 140, 160],
        "rev": [12, 14, 16],
        "cores": [2, 8, 4],
    }
)
west = pl.DataFrame(
    {
        "t_id": [404, 498, 676, 742],
        "time": [90, 130, 150, 170],
        "cost": [9, 13, 15, 16],
        "cores": [4, 2, 1, 4],
    }
)

print(
    east.join_where(
        west,
        pl.col("dur") < pl.col("time"),
        pl.col("rev") < pl.col("cost"),
    )
)
shape: (5, 8)
β”Œβ”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ id  ┆ dur ┆ rev ┆ cores ┆ t_id ┆ time ┆ cost ┆ cores_right β”‚
β”‚ --- ┆ --- ┆ --- ┆ ---   ┆ ---  ┆ ---  ┆ ---  ┆ ---         β”‚
β”‚ i64 ┆ i64 ┆ i64 ┆ i64   ┆ i64  ┆ i64  ┆ i64  ┆ i64         β”‚
β•žβ•β•β•β•β•β•ͺ═════β•ͺ═════β•ͺ═══════β•ͺ══════β•ͺ══════β•ͺ══════β•ͺ═════════════║
β”‚ 100 ┆ 120 ┆ 12  ┆ 2     ┆ 498  ┆ 130  ┆ 13   ┆ 2           β”‚
β”‚ 100 ┆ 120 ┆ 12  ┆ 2     ┆ 676  ┆ 150  ┆ 15   ┆ 1           β”‚
β”‚ 100 ┆ 120 ┆ 12  ┆ 2     ┆ 742  ┆ 170  ┆ 16   ┆ 4           β”‚
β”‚ 101 ┆ 140 ┆ 14  ┆ 8     ┆ 676  ┆ 150  ┆ 15   ┆ 1           β”‚
β”‚ 101 ┆ 140 ┆ 14  ┆ 8     ┆ 742  ┆ 170  ┆ 16   ┆ 4           β”‚
β””β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

This feature is still marked as experimental, as we play around with the API to figure out the most ergonomic way to expose this type of operation to users. After the initial PR, more work has been done on this. For example, #19547 improved the error messages you would see if you tried using an invalid predicate to join on. The same PR also allowed the usage of the expression is_between in a join_where predicate.

Consider the two dataframes below:

df1 = pl.DataFrame({
    "id": ["aa", "bb", "cc"],
    "start": [date(2020,1,1), date(2022,10,10), date(2024,7,5)],
    "end": [date(2022,12,31), date(2024,10,1), date(2024,12,31)],
})
df2 = pl.DataFrame({
    "id": ["aa", "cc", "bb"],
    "dt": [date(2022,12,31), date(2024,2,21), date(2024,8,8)],
    "price": [100, 200, 300],
})

Before #19547, if you wanted to join the two dataframes where the date of the second was between the start and end dates of the first, you would have to write that explicitly:

df1.join_where(
    df2,
    pl.col("id") == pl.col("id_right"),
    pl.col("dt") >= pl.col("start"),
    pl.col("dt") <= pl.col("end")
)

Now, you can express the same logic with the expression is_between, making it more ergonomic to do β€œrange joins”:

df1.join_where(
    df2,
    pl.col("id") == pl.col("id_right"),
    pl.col("dt").is_between("start","end")
)

Revise and rework user-guide/expressions (#19360)

Documentation is one of the most important assets of an open-source library and at Polars we want our user guide to be as clear, helpful, and pedagogical as possible. That’s why we did an extensive rework of the β€œGetting Started”, β€œConcepts”, and β€œExpressions” sections. This extensive rework included making sure the reading experience is coherent and that the examples are educational and clear.

We also added new content to the user guide under those sections, and in particular there is now an entire page dedicated to expression expansion, a feature that we want our users to be able to make the most of. Making good use of expression expansion is part of what makes Polars queries so performant, so we believe our users will benefit a lot from this new page. After you go through it, be sure to join us on Discord and to share your thoughts with us.

Add support for IO[bytes] and bytes in scan_{...} functions (#18532)

The scan_ family of functions can now work with bytes, io.BytesIO and io.StringIO objects:

in_memory_csv = bytes("name,age\nA,37\nB,54\n", "utf8")
print(
    pl
    .scan_csv(in_memory_csv)
    .filter(pl.col("age") > 50)
    .collect()
)
shape: (1, 2)
β”Œβ”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”
β”‚ name ┆ age β”‚
β”‚ ---  ┆ --- β”‚
β”‚ str  ┆ i64 β”‚
β•žβ•β•β•β•β•β•β•ͺ═════║
β”‚ B    ┆ 54  β”‚
β””β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”˜
from io import StringIO
string_io = StringIO("name,age\nA,37\nB,54\n")
print(
    pl
    .scan_csv(string_io)
    .select(pl.col("name"))
    .collect()
)
shape: (2, 1)
β”Œβ”€β”€β”€β”€β”€β”€β”
β”‚ name β”‚
β”‚ ---  β”‚
β”‚ str  β”‚
β•žβ•β•β•β•β•β•β•‘
β”‚ A    β”‚
β”‚ B    β”‚
β””β”€β”€β”€β”€β”€β”€β”˜

The read_ family of functions already had this functionality and this gives you more flexibility in the way you provide the data for your queries when using the lazy API.

Bitwise operations / aggregations (#18994)

Polars now supports six bitwise operations, namely bitwise_count_ones, bitwise_leading_ones, and bitwise_trailing_ones, and the respective operations for zeros instead of ones. We also added support for three bitwise aggregations, namely bitwise_and, bitwise_or, and bitwise_xor.

These nine functions work on Booleans, integers, and floating point numbers, and correspond to a pretty popular feature request.

Here is an example of bitwise_trailing_zeros in action:

print(
    (2 ** pl.arange(0, 10, eager=True))  # Some powers of 2.
    .bitwise_trailing_zeros()
)
shape: (10,)
Series: 'literal' [u32]
[
        0
        1
        2
        3
        4
        5
        6
        7
        8
        9
]

Array and list arithmetic support

Over the past few releases we have seen numerous PRs addressing array arithmetic (#19682, #19837) and list arithmetic (#17823, #19162). At their core, these PRs let you perform arithmetic computations with List or Array columns. This may involve a single number and a nested data type:

df = pl.DataFrame({"nums": [[1, 2], [3], [], [4, 5, 6]]})
print(
    df
    .select(pl.col("nums") * 10)
)
shape: (4, 1)
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ nums         β”‚
β”‚ ---          β”‚
β”‚ list[i64]    β”‚
β•žβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•‘
β”‚ [10, 20]     β”‚
β”‚ [30]         β”‚
β”‚ []           β”‚
β”‚ [40, 50, 60] β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

In this case, we say that the literal 10 was broadcast over the lists.

Polars also supports element-wise arithmetic between columns or series of compatible data types (which may require some form of shape compatibility as well):

df = pl.DataFrame(
    {
        "a": [[1, 2], [3, 4], [5, 6]],
        "b": [[10, 0], [100, 0], [1000, 0]],
    },
    schema={"a": pl.Array(int, (2,)), "b": pl.Array(int, (2,))},
)
print(
    df
    .select(pl.col("a") * pl.col("b"))
)
shape: (3, 1)
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ a             β”‚
β”‚ ---           β”‚
β”‚ array[i64, 2] β”‚
β•žβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•‘
β”‚ [10, 0]       β”‚
β”‚ [300, 0]      β”‚
β”‚ [5000, 0]     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Add pl.concat_arr to concatenate columns into an Array column (#19881)

Still along the lines of adding new features to nested data types, you can now use pl.concat_arr to concatenate columns horizontally into a single array column. This feature is still experimental as we try to figure out the best API for users, but we share an example here of what it looks like with Polars 1.15.0:

df = pl.DataFrame(
    {
        "a": pl.Series([[1, 2], [3, 4], None], dtype=pl.Array(pl.Int64, (2,))),
        "b": pl.Series([[3], [None], [5]], dtype=pl.Array(pl.Int64, (1,))),
        "c": pl.Series([None, 5, 6], dtype=pl.Int64),
    }
)
with pl.Config(set_fmt_table_cell_list_len=4):  # Show the 4 elements of the resulting arrays.
    print(
        df
        .with_columns(
            pl.concat_arr("a", "b", "c").alias("concat_arr(a, b, c)"),
        )
    )
shape: (3, 4)
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ a             ┆ b             ┆ c    ┆ concat_arr(a, b, c) β”‚
β”‚ ---           ┆ ---           ┆ ---  ┆ ---                 β”‚
β”‚ array[i64, 2] ┆ array[i64, 1] ┆ i64  ┆ array[i64, 4]       β”‚
β•žβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•ͺ═══════════════β•ͺ══════β•ͺ═════════════════════║
β”‚ [1, 2]        ┆ [3]           ┆ null ┆ [1, 2, 3, null]     β”‚
β”‚ [3, 4]        ┆ [null]        ┆ 5    ┆ [3, 4, null, 5]     β”‚
β”‚ null          ┆ [5]           ┆ 6    ┆ null                β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Experimental credential_provider argument for scan_parquet (#19271)

This feature allows users to provide custom credential provisioning logic for their cloud I/O scans. The parameter credential_provider expects a callable that returns the credentials and an optional expiry time that allow us to only call the function again when the credentials have expired. An example for AWS follows:

def get_credentials():
    import boto3

    session = boto3.Session(profile_name="profile2")
    creds = session.get_credentials()

    return {
        "aws_access_key_id": creds.access_key,
        "aws_secret_access_key": creds.secret_key,
        "aws_session_token": creds.token,
    }, None


lf = pl.scan_parquet(
    "s3://...",
    credential_provider=get_credentials,
)

As a follow-up, Polars added credential provider utility classes for AWS and GCP (#19297), to simplify the usage of the parameter credential_provider when working with AWS or GCP. With the utility class CredentialProviderAWS, the example above becomes much shorter:

lf = pl.scan_parquet(
    "s3://...",
    credential_provider=pl.CredentialProviderAWS(profile_name="profile2"),
)

The PR also introduced the utility class CredentialProviderGCP for users working with GCP.

Join the Polars community

Did you know that Polars has an active community on Discord? It’s a great place to meet other Polars users, ask questions and help others by answering their questions. Join the Discord today by clicking here!

We are hiring

Are you excited about the work we do at Polars? We are hiring Rust software engineers to work on realizing the new era of DataFrames.

Rust backend engineer

As a founding Backend Engineer, you’ll shape the design of Polars Cloud in order to create a seamless experience for our users. Together with our product manager and full stack engineers, you transform requirements into technical design into implementation.

Database engineer

As a database engineer, you actively work on our DataFrame query engine. Together with the core team and a vibrant open source community, you shape the future of our API and its implementation. In your work, you’ll identify important areas with the highest community impact and start designing and implementing.

1
2
4
3
5
6
7
8
9
10
11
12