package IO::Lambda::HTTP::Client;
use vars qw(@ISA @EXPORT_OK $DEBUG);
@ISA = qw(Exporter);
@EXPORT_OK = qw(http_request);
our $DEBUG = $IO::Lambda::DEBUG{http} || 0;
use strict;
use warnings;
use Socket;
use Errno;
use Exporter;
use IO::Socket;
use HTTP::Response;
use IO::Lambda qw(:lambda :stream);
use IO::Lambda::Socket qw(connect);
use Time::HiRes qw(time);
sub http_request(&)
{
__PACKAGE__-> new(context)->
condition(shift, \&http_request, 'http_request')
}
sub new
{
my ( $class, $req, %options) = @_;
my $self = bless {}, $class;
$self-> {deadline} = $options{timeout} + time if defined $options{timeout};
$self-> {deadline} = $options{deadline} if defined $options{deadline};
$self-> {max_redirect} = defined($options{max_redirect}) ? $options{max_redirect} : 7;
delete @options{qw(deadline timeout max_redirect)};
$self-> {$_} = $options{$_} for keys %options;
my %headers;
$headers{'User-Agent'} = "perl/IO-Lambda-HTTP v$IO::Lambda::VERSION";
if ( $self-> {keep_alive}) {
unless ( $self-> {conn_cache}) {
require LWP::ConnCache;
$self-> {conn_cache} = LWP::ConnCache-> new;
}
unless ( $req-> protocol) {
$req-> protocol('HTTP/1.1');
}
$headers{Host} = $req-> uri-> host;
$headers{Connection} = 'Keep-Alive';
$headers{'Keep-Alive'} = 300;
}
require IO::Lambda::DNS if $self-> {async_dns};
my $h = $req-> headers;
while ( my ($k, $v) = each %headers) {
$h-> header($k, $v) unless defined $h-> header($k);
}
if ( $options{cookie_jar} //= {} ) {
if ( ref($options{cookie_jar}) eq 'HASH') {
require HTTP::Cookies;
$options{cookie_jar} = HTTP::Cookies->new(%{$options{cookie_jar}});
}
$self->{cookie_jar} = $options{cookie_jar};
}
return $self-> handle_redirect( $req);
}
# HTTP::Response features methods base() and request() that we need to set as well
sub finalize_response
{
my ( $self, $req, $response) = @_;
return $response;
}
sub redirect
{
my ( $self, $location, $req) = @_;
return URI-> new_abs( $location, $req-> uri);
}
# reissue the request, if necessary, because of 30X or 401 errors
sub handle_redirect
{
my ( $self, $req) = @_;
my $was_redirected = 0;
my $was_failed_auth = 0;
my $auth = $self-> {auth};
lambda {
my $method;
if ( $auth) {
# create fake response for protocol initiation, -- but just once
my $x = HTTP::Response-> new;
$x-> headers-> header('WWW-Authenticate', split(',', $auth));
$method = $self-> get_authenticator( $req, $x);
undef $auth;
}
$self->{cookie_jar}->add_cookie_header($req) if
$self->{cookie_jar} &&
!$req->headers->header('Cookie');
context $method || $self-> handle_connection( $req);
tail {
# request is finished
my $response = shift;
return $response unless ref($response);
$response-> request($req);
$self->{cookie_jar}->extract_cookies($response) if $self->{cookie_jar};
if ( $response-> code =~ /^30[12378]$/) {
return $self-> finalize_response($req, $response)
if $self->{max_redirect} == 0;
$was_failed_auth = 0;
return 'too many redirects'
if ++$was_redirected > $self-> {max_redirect};
my $location = $response-> header('Location');
return $response unless defined $location;
my $uri = $self->redirect( $location, $req );
return $response if $uri->scheme !~ /^https?$/;
$req-> uri($uri);
$req-> headers-> header( Host => $req-> uri-> host);
$req-> remove_header('Cookie');
$req-> method('GET');
warn "redirect to " . $req-> uri . "\n" if $DEBUG;
this-> start;
} elsif (
not($was_failed_auth) and
$response-> code eq '401' and
defined($self-> {username}) and
defined($self-> {password})
) {
$was_failed_auth++;
$method = $self-> get_authenticator( $req, $response);
context $method;
return $method ? tail {
my $r = shift;
return $r unless $r;
# start from beginning, from handle_connection;
this-> start;
} : $self-> finalize_response($req, $response);
} else {
return $self-> finalize_response($req, $response);
}
}};
}
# if request needs authentication, and we can do something about it, create
# a lambda that handles the authentication
sub get_authenticator
{
my ( $self, $req, $response) = @_;
# supports authentication?
my %auth;
for my $auth ( $response-> header('WWW-Authenticate')) {
$auth =~ s/\s.*$//;
$auth{$auth}++;
}
my %preferred = defined($self-> {preferred_auth}) ? (
ref($self-> {preferred_auth}) ?
%{ $self-> {preferred_auth} } :
( $self-> {preferred_auth} => 1 )
) : ();
my @auth = sort {
($preferred{$b} || 0) <=> ($preferred{$a} || 0)
} grep {
not exists($preferred{$_}) or $preferred{$_} >= 0;
} keys %auth;
my $compilation_errors = '';
for my $auth ( @auth) {
if ( $auth eq 'Basic') {
# always
warn "trying basic authentication\n" if $DEBUG;
$req-> authorization_basic( $self-> {username}, $self-> {password});
return $self-> handle_connection( $req);
}
eval { require "IO/Lambda/HTTP/Authen/$auth.pm" };
$compilation_errors .= "$@\n"
if $@ and ($@ !~ m{^Can't locate IO/Lambda/HTTP/Authen/$auth});
next if $@;
my $lambda = "IO::Lambda::HTTP::Authen::$auth"->
authenticate( $self, $req, $response);
warn "trying authentication with '$auth'\n" if $DEBUG and $lambda;
return $lambda if $lambda;
}
# XXX Propagate compilation errors as http errors. Doubtful.
return lambda { $compilation_errors } if length $compilation_errors;
return undef;
}
# get scheme and eventually load module
my $got_https;
sub prepare_transport
{
my ( $self, $req) = @_;
my $scheme = $req-> uri-> scheme;
unless ( defined $scheme) {
return "bad URI: " . $req-> uri-> as_string;
} elsif ( $scheme eq 'https') {
unless ( $got_https) {
eval { require IO::Lambda::HTTP::HTTPS; };
return "https not supported: $@" if $@;
$got_https++;
}
$self-> {reader} = IO::Lambda::HTTP::HTTPS::https_reader();
$self-> {writer} = \&IO::Lambda::HTTP::HTTPS::https_writer;
warn "https enabled\n" if $DEBUG;
} elsif ( $scheme ne 'http') {
return "bad URI scheme: $scheme";
} else {
$self-> {reader} = undef;
$self-> {writer} = undef;
}
return;
}
# returns static lambda that reads from socket until a condition (see sysreader) is satisfied
sub http_read
{
my ( $self, $cond) = @_;
return $self-> {reader}, $self-> {socket}, \ $self-> {buf}, $cond, $self-> {deadline};
}
# read from socket until a condition (see sysreader) is satisfied
# after this call no communication should happen
sub http_tail
{
my ( $self, $cond) = @_;
context $self-> http_read($cond);
&tail();
}
my $tcp_proto;
sub socket
{
my ( $self, $host, $port) = @_;
my ($iaddr,$paddr,$sock,$error);
$tcp_proto //= getprotobyname('tcp');
return undef, "gethostbyname(tcp) error:$!" unless $tcp_proto;
return undef, "cannot resolve $host" unless $iaddr = inet_aton($host);
$paddr = sockaddr_in($port,$iaddr);
return undef, "error creating socket:$!" unless socket($sock,PF_INET,SOCK_STREAM,$tcp_proto);
$sock->blocking(0);
return undef, "connect($host,$port) error:$!" unless CORE::connect($sock,$paddr) or $!{EINPROGRESS};
return $sock;
}
sub parse_proxy
{
my ( $self, $req) = @_;
my ( $host, $port);
if ( exists( $self-> {proxy})) {
if ( !defined $self->{proxy}) {
# nothing
( $host, $port) = ( $req-> uri-> host, $req-> uri-> port);
} elsif ( ref($self->{proxy})) {
Carp::confess("'proxy' option must be a non-empty array") if
ref($self->{proxy}) ne 'ARRAY' or
not @{$self->{proxy}};
($host, $port) = @{$self->{proxy}};
} else {
$host = $self-> {proxy};
}
$port ||= $req-> uri-> port;
} else {
my $scheme = $req-> uri-> scheme;
( $host, $port) = ( $req-> uri-> host, $req-> uri-> port);
my %proxy;
for my $id ($scheme, qw(all no)) {
if ( exists $ENV{"${id}_proxy"}) {
$proxy{$id} = $ENV{"${id}_proxy"};
} elsif ( exists $ENV{uc "${id}_proxy"}) {
$proxy{$id} = $ENV{uc "${id}_proxy"};
}
}
my $proxy = $proxy{all} // $proxy{$scheme};
$proxy{no} //= '';
($host, $port) = ( $proxy =~ /^(.+?)\:(\d+)$/) ? ($1, $2) : ($proxy, 80)
if defined($proxy) and $proxy{no} ne '*' and $proxy{no} !~ /\b\Q$host\E\b/;
}
return ( $host, $port);
}
# Connect to the remote, wait for protocol to finish, and
# close the connection if needed. Returns HTTP::Response object on success
sub handle_connection
{
my ( $self, $req) = @_;
my ( $host, $port) = $self->parse_proxy($req);
# have a chance to load eventual modules early
my $err = $self-> prepare_transport( $req);
return lambda { $err } if defined $err;
lambda {
# resolve hostname
if (
$self-> {async_dns} and
$host !~ /^(\d{1,3}\.){3}(\d{1,3})$/
) {
context $host,
timeout => ($self-> {deadline} || $IO::Lambda::DNS::TIMEOUT);
warn "resolving $host\n" if $DEBUG;
return IO::Lambda::DNS::dns( sub {
$host = shift;
return $host unless $host =~ /^\d/; # error
warn "resolved to $host\n" if $DEBUG;
return this-> start; # restart the lambda with different $host
});
}
delete $self-> {close_connection};
$self-> {close_connection}++
if ( $req-> header('Connection') || '') =~ /^close/i;
# got cached socket?
my ( $sock, $cached);
my $cc = $self-> {conn_cache};
if ( $cc) {
$sock = $cc-> withdraw( __PACKAGE__, "$host:$port");
if ( $sock) {
my $err = unpack('i', getsockopt( $sock, SOL_SOCKET, SO_ERROR));
$err ? undef $sock : $cached++;
warn "reused socket is ".($err ? "bad" : "ok")."\n" if $DEBUG;
}
}
# connect
my $err;
warn "connecting\n" if $DEBUG and not($sock);
( $sock, $err) = $self-> socket( $host, $port) unless $sock;
return $err unless $sock;
context( $sock, $self-> {deadline});
connect {
return shift if @_;
# connected
$self-> {socket} = $sock;
$self-> {reader} = readbuf ( $self-> {reader});
$self-> {writer} = $self-> {writer}-> ($cached, $host, $port) if $self-> {writer};
$self-> {writer} = writebuf( $self-> {writer});
context $self-> handle_request( $req);
autocatch tail {
my $response = shift;
# put back the connection, if possible
if ( $cc and not $self-> {close_connection}) {
my $err = unpack('i', getsockopt( $sock, SOL_SOCKET, SO_ERROR));
warn "deposited socket back\n" if $DEBUG and not($err);
$cc-> deposit( __PACKAGE__, "$host:$port", $sock)
unless $err;
}
warn "connection:close\n" if $DEBUG and $self-> {close_connection};
delete @{$self}{qw(close_connection socket buf writer reader)};
return $response;
}}}
}
# Execute single http request over an established connection.
# Returns either a HTTP::Response object, or an error string
sub handle_request
{
my ( $self, $req) = @_;
lambda {
$self-> {buf} = '';
context $self-> handle_request_in_buffer( $req);
if ( $DEBUG) {
warn "request sent\n";
warn $req-> as_string . "\n" if $DEBUG > 1;
}
tail {
my ( undef, $error) = @_; # readbuf style
if ( $DEBUG) {
warn "got response\n";
warn (( $error ? $error : $self-> {buf}) . "\n") if $DEBUG > 1;
}
return defined($error) ? $error : $self-> parse( \ $self-> {buf} );
}}
}
# Execute single http request over an established connection.
# Returns 2 parameters, readbuf-style, where actually only the 2nd matters,
# and signals error if defined. 2 parameters are there for readbuf() compatibility,
# so that the protocol handler can easily fall back to readbuf() itself.
sub handle_request_in_buffer
{
my ( $self, $req) = @_;
my $method = $req-> method;
# fixup path - otherwise LWP generates request as GET http://hostname/uri HTTP/1.1
# which not all servers understand
my ($req_line, $save_uri);
if (!$self-> {proxy} && ( $req-> protocol || '') =~ /http\/1.\d/i) {
$save_uri = $req-> uri;
my $fullpath = $save_uri-> path_query;
$fullpath = "/$fullpath" unless $fullpath =~ m[^/];
$req-> uri( $fullpath);
}
$req_line = $req-> as_string("\x0d\x0a");
$req-> uri($save_uri) if defined $save_uri;
lambda {
# send request
context
$self-> {writer},
$self-> {socket}, \ $req_line,
undef, 0, $self-> {deadline};
state write => tail {
my ( $bytes_written, $error) = @_;
return undef, $error if $error;
context $self-> {socket}, $self-> {deadline};
readable {
# request sent, now wait for data
return undef, 'timeout' unless shift;
# read first line
context $self-> http_read(qr/^.*?\n/);
state head => tail {
my $line = shift;
unless ( defined $line) {
my $error = shift;
# remote closed connection and content is single-line HTTP/1.0
return (undef, $error) if $error ne 'eof';
return (undef, undef);
}
# no headers?
return $self-> http_tail
unless $line =~ /^HTTP\/[\.\d]+\s+\d{3}\s+/i;
# got some headers
context $self-> http_read( qr/^.*?\r?\n\r?\n/s);
state body => tail {
$line = shift;
return undef, shift unless defined $line;
my $headers = HTTP::Response-> parse( $line);
# Connection: close
my $c = lc( $headers-> header('Connection') || '');
$self-> {close_connection} = $c =~ /^close\s*$/i;
return 1 if $method eq 'HEAD';
return $self-> http_read_body( length $line, $headers);
}}}}}
}
# have headers, read body
sub http_read_body
{
my ( $self, $offset, $headers) = @_;
# have Content-Length? read that many bytes then
my $l = $headers-> header('Content-Length');
return $self-> http_tail( $1 + $offset )
if defined ($l) and $l =~ /^(\d+)\s*$/;
# have 'Transfer-Encoding: chunked' ? read the chunks
my $te = lc( $headers-> header('Transfer-Encoding') || '');
return $self-> http_read_chunked($offset)
if $self-> {chunked} = $te =~ /^chunked\s*$/i;
# just read as much as possible then -- however with considerations;
# we can't do that if server keeps connection open, otherwise we'll hang
# http/1.0 and less doesn't implement open connections
return $self-> http_tail if
$headers-> protocol =~ /^HTTP\/(\d+\.\d+)/ and
$1 < 1.1;
# server wants to close the connection
return $self-> http_tail if
$self-> {close_connection};
}
# read sequence of TE chunks
sub http_read_chunked
{
my ( $self, $offset) = @_;
# read chunk size
pos( $self-> {buf} ) = $offset;
context $self-> http_read( qr/\G[^\r\n]+\r?\n/i);
state size => tail {
my $line = shift;
return undef, shift unless defined $line; # got error
# advance
substr( $self-> {buf}, $offset, length($line), '');
pos( $self-> {buf} ) = $offset;
$line =~ s/\r?\n//;
return undef, "protocol error: chunk size error"
unless $line =~ /^[\da-f]+$/;
my $size = hex $line;
warn "reading chunk $size bytes\n" if $DEBUG;
return 1 unless $size;
$size += 2; # CRLF
my $frame = restartable;
# read the chunk itself
context $self-> http_read( $offset + $size);
state chunk => tail {
return undef, shift unless shift;
$offset += $size - 2;
substr( $self->{buf}, $offset, 2, '' ); # remove CRLF
pos( $self-> {buf} ) = $offset;
warn "chunk $size bytes ok\n" if $DEBUG;
again($frame);
}};
}
sub parse
{
my ( $self, $buf_ptr) = @_;
return HTTP::Response-> parse( $$buf_ptr) if $$buf_ptr =~ /^(HTTP\S+)\s+(\d{3})\s+/i;
return HTTP::Response-> new( '000', '', undef, $$buf_ptr);
}
1;
__DATA__
=pod
=head1 NAME
IO::Lambda::HTTP::Client - http requests lambda style
=head1 DESCRIPTION
The module exports a single condition C<http_request> that accepts a
C<HTTP::Request> object and set of options as parameters. The condition returns
either a C<HTTP::Response> on success, or an error string otherwise.
=head1 SYNOPSIS
use HTTP::Request;
use IO::Lambda qw(:all);
use IO::Lambda::HTTP::Client qw(http_request);
lambda {
context shift;
http_request {
my $result = shift;
if ( ref($result)) {
print "good: ", length($result-> content), " bytes\n";
} else {
print "bad: $result\n";
}
}
}-> wait(
HTTP::Request-> new( GET => "http://www.perl.com/")
);
=head1 API
=over
=item http_request $HTTP::Request -> $HTTP::Response
C<http_request> is a lambda condition that accepts C<HTTP::Request> object in
the context. Returns either a C<HTTP::Response> object on success, or error
string otherwise.
=item new $HTTP::Request :: () -> $HTTP::Response
Stores C<HTTP::Request> object and returns a new lambda that will finish when
the associated request completes. The lambda will return either a
C<HTTP::Response> object on success, or an error string otherwise.
=back
=head1 OPTIONS
=over
=item async_dns BOOLEAN
If set, hostname will be resolved with L<IO::Lambda::DNS> using asynchronous
capabilities of L<Net::DNS>. Note that this method won't be able to account for
non-DNS (/etc/hosts, NIS) host names.
If unset (default), hostnames will be resolved in a blocking manner.
=item auth $AUTH
Normally, a request is sent without any authentication. If the request returns
error 401, then all available methods of authentication are tried. If the type
of authentication that shall be accepted by the remote is known in advance, the
non-authenticated request stage can be skipped altogether by explicitly setting
the C<auth> option:
username => 'user',
password => 'pass',
auth => 'Basic',
=item conn_cache $LWP::ConnCache = undef
The requestor can optionally use a C<LWP::ConnCache> object to reuse
connections on per-host per-port basis. Desired for HTTP/1.1. Required for the
NTLM/Negotiate authentication. See L<LWP::ConnCache> for details.
=item cookie_jar $HTTP::Cookies = {}
The requestor can optionally use a shared C<HTTP::Cookies> object to support cookies.
If not set, a local cookie jar is created an used fo reventual redirects. To disable that,
set C<cookie_jar> to 0. See L<HTTP::Cookies> for details.
=item deadline SECONDS = undef
Aborts a request and returns C<'timeout'> string as an error if the request is
not finished before the deadline (in epoch seconds). If undef, timeout never
occurs.
=item keep_alive BOOLEAN
If set, all incoming request objects are silently converted use HTTP/1.1, and
connections are automatically reused. Same as combination of the following:
$req-> protocol('HTTP/1.1');
$req-> headers-> header( Host => $req-> uri-> host);
new( $req, conn_cache => LWP::ConnCache-> new);
=item max_redirect NUM = 7
Maximum allowed redirects. If 0, no redirection attemps are made.
=item preferred_auth $AUTH|%AUTH
Sets list of preferred authentication methods, that is used in selection of the
authentication method when the remote server supports several. When the value
is a string, then the given method is tried first, and only then all other
available methods. When it is a hash, the hash values are treated as weight
factors, such as, the method with the greatest weight is tried first. Negative
values prevent the corresponding methods from being tried.
# try basic and whatever else
preferred_auth => 'Basic',
# try basic and never ntlm
preferred_auth => {
Basic => 1,
NTLM => -1,
},
Note that the current implementation does not provide re-trying of
authentication if a method, or combination of username and password fails.
When at least one method is declared by the remote as supported, and was tried,
and subsequently failed, no further authentication retries are made, and the
request is reported as failed.
=item proxy HOSTNAME | [ HOSTNAME, PORT ]
If set, HOSTNAME (or HOSTNAME and PORT tuple) is used as HTTP proxy.
If set to undef, proxy is not used.
If unset, the proxy is set automatically after content of environment variables
C<all_proxy>, C<http_proxy>, C<https_proxy>, C<no_proxy>.
=item timeout SECONDS = undef
Maximum allowed time the request can take. If undef, no timeouts occur.
=back
=head1 BUGS
Non-blocking connects, and hence the module, don't work on win32 on perl5.8.X
due to under-implementation in ext/IO.xs. They do work on 5.10 however.
=head1 SEE ALSO
L<IO::Lambda>, L<HTTP::Request>, L<HTTP::Response>
=head1 AUTHOR
Dmitry Karasik, E<lt>dmitry@karasik.eu.orgE<gt>.
=cut