Skip to content

Querying Delta Tables

Delta tables can be queried in several ways. By loading as Arrow data or an Arrow dataset, they can be used by compatible engines such as Pandas and DuckDB. By passing on the list of files, they can be loaded into other engines such as Dask.

Delta tables are often larger than can fit into memory on a single computer, so this module provides ways to read only the parts of the data you need. Partition filters allow you to skip reading files that are part of irrelevant partitions. Only loading the columns required also saves memory. Finally, some methods allow reading tables batch-by-batch, allowing you to process the whole table while only having a portion loaded at any given time.

To load into Pandas or a PyArrow table use the DeltaTable.to_pandas and DeltaTable.to_pyarrow_table methods, respectively. Both of these support filtering partitions and selecting particular columns.

>>> from deltalake import DeltaTable
>>> dt = DeltaTable("../rust/tests/data/delta-0.8.0-partitioned")
>>> dt.schema().to_pyarrow()
value: string
year: string
month: string
day: string
>>> dt.to_pandas(partitions=[("year", "=", "2021")], columns=["value"])
      value
0     6
1     7
2     5
3     4
>>> dt.to_pyarrow_table(partitions=[("year", "=", "2021")], columns=["value"])
pyarrow.Table
value: string

Converting to a PyArrow Dataset allows you to filter on columns other than partition columns and load the result as a stream of batches rather than a single table. Convert to a dataset using DeltaTable.to_pyarrow_dataset. Filters applied to datasets will use the partition values and file statistics from the Delta transaction log and push down any other filters to the scanning operation.

>>> import pyarrow.dataset as ds
>>> dataset = dt.to_pyarrow_dataset()
>>> condition = (ds.field("year") == "2021") & (ds.field("value") > "4")
>>> dataset.to_table(filter=condition, columns=["value"]).to_pandas()
  value
0     6
1     7
2     5
>>> batch_iter = dataset.to_batches(filter=condition, columns=["value"], batch_size=2)
>>> for batch in batch_iter: print(batch.to_pandas())
  value
0     6
1     7
  value
0     5

PyArrow datasets may also be passed to compatible query engines, such as DuckDB

>>> import duckdb
>>> ex_data = duckdb.arrow(dataset)
>>> ex_data.filter("year = 2021 and value > 4").project("value")
---------------------
-- Expression Tree --
---------------------
Projection [value]
  Filter [year=2021 AND value>4]
    arrow_scan(140409099470144, 4828104688, 1000000)

---------------------
-- Result Columns  --
---------------------
- value (VARCHAR)

---------------------
-- Result Preview  --
---------------------
value
VARCHAR
[ Rows: 3]
6
7
5

Finally, you can always pass the list of file paths to an engine. For example, you can pass them to dask.dataframe.read_parquet:

>>> import dask.dataframe as dd
>>> df = dd.read_parquet(dt.file_uris())
>>> df
Dask DataFrame Structure:
                value             year            month              day
npartitions=6
               object  category[known]  category[known]  category[known]
                  ...              ...              ...              ...
...               ...              ...              ...              ...
                  ...              ...              ...              ...
                  ...              ...              ...              ...
Dask Name: read-parquet, 6 tasks
>>> df.compute()
  value  year month day
0     1  2020     1   1
0     2  2020     2   3
0     3  2020     2   5
0     4  2021     4   5
0     5  2021    12   4
0     6  2021    12  20
1     7  2021    12  20

When working with the Rust API, Apache Datafusion can be used to query data from a delta table.

let table = deltalake::open_table("../rust/tests/data/delta-0.8.0-partitioned").await?;
let ctx = SessionContext::new();
ctx.register_table("simple_table", Arc::new(table.clone()))?;
let df = ctx.sql("SELECT value FROM simple_table WHERE year = 2021").await?;
df.show().await?;

Apache Datafusion also supports a Dataframe interface than can be used instead of the SQL interface:

let table = deltalake::open_table("../rust/tests/data/delta-0.8.0-partitioned").await?;
let ctx = SessionContext::new();
let dataframe = ctx.read_table( Arc::new(table.clone()))?;
let df = dataframe.filter(col("year").eq(lit(2021)))?.select(vec![col("value")])?;
df.show().await?;