openclean.engine.dataset module

The data engine is used to manipulate a dataset with insert and update operations that use functions from the command registry.

class openclean.engine.dataset.DataSample(df: pandas.core.frame.DataFrame, original: openclean.engine.dataset.DatasetHandle, n: int, random_state: Optional[Tuple[int, List]] = None)

Bases: openclean.engine.dataset.DatasetHandle

Handle for datasets that are samples of a larger dataset. Samples datasets are entirely maintained in main memory.

This class maintains a reference to the orginal sample and the to the current modified version of the sample. If intermediate versions need to be accessed they will be recreated by re-applying the sequence of operations that generated them.

The class also has a reference to the handle for the full dataset.

apply()

Apply all actions in the current log to the underlying original dataset.

drop()

Delete all resources that are associated with the dataset history.

class openclean.engine.dataset.DatasetHandle(store: openclean.data.archive.base.ArchiveStore, is_sample: bool)

Bases: object

Handle for datasets that are managed by the openclean engine and whose snapshot history is maintained by an archive manager.

checkout(version: Optional[int] = None) pandas.core.frame.DataFrame

Checkout a dataset snapshot.

The optional identifier references a dataset snapshot via an operation log entry. If no identifier is given, the snapshot for the last version of the dataset will be returned.

Parameters

version (int, default=None) – Identifier for the operation log entry that represents the the dataset version that is being checked out.

Return type

pd.DataFrame

commit(source: Union[pandas.core.frame.DataFrame, str, histore.document.base.Document], action: Optional[openclean.engine.action.OpHandle] = None) Union[pandas.core.frame.DataFrame, str, histore.document.base.Document]

Add a new snapshot to the history of the dataset.

If no action is provided a user commit action operator is used as the default. Returns the data frame for the snapshot.

Parameters
  • source (openclean.data.stream.base.Datasource) – Input data frame or stream containing the new dataset version that is being stored.

  • action (openclean.engine.action.OpHandle, default=None) – Operator that created the dataset snapshot.

Return type

openclean.data.stream.base.Datasource

abstract drop()

Delete all resources that are associated with the dataset history.

insert(names: Union[str, List[str]], pos: Optional[int] = None, values: Optional[Union[int, float, str, datetime.datetime, openclean.engine.object.function.FunctionHandle]] = None, args: Optional[Dict] = None, sources: Optional[Union[int, str, List[Union[str, int]]]] = None) pandas.core.frame.DataFrame

Insert one or more columns at a given position into the dataset. One column is inserted for each given column name. If the insert position is undefined, columns are appended. If the position does not reference a valid position (i.e., not between 0 and len(df.columns)) a ValueError is raised.

Values for the inserted columns are generated using a given constant value or function. If a function is given, it is expected to return exactly one value (e.g., a tuple of len(names)) for each of the inserted columns.

Parameters
  • names (string, or list(string)) – Names of the inserted columns.

  • pos (int, default=None) – Insert position for the new columns. If None, the columns will be appended.

  • values (scalar or openclean.engine.object.func.FunctionHandle, default=None) – Single value, tuple of values, or library function that is used to generate the values for the inserted column(s). If no default is specified all columns will contain None.

  • args (dict, default=None) – Additional keyword arguments that are passed to the callable together with the column values that are extracted from each row.

  • sources (int, string, or list(int or string), default=None) – List of source columns from which the input values for the callable are extracted.

Return type

pd.DataFrame

log() List[openclean.engine.log.LogEntry]

Get the list of log entries for all dataset snapshots.

Return type

list of openclean.engine.log.LogEntry

metadata(version: Optional[int] = None) openclean.data.metadata.base.MetadataStore

Get metadata that is associated with the current dataset version.

Parameters

version (int, default=None) – Identifier for the dataset version for which the metadata is being fetched.

Return type

openclean.data.metadata.base.MetadataStore

open(version: Optional[int] = None) histore.archive.reader.SnapshotReader

Get a stream reader for a dataset snapshot.

Parameters

version (int, default=None) – Unique version identifier. By default the last version is used.

Return type

openclean.data.archive.base.SnapshotReader

rollback(version: int) pandas.core.frame.DataFrame

Rollback all changes including the given dataset version.

That is, we rollback all changes that occurred at and after the identified snapshot. This will make the respective snapshot of the previous version the new current (head) snapshot for the dataset history.

Returns the dataframe for the dataset snapshot that is at the new head of the dataset history.

Raises a KeyError if the given log entry identifier is unknown. Raises a ValueError if the log entry references a snapshot that has already been committed.

Parameters

version (int) – Unique log entry version.

Return type

pd.DataFrame

update(columns: Union[int, str, List[Union[str, int]]], func: openclean.engine.object.function.FunctionHandle, args: Optional[Dict] = None, sources: Optional[Union[int, str, List[Union[str, int]]]] = None) pandas.core.frame.DataFrame

Update a given column (or list of columns) by applying the given function.

Columns defines the dataset column(s) that are being updated. If the given function is an evaluation function, that function will define the columns from which the input values are being retrieved. If the function is not an evaluation function, the input values for the update function will come from the same column(s) that are being modified. This behavior can be changed by specifying a list of source columns. If function is a callable (not an evaluation function) and sources is given, row values from the column(s) that are specified by sounrces are used as the input to the update function.

Parameters
  • columns (int, string, or list(int or string)) – Single column or list of column index positions or column names.

  • func (openclean.engine.object.func.FunctionHandle) – Library function that is used to generate modified values for the updated column(s).

  • args (dict, default=None) – Additional keyword arguments that are passed to the callable together with the column values that are extracted from each row.

  • sources (int, string, or list(int or string), default=None) – List of source columns from which the input values for the callable are extracted.

  • -------

  • pd.DataFrame

version() int

Get version identifier for the last snapshot of the dataset.

Return type

int

class openclean.engine.dataset.FullDataset(datastore: openclean.data.archive.base.ArchiveStore, manager: histore.archive.manager.base.ArchiveManager, identifier: str, pk: Optional[Union[str, List[str]]] = None)

Bases: openclean.engine.dataset.DatasetHandle

Handle for datasets that are managed by the openclean engine and that have their history being maintained by an archive manager. All operations are applied directly on the full dataset in the underlying archive.

apply(operations: Union[openclean.operator.stream.processor.StreamProcessor, openclean.engine.operator.StreamOp, List[Union[openclean.operator.stream.processor.StreamProcessor, openclean.engine.operator.StreamOp]]], validate: Optional[bool] = None) List[histore.archive.snapshot.Snapshot]

Apply a given operator or a sequence of operators on a snapshot in the archive.

The resulting snapshot(s) will directly be merged into the archive. This method allows to update data in an archive directly without the need to checkout the snapshot first and then commit the modified version(s).

Returns list of handles for the created snapshots.

Note that there are some limitations for this method. Most importantly, the order of rows cannot be modified and neither can it insert new rows at this point. Columns can be added, moved, renamed, and deleted.

Parameters
  • operations (openclean.engine.base.StreamOpPipeline) – Operator(s) that is/are used to update the rows in a dataset snapshot to create new snapshot(s) in this archive.

  • validate (bool, default=False) – Validate that the resulting archive is in proper order before committing the action.

Return type

histore.archive.snapshot.Snapshot

drop()

Delete all resources that are associated with the dataset history.