openclean.engine.base module

The openclean engine maintains a collection of datasets. Each dataset is identified by a unique name. Dataset snapshots are maintained by a datastore.

The idea of the engine is to provide a namespace for datasets that are maintained by a datastore which keeps track of changes to the data. The engine is associated with an object registry that maintains user-defined objects such as functions, lookup tables, etc..

openclean.engine.base.DB(basedir: Optional[str] = None, create: Optional[bool] = False, cached: Optional[bool] = True) openclean.engine.base.OpencleanEngine

Create an instance of the openclean engine. Uses a persistent engine if a base directory is given. This test implementation uses HISTORE as the underlying datastore for a persistent engine. If no base directory is given, a volatile archive manager will be used instead of a persistent one.

If the create flag is True all existing files in the base directory (if given) will be removed.

Parameters
  • basedir (string) – Path to directory on disk where archives are maintained.

  • create (bool, default=False) – Create a fresh instance of the archive manager if True. This will delete all files in the base directory.

  • cached (bool, default=True) – Flag indicating whether the all datastores that are created for existing archives are cached datastores or not.

Return type

openclean.engine.base.OpencleanEngine

class openclean.engine.base.OpencleanEngine(identifier: str, manager: histore.archive.manager.base.ArchiveManager, library: openclean.engine.library.ObjectLibrary, basedir: Optional[str] = None, cached: Optional[bool] = True)

Bases: object

The idea of the engine is to provide a namespace that manages a set of datasets that are identified by unique names. The engine is associated with an object repository that provides additional functionality to register objects like functions, lookup tables, etc..

Datasets that are created from files of data frames are maintained by an archive manager.

Each engine has a unique identifier allowing a user to use multiple engines if necessary.

apply(name: str, operations: Union[openclean.operator.stream.processor.StreamProcessor, openclean.engine.operator.StreamOp, List[Union[openclean.operator.stream.processor.StreamProcessor, openclean.engine.operator.StreamOp]]], validate: Optional[bool] = None) List[histore.archive.snapshot.Snapshot]

Apply a given operator or a sequence of operators on the specified archive.

The resulting snapshot(s) will directly be merged into the archive. This method allows to update data in an archive directly without the need to checkout the snapshot first and then commit the modified version(s).

Returns list of handles for the created snapshots.

Note that there are some limitations for this method. Most importantly, the order of rows cannot be modified and neither can it insert new rows at this point. Columns can be added, moved, renamed, and deleted.

Parameters
  • name (string) – Unique dataset name.

  • operations (openclean.engine.dataset.StreamOpPipeline) – Operator(s) that is/are used to update the rows in a dataset snapshot to create new snapshot(s) in this archive.

  • validate (bool, default=False) – Validate that the resulting archive is in proper order before committing the action.

Return type

histore.archive.snapshot.Snapshot

checkout(name: str, commit: Optional[bool] = False) pandas.core.frame.DataFrame

Checkout the latest version of a dataset. The dataset is identified by the unique name. If the dataset that is currently associated with the given name is a sample dataset it will be replace by the handle for the original dataset first. If the commit flag is True any uncommited changes for the sample dataset will be commited first.

Raises a KeyError if the given dataset name is unknown.

Parameters
  • name (string) – Unique dataset name.

  • commit (bool, default=False) – Apply all uncommited changes to the original database if True.

Return type

pd.DataFrame

Raises

KeyError

commit(name: str, source: Union[pandas.core.frame.DataFrame, str, histore.document.base.Document], action: Optional[openclean.engine.action.OpHandle] = None) Union[pandas.core.frame.DataFrame, str, histore.document.base.Document]

Commit a modified data frame to the dataset archive.

The dataset is identified by its unique name. Raises a KeyError if the given dataset name is unknown.

Parameters
  • name (string) – Unique dataset name.

  • source (openclean.data.stream.base.Datasource) – Input data frame or stream containing the new dataset version that is being stored.

  • action (openclean.engine.action.OpHandle, default=None) – Operator that created the dataset snapshot.

Return type

openclean.data.stream.base.Datasource

Raises

KeyError

create(source: Union[pandas.core.frame.DataFrame, str, histore.document.base.Document], name: str, primary_key: Optional[Union[str, List[str]]] = None, cached: Optional[bool] = True) openclean.engine.dataset.DatasetHandle

Create an initial dataset archive that is idetified by the given name. The given data represents the first snapshot in the created archive.

Raises a ValueError if an archive with the given name already exists.

Parameters
  • source (pd.DataFrame, CSVFile, or string) – Data frame or file containing the first version of the archived dataset.

  • name (string) – Unique dataset name.

  • primary_key (string or list, default=None) – Column(s) that are used to generate identifier for rows in the archive.

  • cached (bool, default=True) – Flag indicating whether the last accessed dataset snapshot for the created dataset is cached for fast access.

Return type

openclean.engine.dataset.DatasetHandle

Raises

ValueError

dataset(name: str) openclean.engine.dataset.DatasetHandle

Get handle for a dataset. Depending on the type of the dataset this will either return a :class:FullDataset or :class:DataSample.

Parameters

name (string) – Unique dataset name.

Return type

openclean.engine.dataset.DatasetHandle

drop(name: str)

Delete the full history for the dataset with the given name. Raises a ValueError if the dataset name is unknonw.

Parameters

name (string) – Unique dataset name.

Raises

ValueError

load_dataset(source: Union[pandas.core.frame.DataFrame, str, histore.document.base.Document], name: str, primary_key: Optional[Union[str, List[str]]] = None, cached: Optional[bool] = True) openclean.engine.dataset.DatasetHandle

Create an initial dataset archive that is idetified by the given name. The given data frame represents the first snapshot in the created archive.

Raises a ValueError if an archive with the given name already exists.

This is a synonym for create() for backward compatibility.

Parameters
  • source (pd.DataFrame or string) – Data frame or file containing the first version of the archived dataset.

  • name (string) – Unique dataset name.

  • primary_key (string or list, default=None) – Column(s) that are used to generate identifier for rows in the archive.

  • cached (bool, default=True) – Flag indicating whether the last accessed dataset snapshot for the created dataset is cached for fast access.

Return type

openclean.engine.dataset.DatasetHandle

Raises

ValueError

metadata(name: str) openclean.data.metadata.base.MetadataStore

Get metadata that is associated with the current dataset version.

Raises a ValueError if the dataset is unknown.

Parameters

name (string) – Unique dataset name.

Return type

openclean.data.metadata.base.MetadataStore

property register: openclean.engine.library.ObjectLibrary

Synonym for accessing the library as a function registry.

Return type

openclean.engine.object.base.ObjectLibrary

rollback(name: str, version: str) pandas.core.frame.DataFrame

Rollback all changes including the given dataset version.

That is, we rollback all changes that occurred at and after the identified snapshot. This will make the respective snapshot of the previous version the new current (head) snapshot for the dataset history.

Returns the dataframe for the dataset snapshot that is at the new head of the dataset history.

Raises a KeyError if the dataset or the given version identifier are unknown.

Parameters
  • name (string) – Unique dataset name.

  • version (string) – Unique log entry version.

Return type

pd.DataFrame

sample(name: str, n: Optional[int] = None, random_state: Optional[Tuple[int, List]] = None) pandas.core.frame.DataFrame

Display the spreadsheet view for a given dataset. The dataset is identified by its unique name. Raises a ValueError if no dataset with the given name exists.

Creates a new data frame that contains a random sample of the rows in the last snapshot of the identified dataset. This sample is registered as a separate dataset with the engine. If neither n nor frac are specified a random sample of size 100 is generated.

Parameters
  • name (string) – Unique dataset name.

  • n (int, default=None) – Number of rows in the sample dataset.

  • random_state (int or list, default=None) – Seed for random number generator.

Return type

pd.DataFrame

Raises

KeyError

stream(name: str, version: Optional[int] = None) openclean.pipeline.DataPipeline

Get a data pipeline for a dataset snapshot.

Parameters
  • name (string) – Unique dataset name.

  • version (int, default=None) – Unique version identifier. By default the last version is used.

Return type

openclean.pipeline.DataPipeline