IO::Stream - ease non-blocking I/O streams based on EV


    This document describes IO::Stream version v2.0.3


        use EV;
        use IO::Stream;
            host        => '',
            port        => 80,
            cb          => \&client,
            wait_for    => SENT|EOF,
            in_buf_limit=> 102400,
            out_buf     => "GET / HTTP/1.0\nHost:\n\n",
        $EV::DIED = sub { warn $@; EV::unloop };
        sub client {
            my ($io, $e, $err) = @_;
            if ($err) {
                die $err;
            if ($e & SENT) {
                print "request sent, waiting for reply...\n";
            if ($e & EOF) {
                print "server reply:\n", $io->{in_buf};
                EV::unloop;         # ALL DONE


    Non-blocking event-based low-level I/O is hard to get right. Code
    usually error-prone and complex... and it very similar in all
    applications. Things become much worse when you need to alter I/O
    stream in some way - use proxies, encryption, SSL, etc.

    This module designed to give user ability to work with I/O streams on
    higher level, using input/output buffers (just scalars) and high-level
    events like CONNECTED, SENT or EOF. As same time it doesn't hide
    low-level things, and user still able to work on low-level without any


    Architecture of this module make it ease to write plugins, which will
    alter I/O stream in any way - route it through proxies, encrypt, log,

    Here are few available plugins, you may find more on CPAN:
    IO::Stream::Crypt::RC4, IO::Stream::Proxy::HTTPS,
    IO::Stream::MatrixSSL::Client, IO::Stream::MatrixSSL::Server.

    If you interested in writing own plugin, check source for "skeleton"
    plugins: IO::Stream::Noop and IO::Stream::NoopAlias.


    This modules doesn't export any functions/methods/variables, but it
    exports a lot of constants. There two groups of constants: events and
    errors (which can be imported using tags ':Event' and ':Error'). By
    default all constants are exported.





    Errors are similar to $! - they're dualvars, having both textual and
    numeric values.

    NOTE: Since v2.0.0 ETORESOLVE, EDNSNXDOMAIN and EDNSNODATA are not used
    anymore (EDNS is used instead), but they're still exported for


    You can create IO::Stream object using any "stream" fh (file, TTY, UNIX
    socket, TCP socket, pipe, FIFO). Or, if you need TCP socket, you can
    create IO::Stream object using host+port instead of fh (in this case
    IO::Stream will do non-blocking host resolving, create TCP socket and
    do non-blocking connect).

    After you created IO::Stream object, it will handle read/write on this
    fh, and deliver only high-level events you asked for into your
    callback, where you will be able to operate with in/out buffers instead
    of doing sysread()/syswrite() manually.

    There no limitations on what you can do with fh after you've created
    IO::Stream object - you can even do sysread()/syswrite() (but there no
    reasons for you to do this anymore).

    IMPORTANT! When you want to close this fh, you MUST use $io->close()
    method for closing fh instead of doing close($fh). This is because
    IO::Stream doesn't require from you to keep object returned by new(),
    and without call to $io->close() IO::Stream object will continue to
    exists and may receive/generate some events, which is not what you
    expect after closing fh. Also, if you keep object returned by
    IO::Stream->new() somewhere in your variables, you should either undef
    all such variables after you called $io->close(), or you should use
    Scalar::Util::weaken() on these variables after storing IO::Stream
    object. (The same is applicable for all plugin objects too.)



      If you created IO::Stream object using {host}+{port} instead of {fh},
      this event will be generated after resolving {host}. Resolved IP
      address will be stored in {ip}.


      If you created IO::Stream object using {host}+{port} instead of {fh},
      this event will be generated after connecting socket to {ip}:{port}.


      Generated after each successful read. IO::Stream may execute several
      sysread() at once before generating IN event for optimization. Read
      data will be stored in {in_buf}, and {in_bytes} counter will be
      incremented by amount of bytes read.


      Generated only ONCE when EOF reached (sysread() return 0). Also will
      set {is_eof} to true.


      Generated when some data from {out_buf} was written. Written bytes
      either removed from {out_buf} or just increment {out_pos} by amount
      of bytes written (see documentation about these fields below for more
      details). Also increment {out_bytes} counter by amount of bytes

      Here 'written' may be somewhat virtual, while {out_buf}/{out_pos}
      changes, the real data still can be in plugin buffers (if you use
      plugins) and real syswrite() may not be called yet. To detect when
      all data is really written you should use SENT event, not OUT.


      Generated when all data from {out_buf} was written. It's usual and
      safe to call $io->close() on SENT event.


    IO::Stream has 30-second timeouts for connect and write, to timeout DNS
    resolve it use default AnyEvent::DNS timeout. If you need to timeout
    other operations, you have to create own timers using EV::timer().

    Current version doesn't allow you to change these timeouts.


    If you need to run TCP/UNIX-server socket, then you should handle that
    socket manually. But you can create IO::Stream object for accept()'ed

        my ($host, $port) = ('', 1234);
        socket  my $srv_sock, AF_INET, SOCK_STREAM, 0;
        setsockopt $srv_sock, SOL_SOCKET, SO_REUSEADDR, 1;
        bind       $srv_sock, sockaddr_in($port, inet_aton($host));
        listen     $srv_sock, SOMAXCONN;
        fcntl      $srv_sock, F_SETFL, O_NONBLOCK;
        $srv_w = EV::io($srv_sock, EV::READ, sub {
            if (accept my $sock, $srv_sock) {
                    fh          => $sock,
                    cb          => \&server,
                    wait_for    => IN,
            elsif ($! != EAGAIN) {
                die "accept: $!";


    IO::Stream provide only three public methods: new(), write() and
    close(). new() will create new object, close() will destroy it and
    write() must be called when you want to modify (or just modified)
    output buffer.

    All other operations are done using IO::Stream object fields - for
    simplicity and performance reasons. Moreover, you can keep your own
    data in it. There convention on field names, to avoid conflicts:


      Fields with names started with underscore are for internal use by
      IO::Stream, you shouldn't touch them or create your own field with
      such names.


      Fields with names started with lower-case letter are part of
      IO::Stream public interface - you allowed to read/write these fields,
      but you should not store incorrect values in these fields. Check
      "PUBLIC FIELDS" below for description of available fields and their


      You can store your own data in IO::Stream object using field names
      started with upper-case letter. IO::Stream will not touch these

    When some event arise which you're waited for, your callback will be
    called with 3 parameters: IO::Stream object, event mask, and error (if

        sub callback {
            my ($io, $e, $err) = @_;



        IO::Stream->new( \%opt );

    Create and return IO::Stream object. You may not keep returned object -
    you will get it in your callback (in first parameter) when some
    interesting for your event happens, and will exists until to call
    method close(). See OVERVIEW for more details.

    Fields of %opt become fields of created IO::Stream object. There only
    few fields required, but you can set any other fields too, and can also
    set your custom fields (with names starting from upper-case letter).

    Only required fields in %opt are {cb} and either {fh} or {host}+{port}.
    The {wait_for} field also highly recommended to set when creating

    If {out_buf} will be set, then new() will automatically call write()
    after creating object.

            fh          => \*STDIN,
            cb          => \&console,
            wait_for    => IN,



    Method write() MUST be called after any modifications of {out_buf}
    field, to ensure data in {out_buf} will be written to {fh} as soon as
    it will be possible.

    If {fh} available for writing when calling write(), then it will write
    (may be partially) {out_buf} and may immediately call your callback
    function delivering OUT|SENT events there. So, if you call write() from
    that callback (as it usually happens), keep in mind it may be called
    again while executing write(), and object state may significantly
    change (it even may be close()'d) after it return from write() into
    your callback.

    The write($data) is just a shortcut for:

        $io->{out_buf} .= $data;



    Method close() will close {fh} and destroy IO::Stream object. See
    OVERVIEW for more details.


    If field marked *RO* that mean field is read-only and shouldn't be

    Some field have default values (shown after equal sign).

    Some field modified on events.


    method ='IO'

      User callback which will be called when some listed in {wait_for}
      events arise or error happens.

      Field {cb} should be either CODE ref or object or class name. In last
      two cases method named {method} will be called. Field {method} should
      be string.


      Bitmask of events interesting for user. Can be changed at any time.
      For example:

          $io->{wait_for} = RESOLVED|CONNECTED|IN|EOF|OUT|SENT;

      When some data will be read from {fh}, {wait_for} must contain IN
      and/or EOF, or error EREQINEOF will be generated. So, it's better to
      always have IN and/or EOF in {wait_for}.

      If {wait_for} contain EOF and doesn't contain IN then {in_buf_limit}
      must be defined or error EREQINBUFLIMIT will be generated.

    fh *RO*

      File handle for doing I/O. It's either provided by user to new(), or
      created by new() (when user provided {host}+{port} instead).

    host *RO*

    port *RO*

      If user doesn't provide {fh} to new(), he should provide {host} and
      {port} instead. This way new() will create new TCP socket in {fh} and
      resolve {host} and connect this {fh} to resolved {ip} and {port}.
      Both resolving and connecting happens in non-blocking way, and will
      result in delivering RESOLVED and CONNECTED events into user callback
      (if user {wait_for} these events).

    in_buf_limit =undef

      Used to avoid DoS attach when user doesn't handle IN events and want
      his callback called only on EOF event. Must be defined if user have
      EOF without IN in {wait_for}.

      Any value >0 will defined amount of bytes which can be read into
      {in_buf} before EOF happens. When size of {in_buf} become larger than
      {in_buf_limit}, error EINBUFLIMIT will be delivered to user callback.
      In this case user can either remove some data from {in_buf} to make
      it smaller than {in_buf_limit} or increase {in_buf_limit}, and
      continue reading data.

      NOT RECOMMENDED! Value 0 will switch off DoS protection, so there
      will be no limit on amount of data to read into {in_buf} until EOF

    out_buf =q{} # modified on: OUT

    out_pos =undef # modified on: OUT

      Data from {out_buf} will be written to {fh}.

      If {out_pos} not defined, then data will be written from beginning of
      {out_buf}, and after successful write written bytes will be removed
      from beginning of {out_buf}.

      If {out_pos} defined, it should be >= 0. In this case data will be
      written from {out_pos} position in {out_buf}, and after successful
      write {out_pos} will be incremented by amount of bytes written.
      {out_buf} will not be changed!

    out_bytes =0 # modified on: OUT

      Each successful write will increment {out_bytes} by amount of written
      bytes. You can change {out_bytes} in any way, but it should always be
      a number.

    in_buf =q{} # modified on: IN

      Each successful read will concatenate read bytes to {in_buf}. You can
      change {in_buf} in any way, but it should always be a string.

    in_bytes =0 # modified on: IN

      Each successful read will increment {in_bytes} by amount of read
      bytes. You can change {in_bytes} in any way, but it should always be
      a number.

    ip *RO* =undef # modified on: RESOLVED

      When you call new() with {host}+{port} instead of {fh} then IP
      address resolved from {host} will be stored in {ip}, and event
      RESOLVED will be generated.

    is_eof *RO* =undef # modified on: EOF

      When EOF event happens {is_eof} will be set to true value. This allow
      you to detect is EOF already happens at any time, even if you doesn't
      have EOF in {wait_for}.

    plugin *RO* ={}

      Allow you to set list of plugins when creating object with new(), and
      later access these plugins.

      This field is somewhat special, because when you call new() you
      should set plugin to ARRAY ref, but in IO::Stream object {plugin} is
      HASH ref:

          my $io = IO::Stream->new({
              host        => '',
              port        => 443,
              cb          => \&google,
              wait_for    => EOF,
              in_buf_limit=> 102400,
              out_buf     => "GET / HTTP/1.0\nHost:\n\n",
              plugin      => [    # <------ it's ARRAY, but looks like HASH
                  ssl         => IO::Stream::MatrixSSL::Client->new(),
                  proxy       => IO::Stream::Proxy::HTTPS->new({
                      host        => '',
                      port        => 3218,
                      user        => 'me',
                      pass        => 'my pass',
              MyField1    => 'my data1',
              MyField2    => \%mydata2,
          # access the "proxy" plugin:

      This is because when calling new() it's important to keep plugins in
      order, but later it's easier to access them using names.


    Exceptions may be thrown only in new(). All other errors will be
    delivered to user's callback in last parameter.

    usage: IO::Stream->new({ cb=>, wait_for=>, [fh=>, | host=>, port=>,]
    ... })

      You called new() with wrong parameters.

    socket: %s

    fcntl: %s

      Error happens while creating new socket. Usually this happens because
      you run out of file descriptors.

    can't get file descriptor

      Failed to get fileno() for your fh. Either fh doesn't open, or this
      fh type is not supported (directory handle), or fh is not file handle
      at all.

    can't create second object for same fh

      You can't have more than one IO::Stream object for same fh.

      IO::Stream keep all objects created by new() until $io->close() will
      be called. Probably you've closed fh in some way without calling
      $io->close(), then new fh was created with same file descriptor
      number, and you've tried to create IO::Stream object using new fh.




 Bugs / Feature Requests

    Please report any bugs or feature requests through the issue tracker at You will be notified
    automatically of any progress on your issue.

 Source Code

    This is open source software. The code repository is available for
    public review and contribution under the terms of the license. Feel
    free to fork the repository and submit pull requests.

        git clone


      * MetaCPAN Search

      * CPAN Ratings

      * AnnoCPAN: Annotated CPAN documentation

      * CPAN Testers Matrix

      * CPANTS: A CPAN Testing Service (Kwalitee)


    Alex Efros <>


    This software is Copyright (c) 2008- by Alex Efros <>.

    This is free software, licensed under:

      The MIT (X11) License