Skip to content

DeltaTable

deltalake.DeltaTable dataclass

DeltaTable(table_uri: Union[str, Path, os.PathLike[str]], version: Optional[int] = None, storage_options: Optional[Dict[str, str]] = None, without_files: bool = False, log_buffer_size: Optional[int] = None)

Represents a Delta Table

Create the Delta Table from a path with an optional version. Multiple StorageBackends are currently supported: AWS S3, Azure Data Lake Storage Gen2, Google Cloud Storage (GCS) and local URI. Depending on the storage backend used, you could provide options values using the storage_options parameter.

Parameters:

Name Type Description Default
table_uri Union[str, Path, PathLike[str]]

the path of the DeltaTable

required
version Optional[int]

version of the DeltaTable

None
storage_options Optional[Dict[str, str]]

a dictionary of the options to use for the storage backend

None
without_files bool

If True, will load table without tracking files. Some append-only applications might have no need of tracking any files. So, the DeltaTable will be loaded with a significant memory reduction.

False
log_buffer_size Optional[int]

Number of files to buffer when reading the commit log. A positive integer. Setting a value greater than 1 results in concurrent calls to the storage api. This can decrease latency if there are many files in the log since the last checkpoint, but will also increase memory usage. Possible rate limits of the storage backend should also be considered for optimal performance. Defaults to 4 * number of cpus.

None

alter property

alter: TableAlterer

Namespace for all table alter related methods.

Returns:

Name Type Description
TableAlterer TableAlterer

TableAlterer Object

optimize property

optimize: TableOptimizer

Namespace for all table optimize related methods.

Returns:

Name Type Description
TableOptimizer TableOptimizer

TableOptimizer Object

cleanup_metadata

cleanup_metadata() -> None

Delete expired log files before current version from table. The table log retention is based on the configuration.logRetentionDuration value, 30 days by default.

create classmethod

create(table_uri: Union[str, Path], schema: Union[pyarrow.Schema, DeltaSchema], mode: Literal['error', 'append', 'overwrite', 'ignore'] = 'error', partition_by: Optional[Union[List[str], str]] = None, name: Optional[str] = None, description: Optional[str] = None, configuration: Optional[Mapping[str, Optional[str]]] = None, storage_options: Optional[Dict[str, str]] = None, custom_metadata: Optional[Dict[str, str]] = None) -> DeltaTable

CREATE or CREATE_OR_REPLACE a delta table given a table_uri.

Parameters:

Name Type Description Default
table_uri Union[str, Path]

URI of a table

required
schema Union[Schema, Schema]

Table schema

required
mode Literal['error', 'append', 'overwrite', 'ignore']

How to handle existing data. Default is to error if table already exists. If 'append', returns not support error if table exists. If 'overwrite', will CREATE_OR_REPLACE table. If 'ignore', will not do anything if table already exists. Defaults to "error".

'error'
partition_by Optional[Union[List[str], str]]

List of columns to partition the table by.

None
name Optional[str]

User-provided identifier for this table.

None
description Optional[str]

User-provided description for this table.

None
configuration Optional[Mapping[str, Optional[str]]]

A map containing configuration options for the metadata action.

None
storage_options Optional[Dict[str, str]]

options passed to the object store crate.

None
custom_metadata Optional[Dict[str, str]]

custom metadata that will be added to the transaction commit.

None

Returns:

Name Type Description
DeltaTable DeltaTable

created delta table

Example
import pyarrow as pa

from deltalake import DeltaTable

dt = DeltaTable.create(
    table_uri="my_local_table",
    schema=pa.schema(
        [pa.field("foo", pa.string()), pa.field("bar", pa.string())]
    ),
    mode="error",
    partition_by="bar",
)

delete

delete(predicate: Optional[str] = None, writer_properties: Optional[WriterProperties] = None, custom_metadata: Optional[Dict[str, str]] = None) -> Dict[str, Any]

Delete records from a Delta Table that statisfy a predicate.

When a predicate is not provided then all records are deleted from the Delta Table. Otherwise a scan of the Delta table is performed to mark any files that contain records that satisfy the predicate. Once files are determined they are rewritten without the records.

Parameters:

Name Type Description Default
predicate Optional[str]

a SQL where clause. If not passed, will delete all rows.

None
writer_properties Optional[WriterProperties]

Pass writer properties to the Rust parquet writer.

None
custom_metadata Optional[Dict[str, str]]

custom metadata that will be added to the transaction commit.

None

Returns:

Type Description
Dict[str, Any]

the metrics from delete.

file_uris

file_uris(partition_filters: Optional[List[Tuple[str, str, Any]]] = None) -> List[str]

Get the list of files as absolute URIs, including the scheme (e.g. "s3://").

Local files will be just plain absolute paths, without a scheme. (That is, no 'file://' prefix.)

Use the partition_filters parameter to retrieve a subset of files that match the given filters.

Parameters:

Name Type Description Default
partition_filters Optional[List[Tuple[str, str, Any]]]

the partition filters that will be used for getting the matched files

None

Returns:

Type Description
List[str]

list of the .parquet files with an absolute URI referenced for the current version of the DeltaTable

Predicates are expressed in disjunctive normal form (DNF), like [("x", "=", "a"), ...]. DNF allows arbitrary boolean logical combinations of single partition predicates. The innermost tuples each describe a single partition predicate. The list of inner predicates is interpreted as a conjunction (AND), forming a more selective and multiple partition predicates. Each tuple has format: (key, op, value) and compares the key with the value. The supported op are: =, !=, in, and not in. If the op is in or not in, the value must be a collection such as a list, a set or a tuple. The supported type for value is str. Use empty string '' for Null partition value.

Example
("x", "=", "a")
("x", "!=", "a")
("y", "in", ["a", "b", "c"])
("z", "not in", ["a","b"])

files

files(partition_filters: Optional[List[Tuple[str, str, Any]]] = None) -> List[str]

Get the .parquet files of the DeltaTable.

The paths are as they are saved in the delta log, which may either be relative to the table root or absolute URIs.

Parameters:

Name Type Description Default
partition_filters Optional[List[Tuple[str, str, Any]]]

the partition filters that will be used for getting the matched files

None

Returns:

Type Description
List[str]

list of the .parquet files referenced for the current version of the DeltaTable

Predicates are expressed in disjunctive normal form (DNF), like [("x", "=", "a"), ...]. DNF allows arbitrary boolean logical combinations of single partition predicates. The innermost tuples each describe a single partition predicate. The list of inner predicates is interpreted as a conjunction (AND), forming a more selective and multiple partition predicates. Each tuple has format: (key, op, value) and compares the key with the value. The supported op are: =, !=, in, and not in. If the op is in or not in, the value must be a collection such as a list, a set or a tuple. The supported type for value is str. Use empty string '' for Null partition value.

Example
("x", "=", "a")
("x", "!=", "a")
("y", "in", ["a", "b", "c"])
("z", "not in", ["a","b"])

from_data_catalog classmethod

from_data_catalog(data_catalog: DataCatalog, database_name: str, table_name: str, data_catalog_id: Optional[str] = None, version: Optional[int] = None, log_buffer_size: Optional[int] = None) -> DeltaTable

Create the Delta Table from a Data Catalog.

Parameters:

Name Type Description Default
data_catalog DataCatalog

the Catalog to use for getting the storage location of the Delta Table

required
database_name str

the database name inside the Data Catalog

required
table_name str

the table name inside the Data Catalog

required
data_catalog_id Optional[str]

the identifier of the Data Catalog

None
version Optional[int]

version of the DeltaTable

None
log_buffer_size Optional[int]

Number of files to buffer when reading the commit log. A positive integer. Setting a value greater than 1 results in concurrent calls to the storage api. This can decrease latency if there are many files in the log since the last checkpoint, but will also increase memory usage. Possible rate limits of the storage backend should also be considered for optimal performance. Defaults to 4 * number of cpus.

None

get_add_actions

get_add_actions(flatten: bool = False) -> pyarrow.RecordBatch

Return a dataframe with all current add actions.

Add actions represent the files that currently make up the table. This data is a low-level representation parsed from the transaction log.

Parameters:

Name Type Description Default
flatten bool

whether to flatten the schema. Partition values columns are given the prefix partition., statistics (null_count, min, and max) are given the prefix null_count., min., and max., and tags the prefix tags.. Nested field names are concatenated with ..

False

Returns:

Type Description
RecordBatch

a PyArrow RecordBatch containing the add action data.

Example
from pprint import pprint
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa
data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
write_deltalake("tmp", data, partition_by=["x"])
dt = DeltaTable("tmp")
df = dt.get_add_actions().to_pandas()
df["path"].sort_values(ignore_index=True)
0    x=1/0
1    x=2/0
2    x=3/0
df = dt.get_add_actions(flatten=True).to_pandas()
df["partition.x"].sort_values(ignore_index=True)
0    1
1    2
2    3

history

history(limit: Optional[int] = None) -> List[Dict[str, Any]]

Run the history command on the DeltaTable. The operations are returned in reverse chronological order.

Parameters:

Name Type Description Default
limit Optional[int]

the commit info limit to return

None

Returns:

Type Description
List[Dict[str, Any]]

list of the commit infos registered in the transaction log

load_as_version

load_as_version(version: Union[int, str, datetime]) -> None

Load/time travel a DeltaTable to a specified version number, or a timestamp version of the table. If a string is passed then the argument should be an RFC 3339 and ISO 8601 date and time string format. If a datetime object without a timezone is passed, the UTC timezone will be assumed.

Parameters:

Name Type Description Default
version Union[int, str, datetime]

the identifier of the version of the DeltaTable to load

required
Example

Use a version number

dt = DeltaTable("test_table")
dt.load_as_version(1)

Use a datetime object

dt.load_as_version(datetime(2023, 1, 1))
dt.load_as_version(datetime(2023, 1, 1, tzinfo=timezone.utc))

Use a datetime in string format

dt.load_as_version("2018-01-26T18:30:09Z")
dt.load_as_version("2018-12-19T16:39:57-08:00")
dt.load_as_version("2018-01-26T18:30:09.453+00:00")

load_version

load_version(version: int) -> None

Load a DeltaTable with a specified version.

Deprecated

Load_version and load_with_datetime have been combined into DeltaTable.load_as_version.

Parameters:

Name Type Description Default
version int

the identifier of the version of the DeltaTable to load

required

load_with_datetime

load_with_datetime(datetime_string: str) -> None

Time travel Delta table to the latest version that's created at or before provided datetime_string argument. The datetime_string argument should be an RFC 3339 and ISO 8601 date and time string.

Deprecated

Load_version and load_with_datetime have been combined into DeltaTable.load_as_version.

Parameters:

Name Type Description Default
datetime_string str

the identifier of the datetime point of the DeltaTable to load

required
Example
"2018-01-26T18:30:09Z"
"2018-12-19T16:39:57-08:00"
"2018-01-26T18:30:09.453+00:00"

merge

merge(source: Union[pyarrow.Table, pyarrow.RecordBatch, pyarrow.RecordBatchReader, ds.Dataset, pd.DataFrame], predicate: str, source_alias: Optional[str] = None, target_alias: Optional[str] = None, error_on_type_mismatch: bool = True, writer_properties: Optional[WriterProperties] = None, large_dtypes: bool = True, custom_metadata: Optional[Dict[str, str]] = None) -> TableMerger

Pass the source data which you want to merge on the target delta table, providing a predicate in SQL query like format. You can also specify on what to do when the underlying data types do not match the underlying table.

Parameters:

Name Type Description Default
source Union[Table, RecordBatch, RecordBatchReader, Dataset, DataFrame]

source data

required
predicate str

SQL like predicate on how to merge

required
source_alias Optional[str]

Alias for the source table

None
target_alias Optional[str]

Alias for the target table

None
error_on_type_mismatch bool

specify if merge will return error if data types are mismatching :default = True

True
writer_properties Optional[WriterProperties]

Pass writer properties to the Rust parquet writer

None
large_dtypes bool

If True, the data schema is kept in large_dtypes.

True
custom_metadata Optional[Dict[str, str]]

custom metadata that will be added to the transaction commit.

None

Returns:

Name Type Description
TableMerger TableMerger

TableMerger Object

metadata

metadata() -> Metadata

Get the current metadata of the DeltaTable.

Returns:

Type Description
Metadata

the current Metadata registered in the transaction log

protocol

protocol() -> ProtocolVersions

Get the reader and writer protocol versions of the DeltaTable.

Returns:

Type Description
ProtocolVersions

the current ProtocolVersions registered in the transaction log

repair

repair(dry_run: bool = False, custom_metadata: Optional[Dict[str, str]] = None) -> Dict[str, Any]

Repair the Delta Table by auditing active files that do not exist in the underlying filesystem and removes them. This can be useful when there are accidental deletions or corrupted files.

Active files are ones that have an add action in the log, but no corresponding remove action. This operation creates a new FSCK transaction containing a remove action for each of the missing or corrupted files.

Parameters:

Name Type Description Default
dry_run bool

when activated, list only the files, otherwise add remove actions to transaction log. Defaults to False.

False
custom_metadata Optional[Dict[str, str]]

custom metadata that will be added to the transaction commit.

None

Returns: The metrics from repair (FSCK) action.

Example

from deltalake import DeltaTable
dt = DeltaTable('TEST')
dt.repair(dry_run=False)
Results in
{'dry_run': False, 'files_removed': ['6-0d084325-6885-4847-b008-82c1cf30674c-0.parquet', 5-4fba1d3e-3e20-4de1-933d-a8e13ac59f53-0.parquet']}

restore

restore(target: Union[int, datetime, str], *, ignore_missing_files: bool = False, protocol_downgrade_allowed: bool = False, custom_metadata: Optional[Dict[str, str]] = None) -> Dict[str, Any]

Run the Restore command on the Delta Table: restore table to a given version or datetime.

Parameters:

Name Type Description Default
target Union[int, datetime, str]

the expected version will restore, which represented by int, date str or datetime.

required
ignore_missing_files bool

whether the operation carry on when some data files missing.

False
protocol_downgrade_allowed bool

whether the operation when protocol version upgraded.

False
custom_metadata Optional[Dict[str, str]]

custom metadata that will be added to the transaction commit.

None

Returns:

Type Description
Dict[str, Any]

the metrics from restore.

schema

schema() -> DeltaSchema

Get the current schema of the DeltaTable.

Returns:

Type Description
Schema

the current Schema registered in the transaction log

to_pandas

to_pandas(partitions: Optional[List[Tuple[str, str, Any]]] = None, columns: Optional[List[str]] = None, filesystem: Optional[Union[str, pa_fs.FileSystem]] = None, filters: Optional[FilterType] = None) -> pd.DataFrame

Build a pandas dataframe using data from the DeltaTable.

Parameters:

Name Type Description Default
partitions Optional[List[Tuple[str, str, Any]]]

A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax

None
columns Optional[List[str]]

The columns to project. This can be a list of column names to include (order and duplicates will be preserved)

None
filesystem Optional[Union[str, FileSystem]]

A concrete implementation of the Pyarrow FileSystem or a fsspec-compatible interface. If None, the first file path will be used to determine the right FileSystem

None
filters Optional[FilterType]

A disjunctive normal form (DNF) predicate for filtering rows. If you pass a filter you do not need to pass partitions

None

to_pyarrow_dataset

to_pyarrow_dataset(partitions: Optional[List[Tuple[str, str, Any]]] = None, filesystem: Optional[Union[str, pa_fs.FileSystem]] = None, parquet_read_options: Optional[ParquetReadOptions] = None) -> pyarrow.dataset.Dataset

Build a PyArrow Dataset using data from the DeltaTable.

Parameters:

Name Type Description Default
partitions Optional[List[Tuple[str, str, Any]]]

A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax

None
filesystem Optional[Union[str, FileSystem]]

A concrete implementation of the Pyarrow FileSystem or a fsspec-compatible interface. If None, the first file path will be used to determine the right FileSystem

None
parquet_read_options Optional[ParquetReadOptions]

Optional read options for Parquet. Use this to handle INT96 to timestamp conversion for edge cases like 0001-01-01 or 9999-12-31

None

More info: https://arrow.apache.org/docs/python/generated/pyarrow.dataset.ParquetReadOptions.html

Example

deltalake will work with any storage compliant with :class:pyarrow.fs.FileSystem, however the root of the filesystem has to be adjusted to point at the root of the Delta table. We can achieve this by wrapping the custom filesystem into a :class:pyarrow.fs.SubTreeFileSystem.

import pyarrow.fs as fs
from deltalake import DeltaTable

table_uri = "s3://<bucket>/<path>"
raw_fs, normalized_path = fs.FileSystem.from_uri(table_uri)
filesystem = fs.SubTreeFileSystem(normalized_path, raw_fs)

dt = DeltaTable(table_uri)
ds = dt.to_pyarrow_dataset(filesystem=filesystem)

Returns:

Type Description
Dataset

the PyArrow dataset in PyArrow

to_pyarrow_table

to_pyarrow_table(partitions: Optional[List[Tuple[str, str, Any]]] = None, columns: Optional[List[str]] = None, filesystem: Optional[Union[str, pa_fs.FileSystem]] = None, filters: Optional[FilterType] = None) -> pyarrow.Table

Build a PyArrow Table using data from the DeltaTable.

Parameters:

Name Type Description Default
partitions Optional[List[Tuple[str, str, Any]]]

A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax

None
columns Optional[List[str]]

The columns to project. This can be a list of column names to include (order and duplicates will be preserved)

None
filesystem Optional[Union[str, FileSystem]]

A concrete implementation of the Pyarrow FileSystem or a fsspec-compatible interface. If None, the first file path will be used to determine the right FileSystem

None
filters Optional[FilterType]

A disjunctive normal form (DNF) predicate for filtering rows. If you pass a filter you do not need to pass partitions

None

update

update(updates: Optional[Dict[str, str]] = None, new_values: Optional[Dict[str, Union[int, float, str, datetime, bool, List[Any]]]] = None, predicate: Optional[str] = None, writer_properties: Optional[WriterProperties] = None, error_on_type_mismatch: bool = True, custom_metadata: Optional[Dict[str, str]] = None) -> Dict[str, Any]

UPDATE records in the Delta Table that matches an optional predicate. Either updates or new_values needs to be passed for it to execute.

Parameters:

Name Type Description Default
updates Optional[Dict[str, str]]

a mapping of column name to update SQL expression.

None
new_values Optional[Dict[str, Union[int, float, str, datetime, bool, List[Any]]]]

a mapping of column name to python datatype.

None
predicate Optional[str]

a logical expression.

None
writer_properties Optional[WriterProperties]

Pass writer properties to the Rust parquet writer.

None
error_on_type_mismatch bool

specify if update will return error if data types are mismatching :default = True

True
custom_metadata Optional[Dict[str, str]]

custom metadata that will be added to the transaction commit.

None

Returns: the metrics from update

Example

Update some row values with SQL predicate

This is equivalent to UPDATE table SET deleted = true WHERE id = '3'

from deltalake import write_deltalake, DeltaTable
import pandas as pd
df = pd.DataFrame(
    {"id": ["1", "2", "3"],
    "deleted": [False, False, False],
    "price": [10., 15., 20.]
    })
write_deltalake("tmp", df)
dt = DeltaTable("tmp")
dt.update(predicate="id = '3'", updates = {"deleted": 'True'})

{'num_added_files': 1, 'num_removed_files': 1, 'num_updated_rows': 1, 'num_copied_rows': 2, 'execution_time_ms': ..., 'scan_time_ms': ...}

Update all row values

This is equivalent to UPDATE table SET deleted = true, id = concat(id, '_old').

dt.update(updates = {"deleted": 'True', "id": "concat(id, '_old')"})

{'num_added_files': 1, 'num_removed_files': 1, 'num_updated_rows': 3, 'num_copied_rows': 0, 'execution_time_ms': ..., 'scan_time_ms': ...}

Use Python objects instead of SQL strings

Use the new_values parameter instead of the updates parameter. For example, this is equivalent to UPDATE table SET price = 150.10 WHERE id = '1'

dt.update(predicate="id = '1_old'", new_values = {"price": 150.10})

{'num_added_files': 1, 'num_removed_files': 1, 'num_updated_rows': 1, 'num_copied_rows': 2, 'execution_time_ms': ..., 'scan_time_ms': ...}

update_incremental

update_incremental() -> None

Updates the DeltaTable to the latest version by incrementally applying newer versions.

vacuum

vacuum(retention_hours: Optional[int] = None, dry_run: bool = True, enforce_retention_duration: bool = True, custom_metadata: Optional[Dict[str, str]] = None) -> List[str]

Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold.

Parameters:

Name Type Description Default
retention_hours Optional[int]

the retention threshold in hours, if none then the value from configuration.deletedFileRetentionDuration is used or default of 1 week otherwise.

None
dry_run bool

when activated, list only the files, delete otherwise

True
enforce_retention_duration bool

when disabled, accepts retention hours smaller than the value from configuration.deletedFileRetentionDuration.

True
custom_metadata Optional[Dict[str, str]]

custom metadata that will be added to the transaction commit.

None

Returns: the list of files no longer referenced by the Delta Table and are older than the retention threshold.

version

version() -> int

Get the version of the DeltaTable.

Returns:

Type Description
int

The current version of the DeltaTable