TableMerger
deltalake.table.TableMerger
API for various table MERGE
commands.
execute
Executes MERGE
with the previously provided settings in Rust with Apache Datafusion query engine.
Returns:
Name | Type | Description |
---|---|---|
Dict |
Dict[str, Any]
|
metrics |
when_matched_delete
Delete a matched row from the table only if the given predicate
(if specified) is
true for the matched row. If not specified it deletes all matches.
Note
Column names with special characters, such as numbers or spaces should be encapsulated
in backticks: "target.123column
" or "target.my column
"
Parameters:
Name | Type | Description | Default |
---|---|---|---|
predicate |
(str | None, Optional)
|
SQL like predicate on when to delete. |
None
|
Returns:
Name | Type | Description |
---|---|---|
TableMerger |
TableMerger
|
TableMerger Object |
Example
Delete on a predicate
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa
data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
write_deltalake("tmp", data)
dt = DeltaTable("tmp")
new_data = pa.table({"x": [2, 3], "deleted": [False, True]})
(
dt.merge(
source=new_data,
predicate='target.x = source.x',
source_alias='source',
target_alias='target')
.when_matched_delete(
predicate="source.deleted = true")
.execute()
)
{'num_source_rows': 2, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 1, 'num_target_rows_copied': 2, 'num_output_rows': 2, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}
dt.to_pandas().sort_values("x", ignore_index=True)
x y
0 1 4
1 2 5
Delete all records that were matched
dt = DeltaTable("tmp")
(
dt.merge(
source=new_data,
predicate='target.x = source.x',
source_alias='source',
target_alias='target')
.when_matched_delete()
.execute()
)
{'num_source_rows': 2, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 1, 'num_target_rows_copied': 1, 'num_output_rows': 1, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}
dt.to_pandas()
x y
0 1 4
when_matched_update
Update a matched table row based on the rules defined by updates
.
If a predicate
is specified, then it must evaluate to true for the row to be updated.
Note
Column names with special characters, such as numbers or spaces should be encapsulated
in backticks: "target.123column
" or "target.my column
"
Parameters:
Name | Type | Description | Default |
---|---|---|---|
updates |
Dict[str, str]
|
a mapping of column name to update SQL expression. |
required |
predicate |
Optional[str]
|
SQL like predicate on when to update. |
None
|
Returns:
Name | Type | Description |
---|---|---|
TableMerger |
TableMerger
|
TableMerger Object |
Example
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa
data = pa.table({"x": [1, 2, 3], "1y": [4, 5, 6]})
write_deltalake("tmp", data)
dt = DeltaTable("tmp")
new_data = pa.table({"x": [1], "1y": [7]})
(
dt.merge(
source=new_data,
predicate="target.x = source.x",
source_alias="source",
target_alias="target")
.when_matched_update(updates={"x": "source.x", "`1y`": "source.`1y`"})
.execute()
)
{'num_source_rows': 1, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 1, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 2, 'num_output_rows': 3, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}
dt.to_pandas()
x y
0 1 7
1 2 5
2 3 6
when_matched_update_all
when_matched_update_all(predicate: Optional[str] = None, except_cols: Optional[List[str]] = None) -> TableMerger
Updating all source fields to target fields, source and target are required to have the same field names.
If a predicate
is specified, then it must evaluate to true for the row to be updated.
If except_cols
is specified, then the columns in the exclude list will not be updated.
Note
Column names with special characters, such as numbers or spaces should be encapsulated
in backticks: "target.123column
" or "target.my column
"
Parameters:
Name | Type | Description | Default |
---|---|---|---|
predicate |
Optional[str]
|
SQL like predicate on when to update all columns. |
None
|
except_cols |
Optional[List[str]]
|
List of columns to exclude from update. |
None
|
Returns: TableMerger: TableMerger Object
Example
** Update all columns **
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa
data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
write_deltalake("tmp", data)
dt = DeltaTable("tmp")
new_data = pa.table({"x": [1], "y": [7]})
(
dt.merge(
source=new_data,
predicate="target.x = source.x",
source_alias="source",
target_alias="target")
.when_matched_update_all()
.execute()
)
{'num_source_rows': 1, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 1, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 2, 'num_output_rows': 3, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}
dt.to_pandas()
x y
0 1 7
1 2 5
2 3 6
** Update all columns except bar
**
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa
data = pa.table({"foo": [1, 2, 3], "bar": [4, 5, 6]})
write_deltalake("tmp", data)
dt = DeltaTable("tmp")
new_data = pa.table({"foo": [1], "bar": [7]})
(
dt.merge(
source=new_data,
predicate="target.foo = source.foo",
source_alias="source",
target_alias="target")
.when_matched_update_all(except_cols=["bar"])
.execute()
)
{'num_source_rows': 1, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 1, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 2, 'num_output_rows': 3, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}
dt.to_pandas()
foo bar
0 1 4
1 2 5
2 3 6
when_not_matched_by_source_delete
Delete a target row that has no matches in the source from the table only if the given
predicate
(if specified) is true for the target row.
Note
Column names with special characters, such as numbers or spaces should be encapsulated
in backticks: "target.123column
" or "target.my column
"
Parameters:
Name | Type | Description | Default |
---|---|---|---|
predicate |
Optional[str]
|
SQL like predicate on when to delete when not matched by source. |
None
|
Returns:
Name | Type | Description |
---|---|---|
TableMerger |
TableMerger
|
TableMerger Object |
when_not_matched_by_source_update
when_not_matched_by_source_update(updates: Dict[str, str], predicate: Optional[str] = None) -> TableMerger
Update a target row that has no matches in the source based on the rules defined by updates
.
If a predicate
is specified, then it must evaluate to true for the row to be updated.
Note
Column names with special characters, such as numbers or spaces should be encapsulated
in backticks: "target.123column
" or "target.my column
"
Parameters:
Name | Type | Description | Default |
---|---|---|---|
updates |
Dict[str, str]
|
a mapping of column name to update SQL expression. |
required |
predicate |
Optional[str]
|
SQL like predicate on when to update. |
None
|
Returns:
Name | Type | Description |
---|---|---|
TableMerger |
TableMerger
|
TableMerger Object |
Example
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa
data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
write_deltalake("tmp", data)
dt = DeltaTable("tmp")
new_data = pa.table({"x": [2, 3, 4]})
(
dt.merge(
source=new_data,
predicate='target.x = source.x',
source_alias='source',
target_alias='target')
.when_not_matched_by_source_update(
predicate = "y > 3",
updates = {"y": "0"})
.execute()
)
{'num_source_rows': 3, 'num_target_rows_inserted': 0, 'num_target_rows_updated': 1, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 2, 'num_output_rows': 3, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}
dt.to_pandas().sort_values("x", ignore_index=True)
x y
0 1 0
1 2 5
2 3 6
when_not_matched_insert
Insert a new row to the target table based on the rules defined by updates
. If a
predicate
is specified, then it must evaluate to true for the new row to be inserted.
Note
Column names with special characters, such as numbers or spaces should be encapsulated
in backticks: "target.123column
" or "target.my column
"
Parameters:
Name | Type | Description | Default |
---|---|---|---|
updates |
dict
|
a mapping of column name to insert SQL expression. |
required |
predicate |
(str | None, Optional)
|
SQL like predicate on when to insert. |
None
|
Returns:
Name | Type | Description |
---|---|---|
TableMerger |
TableMerger
|
TableMerger Object |
Example
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa
data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
write_deltalake("tmp", data)
dt = DeltaTable("tmp")
new_data = pa.table({"x": [4], "y": [7]})
(
dt.merge(
source=new_data,
predicate="target.x = source.x",
source_alias="source",
target_alias="target",)
.when_not_matched_insert(
updates={
"x": "source.x",
"y": "source.y",
})
.execute()
)
{'num_source_rows': 1, 'num_target_rows_inserted': 1, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 3, 'num_output_rows': 4, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}
dt.to_pandas().sort_values("x", ignore_index=True)
x y
0 1 4
1 2 5
2 3 6
3 4 7
when_not_matched_insert_all
when_not_matched_insert_all(predicate: Optional[str] = None, except_cols: Optional[List[str]] = None) -> TableMerger
Insert a new row to the target table, updating all source fields to target fields. Source and target are
required to have the same field names. If a predicate
is specified, then it must evaluate to true for
the new row to be inserted. If except_cols
is specified, then the columns in the exclude list will not be inserted.
Note
Column names with special characters, such as numbers or spaces should be encapsulated
in backticks: "target.123column
" or "target.my column
"
Parameters:
Name | Type | Description | Default |
---|---|---|---|
predicate |
Optional[str]
|
SQL like predicate on when to insert. |
None
|
except_cols |
Optional[List[str]]
|
List of columns to exclude from insert. |
None
|
Returns:
Name | Type | Description |
---|---|---|
TableMerger |
TableMerger
|
TableMerger Object |
Example
** Insert all columns **
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa
data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
write_deltalake("tmp", data)
dt = DeltaTable("tmp")
new_data = pa.table({"x": [4], "y": [7]})
(
dt.merge(
source=new_data,
predicate='target.x = source.x',
source_alias='source',
target_alias='target')
.when_not_matched_insert_all()
.execute()
)
{'num_source_rows': 1, 'num_target_rows_inserted': 1, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 3, 'num_output_rows': 4, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}
dt.to_pandas().sort_values("x", ignore_index=True)
x y
0 1 4
1 2 5
2 3 6
3 4 7
** Insert all columns except bar
**
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa
data = pa.table({"foo": [1, 2, 3], "bar": [4, 5, 6]})
write_deltalake("tmp", data)
dt = DeltaTable("tmp")
new_data = pa.table({"foo": [4], "bar": [7]})
(
dt.merge(
source=new_data,
predicate='target.foo = source.foo',
source_alias='source',
target_alias='target')
.when_not_matched_insert_all(except_cols=["bar"])
.execute()
)
{'num_source_rows': 1, 'num_target_rows_inserted': 1, 'num_target_rows_updated': 0, 'num_target_rows_deleted': 0, 'num_target_rows_copied': 3, 'num_output_rows': 4, 'num_target_files_added': 1, 'num_target_files_removed': 1, 'execution_time_ms': ..., 'scan_time_ms': ..., 'rewrite_time_ms': ...}
dt.to_pandas().sort_values("foo", ignore_index=True)
foo bar
0 1 4
1 2 5
2 3 6
3 4 NaN