=pod

=head1 NAME

POE::Component::MessageQueue::Manual::Clustering - Instructions for setting up clustered MQ's with fail-over

=head1 DESCRIPTION

PoCo::MQ now supports running multiple MQs (possibly on different machines) which share the
same DBI back-store.  If you put these MQs behind a reverse-proxy/load-balancer and write your
clients to reconnect when a connection is broken, you will have automatic fail-over if one of
the MQs goes down!

=head1 EXAMPLE

If you are using mq.pl, this is one possible way to start your MQs.

  # Start the first MQ (ex. on machine mq1.example.com)
  mq.pl --storage dbi \
        --front-store none \
        --dbi-dsn DBI:mysql:database=pocomq;hostname=db.example.com \
        --dbi-username pocomq \
        --dbi-password pocomq \
        --pump-freq 1 \
        --mq-id mq1 \
        --data-dir /tmp/perl_mq

  # Start the second MQ (ex. on machine mq2.example.com)
  mq.pl --storage dbi \
        --front-store none \
        --dbi-dsn DBI:mysql:database=pocomq;hostname=db.example.com \
        --dbi-username pocomq \
        --dbi-password pocomq \
        --pump-freq 1 \
        --mq-id mq2 \
        --data-dir /tmp/perl_mq

There are a couple important things to note above:

=over 4

=item *

--front-store none

The memory based front-stores Memory and BigMemory are local to each MQ.  That means that messages
delivered to one, won't be received by clients on the other, until after messages are moved into the
back-store.

You don't have to disable the front-store (and, in fact, it might improve performance when clients
are connected directly to the queue receiving the messages) but its affects might not be expected.

There is also L<POE::Component::MessageQueue::Storage::Remote> front-store that the MQs can share
but it isn't currently accessible from mq.pl.

=item *

--pump-freq 1

Periodically, an internal queue (ie. /queue/example) is "pumped", which means that the MQ goes
through the messaes on that queue and distributes them to available subscribers.

Normally, a "pump" is triggered on an internal queue only after messages are received on it.  But
in the case of a clustered MQ, messages might have been recieved on one of its sister MQs and so
it doesn't know when to pump.

Setting --pump-freq tells the MQ to pump all queues regardless of whether anything has changed on
them every X seconds.  In our example, it pumps all queues every 1 second.  If you want the messages
to arrive quickly, small values like 1 or 2 are recommended.

=item *

--mq-id mq1 / mq2

This sets a unique string id for each MQ.  Every MQ sharing the same DBI database, B<MUST> have a
different --mq-id.

Since they are sharing the database, they need to prefix some data in it, so that the other MQ doesn't
mess up its state.

=back

=head1 REVERSE PROXY / LOAD BALANCER

=head2 balance

B<balance> is a very simple reverse-proxy and load-balancer that has packages for many distributions.

L<http://www.inlab.de/balance.html>

Executing it on the load balancing machine (per our EXAMPLE above):

   balance -f 61613 mq1.example.com:61613 mq2.example.com:61613

And that's it!

=head2 LVS (Linux Virtual Server)

LVS is a load-balancing solution built into the Linux Kernel.  I recommend the HOWTO for learning
more about LVS:

L<http://www.austintek.com/LVS/LVS-HOWTO/>

You will have to do a number of things (described that the documentation above) to get your system
setup to do LVS, but once its ready, run the following commands as root to configure the 
load-balancer (per our EXAMPLE above):

  ipvsadm -A -t external.example.com:61613 -s wlc
  ipvsadm -a -t external.example.com:61613 -r mq1.example.com:61613 -m
  ipvsadm -a -t external.example.com:61613 -r mq2.example.com:61613 -m

Here is a link to the LVS project homepage:

L<http://www.linuxvirtualserver.org/>

=head1 CLIENTS

Writting clients that are robust to MQ failure and simply reconnect and retry, is a little more
challenging than it seems at first.

The main problem, is detecting failure correctly.  Different systems (with different OSs, Perl
versions) and also over different network configurations (through firewalls, routers, etc) will
act differently when the connection to the MQ is broken.

Here are some example snippets for scripts that correctly handle failure when all programs
involved (MQs, load-balancer, clients) are running on the same machine.  These were tested
on Debian Lenny with Linux 2.6.26 and Perl 5.10.

Your mileage may vary when running on your systems on your network.  Please test appropriately!

=head2 CONSUMER

  use Net::Stomp;
  use strict;

  my $QUEUE_NAME = "/queue/example";
  my $HOSTNAME   = "localhost";
  my $PORT       = 61613;
  my $USERNAME   = 'producer';
  my $PASSWORD   = 'test';

  sub main
  {
    my $stomp;
 
    my $connect = sub {
      while (1) {
        eval {
          $stomp = Net::Stomp->new({
            hostname => $HOSTNAME,
            port     => $PORT,
          });
          $stomp->connect({
            login    => $USERNAME,
            passcode => $PASSWORD,
          });
          $stomp->subscribe({
            destination => $QUEUE_NAME,
            ack         => 'client',
          });
        };
        if (!$@) {
          # If no Perl exception was thrown, return!  We are connected. 
          return;
        }

        # Otherwise, wait a second and try again.
        print STDERR "Unable to connect to MQ!  Sleeping 1 second then trying again...\n";
        sleep 1;
      }
    };
    $connect->();

    my $receive = sub {
      my $frame;
      while (1) {
        eval {
          $frame = $stomp->receive_frame;
        };
        if (!$@) {
          # If no Perl exception was thrown, return our $frame!
          return $frame;
        }

        # Otherwise, wait a second, reconnect and then try again.
        print STDERR "Connection to MQ broken!  Sleeping 1 second then attempting to reconnect...\n";
        sleep 1;
        $connect->();
      }
    };
 
    while (1) {
      my $frame = $receive->();

      # TODO: Do something useful with this frame!!
      print "received:". $frame->body ."\n";

      $stomp->ack({ frame => $frame });
    }
 
    $stomp->disconnect();
  }
  main;

=head2 PRODUCER

  use Net::Stomp;
  use Data::Random qw/rand_chars/;
  use strict;
 
  my $QUEUE_NAME = "/queue/example";
  my $HOSTNAME   = "localhost";
  my $PORT       = 61613;
  my $USERNAME   = 'producer';
  my $PASSWORD   = 'test';
 
  # If we try to send a message on a connection after the MQ on the other side has died,
  # we received SIGPIPE.  It would be awesome if instead of receiving a signal, we got
  # an exception from $stomp->send(), but this is unfortunately how it is...
  my $sigpipe = 0;
  $SIG{PIPE} = sub {
    print STDERR "Caught signal SIGPIPE\n";
    $sigpipe = 1;
  };
  
  sub main
  {
    my $stomp;
  
    my $connect = sub {
      while (1) {
        eval {
          $stomp = Net::Stomp->new({
            hostname => $HOSTNAME,
            port     => $PORT,
          });
          $stomp->connect({
            login    => $USERNAME,
            passcode => $PASSWORD,
          });
        };
        if (!$@) {
          # If no Perl exception was thrown, return!  We are connected. 
          return;
        }

        # Otherwise, wait a second and try again.
        print STDERR "Unable to connect to MQ!  Sleeping 1 second then trying again...\n";
        sleep 1;
      }
    };
    $connect->();
 
    my $send = sub {
      my ($message) = @_;

      # This requests a reciept from the server.  This is very important!  Without this, the
      # server won't let us know if the message was received or not.
      $message->{receipt} = join('', rand_chars(set => 'alpha', size => 10)),

      while (1) {
        $sigpipe = 0;
        eval {
          $stomp->send({ %$message });
        };
        if (!$@ && !$sigpipe) {
          # No Perl exceptions and no SIGPIPE, but we can't advance until we get a receipt!
          # Wait 5 seconds for one, if it doesn't come, then we need to reconnect and try again.

          my $receipt;
          eval {
            $receipt = $stomp->receive_frame(5);
          };
          if (!$@ && $receipt && $receipt->headers->{'receipt-id'} eq $message->{'receipt'}) {
            # No Perl exception, we got a receipt and it matchs the message sent!
            # "return" triumphantly!
            return;
          }
        }

        # Otherwise, wait a second, reconnect and then try again.
        print STDERR "Connection to MQ broken!  Sleeping 1 second then attempting to reconnect...\n";
        sleep 1;
        $connect->();
      };
    };
 
    # Send messages forever...
    while (1) {
      # TODO: construct a real message!
      my $data = "test";

      $send->({
        destination => $QUEUE_NAME,
        body        => $data,
        persistent  => 'true',
      });
    }
 
    $stomp->disconnect();
  }
  main;

=head1 AUTHORS

Copyright 2010 David Snopek (L<http://www.hackyourlife.org>)

=head1 LICENSE

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program.  If not, see <http://www.gnu.org/licenses/>.

=cut