openclean.operator.stream.collector module

Collectors are data stream consumers that are not connected to a downstream consumer. These consumers collect results until the close method of the data stream is called. At that point, the collector returns a final results.

This module contains implementations of the StreamConsumer and StremProcessor interface for various basic collectors.

class openclean.operator.stream.collector.Collector

Bases: openclean.operator.stream.consumer.StreamConsumer, openclean.operator.stream.processor.StreamProcessor

The collector is intended primarily for test purposes. Simply collects the (rowid, row) pairs that are passed on to it in a list.

close() List

Return the collected row buffer on close.

Return type

list

consume(rowid: int, row: List)

Add the given (rowid, row)-pair to the internal buffer. Returns the row.

Parameters
  • rowid (int) – Unique row identifier

  • row (list) – List of values in the row.

open(schema: List[Union[str, histore.document.schema.Column]]) openclean.operator.stream.consumer.StreamConsumer

Factory pattern for stream consumer. Returns an instance of the stream consumer for the row collector.

Parameters

schema (list of string) – List of column names in the data stream schema.

Return type

openclean.operator.stream.consumer.StreamConsumer

class openclean.operator.stream.collector.DataFrame(columns: Optional[List[Union[str, histore.document.schema.Column]]] = None)

Bases: openclean.operator.stream.consumer.StreamConsumer, openclean.operator.stream.processor.StreamProcessor

Row collector that generates a pandas data frame from the rows in a data stream. This consumer will not accept a downstream consumer as it would never send any rows to such a consumer.

close() pandas.core.frame.DataFrame

Closing the consumer yields the data frame with the collected rows.

Return type

ps.DataFrame

consume(rowid: int, row: List)

Add the row identifier and row values to the respective lists. Returns None to avoid that the (empty) downstream consumer is called.

Parameters
  • rowid (int) – Unique row identifier

  • row (list) – List of values in the row.

open(schema: List[Union[str, histore.document.schema.Column]]) openclean.operator.stream.consumer.StreamConsumer

Factory pattern for stream consumer. Returns an instance of the stream consumer for the data frame generator.

Parameters

schema (list of string) – List of column names in the data stream schema.

Return type

openclean.operator.stream.consumer.StreamConsumer

class openclean.operator.stream.collector.Distinct(columns: Optional[Union[int, str, List[Union[str, int]]]] = None)

Bases: openclean.operator.stream.consumer.StreamConsumer, openclean.operator.stream.processor.StreamProcessor

Consumer that popuates a counter with the frequency counts for distinct values (or value combinations) in the processed rows for the data stream.

close() collections.Counter

Closing the consumer returns the populated Counter object.

Return type

collections.Counter

consume(rowid: int, row: List)

Add the value combination for a given row to the counter. If the row contains more than one column a tuple of column values will be added to the counter.

If the row only has one value this value will be used as the key for the counter. For rows with multiple values the row will be converted to a tuple that is used as the counter key.

Parameters
  • rowid (int) – Unique row identifier

  • row (list) – List of values in the row.

open(schema: List[Union[str, histore.document.schema.Column]]) openclean.operator.stream.consumer.StreamConsumer

Factory pattern for stream consumer. Returns an instance of the stream consumer for the distinct values collector.

Parameters

schema (list of string) – List of column names in the data stream schema.

Return type

openclean.operator.stream.consumer.StreamConsumer

class openclean.operator.stream.collector.RowCount

Bases: openclean.operator.stream.consumer.StreamConsumer, openclean.operator.stream.processor.StreamProcessor

The row counter is a simple counter for the number of (rowid, row) pairs that are passed on to consumer.

close() int

Return the couter value.

Return type

int

consume(rowid: int, row: List)

Increament the counter value.

Parameters
  • rowid (int) – Unique row identifier

  • row (list) – List of values in the row.

open(schema: List[Union[str, histore.document.schema.Column]]) openclean.operator.stream.consumer.StreamConsumer

Factory pattern for stream consumer. Returns an instance of the stream consumer for the row counter.

Parameters

schema (list of string) – List of column names in the data stream schema.

Return type

openclean.operator.stream.consumer.StreamConsumer

class openclean.operator.stream.collector.Write(file: Optional[histore.document.csv.base.CSVFile] = None, writer: Optional[histore.document.csv.writer.CSVWriter] = None)

Bases: openclean.operator.stream.consumer.StreamConsumer, openclean.operator.stream.processor.StreamProcessor

Write data stream rows to an output file. This class either contains a reference to a CSV file (if instantiated as a processor) or a reference to a CSV writer (if instantiated as a consumer).

close()

Close the associated CSV writer when the end of the data stream was reached.

consume(rowid: int, row: List)

Write the row values to the output file.

Parameters
  • rowid (int) – Unique row identifier

  • row (list) – List of values in the row.

open(schema: List[Union[str, histore.document.schema.Column]]) openclean.operator.stream.consumer.StreamConsumer

Factory pattern for stream consumer. Returns an instance of the stream consumer with an open CSV writer.

Parameters

schema (list of string) – List of column names in the data stream schema.

Return type

openclean.operator.stream.consumer.StreamConsumer