openclean.pipeline module

Data pipeline for processing datasets as streams of rows.

class openclean.pipeline.DataPipeline(source: Union[pandas.core.frame.DataFrame, str, histore.document.base.Document], columns: Optional[List[Union[str, histore.document.schema.Column]]] = None, pipeline: Optional[openclean.operator.stream.processor.StreamProcessor] = None)

Bases: histore.document.base.DefaultDocument

The data pipeline allows to iterate over the rows that are the result of streaming an input data set through a pipeline of stream operators.

The class implements the context manager interface.

append(op: openclean.operator.stream.processor.StreamProcessor, columns: Optional[List[Union[str, histore.document.schema.Column]]] = None) openclean.pipeline.DataPipeline

Return a modified stream processer with the given operator appended to the stream pipeline.

Parameters
  • op (openclean.operator.stream.processor.StreamProcessor) – Stream operator that is appended to the pipeline of the returned stream processor.

  • columns (list of string, default=None) – Optional (modified) list of column names for the schema of the data stream rows.

Return type

openclean.pipeline.DataPipeline

close()

Close the associated document.

cluster(clusterer: openclean.cluster.base.Clusterer) List[openclean.cluster.base.Cluster]

Cluster values in a data stream.

This operator will create a distinct set of values in the data stream rows. The collected values are then passed on to the given cluster algorithm.

Parameters

clusterer (openclean.cluster.base.Clusterer) – Cluster algorithm for distinct values in the data stream.

Return type

list of openclean.cluster.base.Cluster

count() int

Count the number of rows in a data stream.

Return type

int

delete(predicate: openclean.function.eval.base.EvalFunction) openclean.pipeline.DataPipeline

Remove rows from the data stream that satisfy a given condition.

Parameters

predicate (opencelan.function.eval.base.EvalFunction) – Evaluation function used to delete rows.

Return type

openclean.pipeline.DataPipeline

distinct(columns: Optional[Union[int, str, List[Union[str, int]]]] = None) collections.Counter

Get counts for all distinct values over all columns in the associated data stream. Allows the user to specify the list of columns for which they want to count values.

Parameters

columns (int, str, or list of int or string, default=None) – References to the column(s) for which unique values are counted.

Return type

collections.Counter

distinct_values(columns: Optional[Union[int, str, List[Union[str, int]]]] = None) List[Union[int, float, str, datetime.datetime, Tuple[Union[int, float, str, datetime.datetime]]]]

Get list all distinct values over all columns in the associated data stream.

Provides the option to the user to specify the list of columns for which they want to count values.

Parameters

columns (int, str, or list of int or string, default=None) – References to the column(s) for which unique values are counted.

Return type

collections.Counter

filter(predicate: openclean.function.eval.base.EvalFunction, limit: Optional[int] = None) openclean.pipeline.DataPipeline

Filter rows in the data stream that satisfy a given condition. Allows to limit the number of rows in the returned data stream.

Parameters
  • predicate (opencelan.function.eval.base.EvalFunction) – Evaluation function used to filter rows.

  • limit (int, default=None) – Limit the number of rows in the filtered data stream.

Return type

openclean.pipeline.DataPipeline

head(count: Optional[int] = 10) pandas.core.frame.DataFrame

Return the first n rows in the data stream as a pandas data frame. This is a short-cut for using a pipeline of .limit() and .to_df().

Parameters

count (int, default=10) – Defines the maximum number of rows in the returned data frame.

Return type

pd.DataFrame

insert(names: List[Union[str, histore.document.schema.Column]], pos: Optional[int] = None, values: Optional[Union[Callable, openclean.function.eval.base.EvalFunction, List, int, float, str, datetime.datetime, Tuple]] = None) openclean.pipeline.DataPipeline

Insert one or more columns into the rows in the data stream.

Parameters
  • names (string, or list(string)) – Names of the inserted columns.

  • pos (int, optional) – Insert position for the new columns. If None the columns will be appended.

  • values (scalar,) –

    list,

    callable, or openclean.function.eval.base.EvalFunction, optional

    Single value, list of constant values, callable that accepts a data frame row as the only argument and returns a (list of) value(s) matching the number of columns inserted or an evaluation function that returns a matchin number of values.

iterrows()

Simulate the iterrows() function of a pandas DataFrame as it is used in openclean. Returns an iterator that yields pairs of row identifier and value list for each row in the streamed data frame.

limit(count: int) openclean.pipeline.DataPipeline

Return a data stream for the data frame that will yield at most the first n rows passed to it from an associated producer.

Parameters

count (int) – Maximum number of rows in the returned data frame.

Return type

openclean.pipeline.DataPipeline

match(matcher: openclean.function.matching.base.StringMatcher, include_vocab: Optional[bool] = False) openclean.data.mapping.Mapping

Generate a mapping of best matches between a given vocabulary and the values in one (or more) column(s) of the data stream. For each row (i.e., single value) the best matches with a given vocabulary is computed and added to the returned mapping.

For rows that contain multiple columns an error will be raised.

If the include_vocab flag is False the resulting mapping will contain a mapping only for those values that do not occur in the vocabulary, i.e., the unknown values with respect to the vocabulary.

Parameters
  • matcher (openclean.function.matching.base.VocabularyMatcher) – Matcher to compute matches for the terms in a controlled vocabulary.

  • include_vocab (bool, default=False) – If this flag is False the resulting mapping will only contain matches for terms that are not in the vocabulary that is associated with the given matcher.

Return type

openclean.data.mapping.Mapping

move(columns: Union[int, str, List[Union[str, int]]], pos: int) openclean.pipeline.DataPipeline

Move one or more columns in a data stream schema.

Parameters
  • columns (int, string, or list(int or string)) – Single column or list of column index positions or column names.

  • pos (int) – Insert position for the moved columns.

Return type

openclean.pipeline.DataPipeline

open() openclean.pipeline.DataPipeline

Return reference to self when the pipeline is opened.

Return type

openclean.pipeline.DataPipeline

persist(filename: Optional[str] = None) openclean.pipeline.DataPipeline

Persist the results of the current stream for future processing. The data can either be written to disk or persitet in a in-memory data frame (depending on whether a filename is specified).

The persist operator is currently not lazzily evaluated.

Parameters

filename (string, default=None) – Path to file on disk for storing the pipeline result. If None, the data is persistet in-memory as a pandas data frame.

Return type

openclean.pipeline.DataPipeline

profile(profilers: Optional[Union[int, str, Tuple[Union[int, str], openclean.profiling.base.DataProfiler]]] = None, default_profiler: Optional[Type] = None) List[Dict]

Profile one or more columns in the data stream. Returns a list of profiler results for each profiled column.

By default each column in the data stream is profiled independently using the default stream profiler. The optional list of profilers allows to override the default behavior by providing a list of column references (with optional profiler function). If only a column reference is given the default stream profiler is used for the referenced column.

Parameters
  • profilers (int, string, tuple, or list of tuples of column reference) – and openclean.profiling.base.DataProfiler, default=None Specify the list of columns that are profiled and the profiling function. If only a column reference is given (not a tuple) the default stream profiler is used for profiling the column.

  • default_profiler (class, default=None) – Class object that is instanciated as the profiler for columns that do not have a profiler instance speicified for them.

Return type

list

rename(columns: Union[int, str, List[Union[str, int]]], names: List[Union[str, histore.document.schema.Column]]) openclean.pipeline.DataPipeline

Rename selected columns in a the schema data of data stream rows.

Parameters
  • columns (int, str, or list of int or string) – References to renamed columns.

  • names (int, str, or list of int or string, default=None) – New names for the selected columns.

Return type

openclean.pipeline.DataPipeline

run()

Stream all rows from the associated data file to the data pipeline that is associated with this processor. If an optional operator is given, that operator will be appended to the current pipeline before execution.

The returned value is the result that is returned when the consumer is generated for the pipeline is closed after processing the data stream.

Return type

any

sample(n: int, random_state: Optional[int] = None) openclean.pipeline.DataPipeline

Add operator for a random sample generator to the data stream.

n: int

Size of the collected random sample.

random_state: int, default=None

Seed value for the random number generator (for reproducibility purposes).

select(columns: Optional[Union[int, str, List[Union[str, int]]]] = None, names: Optional[List[Union[str, histore.document.schema.Column]]] = None) openclean.pipeline.DataPipeline

Select a given list of columns from the streamed data frame. Columns in the resulting data stream may also be renamed using the optional list of new column names.

Returns a new data stream with the column filter set to the columns that were in the argument list.

Parameters
  • columns (int, str, or list of int or string, default=None) – References to the selected columns.

  • names (int, str, or list of int or string, default=None) – Optional renaming for selected columns.

Return type

openclean.pipeline.DataPipeline

stream(op: openclean.operator.stream.processor.StreamProcessor)

Stream all rows from the associated data file to the data pipeline that is associated with this processor. The given operator is appended to the current pipeline before execution.

The returned value is the result that is returned when the consumer is generated for the pipeline is closed after processing the data stream.

Parameters

op (openclean.operator.stream.processor.StreamProcessor) – Stream operator that is appended to the current pipeline for execution.

Return type

any

to_df() pandas.core.frame.DataFrame

Collect all rows in the stream that are yielded by the associated consumer into a pandas data frame. :rtype: pd.DataFrame

typecast(converter: Optional[openclean.profiling.datatype.convert.DatatypeConverter] = None) openclean.pipeline.DataPipeline

Typecast operator that converts cell values in data stream rows to different raw types that are represented by the given type converter.

Parameters

converter (openclean.profiling.datatype.convert.DatatypeConverter,) – default=None Datatype converter for values data stream. Uses the default converter if no converter is given.

Return type

openclean.pipeline.processor.DataPipeline

update(columns: Union[int, str, List[Union[str, int]]], func: Union[Callable, Dict, openclean.function.eval.base.EvalFunction, int, float, str, datetime.datetime, openclean.function.value.base.ValueFunction]) openclean.pipeline.DataPipeline

Update rows in a data frame.

Raises a Value error if not enough arguments (at least two) are given.

Parameters
  • columns (int, str, or list of int or string, default=None) – References to the selected columns.

  • func (scalar, dict, callable, openclean.function.value.base.ValueFunction,) – or openclean.function.eval.base.EvalFunction Specification of the (resulting) evaluation function that is used to generate the updated values for each row in the data frame.

Return type

openclean.data.stream.processor.StreamProcessor

where(predicate: openclean.function.eval.base.EvalFunction, limit: Optional[int] = None) openclean.pipeline.DataPipeline

Filter rows in the data stream that match a given condition. Returns a new data stream with a consumer that filters the rows. Currently expects an evaluation function as the row predicate.

Allows to limit the number of rows in the returned data stream.

This is a synonym for the filter() method.

Parameters
  • predicate (opencelan.function.eval.base.EvalFunction) – Evaluation function used to filter rows.

  • limit (int, default=None) – Limit the number of rows in the filtered data stream.

Return type

openclean.pipeline.DataPipeline

write(filename: str, delim: Optional[str] = None, compressed: Optional[bool] = None, none_as: Optional[str] = None, encoding: Optional[str] = None)

Write the rows in the data stream to a given CSV file.

Parameters
  • filename (string) – Path to a CSV file output file on the local file system.

  • delim (string, default=None) – The column delimiter used for the written CSV file.

  • compressed (bool, default=None) – Flag indicating if the file contents of the created file are to be compressed using gzip.

  • none_as (string, default=None) – String that is used to encode None values in the output file. If given, all cell values that are None are substituted by the string.

  • encoding (string, default=None) – The csv file encoding e.g. utf-8, utf-16 etc.

class openclean.pipeline.PipelineIterator(stream: histore.document.base.DocumentIterator, consumer: Optional[openclean.operator.stream.consumer.StreamConsumer] = None)

Bases: histore.document.base.DocumentIterator

Iterator over rows in a data processing pipeline. Iterates over the rows in an input stream. Each row is processed by a stream consumer. If the consumer returns a value this value is returned as the next row. For consumers that only return a result at the end of the stream this iterator iterates over the rows that are returned when the consumer is closed.

close()

Release all resources that are held by the associated input stream and output consumer.

next() Tuple[int, Union[int, float, str, datetime.datetime, Tuple[Union[int, float, str, datetime.datetime], ...]], List[Union[int, float, str, datetime.datetime]]]

Read the next row in the pipeline.

The row is processed by the associated consumer. If the consumer returns an non-None result this row is returned as the next row. If the consumer returns None the next input row is processed. If the consumer returns a non-empty result when it is closed we assume that this is a list of rows and iterate over them as well.

Return type

tuple of int, histore.document.base.RowIndex, histore.document.base.DataRow

openclean.pipeline.stream(filename: Union[str, pandas.core.frame.DataFrame], header: Optional[List[Union[str, histore.document.schema.Column]]] = None, delim: Optional[str] = None, compressed: Optional[bool] = None, none_is: Optional[str] = None, encoding: Optional[str] = None) openclean.pipeline.DataPipeline

Read a CSV file as a data stream. This is a helper method that is intended to read and filter large CSV files.

Parameters
  • filename (string) – Path to CSV file on the local file system.

  • header (list of string, default=None) – Optional header. If no header is given it is assumed that the first row in the CSV file contains the header information.

  • delim (string, default=None) – The column delimiter used in the CSV file.

  • compressed (bool, default=None) – Flag indicating if the file contents have been compressed using gzip.

  • none_is (string, default=None) – String that was used to encode None values in the input file. If given, all cell values that match the given string are substituted by None.

  • encoding (string, default=None) – The csv file encoding e.g. utf-8, utf16 etc

Return type

openclean.pipeline.DataPipeline