NAME

OmniPITR::Tools::ParallelSystem - Class for running multiple shell commands in parallel.

SYNOPSIS

General usage is:

my $run = OmniPITR::Tools::ParallelSystem( 'max_jobs' => 2, ... );
$run->add_command( 'command' => [ 'ls', '-l', '/' ] );
$run->add_command( 'command' => [ 'ls', '-l', '/root' ] );
$run->run();
my $results = $run->results();

DESCRIPTION

ParallelSystem strives to make the task of running in parallel simple, and effective.

It lets you define any number of commands, set max number of concurrent workers, set startup/finish hooks, and run the whole thing.

Final ->run() is blocking, and it (temporarily) sets CHLD signal handler to its own code, but it is reset to original value afterwards.

INTERNALS

new()

Object constructor. Takes one obligatory argument, and two optional:

  • max_jobs - obligatory integer, >= 1, defines how many workers to run at a time

  • on_start - coderef (anonymous sub for example) that will be called, every time new worker is spawned. There will be one argument to the code, and it will be job descriptor. More information about job descriptors in docs for add_command() method.

  • on_finish - coderef (anonymous sub for example) that will be called, every time worker finishes. There will be one argument to the code, and it will be job descriptor. More information about job descriptors in docs for add_command() method.

If there are problems with arguments (max_jobs not given, or bad, or hooks given, but not CODE refs - exception will be raised using Carp::croak().

Arguments are passed as hash - both hash and hashref are accepted, so you can both:

my $run = OmniPITR::Tools::ParallelSystem->new(
  'max_jobs' => 2,
  'on_finish' => sub { call_logging( shift ) },
);

and

my $run = OmniPITR::Tools::ParallelSystem->new(
  {
    'max_jobs' => 2,
    'on_finish' => sub { call_logging( shift ) },
  }
);

results()

Returns arrayref with all job descriptors (check add_command() method docs for details), after all the jobs have been ran.

add_command()

Adds new command to queue of things to be run.

Given argument (both hash and hashref are accepted) is treated as job descriptor.

To make the whole thing run, the only key needed is "command" - which should be arrayref of command and its arguments. The command cannot require any parsing - it will be passed directly to exec() syscall.

There can be more keys in the job descriptor, and all of them will be stored, and passed back in on_start/on_finish hooks, and will be present in ->results() data.

But, there will be several keys added by OmniPITR::Tools::ParallelSystem itself:

  • Available in all 3 places: on_start and on_finish hooks, and final ->results():

    • started - exact time when the worker has started. Time is as epoch time with microsecond precision.

    • pid - pid of worker process - it will be available in

    • stderr - name of temporary file that contains stderr output (in on_start hook), or stderr output from command (in other cases)

    • stdout - name of temporary file that contains stdout output (in on_start hook), or stdout output from command (in other cases)

  • Additional information available in all 2 places: on_finish hooks and final ->results():

    • ended - exact time when the worker has ended - it will be available in on_finish hook, and in results. Time is as epoch time with microsecond precision.

    • status - numerical status of worker exit. Rules for understanding the value are in perldoc perlvar - as "CHILD_ERROR" - a.k.a. $?

If application provides more keys to add_command, all of them will be preserverd, and passed back to app in hook calls, and in results output.

add_commands()

Simple wrapper to simplify adding multiple commands.

Input should be array (or arrayref) of hashrefs, each hashref should be valid job descriptor, as described in add_command() docs.

run()

Main loop responsible of running commands, and handling end of workers.

start_new_worker()

Internal method that does actual starting of new worker (if it can be started, and if there is actual work for it to do).

Calls on_start hook if needed.

handle_finished_workers()

Internal method which does necessary work when worker finishes. Reads stdout/stderr files, unlinks temp files, calls on_finish hook.