Piper - Flexible, iterable pipeline engine with automatic batching


    use Piper;

    my $pipeline = Piper->new(
        first_process => sub {
            my ($instance, $batch) = @_;
            $instance->emit( map { ... } @$batch );
        second_processes => Piper->new(...),
        final_process => sub { ... },


    while ($pipeline->isnt_exhausted) {
        my $item = $pipeline->dequeue;


The software engineering concept known as a pipeline is a chain of processing segments, arranged such that the output of each segment is the input of the next.

[Piper](https://metacpan.org/pod/Piper) is a pipeline builder.  It composes arbitrary processing segments into a single pipeline instance with the following features:

- Pipeline instances are iterators, only processing data as needed.
- Data is automatically processed in batches for each segment (with configurable batch sizes).
- Built-in support exists for non-linear and/or recursive pipelines.
- Processing segments are pluggable and reusable.


## new(@segments)

Create a container pipeline segment (parent) from the provided child `@segments`.

Additionally, a single hashref of attributes for the container/parent segment may be included as an argument to the constructor (anywhere in the argument list). See the ["SEGMENT ATTRIBUTES"](#segment-attributes) section for a description of attributes available for both parent and child segments.

Accepted segment types are as follows:

- [Piper](https://metacpan.org/pod/Piper) object

    Creates a sub-container of pipeline segments.  There is no (explicit) limit to the number of nested containers a pipeline may contain.

- [Piper::Process](#process-handler) object

    See the ["PROCESS HANDLER"](#process-handler) section for a description of [Piper::Process](https://metacpan.org/pod/Piper::Process) objects.

- A coderef (which will be coerced into a [Piper::Process](https://metacpan.org/pod/Piper::Process) object).
- A hashref that can be coerced into a [Piper::Process](https://metacpan.org/pod/Piper::Process) object.

    In order to be considered a candidate for coercion, the hashref must contain (at a minimum) the 'handler' key.

- [Piper::Instance](#initialization) object

    In this case, the associated [Piper](https://metacpan.org/pod/Piper) or [Piper::Process](https://metacpan.org/pod/Piper::Process) object is extracted from the [Piper::Instance](https://metacpan.org/pod/Piper::Instance) object for use in the new pipeline segment.

    See ["INITIALIZATION"](#initialization) for a description of [Piper::Instance](https://metacpan.org/pod/Piper::Instance) objects.

- A `$label => $segment` pair

    For such pairs, the `$segment` can be any of the above segment types, and `$label` is a simple scalar which will be used as `$segment`'s label.

    If the `$segment` already has a label, `$label` will override it.

## Constructor Example

    my $pipe = Piper->new(
        subpipe_label => Piper->new(
            first_handler => Piper::Process->new(sub { ... }),
            second_handler => sub { ... },
            third_handler => {
                handler => sub { ... },
            another_subpipe => Piper->new(...),
            label => 'another_handler',
            handler => sub { ... },
        sub {
            # An un-labeled handler
            label => 'final_handler',
            handler => sub { ... },


Piper segments were designed to be easily reusable.  Prior to initialization, [Piper](https://metacpan.org/pod/Piper) and [Piper::Process](https://metacpan.org/pod/Piper::Process) objects do not process data; they simply contain the blueprint for creating the pipeline.  As such, blueprints for commonly-used pipeline segments can be stored in package libraries and imported wherever needed.

To create a functioning pipeline from one such blueprint, simply call the `init` method on the outermost segment.  The `init` method returns a [Piper::Instance](https://metacpan.org/pod/Piper::Instance) object of the outermost segment, which is the realization of the pipeline design, and which contains [Piper::Instance](https://metacpan.org/pod/Piper::Instance) objects created from all its contained segments.

Initialization fuses the pipeline segments together, establishes the relationships between the segments, and initializes the dataflow infrastructure.

The `init` method may be chained from the constructor if the blueprint object is not needed:

    my $instance = Piper->new(...)->init;

Any arguments passed to the `init` method will be cached and made available to each handler in the pipeline (see the ["PROCESS HANDLER"](#process-handler) section for full description of handlers).  This is a great way to share a resource (such as a database handle) among process handlers.

    my $pipe = Piper->new(
        query => sub {
            my ($instance, $batch, $dbh) = @_;
    my $instance = $pipe->init($dbh);

Instances are ready to accept data for processing:

    while ($instance->isnt_exhausted) {
        my $result = $instance->dequeue;


[Piper::Process](https://metacpan.org/pod/Piper::Process) objects have the same ["SEGMENT ATTRIBUTES"](#segment-attributes) as [Piper](https://metacpan.org/pod/Piper) objects, but have an additional required attribute known as its `handler`.

A process `handler` is the data-processing subroutine for the segment.

In its simplest form, the process handler takes input from the previous pipeline segment, processes it, and passes it on to the next segment; but handlers also have built-in support for non-linear and recursive dataflow (see ["FLOW CONTROL"](#flow-control)).

The arguments provided to the `handler` subroutine are:

- `$instance`

    The instance (a [Piper::Instance](https://metacpan.org/pod/Piper::Instance) object) corresponding to the segment.

- `$batch`

    An arrayref of data items to process.

- `@args`

    Any arguments provided to the `init` method during the ["INITIALIZATION"](#initialization) of the pipeline.

After processing a batch of data, the `handler` may pass the results to the next segment using the `emit` method called from the handler's `$instance`.

## Example:

    sub {
        my ($instance, $batch) = @_;
        $instance->emit( map { ... } @$batch );


Since [Piper](https://metacpan.org/pod/Piper) has built-in support for non-linear and/or recursive pipelines, a ["PROCESS HANDLER"](#process-handler) may send data to any other segment in the pipeline, including itself.

The following methods may be called from the `$instance` object passed as the first argument to a `handler`:

## emit(@data)

Send `@data` to the next segment in the pipeline.  If the instance is the last in the pipeline, emits to the drain, making the `@data` ready for `dequeue`.

## recycle(@data)

Re-queue `@data` to the top of the current segment in an order such that `dequeue(1)` would subsequently return `$data[0]` and so forth.

## injectAt($location, @data)

## injectAfter($location, @data)

Send `@data` to the segment _at_ or _after_ the specified `$location`.

For each of the above methods, `$location` must be the label of a segment in the pipeline or a path-like representation of an hierarchy of labels.

For example, in the following pipeline, a few possible `$location` values include `a`, `subpipe/b`, or `main/subpipe/c`.

    my $pipe = Piper->new(
        { label => 'main' },
        subpipe => Piper->new(
            a => sub { ... },
            b => sub { ... },
            c => sub { ... },

If a label is unique within the pipeline, only the label is required.  For non-unique labels, searches are performed in a nearest-neighbor, depth-first manner.

For example, in the following pipeline, searching for `processA` from the handler of `processB` would find `main/pipeA/processA`, not `main/processA`.  So to reach `main/processA` from `processB`, the handler would need to search for `main/processA`.

    my $pipe = Piper->new(
        { label => 'main' },
        pipeA => Piper->new(
            processA => sub { ... },
            processB => sub { ... },
        processA => sub { ... },

## inject(@data)

If the segment has a parent, enqueues `@data` to its parent.  Otherwise, enqueues `@data` to itself.

## eject(@data)

If the segment has a parent, send `@data` to the drain of its parent.  Otherwise, enqueues `@data` to the segment's drain.


All of the following attributes are available for both container ([Piper](https://metacpan.org/pod/Piper)) and processor ([Piper::Process](https://metacpan.org/pod/Piper::Process)) segment types.

Each attribute is equipped with an accessor of the same name.

A star (\*) indicates that the attribute is writable, and can be modified at runtime by passing a value as an argument to the method of the same name.

All attributes (except `label`) have an associated predicate method called `has_$attribute` which returns a boolean indicating whether the attribute has been set for the segment.

All writable attributes (indicated by \*) can be cleared by passing an explicit `undef` to the writer method or by calling the appropriate clearer method called `clear_$attribute`.

All accessors, writers, predicates, and clearers are available for each segment before and after ["INITIALIZATION"](#initialization).

## allow

A coderef which can be used to subset the items which are _allowed_ to be processed by the segment.

The coderef executes on each item attempting to queue to the segment.  If it returns true, the item is queued.  Otherwise, the item skips the segment and proceeds to the next adjacent segment.

Each item is localized to `$_`, and is also passed in as the first argument.

These example `allow` subroutines are equivalent:

    # This segment only accepts digit inputs
    allow => sub { /^\d+$/ }
    allow => sub { $_ =~ /^\d+$/ }
    allow => sub { $_[0] =~ /^\d+$/ }

## \*batch\_size

The number of items to process at a time for the segment.

Once initialized (see ["INITIALIZATION"](#initialization)), a segment inherits the `batch_size` of any existing parent(s) if not provided.  If the segment has no parents, or if none of the parents have a `batch_size` defined, the default `batch_size` will be used.  The default `batch_size` is 200, but can be configured in the import statement (see the ["GLOBAL CONFIGURATION"](#global-configuration) section).

## \*debug

The debug level for the segment.

Once initialized (see ["INITIALIZATION"](#initialization)), a segment inherits the debug level of any existing parent(s) if not specified.  The default level is 0, but can be globally overridden by the environment variable `PIPER_DEBUG`.

See the ["LOGGING AND DEBUGGING"](#logging-and-debugging) section for specifics about debug and verbosity levels.

## \*enabled

A boolean indicating that the segment is enabled and can accept items for processing.

Once initialized (see ["INITIALIZATION"](#initialization)), a segment inherits this attribute from any existing parent(s).  The default is true.

If a segment is disabled (`enabled = 0`), all items attempting to queue to the segment are forwarded to the next adjacent segment.

## label

A label for the segment.  If no label is provided, a globally unique ID will be used.

Labels are necessary for certain types of ["FLOW CONTROL"](#flow-control) (for example, [injectAt](https://metacpan.org/pod/injectAt) or [injectAfter](https://metacpan.org/pod/injectAfter)).  For pipelines that do not utilize ["FLOW CONTROL"](#flow-control) features, labels are primarily useful for ["LOGGING AND DEBUGGING"](#logging-and-debugging).

## \*verbose

The verbosity level for the segment.

Once initialized (see ["INITIALIZATION"](#initialization)), a segment inherits the verbosity level of any existing parent(s) if not specified.  The default level is 0, but can be globally overridden by the environment variable `PIPER_VERBOSE`.

See the ["LOGGING AND DEBUGGING"](#logging-and-debugging) section for specifics about debug and verbosity levels.


The following attributes have read-only accessors (of the same name).

### children

For container instances (made from [Piper](https://metacpan.org/pod/Piper) objects, not [Piper::Process](https://metacpan.org/pod/Piper::Process) objects), holds an arrayref of the contained instance objects.

### main

For any instance in the pipeline, this attribute holds a reference to the outermost container instance.

### parent

For all instances in the pipeline except the outermost container (`main`), this attribute holds a reference to the instance's immediate container segment.

### path

The full path to the instance, built as the concatenation of all the parent(s) labels and the instance's label, joined by `/`.  Instances stringify to this attribute.


Methods marked with a (\*) should only be called from the outermost instance.

### \*dequeue(\[$num\])

Remove at most `$num` (default 1) processed items from the end of the pipeline.

### \*enqueue(@data)

Queue `@data` for processing by the pipeline.

### find\_segment($location)

Find and return the segment instance according to `$location`, which can be a label or a path-like hierarchy of labels.  See [injectAfter](#injectafter-location-data) for a detailed description of `$location`.

### \*flush

Process batches until there are no more items pending.

### has\_children

A boolean indicating whether the instance has any children.

### has\_parent

A boolean indicating whether the instance has a parent.

### has\_pending

Returns a boolean indicating whether there are any items that are queued at some level of the segment but have not completed processing.

### \*is\_exhausted

Returns a boolean indicating whether there are any items left to process or dequeue.

### \*isnt\_exhausted

Returns the opposite of `is_exhausted`.

### next\_segment

Returns the next adjacent segment from the calling segment.  Returns undef for the outermost container.

### pending

Returns the number of items that are queued at some level of the pipeline segment but have not completed processing.

### \*prepare(\[$num\])

Process batches while data is still `pending` until at least `$num` (default 1) items are `ready` for `dequeue`.

### ready

Returns the number of items that have finished processing and are ready for `dequeue` from the pipeline segment.


The following global attributes are configurable from the Piper import statement.

    # Change the default batch_size to 50
    use Piper batch_size => 50;

## batch\_size

The default batch size used by pipeline segments which do not have a locally defined `batch_size` and do not have a parent segment with a defined `batch_size`.

The `batch_size` attribute must be a positive integer.

The default `batch_size` is 200.


Logging and debugging facilities are available upon ["INITIALIZATION"](#initialization) of a pipeline.

Warnings and errors are issued regardless of debug and verbosity levels via `carp` and `croak` from the [Carp](https://metacpan.org/pod/Carp) module, and are therefore configurable with any of [Carp](https://metacpan.org/pod/Carp)'s global options or environment variables.

Debugging and/or informational messages are printed to STDERR if debug and/verbosity levels have been set.  There are three levels used by [Piper](https://metacpan.org/pod/Piper) for each of `debug`/`verbose`: 0, 1, or 2.  The default is 0 (off).

## Levels

Levels can be set by any of the following mechanisms: at construction of the [Piper](https://metacpan.org/pod/Piper)/[Piper::Process](https://metacpan.org/pod/Piper::Process) objects, dynamically via the `debug` and `verbose` methods of segments, or with the environment variables `PIPER_DEBUG` and `PIPER_VERBOSE`.

Levels can be set local to specific segments.  The default levels of a sub-segment are inherited from its parent.

        # main                 verbose => 0 (default)
        # main/subpipe         verbose => 1
        # main/subpipe/normal  verbose => 1 (inherited)
        # main/subpipe/loud    verbose => 2
        # main/subpipe/quiet   verbose => 0

        my $pipe = Piper->new(
            { label => 'main' },
            subpipe => Piper->new(
                { verbose => 1 },
                normal => sub {...},
                loud => {
                    verbose => 2,
                    handler => sub {...},
                quiet => {
                    verbose => 0,
                    handler => sub {...},

Levels set via the environment variables `PIPER_DEBUG` and `PIPER_VERBOSE` are global.  If set, these environment variables override any and all settings defined in the source code.

## Messages

All messages include information about the segment which called the logger.

Existing informational (`verbose` or `debug` > 0) messages describe data processing steps, such as noting when items are queueing or being processed by specific segments.  Increasing level(s)  1> simply adds more detail to the printed messages.

Existing debug messages describe the decision actions of the pipeline engine itself.  Examples include logging its search steps when locating a named segment or explaining how it chooses which batch to process.  Increasing the debug level > 1 simply adds more detail to the printed messages.

## Custom messaging

User-defined errors, warnings, and debug or informational messages can use the same logging system as [Piper](https://metacpan.org/pod/Piper) itself.

The first argument passed to a ["PROCESS HANDLER"](#process-handler) is the [Piper::Instance](https://metacpan.org/pod/Piper::Instance) object associated with that segment, which has the below-described methods available for logging, debugging, warning, or throwing errors.

In each of the below methods, the `@items` are optional and only printed if the verbosity level for the segment is > 1.  They can be used to pass additional context or detail about the data being processed or which caused the message to print (for conditional messages).

The built-in messaging only uses debug/verbosity levels 1 and 2, but there are no explicit rules enforced on maximum debug/verbosity levels, so users may explicitly require higher levels for custom messages to heighten the required levels for any custom message.

### ERROR($message, \[@items\])

Throws an error with `$message` via `croak`.

### WARN($message, \[@items\])

Issues a warning with `$message` via `carp`.

### INFO($message, \[@items\])

Prints an informational `$message` to STDERR if either the debug or verbosity level for the segment is > 0.

### DEBUG($message, \[@items\])

Prints a debug `$message` to STDERR if the debug level for the segment is > 0.

### Example:

    my $pipe = Piper->new(
        messenger => sub {
            my ($instance, $batch) = @_;
            for my $data (@$batch) {
                if ($data->is_bad) {
                    $instance->ERROR("Data <$data> is bad!");
            # User-heightened verbosity level
            $instance->INFO('Data all good!', @$batch)
                if $instance->verbose > 2;


Much of the concept and API for this project was inspired by the work of [Nathaniel Pierce](mailto:nwpierce@gmail.com).

Special thanks to [Tim Heaney](http://oylenshpeegul.typepad.com) for his encouragement and mentorship.


version 0.05


Mary Ehlers <ehlers@cpan.org>


Tim Heaney <oylenshpeegul@gmail.com>


This software is Copyright (c) 2017 by Mary Ehlers.

This is free software, licensed under:

    The Apache License, Version 2.0, January 2004