Skip to content

TableMerger

deltalake.table.TableMerger

TableMerger(table: DeltaTable, source: pyarrow.RecordBatchReader, predicate: str, source_alias: Optional[str] = None, target_alias: Optional[str] = None, safe_cast: bool = True, writer_properties: Optional[WriterProperties] = None, custom_metadata: Optional[Dict[str, str]] = None)

API for various table MERGE commands.

execute

execute() -> Dict[str, Any]

Executes MERGE with the previously provided settings in Rust with Apache Datafusion query engine.

Returns:

Name Type Description
Dict Dict[str, Any]

metrics

when_matched_delete

when_matched_delete(predicate: Optional[str] = None) -> TableMerger

Delete a matched row from the table only if the given predicate (if specified) is true for the matched row. If not specified it deletes all matches.

Note

Column names with special characters, such as numbers or spaces should be encapsulated in backticks: "target.123column" or "target.my column"

Parameters:

Name Type Description Default
predicate (str | None, Optional)

SQL like predicate on when to delete.

None

Returns:

Name Type Description
TableMerger TableMerger

TableMerger Object

Example

Delete on a predicate

from deltalake import DeltaTable, write_deltalake
import pyarrow as pa

data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
write_deltalake("tmp", data)
dt = DeltaTable("tmp")
new_data = pa.table({"x": [2, 3], "deleted": [False, True]})

(
    dt.merge(
        source=new_data,
        predicate='target.x = source.x',
        source_alias='source',
        target_alias='target')
    .when_matched_delete(
        predicate="source.deleted = true")
    .execute()
)
{'num_source_rows': 2, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 1, 'num_target_rows_copied': 2, 'num_output_rows': 2, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}

dt.to_pandas().sort_values("x", ignore_index=True)
   x  y
0  1  4
1  2  5

Delete all records that were matched

dt = DeltaTable("tmp")
(
    dt.merge(
        source=new_data,
        predicate='target.x = source.x',
        source_alias='source',
        target_alias='target')
    .when_matched_delete()
    .execute()
)
{'num_source_rows': 2, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 1, 'num_target_rows_copied': 1, 'num_output_rows': 1, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}

dt.to_pandas()
   x  y
0  1  4

when_matched_update

when_matched_update(updates: Dict[str, str], predicate: Optional[str] = None) -> TableMerger

Update a matched table row based on the rules defined by updates. If a predicate is specified, then it must evaluate to true for the row to be updated.

Note

Column names with special characters, such as numbers or spaces should be encapsulated in backticks: "target.123column" or "target.my column"

Parameters:

Name Type Description Default
updates Dict[str, str]

a mapping of column name to update SQL expression.

required
predicate Optional[str]

SQL like predicate on when to update.

None

Returns:

Name Type Description
TableMerger TableMerger

TableMerger Object

Example
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa

data = pa.table({"x": [1, 2, 3], "1y": [4, 5, 6]})
write_deltalake("tmp", data)
dt = DeltaTable("tmp")
new_data = pa.table({"x": [1], "1y": [7]})

(
     dt.merge(
         source=new_data,
         predicate="target.x = source.x",
         source_alias="source",
         target_alias="target")
     .when_matched_update(updates={"x": "source.x", "`1y`": "source.`1y`"})
     .execute()
)
{'num_source_rows': 1, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 1, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 2, 'num_output_rows': 3, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}

dt.to_pandas()
   x  y
0  1  7
1  2  5
2  3  6

when_matched_update_all

when_matched_update_all(predicate: Optional[str] = None) -> TableMerger

Updating all source fields to target fields, source and target are required to have the same field names. If a predicate is specified, then it must evaluate to true for the row to be updated.

Note

Column names with special characters, such as numbers or spaces should be encapsulated in backticks: "target.123column" or "target.my column"

Parameters:

Name Type Description Default
predicate Optional[str]

SQL like predicate on when to update all columns.

None

Returns:

Name Type Description
TableMerger TableMerger

TableMerger Object

Example
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa

data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
write_deltalake("tmp", data)
dt = DeltaTable("tmp")
new_data = pa.table({"x": [1], "y": [7]})

(
    dt.merge(
        source=new_data,
        predicate="target.x = source.x",
        source_alias="source",
        target_alias="target")
    .when_matched_update_all()
    .execute()
)
{'num_source_rows': 1, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 1, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 2, 'num_output_rows': 3, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}

dt.to_pandas()
   x  y
0  1  7
1  2  5
2  3  6

when_not_matched_by_source_delete

when_not_matched_by_source_delete(predicate: Optional[str] = None) -> TableMerger

Delete a target row that has no matches in the source from the table only if the given predicate (if specified) is true for the target row.

Note

Column names with special characters, such as numbers or spaces should be encapsulated in backticks: "target.123column" or "target.my column"

Parameters:

Name Type Description Default
predicate Optional[str]

SQL like predicate on when to delete when not matched by source.

None

Returns:

Name Type Description
TableMerger TableMerger

TableMerger Object

when_not_matched_by_source_update

when_not_matched_by_source_update(updates: Dict[str, str], predicate: Optional[str] = None) -> TableMerger

Update a target row that has no matches in the source based on the rules defined by updates. If a predicate is specified, then it must evaluate to true for the row to be updated.

Note

Column names with special characters, such as numbers or spaces should be encapsulated in backticks: "target.123column" or "target.my column"

Parameters:

Name Type Description Default
updates Dict[str, str]

a mapping of column name to update SQL expression.

required
predicate Optional[str]

SQL like predicate on when to update.

None

Returns:

Name Type Description
TableMerger TableMerger

TableMerger Object

Example
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa

data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
write_deltalake("tmp", data)
dt = DeltaTable("tmp")
new_data = pa.table({"x": [2, 3, 4]})

(
   dt.merge(
       source=new_data,
       predicate='target.x = source.x',
       source_alias='source',
       target_alias='target')
   .when_not_matched_by_source_update(
       predicate = "y > 3",
       updates = {"y": "0"})
   .execute()
)
{'num_source_rows': 3, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 1, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 2, 'num_output_rows': 3, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}

dt.to_pandas().sort_values("x", ignore_index=True)
   x  y
0  1  0
1  2  5
2  3  6

when_not_matched_insert

when_not_matched_insert(updates: Dict[str, str], predicate: Optional[str] = None) -> TableMerger

Insert a new row to the target table based on the rules defined by updates. If a predicate is specified, then it must evaluate to true for the new row to be inserted.

Note

Column names with special characters, such as numbers or spaces should be encapsulated in backticks: "target.123column" or "target.my column"

Parameters:

Name Type Description Default
updates dict

a mapping of column name to insert SQL expression.

required
predicate (str | None, Optional)

SQL like predicate on when to insert.

None

Returns:

Name Type Description
TableMerger TableMerger

TableMerger Object

Example
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa

data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
write_deltalake("tmp", data)
dt = DeltaTable("tmp")
new_data = pa.table({"x": [4], "y": [7]})

(
    dt.merge(
        source=new_data,
        predicate="target.x = source.x",
        source_alias="source",
        target_alias="target",)
    .when_not_matched_insert(
        updates={
            "x": "source.x",
            "y": "source.y",
        })
    .execute()
)
{'num_source_rows': 1, 'num_target_rows_inserted': 1, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 3, 'num_output_rows': 4, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}

dt.to_pandas().sort_values("x", ignore_index=True)
   x  y
0  1  4
1  2  5
2  3  6
3  4  7

when_not_matched_insert_all

when_not_matched_insert_all(predicate: Optional[str] = None) -> TableMerger

Insert a new row to the target table, updating all source fields to target fields. Source and target are required to have the same field names. If a predicate is specified, then it must evaluate to true for the new row to be inserted.

Note

Column names with special characters, such as numbers or spaces should be encapsulated in backticks: "target.123column" or "target.my column"

Parameters:

Name Type Description Default
predicate Optional[str]

SQL like predicate on when to insert.

None

Returns:

Name Type Description
TableMerger TableMerger

TableMerger Object

Example
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa

data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
write_deltalake("tmp", data)
dt = DeltaTable("tmp")
new_data = pa.table({"x": [4], "y": [7]})

(
   dt.merge(
       source=new_data,
       predicate='target.x = source.x',
       source_alias='source',
       target_alias='target')
   .when_not_matched_insert_all()
   .execute()
)
{'num_source_rows': 1, 'num_target_rows_inserted': 1, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 3, 'num_output_rows': 4, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}

dt.to_pandas().sort_values("x", ignore_index=True)
   x  y
0  1  4
1  2  5
2  3  6
3  4  7

with_writer_properties

with_writer_properties(data_page_size_limit: Optional[int] = None, dictionary_page_size_limit: Optional[int] = None, data_page_row_count_limit: Optional[int] = None, write_batch_size: Optional[int] = None, max_row_group_size: Optional[int] = None) -> TableMerger

Deprecated

Use .merge(writer_properties = WriterProperties()) instead

Pass writer properties to the Rust parquet writer, see options https://arrow.apache.org/rust/parquet/file/properties/struct.WriterProperties.html:

Parameters:

Name Type Description Default
data_page_size_limit Optional[int]

Limit DataPage size to this in bytes.

None
dictionary_page_size_limit Optional[int]

Limit the size of each DataPage to store dicts to this amount in bytes.

None
data_page_row_count_limit Optional[int]

Limit the number of rows in each DataPage.

None
write_batch_size Optional[int]

Splits internally to smaller batch size.

None
max_row_group_size Optional[int]

Max number of rows in row group.

None

Returns:

Name Type Description
TableMerger TableMerger

TableMerger Object