Writing Delta Tables
For overwrites and appends, use write_deltalake
. If the table does not already exist, it will be created.
The data
parameter will accept a Pandas DataFrame, a PyArrow Table, or
an iterator of PyArrow Record Batches.
>>> from deltalake import write_deltalake
>>> df = pd.DataFrame({'x': [1, 2, 3]})
>>> write_deltalake('path/to/table', df)
Note: write_deltalake
accepts a Pandas DataFrame, but will convert it to a Arrow table before writing. See caveats in pyarrow:python/pandas
.
By default, writes create a new table and error if it already exists.
This is controlled by the mode
parameter, which mirrors the behavior
of Spark's pyspark.sql.DataFrameWriter.saveAsTable
DataFrame method. To overwrite pass in mode='overwrite'
and to append pass in mode='append'
:
>>> write_deltalake('path/to/table', df, mode='overwrite')
>>> write_deltalake('path/to/table', df, mode='append')
write_deltalake
will raise ValueError
if the schema of the data
passed to it differs from the existing table's schema. If you wish to
alter the schema as part of an overwrite pass in schema_mode="overwrite"
or schema_mode="merge"
.
schema_mode="overwrite"
will completely overwrite the schema, even if columns are dropped; merge will append the new columns
and fill missing columns with null
. schema_mode="merge"
is also supported on append operations.
Overwriting a partition
You can overwrite a specific partition by using mode="overwrite"
together with partition_filters
. This will remove all files within the
matching partition and insert your data as new files. This can only be
done on one partition at a time. All of the input data must belong to
that partition or else the method will raise an error.
>>> from deltalake import write_deltalake
>>> df = pd.DataFrame({'x': [1, 2, 3], 'y': ['a', 'a', 'b']})
>>> write_deltalake('path/to/table', df, partition_by=['y'])
>>> table = DeltaTable('path/to/table')
>>> df2 = pd.DataFrame({'x': [100], 'y': ['b']})
>>> write_deltalake(table, df2, partition_filters=[('y', '=', 'b')], mode="overwrite")
>>> table.to_pandas()
x y
0 1 a
1 2 a
2 100 b
This method could also be used to insert a new partition if one doesn't already exist, making this operation idempotent.
Overwriting part of the table data using a predicate
Note
This predicate is often called a replaceWhere
predicate
When you don’t specify the predicate
, the overwrite save mode will replace the entire table.
Instead of replacing the entire table (which is costly!), you may want to overwrite only the specific parts of the table that should be changed.
In this case, you can use a predicate
to overwrite only the relevant records or partitions.
Note
Data written must conform to the same predicate, i.e. not contain any records that don't match the predicate
condition,
otherwise the operation will fail
import pyarrow as pa
from deltalake import write_deltalake
# Assuming there is already a table in this location with some records where `id = '1'` which we want to overwrite
table_path = "/tmp/my_table"
data = pa.table(
{
"id": pa.array(["1", "1"], pa.string()),
"value": pa.array([11, 12], pa.int64()),
}
)
write_deltalake(
table_path,
data,
mode="overwrite",
predicate="id = '1'",
engine="rust",
)
// Assuming there is already a table in this location with some records where `id = '1'` which we want to overwrite
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use deltalake::datafusion::logical_expr::{col, lit};
use deltalake::protocol::SaveMode;
use deltalake::DeltaOps;
let schema = ArrowSchema::new(vec![
Field::new("id", DataType::Utf8, true),
Field::new("value", DataType::Int32, true),
]);
let data = RecordBatch::try_new(
schema.into(),
vec![
Arc::new(arrow::array::StringArray::from(vec!["1", "1"])),
Arc::new(arrow::array::Int32Array::from(vec![11, 12])),
],
)
.unwrap();
let table = deltalake::open_table("/tmp/my_table").await.unwrap();
let _table = DeltaOps(table)
.write(vec![data])
.with_save_mode(SaveMode::Overwrite)
.with_replace_where(col("id").eq(lit("1")))
.await
.unwrap();
Using Writer Properites
You can customize the Rust Parquet writer by using the WriterProperties. Additionally, you can apply extra configurations through the BloomFilterProperties and ColumnProperties data classes.
Here's how you can do it:
from deltalake import BloomFilterProperties, ColumnProperties, WriterProperties, write_deltalake
import pyarrow as pa
wp = WriterProperties(
statistics_truncate_length=200,
default_column_properties=ColumnProperties(
bloom_filter_properties=BloomFilterProperties(True, 0.2, 30)
),
column_properties={
"value_non_bloom": ColumnProperties(bloom_filter_properties=None),
},
)
table_path = "/tmp/my_table"
data = pa.table(
{
"id": pa.array(["1", "1"], pa.string()),
"value": pa.array([11, 12], pa.int64()),
"value_non_bloom": pa.array([11, 12], pa.int64()),
}
)
write_deltalake(table_path, data, writer_properties=wp)