use v5.10;
package REST::Neo4p::Query;
use REST::Neo4p::Path;
use REST::Neo4p::Exceptions;
use JSON::XS;
use REST::Neo4p::ParseStream;
use HOP::Stream qw/drop/;
use Tie::IxHash;
use File::Temp qw(:seekable);
use Carp qw(croak carp);
use strict;
use warnings;
no warnings qw(once);
BEGIN {
$REST::Neo4p::Query::VERSION = '0.4003';
}
our $BUFSIZE = 50000;
sub new {
my $class = shift;
my ($q_string, $params) = @_;
unless (defined $q_string and !ref $q_string) {
REST::Neo4p::LocalException->throw( "First argument must be the query string\n");
}
unless (!defined $params || ref($params) eq 'HASH') {
REST::Neo4p::LocalException->throw( "Second argment must be a hashref of query parameters\n" );
}
$q_string =~ s/\s/ /g;
($q_string) = $q_string =~ m/^\s*(.*)\s*$/;
bless { '_query' => $q_string,
'_params' => $params || {},
'_handle' => REST::Neo4p->handle, # current handle
'Statement' => $q_string,
'NUM_OF_PARAMS' => $params ? scalar keys %$params : 0,
# 'ParamValues' => $params,
'ResponseAsObjects' => 1,
'_tempfile' => ''
}, $class;
}
sub tmpf { shift->{_tempfile} }
sub _handle { shift->{_handle} }
sub execute {
my $self = shift;
my @params = @_;
my %params;
if (@params) {
%params = ref $params[0] ? %{$params[0]} : @params;
$self->{_params} = \%params;
}
# current handle
local $REST::Neo4p::HANDLE;
REST::Neo4p->set_handle($self->_handle);
REST::Neo4p::CommException->throw("Not connected\n") unless REST::Neo4p->connected;
my $agent = REST::Neo4p->agent;
if ($agent->batch_mode) {
REST::Neo4p::NotSuppException->throw("Query execution not supported in batch mode\n");
}
delete $self->{_error};
delete $self->{_error_list};
delete $self->{_decoded_resp};
delete $self->{NAME};
my $endpt = 'post_'.REST::Neo4p->q_endpoint;
$self->{_tempfile} = File::Temp->new;
unless ($self->tmpf) {
REST::Neo4p::LocalException->throw(
"Can't create query result tempfile : $!\n"
);
}
eval {
for ($endpt) {
/cypher/ && do {
$agent->$endpt(
[],
{ query => $self->query, params => $self->params },
{':content_file' => $self->tmpf->filename}
);
last;
};
/transaction/ && do {
# unfortunately, the order of 'statement' and 'parameters'
# is strict in the content (2.0.0-M06)
tie my %stmt, 'Tie::IxHash';
$stmt{statement} = $self->query;
$stmt{parameters} = $self->params;
$agent->$endpt(
[REST::Neo4p->_transaction],
{
statements => [ \%stmt ]
},
{':content_file' => $self->tmpf->filename}
);
last;
};
do {
REST::Neo4p::TxException->throw(
"Unknown query REST endpoint '".REST::Neo4p->q_endpoint."'\n"
);
}
}
};
if (my $e = REST::Neo4p::Neo4jException->caught ) {
$self->{_error} = $e;
$e->can('error_list') && ($self->{_error_list} = $e->error_list);
$e->rethrow if ($self->{RaiseError});
return;
}
elsif ($e = REST::Neo4p::Exception->caught()) {
$self->{_error} = $e;
$e->rethrow if ($self->{RaiseError});
return;
}
elsif ( $e = Exception::Class->caught) {
(ref $e && $e->can("rethrow")) ? $e->rethrow : die $e;
}
if ( ref(REST::Neo4p->agent) !~ /Neo4j::Driver/ ) {
$self->_parse_response;
}
else { # Neo4j::Driver
$self->_wrap_statement_result;
}
1;
}
sub fetchrow_arrayref {
my $self = shift;
unless ( defined $self->{_iterator} ) {
REST::Neo4p::LocalException->throw("Can't run fetch(), query not execute()'d yet\nCheck query object for error with err()/errstr()\n");
}
$self->{_iterator}->();
}
sub fetch { shift->fetchrow_arrayref(@_) }
sub column_names {
my $self = shift;
return $self->{_column_names} && @{$self->{_column_names}};
}
sub err {
my $self = shift;
return $self->{_error} && ($self->{_error}->code || 599);
}
sub errstr {
my $self = shift;
return $self->{_error} && ( $self->{_error}->message || $self->{_error}->neo4j_message );
}
sub errobj { shift->{_error} }
sub err_list {
my $self = shift;
return $self->{_error} && $self->{_error_list};
}
sub query { shift->{_query} }
sub params { shift->{_params} }
sub _wrap_statement_result {
my $self = shift;
my $result = REST::Neo4p->agent->last_result;
my $errors = REST::Neo4p->agent->last_errors;
$self->{NAME} = [$result->keys];
my $n = $self->{NUM_OF_FIELDS} = scalar @{$self->{NAME}};
$self->{_iterator} = sub {
my @row;
my $rec = $result->fetch;
return unless $rec;
eval {
my $as_object = $self->{ResponseAsObjects};
for (my $i=0;$i<$n;$i++) {
my $elt = $rec->get($i);
my $cvt = sub {
return $_[0] unless ref($_[0]) =~ /Driver/;
my ($type) = ref($_[0]) =~ /::([^:]+)$/;
my $cls = "REST::Neo4p::$type";
return $as_object ? $cls->new_from_json_response($_[0]) :
$cls->simple_from_json_response($_[0]);
};
for (ref($elt)) {
/Driver/ && do {
$elt = $cvt->($elt);
};
/HASH/ && do {
for (keys %$elt) {
$elt->{$_} = $cvt->($elt->{$_})
}
};
/ARRAY/ && do {
for (@$elt) {
$_ = $cvt->($_);
}
};
#else
push @row, $elt;
}
}
};
if (my $e = Exception::Class->caught()) {
if ($e =~ /j_parse|json/i) {
$e = REST::Neo4p::StreamException->new(message => $e);
$self->{_error} = $e;
$e->throw if $self->{RaiseError};
return;
}
else {
die $e;
}
}
# flatten if single array ref returned
if (@row==1 and ref($row[0]) eq 'ARRAY') {
return $row[0];
}
else {
return \@row;
}
};
return;
}
# _parse_response sets up an iterator that pulls a row's worth of objects from
# the servers JSON stream, parses the row into objects, and returns the row.
# this iterator is placed in $self->{_iterator} as a side effect.
# It is hit in fetchrow_arrayref.
sub _parse_response {
my $self = shift;
my $jsonr = JSON::XS->new->utf8;
my ($buf,$res,$str,$rowstr,$obj);
my $row_count;
$self->tmpf->read($buf, $BUFSIZE);
$jsonr->incr_parse($buf);
eval { # capture j_parse errors
$res = j_parse($jsonr);
die 'j_parse: No text to parse' unless $res;
die 'j_parse: JSON is not a query or txn response' unless $res->[0] =~ /QUERY|TXN/;
for ($res->[0]) {
/QUERY/ && do {
$obj = drop($str = $res->[1]->());
die 'j_parse: columns key not present' unless $obj && ($obj->[0] eq 'columns');
$self->{NAME} = $obj->[1];
$self->{NUM_OF_FIELDS} = scalar @{$obj->[1]};
$obj = drop($str);
die 'j_parse: data key not present' unless $obj->[0] eq 'data';
$rowstr = $obj->[1]->();
# query iterator
$self->{_iterator} = sub {
return unless defined $self->tmpf;
my $row;
my $item;
$item = drop($rowstr);
unless ($item) {
undef $rowstr;
return;
}
$row = $item->[1];
if (ref $row) {
return $self->_process_row($row);
}
else {
my $ret;
eval {
if ($row eq 'PENDING') {
if ($self->tmpf->read($buf, $BUFSIZE)) {
$jsonr->incr_parse($buf);
$ret = $self->{_iterator}->();
}
else {
$item = drop($rowstr);
$ret = $self->_process_row($item->[1]);
}
}
else {
die "j_parse: barf(qry)"
}
};
if (my $e = Exception::Class->caught()) {
if ($e =~ /j_parse|json/i) {
$e = REST::Neo4p::StreamException->new(message => $e);
$self->{_error} = $e;
$e->throw if $self->{RaiseError};
return;
}
else {
die $e;
}
}
return $ret;
}
};
# error check
last;
};
/TXN/ && do {
$obj = drop($str = $res->[1]->());
die 'j_parse: commit key not present' unless $obj && ($obj->[0] eq 'commit');
$obj = drop($str);
die 'j_parse: results key not present' unless $obj && ($obj->[0] eq 'results');
my $res_str = $obj->[1]->();
my $row_str;
my $item = drop($res_str);
$self->{_iterator} = sub {
return unless defined $self->tmpf;
my $row;
unless ($item) {
undef $row_str;
undef $res_str;
return;
}
my $ret;
eval {
if ($item->[0] eq 'columns') {
$self->{NAME} = $item->[1];
$self->{NUM_OF_FIELDS} = scalar @{$item->[1]};
$item = drop($res_str); # move to data
die 'j_parse: data key not present' unless $item->[0] eq 'data';
}
if ($item->[0] eq 'data' && ref($item->[1])) {
$row_str = $item->[1]->();
}
if ($row_str) {
$row = drop($row_str);
if (ref $row && ref $row->[1]) {
$ret = $self->_process_row($row->[1]->{row}, $row->[1]->{meta});
}
elsif (!defined $row) {
$item = drop($res_str);
$ret = $self->{_iterator}->();
}
else {
if ($row->[1] eq 'PENDING') {
$self->tmpf->read($buf, $BUFSIZE);
$jsonr->incr_parse($buf);
$ret = $self->{_iterator}->();
}
else {
die "j_parse: barf(txn)";
}
}
}
else { # $row_str undef
$item = drop($res_str);
$item = drop($res_str) if $item->[1] =~ /STREAM/;
}
return if $ret || ($self->err && $self->errobj->isa('REST::Neo4p::TxQueryException'));
if ($item && $item->[0] eq 'transaction') {
$item = drop($res_str) # skip
}
if ($item && $item->[0] eq 'errors') {
my $err_str = $item->[1]->();
my @error_list;
while (my $err_item = drop($err_str)) {
my $err = $err_item->[1];
if (ref $err) {
push @error_list, $err;
}
elsif ($err eq 'PENDING') {
$self->tmpf->read($buf,$BUFSIZE);
$jsonr->incr_parse($buf);
}
else {
die 'j_parse: error parsing txn error list';
}
}
my $e = REST::Neo4p::TxQueryException->new(
message => "Query within transaction returned errors (see error_list)\n",
error_list => \@error_list, code => '304'
) if @error_list;
$item = drop($item);
$e->throw if $e;
}
};
if (my $e = Exception::Class->caught()) {
if (ref $e) {
$self->{_error} = $e;
$e->rethrow if $self->{RaiseError};
}
elsif ($e =~ /j_parse|json/i) {
$e = REST::Neo4p::StreamException->new(message => $e);
$self->{_error} = $e;
$e->throw if $self->{RaiseError};
return;
}
else {
die $e;
}
}
return $ret;
};
last;
};
# default
REST::Neo4p::StreamException->throw( "j_parse: unknown item" );
}
};
if (my $e = Exception::Class->caught('REST::Neo4p::LocalException')) {
$self->{_error} = $e;
$e->rethrow if ($self->{RaiseError});
return;
}
elsif ($e = Exception::Class->caught()) {
if (ref $e) {
$e->rethrow;
}
else {
if ($e =~ /j_parse|json/i) {
$e = REST::Neo4p::StreamException->new(message => $e);
$self->{_error} = $e;
$e->throw if $self->{RaiseError};
return;
}
else {
die $e;
}
}
}
}
sub _response_entity {
my ($resp,$meta) = @_;
if ( ref($resp) eq '' ) { #handle arrays of barewords
return 'bareword';
}
elsif ($meta) {
my $type = $meta->{type};
$type =~ s/^(.)/\U$1\E/;
return $type;
}
elsif (defined $resp->{self}) {
for ($resp->{self}) {
m|data/node| && do {
return 'Node';
};
m|data/relationship| && do {
return 'Relationship';
};
do {
REST::Neo4p::QueryResponseException->throw(message => "Can't identify object type by JSON response\n");
};
}
}
elsif (defined $resp->{start} && defined $resp->{end}
&& defined $resp->{nodes}) {
return 'Path';
}
else {
return 'Simple';
}
}
sub _process_row {
my $self = shift;
my ($row,$meta) = @_;
my @ret;
foreach my $elt (@$row) {
my $info;
if ($meta) {
$info = shift @$meta;
}
for ($elt) {
!ref && do { #bareword
push @ret, $elt;
last;
};
(ref =~ /HASH/) && do {
my $entity_type;
eval {
if ($info && $info->{type}) {
$elt->{self} = "$$info{type}/$$info{id}";
$entity_type = $info->{type};
$entity_type =~ s/^(.)/\U$1\E/;
}
else {
$entity_type = _response_entity($elt);
}
};
my $e;
if ($e = Exception::Class->caught()) {
(ref $e && $e->can("rethrow")) ? $e->rethrow : die $e;
}
my $entity_class = 'REST::Neo4p::'.$entity_type;
push @ret, $self->{ResponseAsObjects} ?
$entity_class->new_from_json_response($elt) :
$entity_class->simple_from_json_response($elt);
last;
};
(ref =~ /ARRAY/) && do {
my $array;
for my $ary_elt (@$elt) {
my $entity_type;
eval {
if ($info && $info->{type}) {
$elt->{self} = "$$info{type}/$$info{id}";
$entity_type = $info->{type};
$entity_type =~ s/^(.)/\U$1\E/;
}
else {
$entity_type = _response_entity($ary_elt);
}
};
my $e;
if ($e = Exception::Class->caught()) {
(ref $e && $e->can("rethrow")) ? $e->rethrow : die $e;
}
if ($entity_type eq 'bareword') {
push @$array, $ary_elt;
}
else {
my $entity_class = 'REST::Neo4p::'.$entity_type;
push @$array, $self->{ResponseAsObjects} ?
$entity_class->new_from_json_response($ary_elt) :
$entity_class->simple_from_json_response($ary_elt) ;
}
}
push @ret, $array;
last;
};
do {
REST::Neo4p::QueryResponseException->throw("Can't parse query response (row doesn't make sense)\n");
last;
};
}
}
# guess whether to flatten response:
# if more than one row element, don't flatten,
# return an array reference in the response
return (@ret == 1 and ref($ret[0]) eq 'ARRAY') ? $ret[0] : \@ret;
}
sub finish {
my $self = shift;
delete $self->{_iterator};
unlink $self->tmpf->filename if ($self->tmpf);
delete $self->{_tempfile};
return 1;
}
sub DESTROY { shift->finish }
=head1 NAME
REST::Neo4p::Query - Execute Neo4j Cypher queries
=head1 SYNOPSIS
REST::Neo4p->connect('http:/127.0.0.1:7474');
$query = REST::Neo4p::Query->new('MATCH (n) WHERE n.name = "Boris" RETURN n');
$query->execute;
$node = $query->fetch->[0];
$node->relate_to($other_node, 'link');
=head1 DESCRIPTION
REST::Neo4p::Query encapsulates Neo4j Cypher language queries,
executing them via L<REST::Neo4p::Agent> and returning an iterator
over the rows, in the spirit of L<DBI>.
=head2 Streaming
L<C<execute()>|/execute()> captures the Neo4j query response in a temp
file. L<C<fetch()>|/fetch()> iterates (in a non-blocking way if
possible) over the JSON in the response using the incremental parser
of L<JSON::XS|JSON::XS> (see L<REST::Neo4p::ParseStream> if
interested). So go ahead and make those 100 meg queries. The tempfile
is unlinked after the iterator runs out of rows, or upon object
destruction, whichever comes first.
=head2 Parameters
C<REST::Neo4p::Query> understands Cypher L<query
parameters|http://docs.neo4j.org/chunked/stable/cypher-parameters.html>. These
are represented in Cypher, unfortunately, as dollar-prefixed tokens.
MATCH (n) WHERE n.first_name = $name RETURN n
Here, C<$name> is the named parameter.
Don't forget to escape the dollar sign if you're also doing string interpolation:
$prop = "n.name";
$qry = "MATCH (n) WHERE $prop = \$name RETURN n";
A single query object can be executed multiple times with different parameter values:
my $q = REST::Neo4p::Query->new(
'MATCH (n) WHERE n.first_name = $name RETURN n'
);
foreach (@names) {
$q->execute(name => $_);
while ($row = $q->fetch) {
...process
}
}
This is very highly recommended over creating multiple query objects like so:
foreach (@names) {
my $q = REST::Neo4p::Query->new(
"MATCH (n) WHERE n.first_name = '$_' RETURN n"
);
$q->execute;
...
}
As with any database engine, a large amount of overhead is saved by
planning a parameterized query once. In addition, the REST side of the
Neo4j server will balk at handling 1000s of individual queries in a row.
Parameterizing queries gets around this issue.
=head2 Paths
If your query returns a path, L<C<fetch()>|/fetch()> returns a
L<REST::Neo4p::Path> object from which you can obtain the Nodes and
Relationships.
=head2 Transactions
See L<REST::Neo4p/Transaction Support (Neo4j Version 2.0+)>.
=head1 METHODS
=over
=item new()
$stmt = 'MATCH (n) WHERE id(n) = $node_id RETURN n';
$query = REST::Neo4p::Query->new($stmt,{node_id => 1});
Create a new query object. First argument is the Cypher query
(required). Second argument is a hashref of parameters (optional).
=item execute()
$numrows = $query->execute;
$numrows = $query->execute( param1 => 'value1', param2 => 'value2');
$numrows = $query->execute( $param_hashref );
Execute the query on the server. Not supported in batch mode.
=item fetch()
=item fetchrow_arrayref()
$query = REST::Neo4p::Query->new('MATCH (n) RETURN n, n.name LIMIT 10');
$query->execute;
while ($row = $query->fetch) {
print 'It works!' if ($row->[0]->get_property('name') == $row->[1]);
}
Fetch the next row of returned data (as an arrayref). Nodes are
returned as L<REST::Neo4p::Node|REST::Neo4p::Node> objects,
relationships are returned as
L<REST::Neo4p::Relationship|REST::Neo4p::Relationship> objects,
scalars are returned as-is.
=item err(), errstr(), errobj()
$query->execute;
if ($query->err) {
printf "status code: %d\n", $query->err;
printf "error message: %s\n", $query->errstr;
printf "Exception class was %s\n", ref $query->errobj;
}
Returns the HTTP error code, Neo4j server error message, and exception
object if an error was encountered on execution.
=item err_list()
=item finish()
while (my $row = $q->fetch) {
if ($row->[0] eq 'What I needed') {
$q->finish();
last;
}
}
Call finish() to unlink the tempfile before all items have been
fetched.
=back
=head2 ATTRIBUTES
=over
=item RaiseError
$q->{RaiseError} = 1;
Set C<$query-E<gt>{RaiseError}> to die immediately (e.g., to catch the exception in an C<eval> block).
=item ResponseAsObjects
$q->{ResponseAsObjects} = 0;
$row_as_plain_perl = $q->fetch;
If set to true (the default), query reponses are returned as
REST::Neo4p objects. If false, nodes, relationships and paths are
returned as simple perl structures. See
L<REST::Neo4p::Node/as_simple()>,
L<REST::Neo4p::Relationship/as_simple()>,
L<REST::Neo4p::Path/as_simple()> for details.
=item Statement
$stmt = $q->{Statement};
Get the Cypher statement associated with the query object.
=back
=head1 SEE ALSO
L<DBD::Neo4p>, L<REST::Neo4p>, L<REST::Neo4p::Path>, L<REST::Neo4p::Agent>.
=head1 AUTHOR
Mark A. Jensen
CPAN ID: MAJENSEN
majensen -at- cpan -dot- org
=head1 LICENSE
Copyright (c) 2012-2022 Mark A. Jensen. This program is free software; you
can redistribute it and/or modify it under the same terms as Perl
itself.
=cut
1;