DeltaTable
deltalake.DeltaTable
dataclass
DeltaTable(table_uri: Union[str, Path, os.PathLike[str]], version: Optional[int] = None, storage_options: Optional[Dict[str, str]] = None, without_files: bool = False, log_buffer_size: Optional[int] = None)
Represents a Delta Table
Create the Delta Table from a path with an optional version.
Multiple StorageBackends are currently supported: AWS S3, Azure Data Lake Storage Gen2, Google Cloud Storage (GCS) and local URI.
Depending on the storage backend used, you could provide options values using the storage_options
parameter.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table_uri |
Union[str, Path, PathLike[str]]
|
the path of the DeltaTable |
required |
version |
Optional[int]
|
version of the DeltaTable |
None
|
storage_options |
Optional[Dict[str, str]]
|
a dictionary of the options to use for the storage backend |
None
|
without_files |
bool
|
If True, will load table without tracking files. Some append-only applications might have no need of tracking any files. So, the DeltaTable will be loaded with a significant memory reduction. |
False
|
log_buffer_size |
Optional[int]
|
Number of files to buffer when reading the commit log. A positive integer. Setting a value greater than 1 results in concurrent calls to the storage api. This can decrease latency if there are many files in the log since the last checkpoint, but will also increase memory usage. Possible rate limits of the storage backend should also be considered for optimal performance. Defaults to 4 * number of cpus. |
None
|
alter
property
Namespace for all table alter related methods.
Returns:
Name | Type | Description |
---|---|---|
TableAlterer |
TableAlterer
|
TableAlterer Object |
optimize
property
Namespace for all table optimize related methods.
Returns:
Name | Type | Description |
---|---|---|
TableOptimizer |
TableOptimizer
|
TableOptimizer Object |
cleanup_metadata
Delete expired log files before current version from table. The table log retention is based on
the delta.logRetentionDuration
value, 30 days by default.
create
classmethod
create(table_uri: Union[str, Path], schema: Union[pyarrow.Schema, DeltaSchema], mode: Literal['error', 'append', 'overwrite', 'ignore'] = 'error', partition_by: Optional[Union[List[str], str]] = None, name: Optional[str] = None, description: Optional[str] = None, configuration: Optional[Mapping[str, Optional[str]]] = None, storage_options: Optional[Dict[str, str]] = None, custom_metadata: Optional[Dict[str, str]] = None, raise_if_key_not_exists: bool = True) -> DeltaTable
CREATE
or CREATE_OR_REPLACE
a delta table given a table_uri.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table_uri |
Union[str, Path]
|
URI of a table |
required |
schema |
Union[Schema, Schema]
|
Table schema |
required |
mode |
Literal['error', 'append', 'overwrite', 'ignore']
|
How to handle existing data. Default is to error if table already exists.
If 'append', returns not support error if table exists.
If 'overwrite', will |
'error'
|
partition_by |
Optional[Union[List[str], str]]
|
List of columns to partition the table by. |
None
|
name |
Optional[str]
|
User-provided identifier for this table. |
None
|
description |
Optional[str]
|
User-provided description for this table. |
None
|
configuration |
Optional[Mapping[str, Optional[str]]]
|
A map containing configuration options for the metadata action. |
None
|
storage_options |
Optional[Dict[str, str]]
|
Options passed to the object store crate. |
None
|
custom_metadata |
Optional[Dict[str, str]]
|
Custom metadata that will be added to the transaction commit. |
None
|
raise_if_key_not_exists |
bool
|
Whether to raise an error if the configuration uses keys that are not Delta keys |
True
|
Returns:
Name | Type | Description |
---|---|---|
DeltaTable |
DeltaTable
|
created delta table |
delete
delete(predicate: Optional[str] = None, writer_properties: Optional[WriterProperties] = None, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, commit_properties: Optional[CommitProperties] = None) -> Dict[str, Any]
Delete records from a Delta Table that statisfy a predicate.
When a predicate is not provided then all records are deleted from the Delta Table. Otherwise a scan of the Delta table is performed to mark any files that contain records that satisfy the predicate. Once files are determined they are rewritten without the records.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
predicate |
Optional[str]
|
a SQL where clause. If not passed, will delete all rows. |
None
|
writer_properties |
Optional[WriterProperties]
|
Pass writer properties to the Rust parquet writer. |
None
|
custom_metadata |
Optional[Dict[str, str]]
|
Deprecated and will be removed in future versions. Use commit_properties instead. |
None
|
post_commithook_properties |
Optional[PostCommitHookProperties]
|
properties for the post commit hook. If None, default values are used. |
None
|
commit_properties |
Optional[CommitProperties]
|
properties of the transaction commit. If None, default values are used. |
None
|
Returns:
Type | Description |
---|---|
Dict[str, Any]
|
the metrics from delete. |
file_uris
Get the list of files as absolute URIs, including the scheme (e.g. "s3://").
Local files will be just plain absolute paths, without a scheme. (That is, no 'file://' prefix.)
Use the partition_filters parameter to retrieve a subset of files that match the given filters.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
partition_filters |
Optional[FilterConjunctionType]
|
the partition filters that will be used for getting the matched files |
None
|
Returns:
Type | Description |
---|---|
List[str]
|
list of the .parquet files with an absolute URI referenced for the current version of the DeltaTable |
Predicates are expressed in disjunctive normal form (DNF), like [("x", "=", "a"), ...].
DNF allows arbitrary boolean logical combinations of single partition predicates.
The innermost tuples each describe a single partition predicate. The list of inner
predicates is interpreted as a conjunction (AND), forming a more selective and
multiple partition predicates. Each tuple has format: (key, op, value) and compares
the key with the value. The supported op are: =
, !=
, in
, and not in
. If
the op is in or not in, the value must be a collection such as a list, a set or a tuple.
The supported type for value is str. Use empty string ''
for Null partition value.
files
Get the .parquet files of the DeltaTable.
The paths are as they are saved in the delta log, which may either be relative to the table root or absolute URIs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
partition_filters |
Optional[List[Tuple[str, str, Any]]]
|
the partition filters that will be used for getting the matched files |
None
|
Returns:
Type | Description |
---|---|
List[str]
|
list of the .parquet files referenced for the current version of the DeltaTable |
Predicates are expressed in disjunctive normal form (DNF), like [("x", "=", "a"), ...].
DNF allows arbitrary boolean logical combinations of single partition predicates.
The innermost tuples each describe a single partition predicate. The list of inner
predicates is interpreted as a conjunction (AND), forming a more selective and
multiple partition predicates. Each tuple has format: (key, op, value) and compares
the key with the value. The supported op are: =
, !=
, in
, and not in
. If
the op is in or not in, the value must be a collection such as a list, a set or a tuple.
The supported type for value is str. Use empty string ''
for Null partition value.
files_by_partitions
Get the files for each partition
from_data_catalog
classmethod
from_data_catalog(data_catalog: DataCatalog, database_name: str, table_name: str, data_catalog_id: Optional[str] = None, version: Optional[int] = None, log_buffer_size: Optional[int] = None) -> DeltaTable
Create the Delta Table from a Data Catalog.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_catalog |
DataCatalog
|
the Catalog to use for getting the storage location of the Delta Table |
required |
database_name |
str
|
the database name inside the Data Catalog |
required |
table_name |
str
|
the table name inside the Data Catalog |
required |
data_catalog_id |
Optional[str]
|
the identifier of the Data Catalog |
None
|
version |
Optional[int]
|
version of the DeltaTable |
None
|
log_buffer_size |
Optional[int]
|
Number of files to buffer when reading the commit log. A positive integer. Setting a value greater than 1 results in concurrent calls to the storage api. This can decrease latency if there are many files in the log since the last checkpoint, but will also increase memory usage. Possible rate limits of the storage backend should also be considered for optimal performance. Defaults to 4 * number of cpus. |
None
|
get_add_actions
Return a dataframe with all current add actions.
Add actions represent the files that currently make up the table. This data is a low-level representation parsed from the transaction log.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flatten |
bool
|
whether to flatten the schema. Partition values columns are
given the prefix |
False
|
Returns:
Type | Description |
---|---|
RecordBatch
|
a PyArrow RecordBatch containing the add action data. |
Example
from pprint import pprint
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa
data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
write_deltalake("tmp", data, partition_by=["x"])
dt = DeltaTable("tmp")
df = dt.get_add_actions().to_pandas()
df["path"].sort_values(ignore_index=True)
0 x=1/0
1 x=2/0
2 x=3/0
history
Run the history command on the DeltaTable. The operations are returned in reverse chronological order.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
limit |
Optional[int]
|
the commit info limit to return |
None
|
Returns:
Type | Description |
---|---|
List[Dict[str, Any]]
|
list of the commit infos registered in the transaction log |
is_deltatable
staticmethod
Returns True if a Delta Table exists at specified path. Returns False otherwise.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table_uri |
str
|
the path of the DeltaTable |
required |
storage_options |
Optional[Dict[str, str]]
|
a dictionary of the options to use for the storage backend |
None
|
load_as_version
Load/time travel a DeltaTable to a specified version number, or a timestamp version of the table. If a string is passed then the argument should be an RFC 3339 and ISO 8601 date and time string format. If a datetime object without a timezone is passed, the UTC timezone will be assumed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
version |
Union[int, str, datetime]
|
the identifier of the version of the DeltaTable to load |
required |
merge
merge(source: Union[pyarrow.Table, pyarrow.RecordBatch, pyarrow.RecordBatchReader, ds.Dataset, pd.DataFrame], predicate: str, source_alias: Optional[str] = None, target_alias: Optional[str] = None, error_on_type_mismatch: bool = True, writer_properties: Optional[WriterProperties] = None, large_dtypes: Optional[bool] = None, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, commit_properties: Optional[CommitProperties] = None) -> TableMerger
Pass the source data which you want to merge on the target delta table, providing a predicate in SQL query like format. You can also specify on what to do when the underlying data types do not match the underlying table.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
Union[Table, RecordBatch, RecordBatchReader, Dataset, DataFrame]
|
source data |
required |
predicate |
str
|
SQL like predicate on how to merge |
required |
source_alias |
Optional[str]
|
Alias for the source table |
None
|
target_alias |
Optional[str]
|
Alias for the target table |
None
|
error_on_type_mismatch |
bool
|
specify if merge will return error if data types are mismatching :default = True |
True
|
writer_properties |
Optional[WriterProperties]
|
Pass writer properties to the Rust parquet writer |
None
|
large_dtypes |
Optional[bool]
|
Deprecated, will be removed in 1.0 |
None
|
arrow_schema_conversion_mode |
Large converts all types of data schema into Large Arrow types, passthrough keeps string/binary/list types untouched |
required | |
custom_metadata |
Optional[Dict[str, str]]
|
Deprecated and will be removed in future versions. Use commit_properties instead. |
None
|
post_commithook_properties |
Optional[PostCommitHookProperties]
|
properties for the post commit hook. If None, default values are used. |
None
|
commit_properties |
Optional[CommitProperties]
|
properties for the commit. If None, default values are used. |
None
|
Returns:
Name | Type | Description |
---|---|---|
TableMerger |
TableMerger
|
TableMerger Object |
metadata
Get the current metadata of the DeltaTable.
Returns:
Type | Description |
---|---|
Metadata
|
the current Metadata registered in the transaction log |
partitions
Returns the partitions as a list of dicts. Example: [{'month': '1', 'year': '2020', 'day': '1'}, ...]
Parameters:
Name | Type | Description | Default |
---|---|---|---|
partition_filters |
Optional[List[Tuple[str, str, Any]]]
|
The partition filters that will be used for getting the matched partitions, defaults to |
None
|
protocol
Get the reader and writer protocol versions of the DeltaTable.
Returns:
Type | Description |
---|---|
ProtocolVersions
|
the current ProtocolVersions registered in the transaction log |
repair
repair(dry_run: bool = False, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, commit_properties: Optional[CommitProperties] = None) -> Dict[str, Any]
Repair the Delta Table by auditing active files that do not exist in the underlying filesystem and removes them. This can be useful when there are accidental deletions or corrupted files.
Active files are ones that have an add action in the log, but no corresponding remove action. This operation creates a new FSCK transaction containing a remove action for each of the missing or corrupted files.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dry_run |
bool
|
when activated, list only the files, otherwise add remove actions to transaction log. Defaults to False. |
False
|
custom_metadata |
Optional[Dict[str, str]]
|
Deprecated and will be removed in future versions. Use commit_properties instead. |
None
|
post_commithook_properties |
Optional[PostCommitHookProperties]
|
properties for the post commit hook. If None, default values are used. |
None
|
commit_properties |
Optional[CommitProperties]
|
properties of the transaction commit. If None, default values are used. |
None
|
Returns:
Type | Description |
---|---|
Dict[str, Any]
|
The metrics from repair (FSCK) action. |
restore
restore(target: Union[int, datetime, str], *, ignore_missing_files: bool = False, protocol_downgrade_allowed: bool = False, custom_metadata: Optional[Dict[str, str]] = None, commit_properties: Optional[CommitProperties] = None) -> Dict[str, Any]
Run the Restore command on the Delta Table: restore table to a given version or datetime.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
target |
Union[int, datetime, str]
|
the expected version will restore, which represented by int, date str or datetime. |
required |
ignore_missing_files |
bool
|
whether the operation carry on when some data files missing. |
False
|
protocol_downgrade_allowed |
bool
|
whether the operation when protocol version upgraded. |
False
|
custom_metadata |
Optional[Dict[str, str]]
|
Deprecated and will be removed in future versions. Use commit_properties instead. |
None
|
commit_properties |
Optional[CommitProperties]
|
properties of the transaction commit. If None, default values are used. |
None
|
Returns:
Type | Description |
---|---|
Dict[str, Any]
|
the metrics from restore. |
schema
Get the current schema of the DeltaTable.
Returns:
Type | Description |
---|---|
Schema
|
the current Schema registered in the transaction log |
to_pandas
to_pandas(partitions: Optional[List[Tuple[str, str, Any]]] = None, columns: Optional[List[str]] = None, filesystem: Optional[Union[str, pa_fs.FileSystem]] = None, filters: Optional[Union[FilterType, Expression]] = None) -> pd.DataFrame
Build a pandas dataframe using data from the DeltaTable.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
partitions |
Optional[List[Tuple[str, str, Any]]]
|
A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax |
None
|
columns |
Optional[List[str]]
|
The columns to project. This can be a list of column names to include (order and duplicates will be preserved) |
None
|
filesystem |
Optional[Union[str, FileSystem]]
|
A concrete implementation of the Pyarrow FileSystem or a fsspec-compatible interface. If None, the first file path will be used to determine the right FileSystem |
None
|
filters |
Optional[Union[FilterType, Expression]]
|
A disjunctive normal form (DNF) predicate for filtering rows, or directly a pyarrow.dataset.Expression. If you pass a filter you do not need to pass |
None
|
to_pyarrow_dataset
to_pyarrow_dataset(partitions: Optional[FilterConjunctionType] = None, filesystem: Optional[Union[str, pa_fs.FileSystem]] = None, parquet_read_options: Optional[ParquetReadOptions] = None, schema: Optional[pyarrow.Schema] = None, as_large_types: bool = False) -> pyarrow.dataset.Dataset
Build a PyArrow Dataset using data from the DeltaTable.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
partitions |
Optional[FilterConjunctionType]
|
A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax |
None
|
filesystem |
Optional[Union[str, FileSystem]]
|
A concrete implementation of the Pyarrow FileSystem or a fsspec-compatible interface. If None, the first file path will be used to determine the right FileSystem |
None
|
parquet_read_options |
Optional[ParquetReadOptions]
|
Optional read options for Parquet. Use this to handle INT96 to timestamp conversion for edge cases like 0001-01-01 or 9999-12-31 |
None
|
schema |
Optional[Schema]
|
The schema to use for the dataset. If None, the schema of the DeltaTable will be used. This can be used to force reading of Parquet/Arrow datatypes
that DeltaLake can't represent in it's schema (e.g. LargeString).
If you only need to read the schema with large types (e.g. for compatibility with Polars) you may want to use the |
None
|
as_large_types |
bool
|
get schema with all variable size types (list, binary, string) as large variants (with int64 indices).
This is for compatibility with systems like Polars that only support the large versions of Arrow types.
If |
False
|
More info: https://arrow.apache.org/docs/python/generated/pyarrow.dataset.ParquetReadOptions.html
Example
deltalake
will work with any storage compliant with :class:pyarrow.fs.FileSystem
, however the root of the filesystem has
to be adjusted to point at the root of the Delta table. We can achieve this by wrapping the custom filesystem into
a :class:pyarrow.fs.SubTreeFileSystem
.
Returns:
Type | Description |
---|---|
Dataset
|
the PyArrow dataset in PyArrow |
to_pyarrow_table
to_pyarrow_table(partitions: Optional[List[Tuple[str, str, Any]]] = None, columns: Optional[List[str]] = None, filesystem: Optional[Union[str, pa_fs.FileSystem]] = None, filters: Optional[Union[FilterType, Expression]] = None) -> pyarrow.Table
Build a PyArrow Table using data from the DeltaTable.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
partitions |
Optional[List[Tuple[str, str, Any]]]
|
A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax |
None
|
columns |
Optional[List[str]]
|
The columns to project. This can be a list of column names to include (order and duplicates will be preserved) |
None
|
filesystem |
Optional[Union[str, FileSystem]]
|
A concrete implementation of the Pyarrow FileSystem or a fsspec-compatible interface. If None, the first file path will be used to determine the right FileSystem |
None
|
filters |
Optional[Union[FilterType, Expression]]
|
A disjunctive normal form (DNF) predicate for filtering rows, or directly a pyarrow.dataset.Expression. If you pass a filter you do not need to pass |
None
|
update
update(updates: Optional[Dict[str, str]] = None, new_values: Optional[Dict[str, Union[int, float, str, datetime, bool, List[Any]]]] = None, predicate: Optional[str] = None, writer_properties: Optional[WriterProperties] = None, error_on_type_mismatch: bool = True, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, commit_properties: Optional[CommitProperties] = None) -> Dict[str, Any]
UPDATE
records in the Delta Table that matches an optional predicate. Either updates or new_values needs
to be passed for it to execute.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
updates |
Optional[Dict[str, str]]
|
a mapping of column name to update SQL expression. |
None
|
new_values |
Optional[Dict[str, Union[int, float, str, datetime, bool, List[Any]]]]
|
a mapping of column name to python datatype. |
None
|
predicate |
Optional[str]
|
a logical expression. |
None
|
writer_properties |
Optional[WriterProperties]
|
Pass writer properties to the Rust parquet writer. |
None
|
error_on_type_mismatch |
bool
|
specify if update will return error if data types are mismatching :default = True |
True
|
custom_metadata |
Optional[Dict[str, str]]
|
Deprecated and will be removed in future versions. Use commit_properties instead. |
None
|
post_commithook_properties |
Optional[PostCommitHookProperties]
|
properties for the post commit hook. If None, default values are used. |
None
|
commit_properties |
Optional[CommitProperties]
|
properties of the transaction commit. If None, default values are used. |
None
|
Returns: the metrics from update
Example
Update some row values with SQL predicate
This is equivalent to UPDATE table SET deleted = true WHERE id = '3'
from deltalake import write_deltalake, DeltaTable
import pandas as pd
df = pd.DataFrame(
{"id": ["1", "2", "3"],
"deleted": [False, False, False],
"price": [10., 15., 20.]
})
write_deltalake("tmp", df)
dt = DeltaTable("tmp")
dt.update(predicate="id = '3'", updates = {"deleted": 'True'})
{'num_added_files': 1, 'num_removed_files': 1, 'num_updated_rows': 1, 'num_copied_rows': 2, 'execution_time_ms': ..., 'scan_time_ms': ...}
Update all row values
This is equivalent to UPDATE table SET deleted = true, id = concat(id, '_old')
.
dt.update(updates = {"deleted": 'True', "id": "concat(id, '_old')"})
{'num_added_files': 1, 'num_removed_files': 1, 'num_updated_rows': 3, 'num_copied_rows': 0, 'execution_time_ms': ..., 'scan_time_ms': ...}
Use Python objects instead of SQL strings
Use the new_values
parameter instead of the updates
parameter. For example,
this is equivalent to UPDATE table SET price = 150.10 WHERE id = '1'
update_incremental
Updates the DeltaTable to the latest version by incrementally applying newer versions.
vacuum
vacuum(retention_hours: Optional[int] = None, dry_run: bool = True, enforce_retention_duration: bool = True, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, commit_properties: Optional[CommitProperties] = None) -> List[str]
Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
retention_hours |
Optional[int]
|
the retention threshold in hours, if none then the value from |
None
|
dry_run |
bool
|
when activated, list only the files, delete otherwise |
True
|
enforce_retention_duration |
bool
|
when disabled, accepts retention hours smaller than the value from |
True
|
custom_metadata |
Optional[Dict[str, str]]
|
Deprecated and will be removed in future versions. Use commit_properties instead. |
None
|
post_commithook_properties |
Optional[PostCommitHookProperties]
|
properties for the post commit hook. If None, default values are used. |
None
|
commit_properties |
Optional[CommitProperties]
|
properties of the transaction commit. If None, default values are used. |
None
|
Returns: the list of files no longer referenced by the Delta Table and are older than the retention threshold.