openclean.operator.stream.consumer module

Abstract classes defining consumers in a data processing pipeline. Consumers operate on the rows in a data stream. A consumer may either collect the rows in a stream (for transformation). Collected rows are then processed or returned when the consumer is closed at the end of the stream. A consumer can also manipulate the rows directly and pass them on to a connected downstream consumer. The latter type of consumer is refered to as a producing consumer. The result of a producing consumer (that is returned when the consumer is closed at the end of the stream) is the result that is returned by the connected downstream consumer.

class openclean.operator.stream.consumer.ProducingConsumer(columns: List[Union[str, histore.document.schema.Column]], consumer: Optional[openclean.operator.stream.consumer.StreamConsumer])

Bases: openclean.operator.stream.consumer.StreamConsumer

A producing consumer passes the processed row on to a downstream consumer. This consumer therefore acts as a consumer and a producer.

close() Any

Return the result of the associated consumer when the end of the data stream was reached.

Return type

any

consume(rowid: int, row: List[Union[int, float, str, datetime.datetime]]) List[Union[int, float, str, datetime.datetime]]

Consume the given row. Passes the processed row on to an associated downstream consumer. Returns the processed row. If the result is None this signals to a collector/iterator that the given row should not be part of the collected/yielded result.

Parameters
  • rowid (int) – Unique row identifier

  • row (list) – List of values in the row.

Return type

list

abstract handle(rowid: int, row: List[Union[int, float, str, datetime.datetime]]) List[Union[int, float, str, datetime.datetime]]

Process a given row. Return a modified row or None. In the latter case it is assumed that the row should not be passed on to any consumer downstream.

Parameters
  • rowid (int) – Unique row identifier

  • row (list) – List of values in the row.

Return type

list

set_consumer(consumer: openclean.operator.stream.consumer.StreamConsumer) openclean.operator.stream.consumer.ProducingConsumer

Set the downstream consumer.

Parameters

consumer (openclean.data.stream.base.StreamConsumer) – Downstream consumer for processed rows.

Return type

openclean.data.stream.consumer.ProducingConsumer

class openclean.operator.stream.consumer.StreamConsumer(columns: List[Union[str, histore.document.schema.Column]])

Bases: object

Abstract class for consumers in a data stream processing pipeline. A consumer is the instantiation of a StreamProcessor that is prepared to process (consume) rows in a data stream.

Each consumer may be is associated with an (optional) downstream consumer that will received the processed row from this operator. Consumers that are connected to a downstream consumer are also refered to as producers. Consumers that are not connected to a downstream consumer are called collectors. There are separate modules for each type of consumers.

abstract close() Any

Signal that the end of the data stream has reached. The return value is implementation dependent.

Return type

any

abstract consume(rowid: int, row: List[Union[int, float, str, datetime.datetime]]) List[Union[int, float, str, datetime.datetime]]

Consume the given row. Passes the processed row on to an associated downstream consumer. Returns the processed row. If the result is None this signals to a collector/iterator that the given row should not be part of the collected/yielded result.

Parameters
  • rowid (int) – Unique row identifier

  • row (list) – List of values in the row.

process(ds: histore.document.base.Document) Any

Consume a given data stream and return the computed result.

Parameters

ds (openclean.data.stream.base.Document) – Iterable stream of dataset rows.

Return type

any

class openclean.operator.stream.consumer.StreamFunctionHandler(columns: List[Union[str, histore.document.schema.Column]], func: Callable[[List[Union[int, float, str, datetime.datetime]]], Union[int, float, str, datetime.datetime, Tuple[Union[int, float, str, datetime.datetime]]]], consumer: Optional[openclean.operator.stream.consumer.StreamConsumer] = None)

Bases: openclean.operator.stream.consumer.ProducingConsumer

The stream function handler is a producing consumer that uses an associated stream function to handle rows. A stream function should either return a modified row or None. Modified rows will be streamed to a connected consumer. If None is returned, the row will be ignored.

handle(rowid: int, row: List[Union[int, float, str, datetime.datetime]]) List[Union[int, float, str, datetime.datetime]]

Process a given row using the associated stream function.

Parameters
  • rowid (int) – Unique row identifier

  • row (list) – List of values in the row.

Return type

list