Log::Parallel - cluster computing framework


 % bin/process_logs -c config_file

 use Log::Parallel;
 use Log::Parallel::ConfigCheck qw(validate_config);
 use Proc::JobQueue::DependencyQueue;





 $files_by_recnum{$_->{recnum}} = get_files_by_srec($_, $config->{hostsinfo}) 
        for @{$config->{sources}};

 my $dependency_graph = make_dependency_graph(make_task_list($opts, $config, %files_by_recnum))

 my $job_queue = new Proc::JobQueue::DependencyQueue(dependency_graph => $dependency_graph, hosts => [], hold_all => 1);

 setup_slave_hosts($config, $job_queue);


This is the main driver module at the heart of a cluster computing framework used for batch log processing. It sets things up, figures out what jobs can run and in what order, and queues them up to run.

Everything it does is driven from the configuration data, probably parsed by Config::YAMLMacros and validated by Log::Parallel::ConfigCheck.

Only one program, process_logs, is expected to use this module. As such, documentation for the API for this module in particular is not particularly relevant, but this is as good a place as any to document the overall system.

process_logs is the driver script for processing data logs through a series of jobs specified in a configuration file.

Each job consists of a set of steps to process input files and create an output file (possibly bucketized). This very much like a map-reduce framework. The steps are:

1. Parse

The first step is to parse the input files. The input files can come from multiple places/steps and be in multiple formats. They must all be sorted on the same fields so that they can be joined together in an ordered stream.

2. Filter

As items are read in, the filter code is executed. Items are dropped unless the filter code returns a true value.

4. Group

The items that make it past the filter can optionally be grouped together so that they're passed to the next starge as an array of items.

4. Transform

The transform step consumes items and generate items. It consumes items one-by-one (or one group at a time), but it can produce zero or many items for each one it consumes. It can take events and squish them together into a session; or it can take a session and break it apart into events; or it can take sessions and produce a single aggregated result when it had consumed all the input.

5. Bucketize

As new resultant items are generated, they can be bucketized into many buckets and split across a cluster.

6. Write

The resultant items are writen in the format specified. Since the next step may run things though unix sort, the output format may need to be squished onto one line.

7. Sort

The output files get sorted according to fields defined in the resultant items.

8. Post-Sort Transform

If the writer had to encode the output for unix sort, it gets a chance to un-encode it after sorting so that it's in its desired format.


The configuration file is in YAML format and is preprocessed with Config::YAMLMacros which provides some macro directives (include and define).

It is post-processed with Config::Checker which allows for some flexibility (sloppyness) on the part of configuration writers. Single items will be automatically turned into lists when needed.

The configuration file has three several sections. The main section is the one that defines the jobs that process logs does.

The exact details of each section are described in Log::Parallel::ConfigCheck.


The current version of Log::Parallel can efficiently utilize more than 100 CPU cores for doing parallel work. A single process handles starting new jobs and receives all STDOUT & STDERR from all jobs. The author expect that this setup will bottleneck at around 400 CPU cores. Larger jobs with less output will change will allow larger scale processing.

All jobs in Log::Parallel currently date-based. All the rules and the input data are understood to cover certain date ranges. Log::Parallel figures that the output time range of a job is the same as the intput time range for that job. You can have daily jobs with daily output. You can combine daily inputs together to run a weekly job. In the current system, there is no way to break up that weekly job's output back into daily time ranges. This presents two problems: first, there is no good solution to sessions that cross a time boundry; second, there is no easy way to to go backwards in time (for example, to filter out spammers).

On potential improvement would be to allow jobs that have the same input to run together so that the parsing/combining step is only done once.

Another improvement would be to use unix sort -m instead of Sort::MergeSort.

Currently all files are read and written using ssh. Reading using a remote file system (NFS automounts) would improve performance.


The input files for Log::Parallel need to be named with a date stamp. The exact naming convention is flexible because the input files can be on multiple hosts and matched with globs. Normal syslog filenames will not work so some other processes must name things by the date.


This is used by process_logs. It reads configurations from Log::Parallel::ConfigCheck. It uses a Proc::JobQueue::DependencyQueue to queue the jobs that need to run. The jobs it runs are farmed out to remote systems using RPC::ToWorker. On the remote system, that code that runs the jobs is Log::Parallel::Task. The inputs to the jobs are parsed using a parser found by Log::Parallel::Parsers and the outputs are written using a writer invoked by Log::Parallel::Writers. The main writer is Log::Parallel::TSV. The time time formats that describe when jobs should run are parsed by Log::Parallel::Durations. This module has support modules: Log::Parallel::Paths, Log::Parallel::Metadata, Log::Parallel::Misc.

Some modules that are handy for writing jobs are: Log::Parallel::Sql, Stream::Aggregate, Log::Parallel::Geo::IP.


This package may be used and redistributed under the terms of either the Artistic 2.0 or LGPL 2.1 license.