openclean.engine.base module
The openclean engine maintains a collection of datasets. Each dataset is identified by a unique name. Dataset snapshots are maintained by a datastore.
The idea of the engine is to provide a namespace for datasets that are maintained by a datastore which keeps track of changes to the data. The engine is associated with an object registry that maintains user-defined objects such as functions, lookup tables, etc..
- openclean.engine.base.DB(basedir: Optional[str] = None, create: Optional[bool] = False, cached: Optional[bool] = True) openclean.engine.base.OpencleanEngine
Create an instance of the openclean engine. Uses a persistent engine if a base directory is given. This test implementation uses HISTORE as the underlying datastore for a persistent engine. If no base directory is given, a volatile archive manager will be used instead of a persistent one.
If the create flag is True all existing files in the base directory (if given) will be removed.
- Parameters
basedir (string) – Path to directory on disk where archives are maintained.
create (bool, default=False) – Create a fresh instance of the archive manager if True. This will delete all files in the base directory.
cached (bool, default=True) – Flag indicating whether the all datastores that are created for existing archives are cached datastores or not.
- Return type
- class openclean.engine.base.OpencleanEngine(identifier: str, manager: histore.archive.manager.base.ArchiveManager, library: openclean.engine.library.ObjectLibrary, basedir: Optional[str] = None, cached: Optional[bool] = True)
Bases:
object
The idea of the engine is to provide a namespace that manages a set of datasets that are identified by unique names. The engine is associated with an object repository that provides additional functionality to register objects like functions, lookup tables, etc..
Datasets that are created from files of data frames are maintained by an archive manager.
Each engine has a unique identifier allowing a user to use multiple engines if necessary.
- apply(name: str, operations: Union[openclean.operator.stream.processor.StreamProcessor, openclean.engine.operator.StreamOp, List[Union[openclean.operator.stream.processor.StreamProcessor, openclean.engine.operator.StreamOp]]], validate: Optional[bool] = None) List[histore.archive.snapshot.Snapshot]
Apply a given operator or a sequence of operators on the specified archive.
The resulting snapshot(s) will directly be merged into the archive. This method allows to update data in an archive directly without the need to checkout the snapshot first and then commit the modified version(s).
Returns list of handles for the created snapshots.
Note that there are some limitations for this method. Most importantly, the order of rows cannot be modified and neither can it insert new rows at this point. Columns can be added, moved, renamed, and deleted.
- Parameters
name (string) – Unique dataset name.
operations (openclean.engine.dataset.StreamOpPipeline) – Operator(s) that is/are used to update the rows in a dataset snapshot to create new snapshot(s) in this archive.
validate (bool, default=False) – Validate that the resulting archive is in proper order before committing the action.
- Return type
histore.archive.snapshot.Snapshot
- checkout(name: str, commit: Optional[bool] = False) pandas.core.frame.DataFrame
Checkout the latest version of a dataset. The dataset is identified by the unique name. If the dataset that is currently associated with the given name is a sample dataset it will be replace by the handle for the original dataset first. If the commit flag is True any uncommited changes for the sample dataset will be commited first.
Raises a KeyError if the given dataset name is unknown.
- Parameters
name (string) – Unique dataset name.
commit (bool, default=False) – Apply all uncommited changes to the original database if True.
- Return type
pd.DataFrame
- Raises
KeyError –
- commit(name: str, source: Union[pandas.core.frame.DataFrame, str, histore.document.base.Document], action: Optional[openclean.engine.action.OpHandle] = None) Union[pandas.core.frame.DataFrame, str, histore.document.base.Document]
Commit a modified data frame to the dataset archive.
The dataset is identified by its unique name. Raises a KeyError if the given dataset name is unknown.
- Parameters
name (string) – Unique dataset name.
source (openclean.data.stream.base.Datasource) – Input data frame or stream containing the new dataset version that is being stored.
action (openclean.engine.action.OpHandle, default=None) – Operator that created the dataset snapshot.
- Return type
openclean.data.stream.base.Datasource
- Raises
KeyError –
- create(source: Union[pandas.core.frame.DataFrame, str, histore.document.base.Document], name: str, primary_key: Optional[Union[str, List[str]]] = None, cached: Optional[bool] = True) openclean.engine.dataset.DatasetHandle
Create an initial dataset archive that is idetified by the given name. The given data represents the first snapshot in the created archive.
Raises a ValueError if an archive with the given name already exists.
- Parameters
source (pd.DataFrame, CSVFile, or string) – Data frame or file containing the first version of the archived dataset.
name (string) – Unique dataset name.
primary_key (string or list, default=None) – Column(s) that are used to generate identifier for rows in the archive.
cached (bool, default=True) – Flag indicating whether the last accessed dataset snapshot for the created dataset is cached for fast access.
- Return type
- Raises
ValueError –
- dataset(name: str) openclean.engine.dataset.DatasetHandle
Get handle for a dataset. Depending on the type of the dataset this will either return a :class:FullDataset or :class:DataSample.
- Parameters
name (string) – Unique dataset name.
- Return type
- drop(name: str)
Delete the full history for the dataset with the given name. Raises a ValueError if the dataset name is unknonw.
- Parameters
name (string) – Unique dataset name.
- Raises
ValueError –
- load_dataset(source: Union[pandas.core.frame.DataFrame, str, histore.document.base.Document], name: str, primary_key: Optional[Union[str, List[str]]] = None, cached: Optional[bool] = True) openclean.engine.dataset.DatasetHandle
Create an initial dataset archive that is idetified by the given name. The given data frame represents the first snapshot in the created archive.
Raises a ValueError if an archive with the given name already exists.
This is a synonym for create() for backward compatibility.
- Parameters
source (pd.DataFrame or string) – Data frame or file containing the first version of the archived dataset.
name (string) – Unique dataset name.
primary_key (string or list, default=None) – Column(s) that are used to generate identifier for rows in the archive.
cached (bool, default=True) – Flag indicating whether the last accessed dataset snapshot for the created dataset is cached for fast access.
- Return type
- Raises
ValueError –
- metadata(name: str) openclean.data.metadata.base.MetadataStore
Get metadata that is associated with the current dataset version.
Raises a ValueError if the dataset is unknown.
- Parameters
name (string) – Unique dataset name.
- Return type
- property register: openclean.engine.library.ObjectLibrary
Synonym for accessing the library as a function registry.
- Return type
openclean.engine.object.base.ObjectLibrary
- rollback(name: str, version: str) pandas.core.frame.DataFrame
Rollback all changes including the given dataset version.
That is, we rollback all changes that occurred at and after the identified snapshot. This will make the respective snapshot of the previous version the new current (head) snapshot for the dataset history.
Returns the dataframe for the dataset snapshot that is at the new head of the dataset history.
Raises a KeyError if the dataset or the given version identifier are unknown.
- Parameters
name (string) – Unique dataset name.
version (string) – Unique log entry version.
- Return type
pd.DataFrame
- sample(name: str, n: Optional[int] = None, random_state: Optional[Tuple[int, List]] = None) pandas.core.frame.DataFrame
Display the spreadsheet view for a given dataset. The dataset is identified by its unique name. Raises a ValueError if no dataset with the given name exists.
Creates a new data frame that contains a random sample of the rows in the last snapshot of the identified dataset. This sample is registered as a separate dataset with the engine. If neither n nor frac are specified a random sample of size 100 is generated.
- Parameters
name (string) – Unique dataset name.
n (int, default=None) – Number of rows in the sample dataset.
random_state (int or list, default=None) – Seed for random number generator.
- Return type
pd.DataFrame
- Raises
KeyError –
- stream(name: str, version: Optional[int] = None) openclean.pipeline.DataPipeline
Get a data pipeline for a dataset snapshot.
- Parameters
name (string) – Unique dataset name.
version (int, default=None) – Unique version identifier. By default the last version is used.
- Return type