Skip to content

DeltaTable

deltalake.DeltaTable dataclass

DeltaTable(table_uri: Union[str, Path, os.PathLike[str]], version: Optional[int] = None, storage_options: Optional[Dict[str, str]] = None, without_files: bool = False, log_buffer_size: Optional[int] = None)

Represents a Delta Table

Create the Delta Table from a path with an optional version. Multiple StorageBackends are currently supported: AWS S3, Azure Data Lake Storage Gen2, Google Cloud Storage (GCS) and local URI. Depending on the storage backend used, you could provide options values using the storage_options parameter.

Parameters:

Name Type Description Default
table_uri Union[str, Path, PathLike[str]]

the path of the DeltaTable

required
version Optional[int]

version of the DeltaTable

None
storage_options Optional[Dict[str, str]]

a dictionary of the options to use for the storage backend

None
without_files bool

If True, will load table without tracking files. Some append-only applications might have no need of tracking any files. So, the DeltaTable will be loaded with a significant memory reduction.

False
log_buffer_size Optional[int]

Number of files to buffer when reading the commit log. A positive integer. Setting a value greater than 1 results in concurrent calls to the storage api. This can decrease latency if there are many files in the log since the last checkpoint, but will also increase memory usage. Possible rate limits of the storage backend should also be considered for optimal performance. Defaults to 4 * number of cpus.

None

alter property

alter: TableAlterer

Namespace for all table alter related methods.

Returns:

Name Type Description
TableAlterer TableAlterer

TableAlterer Object

optimize property

optimize: TableOptimizer

Namespace for all table optimize related methods.

Returns:

Name Type Description
TableOptimizer TableOptimizer

TableOptimizer Object

cleanup_metadata

cleanup_metadata() -> None

Delete expired log files before current version from table. The table log retention is based on the delta.logRetentionDuration value, 30 days by default.

create classmethod

create(table_uri: Union[str, Path], schema: Union[pyarrow.Schema, DeltaSchema], mode: Literal['error', 'append', 'overwrite', 'ignore'] = 'error', partition_by: Optional[Union[List[str], str]] = None, name: Optional[str] = None, description: Optional[str] = None, configuration: Optional[Mapping[str, Optional[str]]] = None, storage_options: Optional[Dict[str, str]] = None, custom_metadata: Optional[Dict[str, str]] = None, raise_if_key_not_exists: bool = True) -> DeltaTable

CREATE or CREATE_OR_REPLACE a delta table given a table_uri.

Parameters:

Name Type Description Default
table_uri Union[str, Path]

URI of a table

required
schema Union[Schema, Schema]

Table schema

required
mode Literal['error', 'append', 'overwrite', 'ignore']

How to handle existing data. Default is to error if table already exists. If 'append', returns not support error if table exists. If 'overwrite', will CREATE_OR_REPLACE table. If 'ignore', will not do anything if table already exists. Defaults to "error".

'error'
partition_by Optional[Union[List[str], str]]

List of columns to partition the table by.

None
name Optional[str]

User-provided identifier for this table.

None
description Optional[str]

User-provided description for this table.

None
configuration Optional[Mapping[str, Optional[str]]]

A map containing configuration options for the metadata action.

None
storage_options Optional[Dict[str, str]]

Options passed to the object store crate.

None
custom_metadata Optional[Dict[str, str]]

Custom metadata that will be added to the transaction commit.

None
raise_if_key_not_exists bool

Whether to raise an error if the configuration uses keys that are not Delta keys

True

Returns:

Name Type Description
DeltaTable DeltaTable

created delta table

Example
import pyarrow as pa

from deltalake import DeltaTable

dt = DeltaTable.create(
    table_uri="my_local_table",
    schema=pa.schema(
        [pa.field("foo", pa.string()), pa.field("bar", pa.string())]
    ),
    mode="error",
    partition_by="bar",
)

delete

delete(predicate: Optional[str] = None, writer_properties: Optional[WriterProperties] = None, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, commit_properties: Optional[CommitProperties] = None) -> Dict[str, Any]

Delete records from a Delta Table that statisfy a predicate.

When a predicate is not provided then all records are deleted from the Delta Table. Otherwise a scan of the Delta table is performed to mark any files that contain records that satisfy the predicate. Once files are determined they are rewritten without the records.

Parameters:

Name Type Description Default
predicate Optional[str]

a SQL where clause. If not passed, will delete all rows.

None
writer_properties Optional[WriterProperties]

Pass writer properties to the Rust parquet writer.

None
custom_metadata Optional[Dict[str, str]]

Deprecated and will be removed in future versions. Use commit_properties instead.

None
post_commithook_properties Optional[PostCommitHookProperties]

properties for the post commit hook. If None, default values are used.

None
commit_properties Optional[CommitProperties]

properties of the transaction commit. If None, default values are used.

None

Returns:

Type Description
Dict[str, Any]

the metrics from delete.

file_uris

file_uris(partition_filters: Optional[FilterConjunctionType] = None) -> List[str]

Get the list of files as absolute URIs, including the scheme (e.g. "s3://").

Local files will be just plain absolute paths, without a scheme. (That is, no 'file://' prefix.)

Use the partition_filters parameter to retrieve a subset of files that match the given filters.

Parameters:

Name Type Description Default
partition_filters Optional[FilterConjunctionType]

the partition filters that will be used for getting the matched files

None

Returns:

Type Description
List[str]

list of the .parquet files with an absolute URI referenced for the current version of the DeltaTable

Predicates are expressed in disjunctive normal form (DNF), like [("x", "=", "a"), ...]. DNF allows arbitrary boolean logical combinations of single partition predicates. The innermost tuples each describe a single partition predicate. The list of inner predicates is interpreted as a conjunction (AND), forming a more selective and multiple partition predicates. Each tuple has format: (key, op, value) and compares the key with the value. The supported op are: =, !=, in, and not in. If the op is in or not in, the value must be a collection such as a list, a set or a tuple. The supported type for value is str. Use empty string '' for Null partition value.

Example
("x", "=", "a")
("x", "!=", "a")
("y", "in", ["a", "b", "c"])
("z", "not in", ["a","b"])

files

files(partition_filters: Optional[List[Tuple[str, str, Any]]] = None) -> List[str]

Get the .parquet files of the DeltaTable.

The paths are as they are saved in the delta log, which may either be relative to the table root or absolute URIs.

Parameters:

Name Type Description Default
partition_filters Optional[List[Tuple[str, str, Any]]]

the partition filters that will be used for getting the matched files

None

Returns:

Type Description
List[str]

list of the .parquet files referenced for the current version of the DeltaTable

Predicates are expressed in disjunctive normal form (DNF), like [("x", "=", "a"), ...]. DNF allows arbitrary boolean logical combinations of single partition predicates. The innermost tuples each describe a single partition predicate. The list of inner predicates is interpreted as a conjunction (AND), forming a more selective and multiple partition predicates. Each tuple has format: (key, op, value) and compares the key with the value. The supported op are: =, !=, in, and not in. If the op is in or not in, the value must be a collection such as a list, a set or a tuple. The supported type for value is str. Use empty string '' for Null partition value.

Example
("x", "=", "a")
("x", "!=", "a")
("y", "in", ["a", "b", "c"])
("z", "not in", ["a","b"])

files_by_partitions

files_by_partitions(partition_filters: PartitionFilterType) -> List[str]

Get the files for each partition

from_data_catalog classmethod

from_data_catalog(data_catalog: DataCatalog, database_name: str, table_name: str, data_catalog_id: Optional[str] = None, version: Optional[int] = None, log_buffer_size: Optional[int] = None) -> DeltaTable

Create the Delta Table from a Data Catalog.

Parameters:

Name Type Description Default
data_catalog DataCatalog

the Catalog to use for getting the storage location of the Delta Table

required
database_name str

the database name inside the Data Catalog

required
table_name str

the table name inside the Data Catalog

required
data_catalog_id Optional[str]

the identifier of the Data Catalog

None
version Optional[int]

version of the DeltaTable

None
log_buffer_size Optional[int]

Number of files to buffer when reading the commit log. A positive integer. Setting a value greater than 1 results in concurrent calls to the storage api. This can decrease latency if there are many files in the log since the last checkpoint, but will also increase memory usage. Possible rate limits of the storage backend should also be considered for optimal performance. Defaults to 4 * number of cpus.

None

get_add_actions

get_add_actions(flatten: bool = False) -> pyarrow.RecordBatch

Return a dataframe with all current add actions.

Add actions represent the files that currently make up the table. This data is a low-level representation parsed from the transaction log.

Parameters:

Name Type Description Default
flatten bool

whether to flatten the schema. Partition values columns are given the prefix partition., statistics (null_count, min, and max) are given the prefix null_count., min., and max., and tags the prefix tags.. Nested field names are concatenated with ..

False

Returns:

Type Description
RecordBatch

a PyArrow RecordBatch containing the add action data.

Example
from pprint import pprint
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa
data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
write_deltalake("tmp", data, partition_by=["x"])
dt = DeltaTable("tmp")
df = dt.get_add_actions().to_pandas()
df["path"].sort_values(ignore_index=True)
0    x=1/0
1    x=2/0
2    x=3/0
df = dt.get_add_actions(flatten=True).to_pandas()
df["partition.x"].sort_values(ignore_index=True)
0    1
1    2
2    3

history

history(limit: Optional[int] = None) -> List[Dict[str, Any]]

Run the history command on the DeltaTable. The operations are returned in reverse chronological order.

Parameters:

Name Type Description Default
limit Optional[int]

the commit info limit to return

None

Returns:

Type Description
List[Dict[str, Any]]

list of the commit infos registered in the transaction log

is_deltatable staticmethod

is_deltatable(table_uri: str, storage_options: Optional[Dict[str, str]] = None) -> bool

Returns True if a Delta Table exists at specified path. Returns False otherwise.

Parameters:

Name Type Description Default
table_uri str

the path of the DeltaTable

required
storage_options Optional[Dict[str, str]]

a dictionary of the options to use for the storage backend

None

load_as_version

load_as_version(version: Union[int, str, datetime]) -> None

Load/time travel a DeltaTable to a specified version number, or a timestamp version of the table. If a string is passed then the argument should be an RFC 3339 and ISO 8601 date and time string format. If a datetime object without a timezone is passed, the UTC timezone will be assumed.

Parameters:

Name Type Description Default
version Union[int, str, datetime]

the identifier of the version of the DeltaTable to load

required
Example

Use a version number

dt = DeltaTable("test_table")
dt.load_as_version(1)

Use a datetime object

dt.load_as_version(datetime(2023, 1, 1))
dt.load_as_version(datetime(2023, 1, 1, tzinfo=timezone.utc))

Use a datetime in string format

dt.load_as_version("2018-01-26T18:30:09Z")
dt.load_as_version("2018-12-19T16:39:57-08:00")
dt.load_as_version("2018-01-26T18:30:09.453+00:00")

merge

merge(source: Union[pyarrow.Table, pyarrow.RecordBatch, pyarrow.RecordBatchReader, ds.Dataset, pd.DataFrame], predicate: str, source_alias: Optional[str] = None, target_alias: Optional[str] = None, error_on_type_mismatch: bool = True, writer_properties: Optional[WriterProperties] = None, large_dtypes: Optional[bool] = None, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, commit_properties: Optional[CommitProperties] = None) -> TableMerger

Pass the source data which you want to merge on the target delta table, providing a predicate in SQL query like format. You can also specify on what to do when the underlying data types do not match the underlying table.

Parameters:

Name Type Description Default
source Union[Table, RecordBatch, RecordBatchReader, Dataset, DataFrame]

source data

required
predicate str

SQL like predicate on how to merge

required
source_alias Optional[str]

Alias for the source table

None
target_alias Optional[str]

Alias for the target table

None
error_on_type_mismatch bool

specify if merge will return error if data types are mismatching :default = True

True
writer_properties Optional[WriterProperties]

Pass writer properties to the Rust parquet writer

None
large_dtypes Optional[bool]

Deprecated, will be removed in 1.0

None
arrow_schema_conversion_mode

Large converts all types of data schema into Large Arrow types, passthrough keeps string/binary/list types untouched

required
custom_metadata Optional[Dict[str, str]]

Deprecated and will be removed in future versions. Use commit_properties instead.

None
post_commithook_properties Optional[PostCommitHookProperties]

properties for the post commit hook. If None, default values are used.

None
commit_properties Optional[CommitProperties]

properties for the commit. If None, default values are used.

None

Returns:

Name Type Description
TableMerger TableMerger

TableMerger Object

metadata

metadata() -> Metadata

Get the current metadata of the DeltaTable.

Returns:

Type Description
Metadata

the current Metadata registered in the transaction log

partitions

partitions(partition_filters: Optional[List[Tuple[str, str, Any]]] = None) -> List[Dict[str, str]]

Returns the partitions as a list of dicts. Example: [{'month': '1', 'year': '2020', 'day': '1'}, ...]

Parameters:

Name Type Description Default
partition_filters Optional[List[Tuple[str, str, Any]]]

The partition filters that will be used for getting the matched partitions, defaults to None (no filtering).

None

protocol

protocol() -> ProtocolVersions

Get the reader and writer protocol versions of the DeltaTable.

Returns:

Type Description
ProtocolVersions

the current ProtocolVersions registered in the transaction log

repair

repair(dry_run: bool = False, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, commit_properties: Optional[CommitProperties] = None) -> Dict[str, Any]

Repair the Delta Table by auditing active files that do not exist in the underlying filesystem and removes them. This can be useful when there are accidental deletions or corrupted files.

Active files are ones that have an add action in the log, but no corresponding remove action. This operation creates a new FSCK transaction containing a remove action for each of the missing or corrupted files.

Parameters:

Name Type Description Default
dry_run bool

when activated, list only the files, otherwise add remove actions to transaction log. Defaults to False.

False
custom_metadata Optional[Dict[str, str]]

Deprecated and will be removed in future versions. Use commit_properties instead.

None
post_commithook_properties Optional[PostCommitHookProperties]

properties for the post commit hook. If None, default values are used.

None
commit_properties Optional[CommitProperties]

properties of the transaction commit. If None, default values are used.

None

Returns:

Type Description
Dict[str, Any]

The metrics from repair (FSCK) action.

Example

from deltalake import DeltaTable
dt = DeltaTable('TEST')
dt.repair(dry_run=False)
Results in
{'dry_run': False, 'files_removed': ['6-0d084325-6885-4847-b008-82c1cf30674c-0.parquet', 5-4fba1d3e-3e20-4de1-933d-a8e13ac59f53-0.parquet']}

restore

restore(target: Union[int, datetime, str], *, ignore_missing_files: bool = False, protocol_downgrade_allowed: bool = False, custom_metadata: Optional[Dict[str, str]] = None, commit_properties: Optional[CommitProperties] = None) -> Dict[str, Any]

Run the Restore command on the Delta Table: restore table to a given version or datetime.

Parameters:

Name Type Description Default
target Union[int, datetime, str]

the expected version will restore, which represented by int, date str or datetime.

required
ignore_missing_files bool

whether the operation carry on when some data files missing.

False
protocol_downgrade_allowed bool

whether the operation when protocol version upgraded.

False
custom_metadata Optional[Dict[str, str]]

Deprecated and will be removed in future versions. Use commit_properties instead.

None
commit_properties Optional[CommitProperties]

properties of the transaction commit. If None, default values are used.

None

Returns:

Type Description
Dict[str, Any]

the metrics from restore.

schema

schema() -> DeltaSchema

Get the current schema of the DeltaTable.

Returns:

Type Description
Schema

the current Schema registered in the transaction log

to_pandas

to_pandas(partitions: Optional[List[Tuple[str, str, Any]]] = None, columns: Optional[List[str]] = None, filesystem: Optional[Union[str, pa_fs.FileSystem]] = None, filters: Optional[Union[FilterType, Expression]] = None) -> pd.DataFrame

Build a pandas dataframe using data from the DeltaTable.

Parameters:

Name Type Description Default
partitions Optional[List[Tuple[str, str, Any]]]

A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax

None
columns Optional[List[str]]

The columns to project. This can be a list of column names to include (order and duplicates will be preserved)

None
filesystem Optional[Union[str, FileSystem]]

A concrete implementation of the Pyarrow FileSystem or a fsspec-compatible interface. If None, the first file path will be used to determine the right FileSystem

None
filters Optional[Union[FilterType, Expression]]

A disjunctive normal form (DNF) predicate for filtering rows, or directly a pyarrow.dataset.Expression. If you pass a filter you do not need to pass partitions

None

to_pyarrow_dataset

to_pyarrow_dataset(partitions: Optional[FilterConjunctionType] = None, filesystem: Optional[Union[str, pa_fs.FileSystem]] = None, parquet_read_options: Optional[ParquetReadOptions] = None, schema: Optional[pyarrow.Schema] = None, as_large_types: bool = False) -> pyarrow.dataset.Dataset

Build a PyArrow Dataset using data from the DeltaTable.

Parameters:

Name Type Description Default
partitions Optional[FilterConjunctionType]

A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax

None
filesystem Optional[Union[str, FileSystem]]

A concrete implementation of the Pyarrow FileSystem or a fsspec-compatible interface. If None, the first file path will be used to determine the right FileSystem

None
parquet_read_options Optional[ParquetReadOptions]

Optional read options for Parquet. Use this to handle INT96 to timestamp conversion for edge cases like 0001-01-01 or 9999-12-31

None
schema Optional[Schema]

The schema to use for the dataset. If None, the schema of the DeltaTable will be used. This can be used to force reading of Parquet/Arrow datatypes that DeltaLake can't represent in it's schema (e.g. LargeString). If you only need to read the schema with large types (e.g. for compatibility with Polars) you may want to use the as_large_types parameter instead.

None
as_large_types bool

get schema with all variable size types (list, binary, string) as large variants (with int64 indices). This is for compatibility with systems like Polars that only support the large versions of Arrow types. If schema is passed it takes precedence over this option.

False

More info: https://arrow.apache.org/docs/python/generated/pyarrow.dataset.ParquetReadOptions.html

Example

deltalake will work with any storage compliant with :class:pyarrow.fs.FileSystem, however the root of the filesystem has to be adjusted to point at the root of the Delta table. We can achieve this by wrapping the custom filesystem into a :class:pyarrow.fs.SubTreeFileSystem.

import pyarrow.fs as fs
from deltalake import DeltaTable

table_uri = "s3://<bucket>/<path>"
raw_fs, normalized_path = fs.FileSystem.from_uri(table_uri)
filesystem = fs.SubTreeFileSystem(normalized_path, raw_fs)

dt = DeltaTable(table_uri)
ds = dt.to_pyarrow_dataset(filesystem=filesystem)

Returns:

Type Description
Dataset

the PyArrow dataset in PyArrow

to_pyarrow_table

to_pyarrow_table(partitions: Optional[List[Tuple[str, str, Any]]] = None, columns: Optional[List[str]] = None, filesystem: Optional[Union[str, pa_fs.FileSystem]] = None, filters: Optional[Union[FilterType, Expression]] = None) -> pyarrow.Table

Build a PyArrow Table using data from the DeltaTable.

Parameters:

Name Type Description Default
partitions Optional[List[Tuple[str, str, Any]]]

A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax

None
columns Optional[List[str]]

The columns to project. This can be a list of column names to include (order and duplicates will be preserved)

None
filesystem Optional[Union[str, FileSystem]]

A concrete implementation of the Pyarrow FileSystem or a fsspec-compatible interface. If None, the first file path will be used to determine the right FileSystem

None
filters Optional[Union[FilterType, Expression]]

A disjunctive normal form (DNF) predicate for filtering rows, or directly a pyarrow.dataset.Expression. If you pass a filter you do not need to pass partitions

None

update

update(updates: Optional[Dict[str, str]] = None, new_values: Optional[Dict[str, Union[int, float, str, datetime, bool, List[Any]]]] = None, predicate: Optional[str] = None, writer_properties: Optional[WriterProperties] = None, error_on_type_mismatch: bool = True, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, commit_properties: Optional[CommitProperties] = None) -> Dict[str, Any]

UPDATE records in the Delta Table that matches an optional predicate. Either updates or new_values needs to be passed for it to execute.

Parameters:

Name Type Description Default
updates Optional[Dict[str, str]]

a mapping of column name to update SQL expression.

None
new_values Optional[Dict[str, Union[int, float, str, datetime, bool, List[Any]]]]

a mapping of column name to python datatype.

None
predicate Optional[str]

a logical expression.

None
writer_properties Optional[WriterProperties]

Pass writer properties to the Rust parquet writer.

None
error_on_type_mismatch bool

specify if update will return error if data types are mismatching :default = True

True
custom_metadata Optional[Dict[str, str]]

Deprecated and will be removed in future versions. Use commit_properties instead.

None
post_commithook_properties Optional[PostCommitHookProperties]

properties for the post commit hook. If None, default values are used.

None
commit_properties Optional[CommitProperties]

properties of the transaction commit. If None, default values are used.

None

Returns: the metrics from update

Example

Update some row values with SQL predicate

This is equivalent to UPDATE table SET deleted = true WHERE id = '3'

from deltalake import write_deltalake, DeltaTable
import pandas as pd
df = pd.DataFrame(
    {"id": ["1", "2", "3"],
    "deleted": [False, False, False],
    "price": [10., 15., 20.]
    })
write_deltalake("tmp", df)
dt = DeltaTable("tmp")
dt.update(predicate="id = '3'", updates = {"deleted": 'True'})

{'num_added_files': 1, 'num_removed_files': 1, 'num_updated_rows': 1, 'num_copied_rows': 2, 'execution_time_ms': ..., 'scan_time_ms': ...}

Update all row values

This is equivalent to UPDATE table SET deleted = true, id = concat(id, '_old').

dt.update(updates = {"deleted": 'True', "id": "concat(id, '_old')"})

{'num_added_files': 1, 'num_removed_files': 1, 'num_updated_rows': 3, 'num_copied_rows': 0, 'execution_time_ms': ..., 'scan_time_ms': ...}

Use Python objects instead of SQL strings

Use the new_values parameter instead of the updates parameter. For example, this is equivalent to UPDATE table SET price = 150.10 WHERE id = '1'

dt.update(predicate="id = '1_old'", new_values = {"price": 150.10})

{'num_added_files': 1, 'num_removed_files': 1, 'num_updated_rows': 1, 'num_copied_rows': 2, 'execution_time_ms': ..., 'scan_time_ms': ...}

update_incremental

update_incremental() -> None

Updates the DeltaTable to the latest version by incrementally applying newer versions.

vacuum

vacuum(retention_hours: Optional[int] = None, dry_run: bool = True, enforce_retention_duration: bool = True, custom_metadata: Optional[Dict[str, str]] = None, post_commithook_properties: Optional[PostCommitHookProperties] = None, commit_properties: Optional[CommitProperties] = None) -> List[str]

Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold.

Parameters:

Name Type Description Default
retention_hours Optional[int]

the retention threshold in hours, if none then the value from delta.deletedFileRetentionDuration is used or default of 1 week otherwise.

None
dry_run bool

when activated, list only the files, delete otherwise

True
enforce_retention_duration bool

when disabled, accepts retention hours smaller than the value from delta.deletedFileRetentionDuration.

True
custom_metadata Optional[Dict[str, str]]

Deprecated and will be removed in future versions. Use commit_properties instead.

None
post_commithook_properties Optional[PostCommitHookProperties]

properties for the post commit hook. If None, default values are used.

None
commit_properties Optional[CommitProperties]

properties of the transaction commit. If None, default values are used.

None

Returns: the list of files no longer referenced by the Delta Table and are older than the retention threshold.

version

version() -> int

Get the version of the DeltaTable.

Returns:

Type Description
int

The current version of the DeltaTable