## @file # (Enter your file info here) # # $Id: DbRegistry.pm 496 2008-08-20 14:18:09Z damjan $ # Database connection and handling registry ## @class RWDE::DB::DbRegistry # Database registry which provides the underlying access API for RWDE objects. Once instantiated and initialized the registry # provides a one stop shop for managing database connections (open/close), connection setting modifications and # transaction management (begin/prepare/commit/abort) package RWDE::DB::DbRegistry; use strict; use warnings; use DBI; use Error qw(:try); use RWDE::Configuration; use RWDE::Exceptions; use RWDE::RObject; use base qw(RWDE::Singleton); our (%dbh, %prepared_transactions, $transaction_connection, $transaction_sequence, $DB_CONFIG, $DB_CONNECTIONS, $impose_transaction); our ($unique_instance); use vars qw($VERSION); $VERSION = sprintf "%d", q$Revision: 561 $ =~ /(\d+)/; ## @method object get_instance() # Retrieve the registry instance # @return retrieved registry instance sub get_instance { my ($self, $params) = @_; if (ref $unique_instance ne $self) { $unique_instance = $self->new; } return $unique_instance; } ## @method void initialize() # Read the configuration file for DB names and locations # these data are used for opening connections sub initialize { my ($self, $params) = @_; # initialize the params for connecting to the db $DB_CONFIG = RWDE::Configuration->DB; # and mapping between the db names and connections $DB_CONNECTIONS = RWDE::Configuration->DB_CONNECTIONS; $transaction_sequence = 0; $impose_transaction = undef; return (); } ## @method object get_dbh() # Retrieve the database handle for a specific database name. In the event that a transaction was # previously signalled the transaction will begin here # ¶m db Database name # @return The database handle for a specific database sub get_dbh { my ($self, $params) = @_; my $connection_name = $self->_get_connection_name($params); unless (defined $dbh{$connection_name} and $dbh{$connection_name}->ping()) { $self->_connect_db({ connection => $connection_name }); } #we have received an earlier signal to start a transaction and it hasn't completed yet if (transaction_signalled()) { #if there is no transaction defined yet if (!defined($transaction_connection)) { $self->_begin_transaction($params); } } return $dbh{$connection_name}; } ## @method void add_db_settings($db_settings) # Pass a setting to the database. This needs to be set before the connection is established # @param db_settings Specific setting to be applied to the database at connect time sub add_db_settings { my ($self, $params) = @_; my $db_setting_param = $$params{db_settings}; if (not defined($db_setting_param) || ref($db_setting_param) ne "ARRAY") { throw RWDE::DevelException({ info => "DB setting supplied undefined or not an array" }); } my $connection_name = RWDE::DB::DbRegistry->_get_connection_name($params); push (@{$$DB_CONFIG{$connection_name}{db_settings}}, @$db_setting_param); return (); } ## @method object get_db_notifications($sleeptime) # Retrieve notifications (pg_notifies) from the database # ¶m db The database name that you want notifications for # @param sleeptime Length of time to block while waiting for notifications # @return Array reference of notifications the db raised sub get_db_notifications { my ($self, $params) = @_; my $dbh = $self->get_dbh($params); my @required = qw( sleeptime ); RWDE::RObject->check_params({ required => \@required, supplied => $params }); use IO::Select; my @notifications; my $dbpid = $dbh->{pg_pid}; my $dbsock = $dbh->{pg_socket}; my $select = new IO::Select($dbsock) or throw RWDE::DevelException({ info => "Failed to create selector" }); if (my @ready = $select->can_read($$params{sleeptime})) { while (my $notification = $dbh->func('pg_notifies')) { push(@notifications, $notification); # "got NOTIFY $n->[0] from $n->[1]"; } } return \@notifications; } ## @method void destroy_dbh() # Tear down and clear (from the registry) the database associated with the database parameter it was invoked with # param db The database name you want to destroy the handle for sub destroy_dbh { my ($self, $params) = @_; my $connection_name = $self->_get_connection_name($params); $dbh{$connection_name}->{InactiveDestroy} = 1; $self->closeDB($params); return; } ## @method void closeDB() # Close a specific database handle and the delete referenced db from the registry # - # Note: Don't do explicit disconnect if we're a child process clearing out the old handle just prior to re-opening it. # In this case, we set InactiveDestroy to keep DBI from doing the implicit disconnect when the handle goes away. sub closeDB { my ($self, $params) = @_; my $connection_name = $self->_get_connection_name($params); if ($dbh{$connection_name} and !$dbh{$connection_name}->{InactiveDestroy}) { $dbh{$connection_name}->disconnect; } delete $dbh{$connection_name}; return (); } ## @method void close_all() # Close all database connections we are currently maintaining sub close_all { my ($self, $params) = @_; foreach my $connection_name (keys %$DB_CONNECTIONS) { if ($dbh{$connection_name} and !$dbh{$connection_name}->{InactiveDestroy}) { $dbh{$connection_name}->disconnect; } delete $dbh{$connection_name}; } return (); } ## @method void close_all() # Release the handles w/o explicitly disconnecting: i.e. on fork sub destroy_all { my ($self, $params) = @_; foreach my $db_name (keys %$DB_CONNECTIONS) { $self->destroy_dbh({ db => $db_name}); } return (); } #================================================================ #everything below is transaction related - global can call these #================================================================ ## @method void signal_transaction() # Signal the database backend to begin a transaction before the next database operation takes place sub signal_transaction { my ($self, $params) = @_; #if this flag is already set and we ended up here something is wrong if (!transaction_signalled()) { $impose_transaction = 1; } else { warn 'Received a signal for impose transaction, but transaction already in progress'; } return (); } ## @method object transaction_signalled() # Determine if a transaction has been signalled in the backend # @return true if a transaction is signalled, false otherwise sub transaction_signalled { my ($self, $params) = @_; return (defined($impose_transaction)); } ## @method void commit_transaction() # Commit the running transaction within the database. Will invoke an exception if a transaction # is not currently defined or if the transaction does not execute properly. After committing a # transaction the related state variables are cleared. sub commit_transaction { my ($self, $params) = @_; if (!(defined $transaction_connection and $dbh{$transaction_connection} and $dbh{$transaction_connection}->commit())) { warn "Commit transaction called with no outstanding transaction -> probably aborted before"; } $transaction_connection = undef; $impose_transaction = undef; return (); } ## @method void abort_transaction() # Abort the running transaction within the database. If no transaction is running then this operation # acts as a no-op sub abort_transaction { my ($self, $params) = @_; if (defined($transaction_connection)) { $dbh{$transaction_connection}->rollback(); $transaction_connection = undef; } $impose_transaction = undef; return (); } ## @method object prepare_transaction() # Prepare and save the currently running transaction within the database. An exception will be thrown if # no transaction is running or there is a problem preparing the transaction. Once the transaction is prepared a # string handle/reference is returned back to the caller to allow for processing of the transaction at a later time # @return string handle/reference to the prepared transaction sub prepare_transaction { my ($self, $params) = @_; $self->check_transaction(); my $transaction_name = $self->_get_transaction_name($params); #unfortunately DBI does not have a call to support 2pc yet my $sth = $dbh{$transaction_connection}->prepare("PREPARE TRANSACTION ?"); if (!($sth && $sth->execute($transaction_name))) { throw RWDE::DevelException({ info => $dbh{$transaction_connection}->errstr() }); } #store the transaction connection name associated with this prepared transaction $prepared_transactions{$transaction_name} = $transaction_connection; #clear out transaction connection $transaction_connection = undef; $impose_transaction = undef; return $transaction_name; } ## @method void commit_prepared_transaction($transaction_name) # Commit a previously prepared transaction within the database. An exception will be thrown if # the transaction doesn't exist or if there is a problem committing the transaction. # @param transaction_name Handle/reference to a previously prepared transaction sub commit_prepared_transaction { my ($self, $params) = @_; #if the transaction is not defined then something is wrong if (!defined($$params{transaction_name})) { my ($package, $filename, $line, $subroutine, $hasargs, $wantarray, $evaltext, $is_require, $hints, $bitmask) = caller(1); throw RWDE::DevelException({ info => "Attempt to commit an unspecified prepared transaction without a name: $package on $line" }); } my $transaction_name = $$params{transaction_name}; my $connection = $prepared_transactions{$transaction_name}; if (not defined $connection){ $connection = $self->_get_connection_name($params); } if (defined($connection) && defined($transaction_name)) { #unfortunately DBI does not have a call to support 2pc yet if (!defined($dbh{$connection}->do("COMMIT PREPARED " . $dbh{$connection}->quote($transaction_name)))) { throw RWDE::DevelException({ info => $dbh{$connection}->errstr() }); } } #clear out transaction $prepared_transactions{$transaction_name} = undef; return (); } ## @method void abort_prepared_transaction($transaction_name) # Abort a previously prepared transaction within the database. An exception will be thrown if # the transaction doesn't exist or if there is a problem committing the transaction. # @param transaction_name Handle/reference to a previously prepared transaction sub abort_prepared_transaction { my ($self, $params) = @_; #if the transaction is not defined then something is wrong if (!defined($$params{transaction_name})) { my ($package, $filename, $line, $subroutine, $hasargs, $wantarray, $evaltext, $is_require, $hints, $bitmask) = caller(1); throw RWDE::DevelException({ info => "Attempt to commit an unspecified prepared transaction without a name: $package on $line" }); } my $transaction_name = $$params{transaction_name}; my $connection = $prepared_transactions{$transaction_name}; if (not defined $connection){ $connection = $self->_get_connection_name($params); } if (defined($connection) && defined($transaction_name)) { #unfortunately DBI does not have a call to support 2pc yet if (!defined($dbh{$connection}->do("ROLLBACK PREPARED " . $dbh{$connection}->quote($transaction_name)))) { throw RWDE::DevelException({ info => $dbh{$connection}->errstr() }); } } else { warn 'Either connection or transaction_name not defined'; } #clear out transaction $prepared_transactions{$transaction_name} = undef; return (); } ## @method void check_transaction() # Test to see if we have a transaction and that everything is consistent. Throw a devel exception if no # transaction is running. # - # This is used to ensure that we are in ANY transaction - regardless of connection. Maybe this is bad sub check_transaction { my ($self, $params) = @_; if (!defined($transaction_connection) || !defined($dbh{$transaction_connection}) || !transaction_signalled()) { my ($package, $filename, $line, $subroutine, $hasargs, $wantarray, $evaltext, $is_require, $hints, $bitmask) = caller(1); throw RWDE::DevelException({ info => "Attempt to get transaction name for nonexistent transaction: $package on $line" }); } return (); } ## @method object has_transaction() # Determine if the argument db connection has a transaction # ¶m db name of the database supposedly in a transaction # @return true if the database is in a transaction, false otherwise sub has_transaction { my ($self, $params) = @_; #check to see if a transaction is signalled and if the argument connection is associated with such a signalled transaction return (transaction_signalled() && defined($transaction_connection) && ($transaction_connection eq $self->_get_connection_name($params))); } ## @method void db_check_transaction() # Determine if the current transaction is associated with the specified db param - otherwise devel exception sub db_check_transaction { my ($self, $params) = @_; unless (has_transaction($params)) { my ($package, $filename, $line, $subroutine, $hasargs, $wantarray, $evaltext, $is_require, $hints, $bitmask) = caller(1); throw RWDE::DevelException({ info => "$package on $line requires exclusive access to the affecting rows. Add transaction around the function call." }); } return (); } ## @method void cleanup() # Cleanup running transactions on all connections by aborting them sub cleanup { my ($self, $params) = @_; foreach my $db (keys %dbh) { if (defined($transaction_connection)) { $self->abort_transaction(); } $dbh{$db}->do("RESET ALL"); # unset any special variables } return (); } #================================================================ #everything below is private - do not call externally #================================================================ ## @method protected object _get_connection_name($db) # Private call to determine the connection name for a database name argument # @param db Database name you are seeking the connection name for # @return the connection name associated with the database name argument sub _get_connection_name { my ($self, $params) = @_; my $label = $$params{db}; if (not defined $label) { my ($package, $filename, $line, $subroutine, $hasargs, $wantarray, $evaltext, $is_require, $hints, $bitmask) = caller(1); throw RWDE::DevelException({ info => "DB not specified: $package on $line" }); } my $connection_name = $$DB_CONNECTIONS{$label}; if (!defined($connection_name)) { my ($package, $filename, $line, $subroutine, $hasargs, $wantarray, $evaltext, $is_require, $hints, $bitmask) = caller(2); throw RWDE::DevelException({ info => "No connection specified for given db:$label: from $package on $line" }); } return $connection_name; } sub get_host { my ($self, $params) = @_; my $connection_name = $self->_get_connection_name({ db => $$params{db}}); my $DB = $$DB_CONFIG{$connection_name}; return $$DB{db_host}; } ## @method protected void _connect_db($connection) # Create a database connection for the connection_name argument given. If there are any problems an exception will be thrown. # Upon success the database connection handle is added to the registry and may be retrieved using "get_dbh". # @param connection The connection_name you are trying to create a connection for sub _connect_db { my ($self, $params) = @_; my $connection_name = $$params{connection}; my $DB = $$DB_CONFIG{$connection_name}; my $port = $$DB{db_port} ? $$DB{db_port} : 5432; my $datasource = 'dbi:' . $$DB{db_type} . ':dbname=' . $$DB{db_name} . ';host=' . $$DB{db_host} . ';port=' . $port; if (RWDE::Configuration->Debug) { #disable preparing of queries so we can see them $dbh{$connection_name} = DBI->connect($datasource, $$DB{db_user}, $$DB{db_pass}, { PrintError => 0, pg_server_prepare => 0 }) or throw RWDE::DevelException({ info => "dbi connect failure: " . $DBI::errstr }); } else { #tmp fix, put prepare to 0, looks like pg_bouncer is not liking it a lot $dbh{$connection_name} = DBI->connect($datasource, $$DB{db_user}, $$DB{db_pass}, { PrintError => 0, pg_server_prepare => 0 }) or throw RWDE::DevelException({ info => "dbi connect failure: " . $DBI::errstr }); } # and listen for events to wake us up foreach my $db_setting (@{$$DB{db_settings}}) { $dbh{$connection_name}->do($db_setting) or throw RWDE::DevelException({ info => "failed to set: $db_setting for $connection_name" }); } return (); } ## @method protected object _get_transaction_name() # This method provides a uniq naming scheme for naming prepared transactions. # @return Unique name that is suitable to be used for preparing a transaction sub _get_transaction_name { my ($self, $params) = @_; $self->check_transaction(); my $dbh = $dbh{$transaction_connection}; my $transaction_name = $$ . "|" . $dbh->{pg_pid} . "|" . $transaction_sequence; $transaction_sequence++; return $transaction_name; } ## @method protected void _begin_transaction() # Private method to begin a transaction. The only appropriate place to use it within this registry construct # is immediately after a transaction signal has been raised - but before any other database queries are executed. sub _begin_transaction { my ($self, $params) = @_; #if there are ANY other transactions then something is wrong if (defined($transaction_connection)) { throw RWDE::DevelException({ info => "Attempt to start multiple transactions" }); } my $connection_name = $self->_get_connection_name($params); $transaction_connection = $connection_name; if (!($dbh{$connection_name})) { throw RWDE::DevelException({ info => 'Error while attempting to _begin_transaction -- no connection_name' }); } if (!($dbh{$connection_name}->begin_work())) { throw RWDE::DevelException({ info => 'Error while attempting to _begin_transaction failed: ' . $dbh{$connection_name}->errstr() }); } return (); } 1;