openclean.pipeline module
Data pipeline for processing datasets as streams of rows.
- class openclean.pipeline.DataPipeline(source: Union[pandas.core.frame.DataFrame, str, histore.document.base.Document], columns: Optional[List[Union[str, histore.document.schema.Column]]] = None, pipeline: Optional[openclean.operator.stream.processor.StreamProcessor] = None)
Bases:
histore.document.base.DefaultDocument
The data pipeline allows to iterate over the rows that are the result of streaming an input data set through a pipeline of stream operators.
The class implements the context manager interface.
- append(op: openclean.operator.stream.processor.StreamProcessor, columns: Optional[List[Union[str, histore.document.schema.Column]]] = None) openclean.pipeline.DataPipeline
Return a modified stream processer with the given operator appended to the stream pipeline.
- Parameters
op (openclean.operator.stream.processor.StreamProcessor) – Stream operator that is appended to the pipeline of the returned stream processor.
columns (list of string, default=None) – Optional (modified) list of column names for the schema of the data stream rows.
- Return type
- close()
Close the associated document.
- cluster(clusterer: openclean.cluster.base.Clusterer) List[openclean.cluster.base.Cluster]
Cluster values in a data stream.
This operator will create a distinct set of values in the data stream rows. The collected values are then passed on to the given cluster algorithm.
- Parameters
clusterer (openclean.cluster.base.Clusterer) – Cluster algorithm for distinct values in the data stream.
- Return type
list of openclean.cluster.base.Cluster
- count() int
Count the number of rows in a data stream.
- Return type
int
- delete(predicate: openclean.function.eval.base.EvalFunction) openclean.pipeline.DataPipeline
Remove rows from the data stream that satisfy a given condition.
- Parameters
predicate (opencelan.function.eval.base.EvalFunction) – Evaluation function used to delete rows.
- Return type
- distinct(columns: Optional[Union[int, str, List[Union[str, int]]]] = None) collections.Counter
Get counts for all distinct values over all columns in the associated data stream. Allows the user to specify the list of columns for which they want to count values.
- Parameters
columns (int, str, or list of int or string, default=None) – References to the column(s) for which unique values are counted.
- Return type
collections.Counter
- distinct_values(columns: Optional[Union[int, str, List[Union[str, int]]]] = None) List[Union[int, float, str, datetime.datetime, Tuple[Union[int, float, str, datetime.datetime]]]]
Get list all distinct values over all columns in the associated data stream.
Provides the option to the user to specify the list of columns for which they want to count values.
- Parameters
columns (int, str, or list of int or string, default=None) – References to the column(s) for which unique values are counted.
- Return type
collections.Counter
- filter(predicate: openclean.function.eval.base.EvalFunction, limit: Optional[int] = None) openclean.pipeline.DataPipeline
Filter rows in the data stream that satisfy a given condition. Allows to limit the number of rows in the returned data stream.
- Parameters
predicate (opencelan.function.eval.base.EvalFunction) – Evaluation function used to filter rows.
limit (int, default=None) – Limit the number of rows in the filtered data stream.
- Return type
- head(count: Optional[int] = 10) pandas.core.frame.DataFrame
Return the first n rows in the data stream as a pandas data frame. This is a short-cut for using a pipeline of .limit() and .to_df().
- Parameters
count (int, default=10) – Defines the maximum number of rows in the returned data frame.
- Return type
pd.DataFrame
- insert(names: List[Union[str, histore.document.schema.Column]], pos: Optional[int] = None, values: Optional[Union[Callable, openclean.function.eval.base.EvalFunction, List, int, float, str, datetime.datetime, Tuple]] = None) openclean.pipeline.DataPipeline
Insert one or more columns into the rows in the data stream.
- Parameters
names (string, or list(string)) – Names of the inserted columns.
pos (int, optional) – Insert position for the new columns. If None the columns will be appended.
values (scalar,) –
- list,
callable, or openclean.function.eval.base.EvalFunction, optional
Single value, list of constant values, callable that accepts a data frame row as the only argument and returns a (list of) value(s) matching the number of columns inserted or an evaluation function that returns a matchin number of values.
- iterrows()
Simulate the iterrows() function of a pandas DataFrame as it is used in openclean. Returns an iterator that yields pairs of row identifier and value list for each row in the streamed data frame.
- limit(count: int) openclean.pipeline.DataPipeline
Return a data stream for the data frame that will yield at most the first n rows passed to it from an associated producer.
- Parameters
count (int) – Maximum number of rows in the returned data frame.
- Return type
- match(matcher: openclean.function.matching.base.StringMatcher, include_vocab: Optional[bool] = False) openclean.data.mapping.Mapping
Generate a mapping of best matches between a given vocabulary and the values in one (or more) column(s) of the data stream. For each row (i.e., single value) the best matches with a given vocabulary is computed and added to the returned mapping.
For rows that contain multiple columns an error will be raised.
If the include_vocab flag is False the resulting mapping will contain a mapping only for those values that do not occur in the vocabulary, i.e., the unknown values with respect to the vocabulary.
- Parameters
matcher (openclean.function.matching.base.VocabularyMatcher) – Matcher to compute matches for the terms in a controlled vocabulary.
include_vocab (bool, default=False) – If this flag is False the resulting mapping will only contain matches for terms that are not in the vocabulary that is associated with the given matcher.
- Return type
- move(columns: Union[int, str, List[Union[str, int]]], pos: int) openclean.pipeline.DataPipeline
Move one or more columns in a data stream schema.
- Parameters
columns (int, string, or list(int or string)) – Single column or list of column index positions or column names.
pos (int) – Insert position for the moved columns.
- Return type
- open() openclean.pipeline.DataPipeline
Return reference to self when the pipeline is opened.
- Return type
- persist(filename: Optional[str] = None) openclean.pipeline.DataPipeline
Persist the results of the current stream for future processing. The data can either be written to disk or persitet in a in-memory data frame (depending on whether a filename is specified).
The persist operator is currently not lazzily evaluated.
- Parameters
filename (string, default=None) – Path to file on disk for storing the pipeline result. If None, the data is persistet in-memory as a pandas data frame.
- Return type
- profile(profilers: Optional[Union[int, str, Tuple[Union[int, str], openclean.profiling.base.DataProfiler]]] = None, default_profiler: Optional[Type] = None) List[Dict]
Profile one or more columns in the data stream. Returns a list of profiler results for each profiled column.
By default each column in the data stream is profiled independently using the default stream profiler. The optional list of profilers allows to override the default behavior by providing a list of column references (with optional profiler function). If only a column reference is given the default stream profiler is used for the referenced column.
- Parameters
profilers (int, string, tuple, or list of tuples of column reference) – and openclean.profiling.base.DataProfiler, default=None Specify the list of columns that are profiled and the profiling function. If only a column reference is given (not a tuple) the default stream profiler is used for profiling the column.
default_profiler (class, default=None) – Class object that is instanciated as the profiler for columns that do not have a profiler instance speicified for them.
- Return type
list
- rename(columns: Union[int, str, List[Union[str, int]]], names: List[Union[str, histore.document.schema.Column]]) openclean.pipeline.DataPipeline
Rename selected columns in a the schema data of data stream rows.
- Parameters
columns (int, str, or list of int or string) – References to renamed columns.
names (int, str, or list of int or string, default=None) – New names for the selected columns.
- Return type
- run()
Stream all rows from the associated data file to the data pipeline that is associated with this processor. If an optional operator is given, that operator will be appended to the current pipeline before execution.
The returned value is the result that is returned when the consumer is generated for the pipeline is closed after processing the data stream.
- Return type
any
- sample(n: int, random_state: Optional[int] = None) openclean.pipeline.DataPipeline
Add operator for a random sample generator to the data stream.
- n: int
Size of the collected random sample.
- random_state: int, default=None
Seed value for the random number generator (for reproducibility purposes).
- select(columns: Optional[Union[int, str, List[Union[str, int]]]] = None, names: Optional[List[Union[str, histore.document.schema.Column]]] = None) openclean.pipeline.DataPipeline
Select a given list of columns from the streamed data frame. Columns in the resulting data stream may also be renamed using the optional list of new column names.
Returns a new data stream with the column filter set to the columns that were in the argument list.
- Parameters
columns (int, str, or list of int or string, default=None) – References to the selected columns.
names (int, str, or list of int or string, default=None) – Optional renaming for selected columns.
- Return type
- stream(op: openclean.operator.stream.processor.StreamProcessor)
Stream all rows from the associated data file to the data pipeline that is associated with this processor. The given operator is appended to the current pipeline before execution.
The returned value is the result that is returned when the consumer is generated for the pipeline is closed after processing the data stream.
- Parameters
op (openclean.operator.stream.processor.StreamProcessor) – Stream operator that is appended to the current pipeline for execution.
- Return type
any
- to_df() pandas.core.frame.DataFrame
Collect all rows in the stream that are yielded by the associated consumer into a pandas data frame. :rtype: pd.DataFrame
- typecast(converter: Optional[openclean.profiling.datatype.convert.DatatypeConverter] = None) openclean.pipeline.DataPipeline
Typecast operator that converts cell values in data stream rows to different raw types that are represented by the given type converter.
- Parameters
converter (openclean.profiling.datatype.convert.DatatypeConverter,) – default=None Datatype converter for values data stream. Uses the default converter if no converter is given.
- Return type
openclean.pipeline.processor.DataPipeline
- update(columns: Union[int, str, List[Union[str, int]]], func: Union[Callable, Dict, openclean.function.eval.base.EvalFunction, int, float, str, datetime.datetime, openclean.function.value.base.ValueFunction]) openclean.pipeline.DataPipeline
Update rows in a data frame.
Raises a Value error if not enough arguments (at least two) are given.
- Parameters
columns (int, str, or list of int or string, default=None) – References to the selected columns.
func (scalar, dict, callable, openclean.function.value.base.ValueFunction,) – or openclean.function.eval.base.EvalFunction Specification of the (resulting) evaluation function that is used to generate the updated values for each row in the data frame.
- Return type
openclean.data.stream.processor.StreamProcessor
- where(predicate: openclean.function.eval.base.EvalFunction, limit: Optional[int] = None) openclean.pipeline.DataPipeline
Filter rows in the data stream that match a given condition. Returns a new data stream with a consumer that filters the rows. Currently expects an evaluation function as the row predicate.
Allows to limit the number of rows in the returned data stream.
This is a synonym for the filter() method.
- Parameters
predicate (opencelan.function.eval.base.EvalFunction) – Evaluation function used to filter rows.
limit (int, default=None) – Limit the number of rows in the filtered data stream.
- Return type
- write(filename: str, delim: Optional[str] = None, compressed: Optional[bool] = None, none_as: Optional[str] = None, encoding: Optional[str] = None)
Write the rows in the data stream to a given CSV file.
- Parameters
filename (string) – Path to a CSV file output file on the local file system.
delim (string, default=None) – The column delimiter used for the written CSV file.
compressed (bool, default=None) – Flag indicating if the file contents of the created file are to be compressed using gzip.
none_as (string, default=None) – String that is used to encode None values in the output file. If given, all cell values that are None are substituted by the string.
encoding (string, default=None) – The csv file encoding e.g. utf-8, utf-16 etc.
- class openclean.pipeline.PipelineIterator(stream: histore.document.base.DocumentIterator, consumer: Optional[openclean.operator.stream.consumer.StreamConsumer] = None)
Bases:
histore.document.base.DocumentIterator
Iterator over rows in a data processing pipeline. Iterates over the rows in an input stream. Each row is processed by a stream consumer. If the consumer returns a value this value is returned as the next row. For consumers that only return a result at the end of the stream this iterator iterates over the rows that are returned when the consumer is closed.
- close()
Release all resources that are held by the associated input stream and output consumer.
- next() Tuple[int, Union[int, float, str, datetime.datetime, Tuple[Union[int, float, str, datetime.datetime], ...]], List[Union[int, float, str, datetime.datetime]]]
Read the next row in the pipeline.
The row is processed by the associated consumer. If the consumer returns an non-None result this row is returned as the next row. If the consumer returns None the next input row is processed. If the consumer returns a non-empty result when it is closed we assume that this is a list of rows and iterate over them as well.
- Return type
tuple of int, histore.document.base.RowIndex, histore.document.base.DataRow
- openclean.pipeline.stream(filename: Union[str, pandas.core.frame.DataFrame], header: Optional[List[Union[str, histore.document.schema.Column]]] = None, delim: Optional[str] = None, compressed: Optional[bool] = None, none_is: Optional[str] = None, encoding: Optional[str] = None) openclean.pipeline.DataPipeline
Read a CSV file as a data stream. This is a helper method that is intended to read and filter large CSV files.
- Parameters
filename (string) – Path to CSV file on the local file system.
header (list of string, default=None) – Optional header. If no header is given it is assumed that the first row in the CSV file contains the header information.
delim (string, default=None) – The column delimiter used in the CSV file.
compressed (bool, default=None) – Flag indicating if the file contents have been compressed using gzip.
none_is (string, default=None) – String that was used to encode None values in the input file. If given, all cell values that match the given string are substituted by None.
encoding (string, default=None) – The csv file encoding e.g. utf-8, utf16 etc
- Return type