Skip to content

TableMerger

deltalake.table.TableMerger

TableMerger(builder: PyMergeBuilder, table: RawDeltaTable)

API for various table MERGE commands.

execute

execute() -> Dict[str, Any]

Executes MERGE with the previously provided settings in Rust with Apache Datafusion query engine.

Returns:

Name Type Description
Dict Dict[str, Any]

metrics

when_matched_delete

when_matched_delete(predicate: Optional[str] = None) -> TableMerger

Delete a matched row from the table only if the given predicate (if specified) is true for the matched row. If not specified it deletes all matches.

Note

Column names with special characters, such as numbers or spaces should be encapsulated in backticks: "target.123column" or "target.my column"

Parameters:

Name Type Description Default
predicate (str | None, Optional)

SQL like predicate on when to delete.

None

Returns:

Name Type Description
TableMerger TableMerger

TableMerger Object

Example

Delete on a predicate

from deltalake import DeltaTable, write_deltalake
import pyarrow as pa

data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
write_deltalake("tmp", data)
dt = DeltaTable("tmp")
new_data = pa.table({"x": [2, 3], "deleted": [False, True]})

(
    dt.merge(
        source=new_data,
        predicate='target.x = source.x',
        source_alias='source',
        target_alias='target')
    .when_matched_delete(
        predicate="source.deleted = true")
    .execute()
)
{'num_source_rows': 2, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 1, 'num_target_rows_copied': 2, 'num_output_rows': 2, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}

dt.to_pandas().sort_values("x", ignore_index=True)
   x  y
0  1  4
1  2  5

Delete all records that were matched

dt = DeltaTable("tmp")
(
    dt.merge(
        source=new_data,
        predicate='target.x = source.x',
        source_alias='source',
        target_alias='target')
    .when_matched_delete()
    .execute()
)
{'num_source_rows': 2, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 1, 'num_target_rows_copied': 1, 'num_output_rows': 1, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}

dt.to_pandas()
   x  y
0  1  4

when_matched_update

when_matched_update(updates: Dict[str, str], predicate: Optional[str] = None) -> TableMerger

Update a matched table row based on the rules defined by updates. If a predicate is specified, then it must evaluate to true for the row to be updated.

Note

Column names with special characters, such as numbers or spaces should be encapsulated in backticks: "target.123column" or "target.my column"

Parameters:

Name Type Description Default
updates Dict[str, str]

a mapping of column name to update SQL expression.

required
predicate Optional[str]

SQL like predicate on when to update.

None

Returns:

Name Type Description
TableMerger TableMerger

TableMerger Object

Example
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa

data = pa.table({"x": [1, 2, 3], "1y": [4, 5, 6]})
write_deltalake("tmp", data)
dt = DeltaTable("tmp")
new_data = pa.table({"x": [1], "1y": [7]})

(
     dt.merge(
         source=new_data,
         predicate="target.x = source.x",
         source_alias="source",
         target_alias="target")
     .when_matched_update(updates={"x": "source.x", "`1y`": "source.`1y`"})
     .execute()
)
{'num_source_rows': 1, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 1, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 2, 'num_output_rows': 3, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}

dt.to_pandas()
   x  y
0  1  7
1  2  5
2  3  6

when_matched_update_all

when_matched_update_all(predicate: Optional[str] = None, except_cols: Optional[List[str]] = None) -> TableMerger

Updating all source fields to target fields, source and target are required to have the same field names. If a predicate is specified, then it must evaluate to true for the row to be updated. If except_cols is specified, then the columns in the exclude list will not be updated.

Note

Column names with special characters, such as numbers or spaces should be encapsulated in backticks: "target.123column" or "target.my column"

Parameters:

Name Type Description Default
predicate Optional[str]

SQL like predicate on when to update all columns.

None
except_cols Optional[List[str]]

List of columns to exclude from update.

None

Returns: TableMerger: TableMerger Object

Example

** Update all columns **

from deltalake import DeltaTable, write_deltalake
import pyarrow as pa

data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
write_deltalake("tmp", data)
dt = DeltaTable("tmp")
new_data = pa.table({"x": [1], "y": [7]})

(
    dt.merge(
        source=new_data,
        predicate="target.x = source.x",
        source_alias="source",
        target_alias="target")
    .when_matched_update_all()
    .execute()
)
{'num_source_rows': 1, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 1, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 2, 'num_output_rows': 3, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}

dt.to_pandas()
   x  y
0  1  7
1  2  5
2  3  6

** Update all columns except bar **

from deltalake import DeltaTable, write_deltalake
import pyarrow as pa

data = pa.table({"foo": [1, 2, 3], "bar": [4, 5, 6]})
write_deltalake("tmp", data)
dt = DeltaTable("tmp")
new_data = pa.table({"foo": [1], "bar": [7]})

(
    dt.merge(
        source=new_data,
        predicate="target.foo = source.foo",
        source_alias="source",
        target_alias="target")
    .when_matched_update_all(except_cols=["bar"])
    .execute()
)
{'num_source_rows': 1, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 1, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 2, 'num_output_rows': 3, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}

dt.to_pandas()
   foo  bar
0  1    4
1  2    5
2  3    6

when_not_matched_by_source_delete

when_not_matched_by_source_delete(predicate: Optional[str] = None) -> TableMerger

Delete a target row that has no matches in the source from the table only if the given predicate (if specified) is true for the target row.

Note

Column names with special characters, such as numbers or spaces should be encapsulated in backticks: "target.123column" or "target.my column"

Parameters:

Name Type Description Default
predicate Optional[str]

SQL like predicate on when to delete when not matched by source.

None

Returns:

Name Type Description
TableMerger TableMerger

TableMerger Object

when_not_matched_by_source_update

when_not_matched_by_source_update(updates: Dict[str, str], predicate: Optional[str] = None) -> TableMerger

Update a target row that has no matches in the source based on the rules defined by updates. If a predicate is specified, then it must evaluate to true for the row to be updated.

Note

Column names with special characters, such as numbers or spaces should be encapsulated in backticks: "target.123column" or "target.my column"

Parameters:

Name Type Description Default
updates Dict[str, str]

a mapping of column name to update SQL expression.

required
predicate Optional[str]

SQL like predicate on when to update.

None

Returns:

Name Type Description
TableMerger TableMerger

TableMerger Object

Example
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa

data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
write_deltalake("tmp", data)
dt = DeltaTable("tmp")
new_data = pa.table({"x": [2, 3, 4]})

(
   dt.merge(
       source=new_data,
       predicate='target.x = source.x',
       source_alias='source',
       target_alias='target')
   .when_not_matched_by_source_update(
       predicate = "y > 3",
       updates = {"y": "0"})
   .execute()
)
{'num_source_rows': 3, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 1, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 2, 'num_output_rows': 3, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}

dt.to_pandas().sort_values("x", ignore_index=True)
   x  y
0  1  0
1  2  5
2  3  6

when_not_matched_insert

when_not_matched_insert(updates: Dict[str, str], predicate: Optional[str] = None) -> TableMerger

Insert a new row to the target table based on the rules defined by updates. If a predicate is specified, then it must evaluate to true for the new row to be inserted.

Note

Column names with special characters, such as numbers or spaces should be encapsulated in backticks: "target.123column" or "target.my column"

Parameters:

Name Type Description Default
updates dict

a mapping of column name to insert SQL expression.

required
predicate (str | None, Optional)

SQL like predicate on when to insert.

None

Returns:

Name Type Description
TableMerger TableMerger

TableMerger Object

Example
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa

data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
write_deltalake("tmp", data)
dt = DeltaTable("tmp")
new_data = pa.table({"x": [4], "y": [7]})

(
    dt.merge(
        source=new_data,
        predicate="target.x = source.x",
        source_alias="source",
        target_alias="target",)
    .when_not_matched_insert(
        updates={
            "x": "source.x",
            "y": "source.y",
        })
    .execute()
)
{'num_source_rows': 1, 'num_target_rows_inserted': 1, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 3, 'num_output_rows': 4, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}

dt.to_pandas().sort_values("x", ignore_index=True)
   x  y
0  1  4
1  2  5
2  3  6
3  4  7

when_not_matched_insert_all

when_not_matched_insert_all(predicate: Optional[str] = None, except_cols: Optional[List[str]] = None) -> TableMerger

Insert a new row to the target table, updating all source fields to target fields. Source and target are required to have the same field names. If a predicate is specified, then it must evaluate to true for the new row to be inserted. If except_cols is specified, then the columns in the exclude list will not be inserted.

Note

Column names with special characters, such as numbers or spaces should be encapsulated in backticks: "target.123column" or "target.my column"

Parameters:

Name Type Description Default
predicate Optional[str]

SQL like predicate on when to insert.

None
except_cols Optional[List[str]]

List of columns to exclude from insert.

None

Returns:

Name Type Description
TableMerger TableMerger

TableMerger Object

Example

** Insert all columns **

from deltalake import DeltaTable, write_deltalake
import pyarrow as pa

data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
write_deltalake("tmp", data)
dt = DeltaTable("tmp")
new_data = pa.table({"x": [4], "y": [7]})

(
   dt.merge(
       source=new_data,
       predicate='target.x = source.x',
       source_alias='source',
       target_alias='target')
   .when_not_matched_insert_all()
   .execute()
)
{'num_source_rows': 1, 'num_target_rows_inserted': 1, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 3, 'num_output_rows': 4, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}

dt.to_pandas().sort_values("x", ignore_index=True)
   x  y
0  1  4
1  2  5
2  3  6
3  4  7

** Insert all columns except bar **

from deltalake import DeltaTable, write_deltalake
import pyarrow as pa

data = pa.table({"foo": [1, 2, 3], "bar": [4, 5, 6]})
write_deltalake("tmp", data)
dt = DeltaTable("tmp")
new_data = pa.table({"foo": [4], "bar": [7]})

(
   dt.merge(
       source=new_data,
       predicate='target.foo = source.foo',
       source_alias='source',
       target_alias='target')
   .when_not_matched_insert_all(except_cols=["bar"])
   .execute()
)
{'num_source_rows': 1, 'num_target_rows_inserted': 1, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 3, 'num_output_rows': 4, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}

dt.to_pandas().sort_values("foo", ignore_index=True)
   foo  bar
0  1    4
1  2    5
2  3    6
3  4    NaN