Skip to content

Writing Delta Tables

For overwrites and appends, use write_deltalake. If the table does not already exist, it will be created. The data parameter will accept a Pandas DataFrame, a PyArrow Table, or an iterator of PyArrow Record Batches.

>>> from deltalake import write_deltalake
>>> df = pd.DataFrame({'x': [1, 2, 3]})
>>> write_deltalake('path/to/table', df)

Note: write_deltalake accepts a Pandas DataFrame, but will convert it to a Arrow table before writing. See caveats in pyarrow:python/pandas.

By default, writes create a new table and error if it already exists. This is controlled by the mode parameter, which mirrors the behavior of Spark's pyspark.sql.DataFrameWriter.saveAsTable DataFrame method. To overwrite pass in mode='overwrite' and to append pass in mode='append':

>>> write_deltalake('path/to/table', df, mode='overwrite')
>>> write_deltalake('path/to/table', df, mode='append')

write_deltalake will raise ValueError if the schema of the data passed to it differs from the existing table's schema. If you wish to alter the schema as part of an overwrite pass in schema_mode="overwrite" or schema_mode="merge". schema_mode="overwrite" will completely overwrite the schema, even if columns are dropped; merge will append the new columns and fill missing columns with null. schema_mode="merge" is also supported on append operations.

Overwriting a partition

You can overwrite a specific partition by using mode="overwrite" together with partition_filters. This will remove all files within the matching partition and insert your data as new files. This can only be done on one partition at a time. All the input data must belong to that partition or else the method will raise an error.

>>> from deltalake import write_deltalake
>>> df = pd.DataFrame({'x': [1, 2, 3], 'y': ['a', 'a', 'b']})
>>> write_deltalake('path/to/table', df, partition_by=['y'])

>>> table = DeltaTable('path/to/table')
>>> df2 = pd.DataFrame({'x': [100], 'y': ['b']})
>>> write_deltalake(table, df2, partition_filters=[('y', '=', 'b')], mode="overwrite")

>>> table.to_pandas()
     x  y
0    1  a
1    2  a
2  100  b

This method could also be used to insert a new partition if one doesn't already exist, making this operation idempotent.

Overwriting part of the table data using a predicate

Note

This predicate is often called a replaceWhere predicate

When you don’t specify the predicate, the overwrite save mode will replace the entire table. Instead of replacing the entire table (which is costly!), you may want to overwrite only the specific parts of the table that should be changed. In this case, you can use a predicate to overwrite only the relevant records or partitions.

Note

Data written must conform to the same predicate, i.e. not contain any records that don't match the predicate condition, otherwise the operation will fail

replaceWhere

import pyarrow as pa
from deltalake import write_deltalake

# Assuming there is already a table in this location with some records where `id = '1'` which we want to overwrite
table_path = "/tmp/my_table"
data = pa.table(
    {
        "id": pa.array(["1", "1"], pa.string()),
        "value": pa.array([11, 12], pa.int64()),
    }
)
write_deltalake(
    table_path,
    data,
    mode="overwrite",
    predicate="id = '1'",
    engine="rust",
)

replaceWhere

// Assuming there is already a table in this location with some records where `id = '1'` which we want to overwrite
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use deltalake::datafusion::logical_expr::{col, lit};
use deltalake::protocol::SaveMode;
use deltalake::DeltaOps;

let schema = ArrowSchema::new(vec![
    Field::new("id", DataType::Utf8, true),
    Field::new("value", DataType::Int32, true),
]);

let data = RecordBatch::try_new(
    schema.into(),
    vec![
        Arc::new(arrow::array::StringArray::from(vec!["1", "1"])),
        Arc::new(arrow::array::Int32Array::from(vec![11, 12])),
    ],
)
.unwrap();

let table = deltalake::open_table("/tmp/my_table").await.unwrap();
let _table = DeltaOps(table)
    .write(vec![data])
    .with_save_mode(SaveMode::Overwrite)
    .with_replace_where(col("id").eq(lit("1")))
    .await
    .unwrap();

Using Writer Properties

You can customize the Rust Parquet writer by using the WriterProperties. Additionally, you can apply extra configurations through the BloomFilterProperties and ColumnProperties data classes.

Here's how you can do it:

from deltalake import BloomFilterProperties, ColumnProperties, WriterProperties, write_deltalake
import pyarrow as pa

wp = WriterProperties(
        statistics_truncate_length=200,
        default_column_properties=ColumnProperties(
            bloom_filter_properties=BloomFilterProperties(True, 0.2, 30)
        ),
        column_properties={
            "value_non_bloom": ColumnProperties(bloom_filter_properties=None),
        },
    )

table_path = "/tmp/my_table"

data = pa.table(
        {
            "id": pa.array(["1", "1"], pa.string()),
            "value": pa.array([11, 12], pa.int64()),
            "value_non_bloom": pa.array([11, 12], pa.int64()),
        }
    )

write_deltalake(table_path, data, writer_properties=wp)