openclean.engine.dataset module
The data engine is used to manipulate a dataset with insert and update operations that use functions from the command registry.
- class openclean.engine.dataset.DataSample(df: pandas.core.frame.DataFrame, original: openclean.engine.dataset.DatasetHandle, n: int, random_state: Optional[Tuple[int, List]] = None)
Bases:
openclean.engine.dataset.DatasetHandle
Handle for datasets that are samples of a larger dataset. Samples datasets are entirely maintained in main memory.
This class maintains a reference to the orginal sample and the to the current modified version of the sample. If intermediate versions need to be accessed they will be recreated by re-applying the sequence of operations that generated them.
The class also has a reference to the handle for the full dataset.
- apply()
Apply all actions in the current log to the underlying original dataset.
- drop()
Delete all resources that are associated with the dataset history.
- class openclean.engine.dataset.DatasetHandle(store: openclean.data.archive.base.ArchiveStore, is_sample: bool)
Bases:
object
Handle for datasets that are managed by the openclean engine and whose snapshot history is maintained by an archive manager.
- checkout(version: Optional[int] = None) pandas.core.frame.DataFrame
Checkout a dataset snapshot.
The optional identifier references a dataset snapshot via an operation log entry. If no identifier is given, the snapshot for the last version of the dataset will be returned.
- Parameters
version (int, default=None) – Identifier for the operation log entry that represents the the dataset version that is being checked out.
- Return type
pd.DataFrame
- commit(source: Union[pandas.core.frame.DataFrame, str, histore.document.base.Document], action: Optional[openclean.engine.action.OpHandle] = None) Union[pandas.core.frame.DataFrame, str, histore.document.base.Document]
Add a new snapshot to the history of the dataset.
If no action is provided a user commit action operator is used as the default. Returns the data frame for the snapshot.
- Parameters
source (openclean.data.stream.base.Datasource) – Input data frame or stream containing the new dataset version that is being stored.
action (openclean.engine.action.OpHandle, default=None) – Operator that created the dataset snapshot.
- Return type
openclean.data.stream.base.Datasource
- abstract drop()
Delete all resources that are associated with the dataset history.
- insert(names: Union[str, List[str]], pos: Optional[int] = None, values: Optional[Union[int, float, str, datetime.datetime, openclean.engine.object.function.FunctionHandle]] = None, args: Optional[Dict] = None, sources: Optional[Union[int, str, List[Union[str, int]]]] = None) pandas.core.frame.DataFrame
Insert one or more columns at a given position into the dataset. One column is inserted for each given column name. If the insert position is undefined, columns are appended. If the position does not reference a valid position (i.e., not between 0 and len(df.columns)) a ValueError is raised.
Values for the inserted columns are generated using a given constant value or function. If a function is given, it is expected to return exactly one value (e.g., a tuple of len(names)) for each of the inserted columns.
- Parameters
names (string, or list(string)) – Names of the inserted columns.
pos (int, default=None) – Insert position for the new columns. If None, the columns will be appended.
values (scalar or openclean.engine.object.func.FunctionHandle, default=None) – Single value, tuple of values, or library function that is used to generate the values for the inserted column(s). If no default is specified all columns will contain None.
args (dict, default=None) – Additional keyword arguments that are passed to the callable together with the column values that are extracted from each row.
sources (int, string, or list(int or string), default=None) – List of source columns from which the input values for the callable are extracted.
- Return type
pd.DataFrame
- log() List[openclean.engine.log.LogEntry]
Get the list of log entries for all dataset snapshots.
- Return type
list of openclean.engine.log.LogEntry
- metadata(version: Optional[int] = None) openclean.data.metadata.base.MetadataStore
Get metadata that is associated with the current dataset version.
- Parameters
version (int, default=None) – Identifier for the dataset version for which the metadata is being fetched.
- Return type
- open(version: Optional[int] = None) histore.archive.reader.SnapshotReader
Get a stream reader for a dataset snapshot.
- Parameters
version (int, default=None) – Unique version identifier. By default the last version is used.
- Return type
openclean.data.archive.base.SnapshotReader
- rollback(version: int) pandas.core.frame.DataFrame
Rollback all changes including the given dataset version.
That is, we rollback all changes that occurred at and after the identified snapshot. This will make the respective snapshot of the previous version the new current (head) snapshot for the dataset history.
Returns the dataframe for the dataset snapshot that is at the new head of the dataset history.
Raises a KeyError if the given log entry identifier is unknown. Raises a ValueError if the log entry references a snapshot that has already been committed.
- Parameters
version (int) – Unique log entry version.
- Return type
pd.DataFrame
- update(columns: Union[int, str, List[Union[str, int]]], func: openclean.engine.object.function.FunctionHandle, args: Optional[Dict] = None, sources: Optional[Union[int, str, List[Union[str, int]]]] = None) pandas.core.frame.DataFrame
Update a given column (or list of columns) by applying the given function.
Columns defines the dataset column(s) that are being updated. If the given function is an evaluation function, that function will define the columns from which the input values are being retrieved. If the function is not an evaluation function, the input values for the update function will come from the same column(s) that are being modified. This behavior can be changed by specifying a list of source columns. If function is a callable (not an evaluation function) and sources is given, row values from the column(s) that are specified by sounrces are used as the input to the update function.
- Parameters
columns (int, string, or list(int or string)) – Single column or list of column index positions or column names.
func (openclean.engine.object.func.FunctionHandle) – Library function that is used to generate modified values for the updated column(s).
args (dict, default=None) – Additional keyword arguments that are passed to the callable together with the column values that are extracted from each row.
sources (int, string, or list(int or string), default=None) – List of source columns from which the input values for the callable are extracted.
------- –
pd.DataFrame –
- version() int
Get version identifier for the last snapshot of the dataset.
- Return type
int
- class openclean.engine.dataset.FullDataset(datastore: openclean.data.archive.base.ArchiveStore, manager: histore.archive.manager.base.ArchiveManager, identifier: str, pk: Optional[Union[str, List[str]]] = None)
Bases:
openclean.engine.dataset.DatasetHandle
Handle for datasets that are managed by the openclean engine and that have their history being maintained by an archive manager. All operations are applied directly on the full dataset in the underlying archive.
- apply(operations: Union[openclean.operator.stream.processor.StreamProcessor, openclean.engine.operator.StreamOp, List[Union[openclean.operator.stream.processor.StreamProcessor, openclean.engine.operator.StreamOp]]], validate: Optional[bool] = None) List[histore.archive.snapshot.Snapshot]
Apply a given operator or a sequence of operators on a snapshot in the archive.
The resulting snapshot(s) will directly be merged into the archive. This method allows to update data in an archive directly without the need to checkout the snapshot first and then commit the modified version(s).
Returns list of handles for the created snapshots.
Note that there are some limitations for this method. Most importantly, the order of rows cannot be modified and neither can it insert new rows at this point. Columns can be added, moved, renamed, and deleted.
- Parameters
operations (openclean.engine.base.StreamOpPipeline) – Operator(s) that is/are used to update the rows in a dataset snapshot to create new snapshot(s) in this archive.
validate (bool, default=False) – Validate that the resulting archive is in proper order before committing the action.
- Return type
histore.archive.snapshot.Snapshot
- drop()
Delete all resources that are associated with the dataset history.