package DBIx::BulkUtil; use DBI; use Carp qw(confess); use strict; use warnings; our $VERSION = '0.05'; # Override this sub passwd { return ''; } # Override this sub user { return ''; } { my @connect_options = qw( Server Database Env Type User Password DataDir ConnectMethod RetryCount RetryMinutes BulkLogin NoBlankNull Silent NoCharset NoServer Dsl DslOptions DateFormat DatetimeFormat DatetimeTzFormat ); my %is_valid; $is_valid{$_}++ for @connect_options; sub _options_valid { my $class = shift; my %opts = @_; for my $opt (keys %opts) { return $opt if !$is_valid{$opt}; } return; } } # Override this to set server, db, env, type based on whatever sub env2db { my ($self,$args) = @_; $args->{Type} ||= (!$args->{Server} && $args->{Database} ) ? 'Oracle' : 'Sybase'; if ( $args->{Type} eq 'SybaseIQ' ) { $args->{IsIQ}++; $args->{Type} = 'Sybase' } } sub connect { my $class = shift; # Use HandleError sub instead of more straightforward RaiseError # attribute because Sybase 1.09 does not include line numbers in its # RaiseError die messages. And a stacktrace is usually more helpful # anyway. my $dbi_opts = { ChopBlanks => 1, AutoCommit => 1, PrintError => 0, RaiseError => 1, LongReadLen => 1_024 * 1_024, }; if ( @_ and ref($_[-1]) ) { my $tmp_opts = pop @_; @$dbi_opts{keys %$tmp_opts} = values %$tmp_opts; } my $bad_opt = $class->_options_valid(@_); die "Invalid option $bad_opt to ${class}->connect" if $bad_opt; # TODO: Log or Output option? my %args = @_; my $fh; open($fh, ">", "/dev/null") if $args{Silent}; my $stdout = $args{Silent} ? $fh : \*STDOUT; local *STDOUT = $stdout; my $connect = $args{ConnectMethod} || 'connect'; $class->env2db( \%args ); my @dsl_args = $args{Dsl} ? ref($args{Dsl}) ? @{$args{Dsl}} : $args{Dsl} : (); my $type = $args{Type}; my $database = $args{Database}; my $server = $args{Server} || ''; if (!@dsl_args) { if ( $type eq 'Sybase' ) { # Server-side charset is iso, need to specify it as client-side charset # or else we get utf8 to iso charset conversion error when database handle is cloned # (which happens automatically when you need multiple active statement handles). push @dsl_args, "server=$server" unless $args{NoServer}; push @dsl_args, 'charset=iso_1' unless $args{NoCharset}; push @dsl_args, 'bulkLogin=1' if $args{BulkLogin}; } elsif ( $type eq 'Oracle' ) { push @dsl_args, $database unless $args{NoServer}; } else { die "Unable to connect to database type $type"; } } # For Xtra Dsl options push @dsl_args, ref($args{DslOptions}) ? @{$args{DslOptions}} : $args{DslOptions} if $args{DslOptions}; my $dsl = "dbi:$type:"; $dsl .= join( ";", @dsl_args ); my $user = $args{User} || $class->user(\%args); my $passwd = $args{Password} || $class->passwd(\%args); my $dbh; my $retry = int($args{RetryCount} || 0); my $retry_seconds = 60 * ($args{RetryMinutes} || 10); $retry_seconds = 60 * 10 if $retry_seconds < 0; my $conn_name = ($type eq 'Sybase') ? $server : $database; while (1) { print "Connecting to $conn_name\n"; $dbh = eval { DBI->$connect($dsl, $user, $passwd, $dbi_opts) }; my $err = $@; unless ($dbh) { die $err unless $retry-- > 0; print "Unable to connect to $conn_name. Will retry in $retry_seconds seconds"; sleep $retry_seconds; redo; } # Make selected Sybase database the current database # And make date formats consistent if ( $type eq 'Sybase' ) { # Switch database after connect so that we get a helpful error message # and an error instead of a warning if ($database) { print "Using $database database\n"; my $result = eval { $dbh->do("USE $database") }; my $err = $@; unless ($result) { die $err unless $retry-- > 0; $dbh->disconnect(); print "Unable to use database $database on $server. Will retry in $retry_seconds seconds\n"; sleep $retry_seconds; redo; } }; $dbh->{syb_date_fmt} = $args{DateFormat} || 'ISO'; $dbh->do("set temporary option Load_ZeroLength_AsNULL = 'ON'") if $args{IsIQ} and !$args{NoBlankNull}; } elsif ( $type eq 'Oracle' ) { # Fractions on Oracle "DATE" format not allowed my $date_fmt = $args{DateFormat} || 'YYYY-MM-DD HH24:MI:SS'; my $datetime_fmt = $args{DatetimeFormat} || 'YYYY-MM-DD HH24:MI:SS.FF'; my $datetime_tz_fmt = $args{DatetimeTzFormat} || $args{DatetimeFormat} || $datetime_fmt; $_ = $dbh->quote($_) for $date_fmt, $datetime_fmt, $datetime_tz_fmt; $dbh->do("alter session set nls_date_format=$date_fmt"); $dbh->do("alter session set nls_timestamp_format=$datetime_fmt"); $dbh->do("alter session set nls_timestamp_tz_format=$datetime_tz_fmt"); } last; } # We do not want stack trace on connect so that we do not expose password # But everywhere else it is useful $dbh->{RaiseError} = 0; $dbh->{HandleError} = sub { confess $_[0] }; return $dbh unless wantarray; my $util = DBIx::BulkUtil::Obj->new($dbh, $passwd, \%args); return $dbh, $util; } # Just set the connect method and call connect() sub connect_cached { my $class = shift; my @args = $class->override({ConnectMethod => 'connect_cached'}, @_); $class->connect(@args); } sub syb_connect { my $class = shift; my @args = $class->override({ Type => 'Sybase' }, @_); return $class->connect(@args); } sub syb_connect_cached { my $class = shift; my @args = $class->override({ Type => 'Sybase' }, @_); return $class->connect_cached(@args); } sub ora_connect { my $class = shift; my @args = $class->override({ Type => 'Oracle' }, @_); return $class->connect(@args); } sub ora_connect_cached { my $class = shift; my @args = $class->override({ Type => 'Oracle' }, @_); return $class->connect_cached(@args); } sub iq_connect { my $class = shift; my @args = $class->override({ Type => 'SybaseIQ' }, @_); return $class->connect(@args); } sub iq_connect_cached { my $class = shift; my @args = $class->override({ Type => 'SybaseIQ' }, @_); return $class->connect_cached(@args); } # Overriden connect args need to be spliced in before any dbi options sub override { my $self = shift; my ($ovr, @args) = @_; if ( (@args % 2) == 0 ) { return @args, %$ovr; } my $dbi_opts = pop @args; die "Last argument to connect must be hash reference" unless $dbi_opts and ref($dbi_opts); return @args, %$ovr, $dbi_opts; } package DBIx::BulkUtil::Obj; our $BCP_DELIMITER = '|'; use Memoize qw(memoize); use Carp qw(confess); sub new { my ( $class, $dbh, $passwd, $args ) = @_; my $type = $dbh->{Driver}{Name}; if ($type eq 'Sybase') { (my $version = $dbh->{syb_server_version_string}) =~ s|/.*||; $type = 'SybaseIQ' if $version =~ /IQ/; } $class =~ s/::Obj$// or die "Invalid class $class"; $class .= "::" . $type; return $class->util($dbh, $passwd, $args); } sub util { my $class = shift; my ($dbh, $pw, $args) = @_; confess "Must use subclass of this package" if __PACKAGE__ eq $class; my %util_args; if ( $args and ref($args) ) { $util_args{NoBlankNull} = 1 if $args->{NoBlankNull}; } # Prevent dbh from disconnecting after fork in child processes my $dbh_pid = $$; my $release = DBIx::BulkUtil::Release->new(sub { $dbh->{InactiveDestroy} = 1 if $dbh_pid != $$ }); bless { DBH => $dbh, PASSWORD => $pw, DELIMITER => $BCP_DELIMITER, RELEASE => $release, %util_args }, $class; } sub dbh { $_[0]->{DBH} } sub get { my $self = shift; my $select = shift; my $dbh = $self->{DBH}; my @result = $dbh->selectrow_array( $self->row_select($select) ); return $result[0] if @result == 1; return @result; } sub exec_sp { my $self = shift; my $dbh = $self->{DBH}; $dbh->do($self->sp_sql(@_)); } sub bcp_out { my $self = shift; my $opts = {}; if (ref $_[-1]) { $opts = pop @_; } my ( $table, $file ) = @_; $file ||= "$table.bcp"; my $delimiter = $opts->{Delimiter} || $self->{DELIMITER}; my $row_delim = $opts->{RowDelimiter} || $/; my @esc = ( escape_char => $opts->{EscapeChar} ) if $opts->{EscapeChar}; # Default to no quote char to be more compatible w/Sybase my @quote_char = $opts->{QuoteFields} ? () : ( quote_char => undef, escape_char => undef ); # TODO: Give up on Text::CSV ?? my $csv; if ( length($delimiter) == 1 ) { require Text::CSV_XS; $csv = Text::CSV_XS->new({ binary => 1, eol => $row_delim, sep_char => $delimiter, @esc, @quote_char, }); } my $col_list = $opts->{Columns} ? $opts->{Columns} : "*"; # Only for HP? #local $ENV{NLS_LANG} = "AMERICAN_AMERICA.WE8ROMAN8"; my $enc_opt = $opts->{Encoding} || ''; my $db_type = $self->type(); if ( $db_type eq 'Oracle' ) { my $partition = ( $table =~ s/:(\w+)$// ) ? $1 : ''; my $nls_lang = $ENV{NLS_LANG} || ''; if ( $nls_lang =~ /utf8/i ) { $enc_opt ||= 'utf8'; } if ( $col_list eq '*' ) { my @col_list; my $col_info = $self->column_info($table); my $list = $col_info->{LIST}; my $col_map = $col_info->{MAP}; my $xml_cnt; for my $col (@$list) { if ( $col_map->{$col}{TYPE_NAME} eq 'XMLTYPE' ) { $xml_cnt++; push @col_list, "XMLType.getclobval($col)"; next; } push @col_list, $col; } if ($xml_cnt) { $col_list = join(",", @col_list); } } $table = "$table PARTITION ($partition)" if $partition; } my $enc = $enc_opt ? ":encoding($enc_opt)" : ''; open(my $fh, ">$enc", $file) or confess "Can not write to $file: $!"; my $sql = "SELECT $col_list FROM $table\n"; $sql .= $opts->{Filter} if $opts->{Filter}; my $dbh = $self->{DBH}; my $sth = $dbh->prepare($sql); $sth->{ChopBlanks} = 0 unless $opts->{TrimBlanks}; $sth->execute(); if ($opts->{Header}) { if ($csv) { $csv->print($fh, $sth->{NAME_lc}); } else { print $fh join( $delimiter, @{$sth->{NAME_lc}} ), $row_delim; } } my $cnt = 0; while ( my $row = $sth->fetchrow_arrayref() ) { no warnings 'uninitialized'; if ($csv) { $csv->print($fh, $row); } else { print $fh join($delimiter, @$row), $row_delim; } $cnt++; } close $fh; return $cnt; } { no warnings 'once'; *select2file = \&bcp_out; } sub bcp_file { my ($self, $file_in, $file_out) = @_; my $opts = {}; if (ref $_[-1]) { $opts = pop @_; } my $delimiter = $opts->{Delimiter} || $self->{DELIMITER}; my $esc = $opts->{EscapeChar} || "\\"; my @quote_char = $opts->{QuoteFields} ? () : ( quote_char => undef ); require Text::CSV_XS; my $csv = Text::CSV_XS->new({ binary => 1, eol => $/, sep_char => $delimiter, escape_char => $esc, @quote_char, }); open(my $in_fh, "<", $file_in) or die "Err: $!"; open(my $out_fh, ">", $file_out) or die "Err: $!"; my $hdr = $csv->getline($in_fh); $csv->column_names($hdr); my @drop_cols = $opts->{DropCols} ? @{$opts->{DropCols}} : (); my %drop; $drop{$_}++ for @drop_cols; my @cols = $opts->{KeepCols} ? @{$opts->{KeepCols}} : @drop_cols ? grep !$drop{$_}, @$hdr : @$hdr; my %hdr_idx; @hdr_idx{@$hdr} = 0..$#$hdr; $csv->print($out_fh, [@$hdr[@hdr_idx{@cols}]]) if $opts->{Header}; while ( my $row = $csv->getline_hr($in_fh) ) { $csv->print($out_fh, [@$row{@cols}]); } close $in_fh; close $out_fh; } sub add_header { my ($self, $table, $file, $opts) = @_; $opts ||= {}; my $cols; if ( $opts->{Header} || $opts->{Columns} ) { my $sel_str = $opts->{Columns} ? ref($opts->{Columns}) ? join(",", @{$opts->{Columns}}) : $opts->{Columns} : '*'; my $sth = $self->{DBH}->prepare("SELECT $sel_str FROM $table WHERE 1=0"); $sth->execute(); $cols = $sth->{NAME_lc}; $sth->finish(); } return $self->add_quotes($table, $file, $cols, $opts) if $opts->{QuoteFields}; # If quotes are not required, this is more efficient # I doubt anyone uses either option anyway # but highly doubt anyone uses the quoting require File::Copy; my $d = $opts->{Delimiter} || $self->{DELIMITER}; local $/ = $opts->{RowDelimiter} || "\n"; open(my $fh, ">", "$file.bak") or die "Failed to open $file.bak: $!"; # Unbuffer the filehandle for printing header # because File::Copy uses unbuffered syswrite # $fh->flush() after the print would also work depending on # version of perl and whether IO::Handle is loaded for ( select $fh ) { $| = 1; select $_ } print $fh join($d, @$cols), $/; File::Copy::copy($file, $fh) or die "Failed to copy $file to $file.bak: $!"; close $fh; return "$file.bak"; } sub add_quotes { my ($self, $table, $file, $cols, $opts) = @_; my $d = $opts->{Delimiter} || $self->{DELIMITER}; my $dre = quotemeta($d); local ($_, $., $ARGV, *ARGV); local ( $^I, @ARGV ) = ( '.bak', $file ); local $/ = $opts->{RowDelimiter} || "\n"; my $done; while ( <> ) { print join($d, @$cols), $/ if !$done++ && $opts->{Header}; if ($opts->{QuoteFields}) { chomp; my @fields = split /$dre/; /\s/ and $_ = qq("$_") for @fields; $_ = join($d, @fields) . $/; } print; } return "$file.bak"; } sub type { my $self = shift; return $self->{DBH}{Driver}{Name}; } # Because of Sybase and its stupid mixed case column names, # we need to be able to find the actual cased name for a given # uncased column name. # Just pray that there are not two columns with the same name # in the same table that are differently cased. memoize('column_info'); sub column_info { my $self = shift; my $table = shift; my $schema; my $dbtype = $self->type(); my ($tmp_db, $curr_db) = (undef,''); my $dbh = $self->{DBH}; my %col_dflt; if ( $dbtype eq 'Oracle' ) { $table = uc($table); if ( $table =~ /^(\w+)\.(\w+)$/ ) { ($schema, $table) = ($1,$2); } else { $schema = $self->curr_schema() } } elsif ( $dbtype eq 'Sybase' ) { $tmp_db = $curr_db = $self->curr_db(); if ( $table =~ /^#/ ) { $table = $self->temp_table_name($table); } if ( $table =~ /^(?:(\w+)\.)?(\w*)\.(#?\w+)$/ ) { ($tmp_db, $schema, $table) = ($1,$2,$3); $schema ||= undef; # We can only get column info on the current database $dbh->do("USE $tmp_db") if defined($tmp_db) and $tmp_db ne $curr_db; } $schema ||= '%'; # Sybase gets metadata through a (under the DBD hood) stored proc, but does not return defaults. # So get defaults here. my $sth = $dbh->prepare( sprintf( $self->default_sql(), $table ) ); $sth->execute(); $sth->bind_columns( \my ( $col_name, $default ) ); while ( $sth->fetch ) { $col_dflt{$col_name} .= $default; } } my $sth = $self->{DBH}->column_info($tmp_db, $schema, $table, '%'); my @names = @{$sth->{NAME_uc}}; my %row; $sth->bind_columns(\@row{@names}); my @list; my %col_map; my $col_cnt = 0; while ( $sth->fetch() ) { # Data is probably in order, but we are not guaranteed # So assign by index instead of pushing to array if possible # IQ does not have ORDINAL_POSITION so fall back to select order my $idx = defined($row{ORDINAL_POSITION}) ? $row{ORDINAL_POSITION}-1 : $col_cnt; $col_cnt++; my $name = lc($row{COLUMN_NAME}); $list[$idx] = $name; ($row{COLUMN_DEF} = $col_dflt{$name}) =~ s/^default\s*//i if defined($col_dflt{$name}) and !defined($row{COLUMN_DEF}); $col_map{$name} = { %row }; } $dbh->do("USE $curr_db") if defined($tmp_db) and $tmp_db ne $curr_db; return unless $col_cnt; my %col_info = ( LIST => \@list, MAP => \%col_map, ); return \%col_info; } sub last_chg_list { my $self = shift; my ($table, $columns) = @_; # Determine if last_chg_user, last_chg_date need to be updated my %chg_field = (last_chg_user => 1, last_chg_date => 1); delete $chg_field{$_} for map lc, @$columns; my %chg_cols; if (%chg_field) { # Are chg columns in table my $col_info = $self->column_info($table); my $col_map = $col_info->{MAP}; for my $c (keys %chg_field) { $chg_cols{$c} = $col_map->{$c}{COLUMN_SIZE} if $col_map->{$c}; } } return %chg_cols; } sub key_columns { my ($self, $table) = @_; my $pk = $self->primary_key($table); return $pk if $pk; my $idx = $self->index_info($table); return unless $idx; # Look for unique indexes with suffixes uk, pk, or key my ($pk_name) = sort grep /(?i)(?:[pu]k|key)\d*$/, keys %$idx; return $idx->{$pk_name} if $pk_name; my ($idx_name) = sort keys %$idx; return $idx->{$idx_name}; } sub upd_columns { my ($self, $table, $key_cols) = @_; my $col_data = $self->column_info($table)->{LIST}; $key_cols ||= $self->key_columns($table); return unless $key_cols; my %is_key_col; $is_key_col{$_}++ for @$key_cols; return [ grep !$is_key_col{$_}, @$col_data ]; } sub delete { my ($self, $table, $where) = @_; my $dbh = $self->{DBH}; my $sql = "DELETE FROM $table"; $sql .= " WHERE $where" if $where; my $rows = $dbh->do($sql) + 0; print "$rows rows deleted\n"; return $rows; } # Execute sql with retry on deadlocks sub execute { my ($self,$sth,@args) = @_; # We can pass a sql statement or a sth $sth = $self->{DBH}->prepare($sth) if !ref($sth); my $retry = 5; for (1..$retry) { my $status = eval { $sth->execute(@args) }; return $status if $status; confess $@ unless $@ =~ /deadlock/i; print "Deadlock detected on retry $_ of 5\n"; sleep 2 if $_ < $retry; } confess $@; } sub ora_date_fmt { # Not very OO-ish but allow calling the Oracle date mask routine # From any generic utility object my $self = shift; DBIx::BulkUtil::Oracle->date_mask(@_); } sub strptime_fmt { my ($class, $str, $fmt) = @_; $fmt ||= DBIx::BulkUtil::Oracle->date_mask($str); return undef unless $fmt; for ($fmt) { s/MONTH/%B/; s/MON/%b/; s/MM/%m/; s/DD/%d/; s/YYYY/%Y/; s/YY/%y/; s/RRRR/%Y/; s/RR/%y/; s/HH24/%H/; s/HH(?:12)?/%I/; s/MI/%M/; s/SS/%S/; s/AM/%p/; s/DY/%a/; s/DAY/%a/; s/TZD/%Z/; s/TZH.TZD/%z/; s/"(.)"/$1/g; } return $fmt; } sub blk_prepare { my ($self, $table, %args) = @_; my $blk_opts = $args{BlkOpts} || {}; my $commit = $args{CommitSize} || 1000; my $con = $args{Constants}; my $col_info = $self->column_info($table) or confess "Table $table does not exist"; my @col_list = @{$col_info->{LIST}}; my $arg_len = @col_list; my $col_cnt = @col_list; my $sql = "INSERT INTO $table VALUES (" . join(",", ("?") x $col_cnt) . ")"; my $type = $self->type(); my @blk_opts = ($type eq 'Sybase') ? { syb_bcp_attribs => $blk_opts } : (); my $dbh = $self->{DBH}; my $sth = $dbh->prepare($sql, @blk_opts); my ($exec_f,$commit_f,$finish_f); my @ex_arg_list = (undef) x @col_list; my $cnt = 0; if ($con) { my %const = %$con; my @c_list = keys %const; my %col_pos; @col_pos{@col_list} = 0..$#col_list; my %const_pos; @const_pos{@c_list} = delete @col_pos{@c_list}; $arg_len = keys %col_pos; # Create arg array for execute method # Set constants and create sub for all but constant args @ex_arg_list[@const_pos{@c_list}] = @const{@c_list}; my @non_const = sort { $a <=> $b } values %col_pos; $sth->{HandleError} = undef if $type eq 'Oracle'; if ($type eq 'Sybase') { $exec_f = sub { @ex_arg_list[@non_const] = @_; $sth->execute(@ex_arg_list) }; $commit_f = sub { $dbh->commit() }; $finish_f = sub { $dbh->commit(); $sth->finish(); $sth = undef }; } else { $exec_f = sub { my $i=0; push @{$ex_arg_list[$_]}, $_[$i++] for @non_const }; $commit_f = sub { my ($t,$r) = $sth->execute_array({ ArrayTupleStatus => \my @status }, @ex_arg_list); unless (defined $t) { for my $i (0..$#status) { next unless ref $status[$i]; my @row = map { ref($ex_arg_list[$_]) ? qq('$ex_arg_list[$_][$i]') : $ex_arg_list[$_] } 0..$#ex_arg_list; confess "Error: [$status[$i][1]] inserting [".join(",", @row)."]"; } } $_ = [] for @ex_arg_list[@non_const]; $r; }; $finish_f = sub { $commit_f->() if $cnt > 0 }; } } else { if ($type eq 'Sybase') { $exec_f = sub { $sth->execute(@_); '0E0' }; $commit_f = sub { $dbh->commit(); $cnt }; $finish_f = sub { $dbh->commit(); $sth->finish(); $sth = undef; ( $cnt > 0 ) ? $cnt : '0E0' }; } else { $exec_f = sub { my $i=0; push @{$ex_arg_list[$_]}, $_[$_] for 0..$#ex_arg_list; return '0E0' }; $commit_f = sub { my ($t,$r) = $sth->execute_array({ ArrayTupleStatus => \my @status }, @ex_arg_list); unless (defined $t) { for my $i (0..$#status) { next unless ref $status[$i]; my @row = map { qq('$ex_arg_list[$_][$i]') } 0..$#ex_arg_list; confess "Error: [$status[$i][1]] inserting [".join(",", @row)."]"; } } $_ = [] for @ex_arg_list; $r; }; $finish_f = sub { ( $cnt > 0 ) ? $commit_f->() : '0E0' }; } } bless { CNT => \$cnt, COMMIT_SIZE => $commit, EXEC_FUNC => $exec_f, COMMIT_FUNC => $commit_f, FINISH_FUNC => $finish_f, ARG_LEN => $arg_len, }, "DBIx::BulkUtil::BLK"; } sub prepare { my $self = shift; my %opt = @_; my $table = $opt{Table}; my $sql = $opt{Sql}; my $columns = $opt{Columns}; my $href = $opt{BindHash}; my $aref = $opt{BindArray}; my $by_name = defined($opt{ByName}) ? $opt{ByName} : ( !$href && !$aref ) ? 0 : ($self->type() eq 'Sybase' ) ? 0 : 1; confess "Can not supply both BindHash and BindArray" if $href && $aref; confess "Can not use BindHash or BindArray without ByName" if ( $href || $aref ) && !$by_name; confess "Must supply Table or Sql to prepare" unless $table || $sql; confess "Can not supply both Table and Sql to prepare" if $table && $sql; my $dflt_col = eval { $columns ||= $self->column_info($table)->{LIST} if $table; 1; }; confess "Table $table not found in datbase" unless $dflt_col; if ( $columns && @$columns ) { # A little overkill to get a nicely formatted SQL statement my $c_sep = ( @$columns > 5 ) ? "\n" : ''; my $cnt; my $c_ind = ( @$columns > 5 ) ? sub { ' ' } : sub { $cnt++ ? ' ' : '' }; my $h_cnt; my $h_ind = ( @$columns > 5 ) ? sub { ' ' } : sub { $h_cnt++ ? ' ' : '' }; my $v_sep = $by_name ? ( @$columns > 5 ) ? "\n" : '' : ''; my $hold = $by_name ? sub { $h_ind->() . ":$_" } : sub { "?" }; $sql ||= sprintf("INSERT INTO $table ($c_sep%s$c_sep) VALUES ($v_sep%s$v_sep)", join(",$c_sep", map $c_ind->() . $_, @$columns), join(",$v_sep", map $hold->(), @$columns), ); } print "Preparing: $sql\n"; my $sth = $self->dbh->prepare($sql); if ($href) { $sth->bind_param_inout( ":$_" => \$href->{$_}, 0 ) for @$columns; } elsif ($aref) { $sth->bind_param_inout( ":$columns->[$_]" => \$aref->[$_], 0 ) for 0..$#$columns; } return $sth; } sub prepare_upd { my $self = shift; my %args = @_; my $table = $args{Table} || die "Must supply Table option"; my $col_info = $self->column_info($table); my $col_list = $args{Columns} || $col_info->{LIST}; my $key_cols = $args{KeyCols} || $self->key_columns($table); my $upd_cols = $args{UpdCols} || $self->upd_columns($table); my $sql = <{DBH}->prepare($sql); my %col_pos; my $cnt = 0; $col_pos{$_} = $cnt++ for @$col_list; my @sth_pos; push @sth_pos, $col_pos{$_} for @$upd_cols, @$key_cols; return sub { unless (@_) { $sth->finish(); undef $sth; undef @sth_pos; return; } $sth->execute(@_[@sth_pos]); } } sub is_iq { 0 } package DBIx::BulkUtil::BLK; use Carp qw(confess); sub execute { my $self = shift; unless (@_ == $self->{ARG_LEN}) { my $arg_cnt = @_; confess "Execute argument count $arg_cnt must be $self->{ARG_LEN}"; } my $f = $self->{ARG_FUNC}; my $rows = $self->{EXEC_FUNC}->(@_); my $cnt = $self->{CNT}; if ( ++$$cnt >= $self->{COMMIT_SIZE} ) { $rows = $self->{COMMIT_FUNC}->(); $$cnt = 0; } return $rows; } sub finish { my $self = shift; $self->{FINISH_FUNC}->(); } package DBIx::BulkUtil::Sybase; use Carp qw(confess); our @ISA = qw(DBIx::BulkUtil::Obj); sub now { 'getdate()' }; sub add { my $self = shift; my $date = shift; my $n = shift; my $unit = shift; my $new_date = "dateadd( $unit, $n, $date)"; return $new_date unless @_; return $self->add( $new_date, @_ ); } sub diff { my $self = shift; my $date1 = shift; my $date2 = shift; my $unit = shift; my $new_date = "datediff( $unit, $date1, $date2)"; return $new_date; } sub row_select { my $self = shift; my $sel = shift; return "select $sel"; } sub sp_sth { my $self = shift; my $sth = $self->{DBH}->prepare($self->sp_sql(@_)); $sth->execute(); return $sth; } sub sp_sql { my $self = shift; my ($stored_proc, @args) = @_; return "exec " . join(" ", $stored_proc, join(",", map {$self->{DBH}->quote($_)} grep !/^:cursor$/, @args)); } # This is trivial in Sybase, but a necessary function for Oracle # and so makes this portably compatible sub to_datetime { my $self = shift; my $date = shift; return "'$date'"; } sub bcp_in { my $self = shift; my $optref = (ref $_[-1]) ? pop @_ : {}; my %opts = %$optref; my ( $table, $file, $dir ) = @_; my $partition = ( $table =~ s/(:\d+)$// ) ? $1 : ''; $file ||= "$table.bcp"; $dir ||= 'in'; my $dbh = $self->{DBH}; my $db = $dbh->{Name}; $db =~ /server=(\w+)/ or confess "Can't determine server for bcp"; my $server = $1; my $database = $self->curr_db(); my $user = $dbh->{Username}; my $delimiter = $opts{Delimiter} || $self->{DELIMITER}; my $row_delimiter = $opts{RowDelimiter} || "\n"; my $commit_size = $opts{CommitSize} || 1000; my $bcp_table = (!$database or $table =~ /^\w+\.\w*\.\w+$/) ? $table : ($table =~ /^\w+$/) ? "$database..$table" : ($table =~ /^\w*\.\w+$/) ? "$database.$table" : confess "Can not determine database for bcp"; $bcp_table .= $partition; # Simulate Oracle sqlldr Append/Replace/Truncate my $id_cnt; if ( $dir eq 'in' ) { my $mode = $opts{Action} || "A"; if ( $mode eq 'T' ) { my $sql = "TRUNCATE TABLE $bcp_table"; print "Executing: $sql\n"; $dbh->do($sql); } elsif ($mode eq 'R') { $self->delete($bcp_table, '', $commit_size); } confess "BCP file $file does not exist" unless -f $file; # Save some work # checking underscore ok, we just did -f above unless ( -s _ ) { print "$file is empty. Skipping bcp\n"; # Make any log file parsers happy print "0 rows copied\n"; return 0; } # All this to decide whether or not to use '-E' # Only use '-E' if there is an identity column # And GenerateId is false unless ( $opts{GenerateId} ) { my $col_info = $self->column_info($table); my $col_map = $col_info->{MAP}; if ($col_map) { for my $c ( values %$col_map ) { ++$id_cnt and last if $c->{TYPE_NAME} =~ /identity/; } } } } my ($action,$to_from) = ($dir eq 'in') ? ('Loading', 'from') : ('Exporting', 'to'); print "$action $server/$bcp_table $to_from $file\n"; my (@max_err_opt, @commit_opt, @header_opt, @id_opt); my $max_err_cnt = $opts{MaxErrors} || 0; if ( $dir eq 'in' ) { @max_err_opt = (-m => $max_err_cnt); @commit_opt = (-b => $commit_size); @header_opt = (-F => $opts{Header}+1) if $opts{Header}; @id_opt = "-E" if $id_cnt; } my $keep_temp = $opts{KeepTempFiles} || $opts{Debug}; my $in_temp_dir = $opts{TempDir} || $opts{Debug}; my $temp_dir; $temp_dir = $opts{TempDir} || "." if $in_temp_dir; require File::Temp; my @temp_dir = $in_temp_dir ? (DIR => $temp_dir) : (); my @unlink = $keep_temp ? (UNLINK => 0) : (); my $error_file = File::Temp->new( TEMPLATE => "${table}_XXXXX", SUFFIX => ".err", @temp_dir, @unlink, ); chmod(0664, $error_file->filename()); $error_file->close(); my @packet_size = $opts{PacketSize} ? ( -A => $opts{PacketSize} ) : (); my @passthru = $opts{PassThru} ? @{$opts{PassThru}} : (); my ( $fmt_file, $tmp_fmt_file ); if ( $opts{FormatFile} ) { $fmt_file = $opts{FormatFile}; } elsif ( ( $opts{ColumnList} && $opts{ColumnList} ) || ( $opts{Filler} && @{$opts{Filler}} ) ) { ($tmp_fmt_file,$fmt_file) = $self->mk_fmt_file( Table => $table, Delimiter => $delimiter, RowDelimiter => $row_delimiter, ColumnList => $opts{ColumnList}, Filler => $opts{Filler}, TempDir => $opts{TempDir}, FormatFileName => $opts{FormatFileName}, KeepTempFiles => $keep_temp, ); } my @fmt_file_opt = $fmt_file ? ( -f => $fmt_file ) : '-c'; # UTF-8 doesn't work on HP - default is roman8 on HP # Should probably make '-J' some kind of option, with maybe # a map of OS types and default values. But leave that for # a later date. my @cmd = ( bcp => $bcp_table, $dir, $file, -U => $user, #-J => "utf8", -S => $server, -t => $delimiter, -r => $row_delimiter, -e => $error_file->filename(), @header_opt, @id_opt, @commit_opt, @max_err_opt, @packet_size, @passthru, @fmt_file_opt, ); print "Executing: @cmd\n"; push @cmd, -P => $self->{PASSWORD}; open(my $fh, "-|", @cmd) or confess "Can't exec bcp: $!"; my ($rows, $failed, $partially_failed); local ($_, $.); my $err_cnt = my $c_lib_err_cnt = my $srvr_err_cnt = 0; while (<$fh>) { print; if ( /^(Server|C[TS]LIB) Message/ ) { my $msg_type = $1; if ( $msg_type eq 'CSLIB' ) { if ( m|/N(\d+)| ) { # Sybase says truncation is not an error, so we will too # Or else we might get > 1 error on the same row unless ( $1 == 36 ) { $err_cnt++; $c_lib_err_cnt++; } } } elsif ( $msg_type eq 'CTLIB' ) { $err_cnt++; $c_lib_err_cnt++; } else { # On server errors the whole batch is an error if ( /\s(\d+)/ ) { # Ignore 'slow bcp' warning unless ( $1 == 4852 ) { $err_cnt += $commit_size; $srvr_err_cnt += $commit_size; } } else { $err_cnt += $commit_size; $srvr_err_cnt += $commit_size; } } } $rows = $1 if /^(\d+) rows copied/; # failed or partially failed if ( /^bcp copy in ((?:partially )?)failed/ ) { $partially_failed++ if $1; $failed++; } } # "NaN" (literally "NaN") to numeric errors # do not show up on STDOUT. # So we may as well search the err file to count # all CSLIB and CTLIB errors. # Truncation errors do not show up in file, so we # don't have to filter them out as we would if we # were parsing STDOUT. my $err_file_cnt = 0; open(my $err_h, "<", $error_file->filename()) or die "Failed to open $error_file: $!"; while (<$err_h>) { $err_file_cnt++ if /^#@ Row \d+: Not transferred/; } close $err_h; if ( $err_file_cnt > $c_lib_err_cnt ) { $err_cnt += $err_file_cnt - $c_lib_err_cnt; } # BCP 11.x,12.x returns meaningful exit status # 10.x does not (returns 0 even on errors) my $close_success = close $fh; unless ($close_success) { my $exit_stat = $? >> 8; my $exit_sig = $? & 127; my $exit_core = $? & 128; # bcp will exit with non-zero status on any 'Server' error, # but not on 'CSLIB' errors unless 'CSLIB' error count exceeds max. if ( $exit_stat != 0 ) { if ( $dir eq 'in' ) { # Some of this may seem unneccessary, but Sybase bcp is # horribly inconsistent. # Exceeded the error count confess "BCP error - max error count ($max_err_cnt) exceeded - bcp returned status $exit_stat: $!" if $err_cnt > $max_err_cnt; # The load was aborted before bcp indicated that it finished confess "BCP error - bcp aborted [$exit_stat]: $!" if !defined($rows) and !$failed; # BCP failed - even if we allow some errors on a small file, if zero rows are copied # then call it a total failure. confess "BCP error - bcp failed [$exit_stat]: $!" if $failed and !$partially_failed; } else { confess "BCP error - bcp returned status $exit_stat: $!"; } } confess "BCP error - bcp recieved signal $exit_sig" if $exit_sig > 0; confess "BCP error - bcp coredumped" if $exit_core; } # Will miss error count exceeded error on 10.x # But will catch other errors if load is aborted # Or no rows are loaded. confess "BCP error - no rows copied" if !defined($rows); # CTLIB errors do not cause non-zero exit - so catch them here confess "BCP error - max error count ($max_err_cnt) exceeded" if $err_cnt > $max_err_cnt; $rows ||= 0; return $rows; } { no warnings 'once'; *bcp = \&bcp_in; } sub mk_fmt_file { my $self = shift; my %opts = @_; my $table = $opts{Table} || die "Table required for mk_fmt_file"; my $col_info = $self->column_info($table); my $db_col_list = $col_info->{LIST}; my %is_db_column; $is_db_column{$_}++ for @$db_col_list; my %is_filler; if ( $opts{Filler} ) { $is_filler{lc($_)}++ for @{$opts{Filler}}; } my ($tmp_fmt_file,$fmt_file); if ( $opts{FormatFileName} ) { $fmt_file = $opts{FormatFileName}; } else { require File::Temp; my $keep_temp = $opts{KeepTempFiles} || $opts{Debug}; my $in_temp_dir = $opts{TempDir} || $opts{Debug}; my $temp_dir; $temp_dir = $opts{TempDir} || "." if $in_temp_dir; my @temp_dir = $in_temp_dir ? (DIR => $temp_dir) : (); my @unlink = ( $keep_temp || !defined(wantarray) ) ? (UNLINK => 0) : (); $tmp_fmt_file = File::Temp->new( TEMPLATE => "${table}_XXXXX", SUFFIX => ".fmt", @temp_dir, @unlink, ); $fmt_file = $tmp_fmt_file->filename(); chmod(0664, $tmp_fmt_file); $tmp_fmt_file->close(); } my $delim = $opts{Delimiter} || "|"; my $row_delim = $opts{RowDelimiter} || "\n"; # Need escaped text in fmt file # for CR/LF for ($delim,$row_delim) { s/\n/\\n/g; s/\r/\\r/g; } my @col_list = ( $opts{ColumnList} && @{$opts{ColumnList}} ) ? @{$opts{ColumnList}} : @{$col_info->{LIST}}; my $ncols = @col_list; open( my $fh, ">", $fmt_file ) or confess "Failed to open $fmt_file: $!"; print $fh "10.0\n"; print $fh "$ncols\n"; my $col_map = $col_info->{MAP}; for my $i (1..$ncols) { my $name = $col_list[$i-1]; my $d = ( $i == $ncols ) ? $row_delim : $delim; my @row = ($i, 'SYBCHAR', 0); if ($is_filler{lc($name)}) { push @row, 0, qq["$d"], 0; } elsif ($is_db_column{lc($name)}) { my $info = $col_map->{lc($name)}; # Native Sybase date format size is 26 though metadata says 23 # For numbers, add extra for decimal my $size = ( $info->{TYPE_NAME} =~ /date/ ) ? 26 : ( $info->{TYPE_NAME} =~ /char|text/ ) ? $info->{COLUMN_SIZE} : $info->{COLUMN_SIZE} + 1; push @row, $size, qq["$d"], $info->{ORDINAL_POSITION}, $name; } else { confess "$name is neither a db nor filler column" } print $fh join("\t", @row), "\n"; } close $fh; # Also return temp object so it will not be cleaned up yet return wantarray ? ($tmp_fmt_file, $fmt_file) : $tmp_fmt_file ? $tmp_fmt_file : $fmt_file; } sub bcp_out { my $self = shift; my @opts; if (ref $_[-1]) { @opts = pop @_; } my ($table, $file) = @_; $file ||= "$table.bcp"; my $scratchdb = @opts ? $opts[0]{TempDb} || 'scratchdb' : 'scratchdb'; # Sybase rounds money columns, need to bcp a view of it # if any exist. my $dbh = $self->{DBH}; # Need to save current db in case view is created my $curr_db = $self->curr_db(); my $view = $self->mk_view($table, @opts); my $rows = eval { $self->bcp($view || $table, $file, 'out', @opts) }; unless (defined $rows) { my $err = $@; if ($view) { warn "BCP error detected - dropping view $view\n"; my $result = eval { $dbh->do("DROP VIEW $view") }; warn "Unable to drop view $view: $@" unless $result; $dbh->do("USE $curr_db") if !$self->is_iq() and $curr_db; } confess $err; } if ($view) { print "Dropping view $view\n"; $dbh->do("DROP VIEW $view"); $dbh->do("USE $curr_db") if !$self->is_iq() and $curr_db; } if ( !@opts or !$opts[0]{NoFix} ) { my $bak = eval { $self->fix_bcp_file($file, @opts) }; if ( $bak ) { unlink $bak; } else { warn "Error processing $file. BCP file in $file.bak: $@\n"; return; } } if ( @opts and ( $opts[0]{Header} || $opts[0]{QuoteFields} ) ) { my $bak = eval { $self->add_header($table, $file, @opts) }; if ( $bak ) { unlink $bak; } else { warn "Error post processing $file. BCP file in $file.bak: $@\n"; return; } } return $rows; } sub mk_view { my ($self,$table) = @_; my @opts; @opts = pop @_ if ref $_[-1]; my $scratchdb = @opts ? $opts[0]{TempDb} || 'scratchdb' : 'scratchdb'; # Sybase rounds money columns, need to bcp a view of it # if any exist. my $dbh = $self->{DBH}; my $col_info = $self->column_info($table); # Columns might be a string from a SELECT clause # Or it might be an arrayref of columns my $col_list = ( @opts && $opts[0]{Columns} ) ? $opts[0]{Columns} : $col_info->{LIST}; my $col_map = $col_info->{MAP}; my @columns; my $money_cnt = 0; my $column_str; if ( ref $col_list ) { for my $name (@$col_list) { my $col_name = $name; if ( my $info = $col_map->{$name} ) { my $type = $info->{TYPE_NAME}; $col_name = $info->{COLUMN_NAME}; if ($type =~ /money/) { $money_cnt++; my $len = ($type =~ /small/) ? 10 : 19; $col_name = "convert(decimal($len,4), $col_name) $col_name"; } } push @columns, $col_name; } $column_str = join ",", @columns; } else { $column_str = $col_list; } return if $money_cnt==0 and !$opts[0]{Filter} and !$opts[0]{Columns}; my ($view, $db_view); my $curr_db = $self->curr_db(); if ( !$curr_db and $table =~ /^(\w+)\.\w*\.\w+$/ ) { $curr_db = $1; } confess "Can not determine database" unless $curr_db; my $base_table = (!$curr_db or $table =~ /^\w+\.\w*\.\w+$/) ? $table : ($table =~ /^\w+$/) ? "$curr_db..$table" : ($table =~ /^\w*\.\w+$/) ? "$curr_db.$table" : confess "Can not determine database for view"; ( my $tmp_view = $base_table ) =~ s/.*\.//; $tmp_view = substr($tmp_view, 0, 19) if length($tmp_view) > 19; $dbh->do("USE $scratchdb") unless $self->is_iq(); my $cnt; while (1) { my ($sec, $min, $hr) = localtime; my $id = sprintf("%05d%02d%02d%02d", $$, $hr, $min, $sec); $view = "${tmp_view}${id}"; $db_view = $self->is_iq() ? $view : "$scratchdb..$view"; my $sql = sprintf( "CREATE VIEW %s AS SELECT %s FROM %s", $view, $column_str, $base_table, ); $sql .= " $opts[0]{Filter}" if @opts && $opts[0]{Filter}; print "Creating view $db_view\n"; print "Executing: $sql\n"; my $result = eval { $dbh->do($sql) }; return $view if $result; confess $@ unless $@ =~ /already an object/; $cnt++; confess "Too many retries trying to create view $db_view. Aborting" if $cnt > 20; print "View $db_view already exists, retrying #$cnt..."; sleep 2; } } # Fix native date format from Sybase bcp out { my %mons = qw( Jan 1 Feb 2 Mar 3 Apr 4 May 5 Jun 6 Jul 7 Aug 8 Sep 9 Oct 10 Nov 11 Dec 12 ); my $mon_str = join '|', keys %mons; my $mon_re = qr/$mon_str/; sub fix_bcp_file { my ( $self, $file ) = @_; my $opts = {}; if (ref $_[-1]) { $opts = pop @_; } my $delimiter = $opts->{Delimiter} || $self->{DELIMITER} || '|'; my $dre = quotemeta($delimiter); local ($_, $., $ARGV, *ARGV); local ( $^I, @ARGV ) = ( '.bak', $file ); local $/ = $opts->{RowDelimiter} || $/; while ( <> ) { 1 while s!(^|$dre)($mon_re)\s{1,2}(\d{1,2})\s(\d{4})\s\s?(\d\d?):(\d\d):(\d\d):(\d{3})([AP])M($dre|$/)! $1 . sprintf( '%04d-%02d-%02d %02d:%02d:%02d.%03d', $4, $mons{ $2 }, $3, ( $9 eq 'P' && $5 < 12) ? $5 + 12 : ( $9 eq 'A' && $5 == 12 ) ? 0 : $5, $6, $7, $8 ) . $10 !eg; 1 while s!(^|$dre)($mon_re)\s{1,2}(\d{1,2})\s(\d{4})\s\s?(\d\d?):(\d\d)([AP])M($dre|$/)! $1 . sprintf( '%04d-%02d-%02d %02d:%02d', $4, $mons{ $2 }, $3, ( $7 eq 'P' && $5 < 12) ? $5 + 12 : ( $7 eq 'A' && $5 == 12 ) ? 0 : $5, $6 ) . $8 !eg; 1 while s!(^|$dre)($mon_re)\s{1,2}(\d{1,2})\s(\d{4})($dre|$/)! $1 . sprintf( '%04d-%02d-%02d', $4, $mons{ $2 }, $3 ) . $5 !eg; print; } return "$file.bak"; } } { my %type_map = ( 'V' => 'V', 'P' => 'P', 'U' => 'T' ); sub obj_type { my ( $self, $name ) = @_; my $dbh = $self->{DBH}; my $qname = $dbh->quote($name); my ( $type ) = $dbh->selectrow_array("select type from sysobjects where name = $qname"); return unless $type; return $type_map{$type} || confess "Don't know about type $type for object $name"; } } sub curr_db { my $self = shift; $self->get('db_name()'); } sub curr_schema { undef } { # Can get errors in some databases if you don't add dbo to everything my $sql_t = <{DBH}; my $schema = ''; $tmp_db = $curr_db = $self->curr_db(); if ( $table =~ /^(?:(\w+)\.)?(\w*)\.(\w+)$/ ) { ($tmp_db, $schema, $table) = ($1,$2,$3); $table = "$schema.$table"; # We can only get info on the current database if ( defined($tmp_db) and $tmp_db ne $curr_db ) { $dbh->do("USE $tmp_db"); } } my $sql = sprintf $sql_t, $dbh->quote($table); $sql .= "AND dbo.sysindexes.status & 2 = 2\n" unless $all_indexes; $sql .= "ORDER BY dbo.syscolumns.colid\n"; my $sth = $dbh->prepare($sql); $sth->execute(); my @col_names = @{$sth->{NAME_lc}}; my %row; $sth->bind_columns(\@row{@col_names}); my %ind; while ($sth->fetch()) { if ( $row{col_name} ) { push @{$ind{$row{name}}}, lc($row{col_name}); } } $dbh->do("USE $curr_db") if defined($tmp_db) and $tmp_db ne $curr_db; return unless %ind; return \%ind; } } sub primary_key { my ( $self, $table ) = @_; my $schema; my ($tmp_db, $curr_db) = (undef,''); my $dbh = $self->{DBH}; $tmp_db = $curr_db = $self->curr_db(); if ( $table =~ /^(?:(\w+)\.)?(\w*)\.(\w+)$/ ) { ($tmp_db, $schema, $table) = ($1,$2,$3); $schema ||= undef; # We can only get column info on the current database $dbh->do("USE $tmp_db") if defined($tmp_db) and $tmp_db ne $curr_db; } my @pk = $self->{DBH}->primary_key($tmp_db, $schema, $table); $dbh->do("USE $curr_db") if defined($tmp_db) and $tmp_db ne $curr_db; return unless @pk; return \@pk; } { my $del_sql = <{DBH}; my $table = lc($args{Table}); my $stg_table = lc($args{StgTable}); my $tbl_info = $self->column_info($table); my $tbl_map = $tbl_info->{MAP}; my $stg_info = $self->column_info($stg_table); my $stg_map = $stg_info->{MAP}; my %stg_has; $stg_has{$_}++ for @{$stg_info->{LIST}}; my $key_col_ref = ($args{KeyCols} && @{$args{KeyCols}}) ? $args{KeyCols} : $self->key_columns($table); my $upd_col_ref = ($args{UpdCols} && @{$args{UpdCols}}) ? $args{UpdCols} : $self->upd_columns($table, $key_col_ref); my @key_cols = map $tbl_map->{lc($_)}{COLUMN_NAME}, @$key_col_ref; my %is_key_col; $is_key_col{$_}++ for map lc, @$key_col_ref; my @upd_cols = map $tbl_map->{lc($_)}{COLUMN_NAME}, @$upd_col_ref; my %is_upd_col; $is_upd_col{$_}++ for map lc, @$upd_col_ref; my %tmp_col_map; %tmp_col_map = map lc, %{$args{ColMap}} if $args{ColMap}; # Column map for upd statement, which must map the correct case # to the correct case. my %col_map = map {( $_ => ( $tmp_col_map{lc($_)} ? $stg_map->{lc($tmp_col_map{lc($_)})}{COLUMN_NAME} : $stg_map->{lc($_)}{COLUMN_NAME} ))} @key_cols, @upd_cols; # Correctly cased field list for bcp select from stage table statement # Either it's in the explicit column map, or it's a key or upd column # with the same name as the target table, # or it can be last_chg_user or date my @fields = map { ($_ eq 'last_chg_user' && !$stg_has{last_chg_user}) ? 'suser_name()' : ($_ eq 'last_chg_date' && !$stg_has{last_chg_date}) ? 'getdate()' : $tmp_col_map{$_} ? $stg_has{$tmp_col_map{$_}} ? $stg_map->{$tmp_col_map{$_}}{COLUMN_NAME} : $tmp_col_map{$_} : ( $is_key_col{$_} || $is_upd_col{$_} ) ? $stg_has{$_} ? $stg_map->{$_}{COLUMN_NAME} : () : $stg_map->{$_} ? $stg_map->{$_}{COLUMN_NAME} : confess "Failed to map target column $table.$_" } @{$tbl_info->{LIST}}; my $field_str = join(",", @fields); my $key_col_str = join("\nAND ", map "d.$_=s.".($col_map{$_}||$_), @key_cols); my $del_merge_sql = sprintf($del_sql, $table, $table, $stg_table, $key_col_str, ); print("Executing: $del_merge_sql\n"); unless ($args{NoExec}) { my $del_rows = $dbh->do($del_merge_sql) + 0; print("$del_rows rows deleted from $table\n\n"); } my $ins_merge_sql = sprintf($ins_sql, $field_str, $stg_table, ); print("Inserting to $table: $ins_merge_sql\n"); return 1 if $args{NoExec}; my $ins_rows = ( $args{NoBCP} or ($stg_table =~ /^#/) ) ? $dbh->do("INSERT INTO $table\n$ins_merge_sql") + 0 : $self->bcp_sql($table, $ins_merge_sql) + 0; print("$ins_rows rows inserted to $table\n\n"); return 1; } } # This merge is destructive to the staging table # Only 'new' rows will be left in the staging table { my $upd_sql = <{DBH}; my $table = lc($args{Table}); my $stg_table = lc($args{StgTable}); my $tbl_info = $self->column_info($table); my $tbl_map = $tbl_info->{MAP}; my $stg_info = $self->column_info($stg_table); my $stg_map = $stg_info->{MAP}; my %stg_has; $stg_has{$_}++ for @{$stg_info->{LIST}}; my $key_col_ref = ($args{KeyCols} && @{$args{KeyCols}}) ? $args{KeyCols} : $self->key_columns($table); my $upd_col_ref = ($args{UpdCols} && @{$args{UpdCols}}) ? $args{UpdCols} : $self->upd_columns($table); my @key_cols = map $tbl_map->{lc($_)}{COLUMN_NAME}, @$key_col_ref; my %is_key_col; $is_key_col{$_}++ for map lc, @$key_col_ref; my @upd_cols = map $tbl_map->{lc($_)}{COLUMN_NAME}, @$upd_col_ref; my %is_upd_col; $is_upd_col{$_}++ for map lc, @$upd_col_ref; my %tmp_col_map; %tmp_col_map = map lc, %{$args{ColMap}} if $args{ColMap}; # Column map for upd statement, which must map the correct case # to the correct case. my %col_map = map {( $_ => ( $tmp_col_map{lc($_)} ? $stg_map->{lc($tmp_col_map{lc($_)})}{COLUMN_NAME} : $stg_map->{lc($_)}{COLUMN_NAME} ))} @key_cols, @upd_cols; # Correctly cased field list for bcp select from stage table statement # Either it's in the explicit column map, or it's a key or upd column # with the same name as the target table, # or it can be last_chg_user or date my @fields = map { ($_ eq 'last_chg_user' && !$stg_has{last_chg_user}) ? 'suser_name()' : ($_ eq 'last_chg_date' && !$stg_has{last_chg_date}) ? 'getdate()' : $tmp_col_map{$_} ? $stg_map->{$tmp_col_map{$_}}{COLUMN_NAME} : ( $is_key_col{$_} || $is_upd_col{$_} ) ? $stg_map->{$_}{COLUMN_NAME} : $stg_map->{$_} ? $stg_map->{$_}{COLUMN_NAME} : confess "Failed to map target column $table.$_" } @{$tbl_info->{LIST}}; my $field_str = join(",", @fields); my $key_col_str = join("\nAND ", map "d.$_=s.".($col_map{$_}||$_), @key_cols); my $upd_col_str = join(",", map "$_=s.".($col_map{$_}||$_), @upd_cols); # Determine if last_chg_user, last_chg_date need to be updated my %chg_col = $self->last_chg_list($table, \@fields); for my $col ( sort { $b cmp $a } keys %chg_col ) { $upd_col_str .= ",$col=".( ($col eq 'last_chg_user') ? 'suser_name()' : 'getdate()'); } unless ($args{InsertOnly}) { my $upd_merge_sql = sprintf($upd_sql, $table, $upd_col_str, $table, $stg_table, $key_col_str, ); print("Executing: $upd_merge_sql\n"); unless ($args{NoExec}) { my $upd_rows = $dbh->do($upd_merge_sql) + 0; print("$upd_rows rows updated in $table\n\n"); } } my $del_merge_sql = sprintf($del_sql, $stg_table, $stg_table, $table, $key_col_str, ); print("Executing: $del_merge_sql\n"); unless ($args{NoExec}) { my $del_rows = $dbh->do($del_merge_sql) + 0; print("$del_rows rows deleted from $stg_table\n\n"); } my $ins_merge_sql = sprintf($ins_sql, $field_str, $stg_table, ); print("Inserting to $table: $ins_merge_sql\n"); return 1 if $args{NoExec}; my $ins_rows = $self->bcp_sql($table, $ins_merge_sql) + 0; print("$ins_rows rows inserted to $table\n\n"); return 1; } } # BCP (via sqsh) the results of a sql select statement into a table sub bcp_sql { my $self = shift; my ($table,$sql) = @_; my $dbh = $self->{DBH}; my $db = $dbh->{Name}; $db =~ /server=(\w+)/ or confess "Can't determine server for bcp"; my $server = $1; my $database = $self->curr_db(); my $user = $dbh->{Username}; my $bcp_table = (!$database or $table =~ /^\w+\.\w*\.\w+$/) ? $table : ($table =~ /^\w+$/) ? "$database..$table" : ($table =~ /^\w*\.\w+$/) ? "$database.$table" : confess "Can not determine database for sqsh/bcp"; local $ENV{SQSH} = "-U $dbh->{Username} -P $self->{PASSWORD}"; my $pid = open(my $fh, "-|"); confess "Can't fork: $!" unless defined $pid; unless ($pid) { # sqsh needs library path set - make sure it is set # Don't know where it is in generic environment, or best way # to universally set this, or even if this is necessary in general... # local $ENV{LD_LIBRARY_PATH} = '/path/to/sybase/OCS-12_5/lib'; my @cmd = (sqsh => -S => $server, -D => $database); my $sqsh_fh; # sqsh outputs to stderr open(STDERR, ">&STDOUT"); unless ( open($sqsh_fh, "|-", @cmd) ) { warn "Unable to exec @cmd: $!"; exit(1); } print $sqsh_fh "$sql\n"; print $sqsh_fh "\\bcp -b 1000 $bcp_table\n"; my $status = close $sqsh_fh; exit($status ? 0 : 1); } my $rows; local ($_, $.); my $cnt; while (<$fh>) { if (/^Batch successfully bulk-copied/) { $cnt += 1000; print "$cnt: $_" unless $cnt % 10_000; next; } print; $rows = $1 if /^\s*(\d+) rows copied/; } my $close_status = close $fh; confess "SQSH BCP error - no rows copied" unless defined $rows; confess "SQSH BCP error - $rows rows copied" unless $close_status; # Return true value return $rows; } # SQL to return table column defaults { my $sql = <{DBH}; my ($spid) = $dbh->selectrow_array('select @@spid'); print "SPid: $spid\n"; my $who = $dbh->selectrow_hashref("exec sp_who '$spid'"); my $tempdb = $who->{tempdbname} || 'tempdb'; print "TempDb: $tempdb\n"; my ($id) = $dbh->selectrow_array("select object_id('$tempdb..$name')"); print "ID: $id\n"; my ($real_name) = $dbh->selectrow_array("select object_name($id, db_id('$tempdb'))"); print "RealName: $real_name\n"; return "$tempdb..$real_name"; } sub delete { my ($self, $table, $where, $limit) = @_; my $dbh = $self->{DBH}; $dbh->{syb_rowcount} = $limit || 1000; my $sql = "DELETE FROM $table"; $sql .= " WHERE $where" if $where; my ($rows, $tot_rows); my ($err, $err_msg); print "Executing: $sql\n"; do { $rows = eval { $dbh->do($sql) }; unless ($rows) { $err_msg = $@; $err++; $rows = 0; } $tot_rows += $rows; print "Deleted $tot_rows rows\n" if $rows > 0; } while $rows > 0; $dbh->{syb_rowcount} = 0; confess $err_msg if $err; print "$tot_rows rows deleted from $table\n"; return $tot_rows; } package DBIx::BulkUtil::SybaseIQ; use Carp qw(confess); use Cwd qw(abs_path); our @ISA = qw(DBIx::BulkUtil::Sybase); { my $sql = <{DBH}; my $delimiter = $opts->{Delimiter} || $self->{DELIMITER}; my $row_delimiter = $opts->{RowDelimiter} || "\n"; my $id_cnt; my $mode = $opts->{Action} || "A"; if ( $mode eq 'T' ) { my $sql = "TRUNCATE TABLE $table"; print "Executing: $sql\n"; $dbh->do($sql); } elsif ($mode eq 'R') { my $sql = "DELETE FROM $table"; print "Executing: $sql\n"; $dbh->do($sql); } my @bcp_list; for my $file (@files) { confess "BCP file $file does not exist" unless -f $file; unless ( -s _ ) { print "$file is empty. Skipping ...\n"; next; } push @bcp_list, $file; } unless ( @bcp_list ) { print "All files are empty. Skipping bcp of $table\n"; # Make any log file parsers happy print "0 rows copied\n"; return 0; } my $info = $self->column_info($table); my $col_list = ( $opts->{ColumnList} && @{$opts->{ColumnList}} ) ? $opts->{ColumnList} : $info->{LIST}; my @filler = $opts->{Filler} ? @{$opts->{Filler}} : (); my %is_filler; $is_filler{$_}++ for @filler; # Convert empty string to NULL # Should be default but we don't want to break existing apps my $null_blanks = $self->{NoBlankNull} ? ' NULL(BLANKS)' : ''; # Columns that we will let default to the schema default my $dflt = $opts->{Default} || []; my %dflt; $dflt{$_}++ for @$dflt; my $constant = $opts->{Constants} || {}; my @list = grep !defined($constant->{$_})&&!$dflt{$_}, @$col_list; my $last_col = $list[-1]; # It is best to explicitly put the row delimiter on the last column my $load_sql = sprintf( $sql, $table, join( ",\n", map { defined($constant->{$_}) ? qq( [$_] DEFAULT '$constant->{$_}') : ( $_ ne $last_col ) ? $is_filler{$_} ? qq( FILLER('$delimiter')) : qq( [$_] '$delimiter'$null_blanks) : ( $opts->{TrailingDelimiter} ) ? $is_filler{$_} ? qq( FILLER('$delimiter$row_delimiter')) : qq( [$_] '$delimiter$row_delimiter'$null_blanks) : $is_filler{$_} ? qq( FILLER('$row_delimiter')) : qq( [$_] '$row_delimiter'$null_blanks) } grep !$dflt{$_}, @$col_list), join( ",\n ", map { "'". abs_path($_) . "'" } @bcp_list), ); $load_sql .= "SKIP $opts->{Header}\n" if $opts->{Header}; # '0' indicates unlimited errors to IQ, but will be skipped here since '0' is false # That's okay, '00' might work (it is 'true' and == 0). $load_sql .= "IGNORE CONSTRAINT ALL $opts->{MaxErrors}\n" if $opts->{MaxErrors}; my $db = $dbh->{Name}; $db =~ /server=(\w+)/ or confess "Can't determine server for bcp"; my $server = $1; my $database = $self->curr_db(); print "Loading $server/$database/$table\n"; print "Executing: $load_sql\n"; my $rows = $dbh->do($load_sql) + 0; print "$rows rows copied\n"; return $rows; } } { my $sql = <{DBH}->do("INSERT INTO $table\n$sql"); } sub is_iq {1} sub index_info { my ( $self, $table, $all_indexes ) = @_; my $dbh = $self->{DBH}; my $sql = "exec sp_iqindex [$table]"; my $sth = $dbh->prepare($sql); $sth->execute(); my @col_names = @{$sth->{NAME_lc}}; my %row; $sth->bind_columns(\@row{@col_names}); my %ind; while ($sth->fetch()) { next if !$all_indexes and $row{unique_index} ne 'Y'; $ind{$row{index_name}} = [ split /,/, $row{column_name} ]; } return unless %ind; return \%ind; } package DBIx::BulkUtil::Oracle; use Carp qw(confess); use Cwd qw(abs_path); our @ISA = qw(DBIx::BulkUtil::Obj); sub now { 'systimestamp' } sub add { my $self = shift; my $date = shift; while (my ( $n, $unit ) = splice( @_, 0, 2 ) ) { $date .= " + numtodsinterval( $n, '$unit' )"; } return $date; } { my %intervals = ( year => '/ 365', month => '/ 30', hour => '* 24', minute => '* 24 * 60', second => '* 24 * 60 * 60', ); sub diff { my $self = shift; my $date1 = shift; my $date2 = shift; my $unit = shift; my $diff_str = "$date2 - $date1"; if (my $str = $intervals{$unit}) { $diff_str = "($diff_str) $str"; } return "trunc($diff_str)"; } } # This is necessary when you want to use a literal # date in a datetime calculation sub to_datetime { my $self = shift; my $date = shift; return "to_timestamp('$date', 'YYYY-MM-DD HH24:MI:SS.FF')"; } # Don't need this with new version of DBI/DBD #sub to_char { # my $self = shift; # my $date = shift; # return "to_char($date, 'YYYY-MM-DD HH24:MI:SS')"; #} # #sub fmt { return $_[1] } sub row_select { my $self = shift; my $sel = shift; return "select $sel from dual"; } sub sp_sth { my $self = shift; my $sth = $self->{DBH}->prepare($self->sp_sql(@_)); $sth->bind_param_inout(":cursor", \my $sth2, 0, { ora_type => DBD::Oracle::ORA_RSET() }); $sth->execute(); return $sth2; } sub sp_sql { my $self = shift; my ($stored_proc, @args) = @_; return "BEGIN\n$stored_proc(" . join(",", map { /^:cursor$/ ? $_ : $self->{DBH}->quote($_) } @args) . ");\nEND;\n"; } { my %action_map = ( A => "APPEND", R => "REPLACE", T => "TRUNCATE", ); sub bcp_in { my $self = shift; my $opts = {}; if (ref $_[-1]) { $opts = pop @_; } my $action_opt = uc($opts->{Action} || "A"); my ( $table, @files ) = @_; my $partition = ( $table =~ s/:(\w+)$// ) ? $1 : ''; my $dbh = $self->{DBH}; my $stdin = $opts->{Stdin}; @files = "$table.bcp" if !@files && !$stdin; my $has_stdin; for my $file (@files) { if ( $file eq "-" ) { $has_stdin++; next; } confess "BCP file $file does not exist" unless -f $file; } if ( $has_stdin && !$stdin ) { $stdin = \*STDIN; } elsif ( $stdin && !$has_stdin ) { push @files, "-"; } # Save some work, skip load on empty file # Let sqlldr do a heavy handed truncate or delete # if that is the chosen action my @bcp_files = grep { $_ eq "-" or -s } @files; if ( !@bcp_files ) { if ( $action_opt eq 'A') { print "$files[0],... is empty. Skipping sqlldr\n"; # Make any log file parsers happy print "0 Rows successfully loaded\n"; return 0; } # Need some files if we run sqlldr @bcp_files = @files; } require File::Temp; my $constants = $opts->{Constants} || {}; my %const = map { uc($_) => $constants->{$_} } keys %$constants; my $sizes = $opts->{CharSizes} || {}; my %char_sizes = map { uc($_) => $sizes->{$_} } keys %$sizes; my $keep_temp = $opts->{KeepTempFiles} || $opts->{Debug}; my $in_temp_dir = $opts->{TempDir} || $opts->{Debug}; my $temp_dir; $temp_dir = $opts->{TempDir} || "." if $in_temp_dir; my @temp_dir = $in_temp_dir ? (DIR => $temp_dir) : (); my @unlink = $keep_temp ? (UNLINK => 0) : (); my $ctl_fh = File::Temp->new( TEMPLATE => "${table}_XXXXX", SUFFIX => ".ctl", @temp_dir, @unlink, ); chmod(0664, $ctl_fh->filename()); my $bad_fh = File::Temp->new( TEMPLATE => "${table}_XXXXX", SUFFIX => ".bad", @temp_dir, @unlink, ); chmod(0664, $bad_fh->filename()); my $log_fh = File::Temp->new( TEMPLATE => "${table}_XXXXX", SUFFIX => ".log", @temp_dir, @unlink, ); chmod(0664, $log_fh->filename()); my $prm_fh = $stdin ? File::Temp->new( TEMPLATE => "${table}_XXXXX", SUFFIX => ".prm", @temp_dir, ) : undef; # NLS date format env variable does not work # for sqlldr. # So we must determine date fields and # specify the format in the control file. my $db = $self->{DBH}->{Name}; my $user = $dbh->{Username}; my ($schema, $tbl_name) = split /\./, uc($table); if (!$tbl_name) { $tbl_name = $schema; $schema = $self->curr_schema(); } my $sth = $dbh->column_info(undef, $schema, $tbl_name, undef); my @info_names = @{$sth->{NAME_uc}}; my %row; $sth->bind_columns(\@row{@info_names}); my (@columns, %is_date, %char_sz, %is_lob); print "ColumnName Type Size\n" if $opts->{Debug}; print "----------------\n" if $opts->{Debug}; while ($sth->fetch()) { push @columns, $row{COLUMN_NAME}; print "$row{COLUMN_NAME}\t$row{TYPE_NAME}\t$row{COLUMN_SIZE}\n" if $opts->{Debug}; $char_sz{$row{COLUMN_NAME}} = exists($char_sizes{$row{COLUMN_NAME}}) ? $char_sizes{$row{COLUMN_NAME}} : $row{COLUMN_SIZE} if $row{TYPE_NAME} =~ /CHAR/; $char_sz{$row{COLUMN_NAME}} = exists($char_sizes{$row{COLUMN_NAME}}) ? $char_sizes{$row{COLUMN_NAME}} : 20_000_000, $is_lob{$row{COLUMN_NAME}} = 1 if $row{TYPE_NAME} =~ /TEXT|LOB|XML/; $is_date{$row{COLUMN_NAME}} = $1 if $row{TYPE_NAME} =~ /(DATE|TIMESTAMP)/; } confess("Table $schema.$tbl_name not found in database $db") unless @columns; # Find date formats in file, remove constants from column list my %date_fmt; my @file_columns = grep !defined($const{$_}), ( ( $opts->{ColumnList} && @{$opts->{ColumnList}} ) ? ( map uc, @{$opts->{ColumnList}} ) : @columns ); if (%is_date) { # We don't want to sample rows from stdin my @real_files = grep { $_ ne "-" } @files; %date_fmt = $self->date_masks_from_file( \@real_files, \@file_columns, \%is_date, $opts) if @real_files; } my $row_delim_str = $opts->{RowDelimiter} ? qq("str '$opts->{RowDelimiter}'"\n) : ''; my $delimiter = $opts->{Delimiter} || $self->{DELIMITER}; my $action = $action_map{$action_opt} || "APPEND"; my $direct_load_pre = ''; my $direct_load_post = ''; my $sqlldr_opts = ''; my $max_errors = $opts->{MaxErrors} || 0; $sqlldr_opts .= "ERRORS=$max_errors"; $sqlldr_opts .= ", SKIP=$opts->{Header}" if $opts->{Header}; if ($opts->{DirectPath}) { my $parallel = ( uc($opts->{DirectPath}) eq 'P' ) ? ", PARALLEL=TRUE" : ''; $direct_load_pre = "OPTIONS(DIRECT=TRUE$parallel, ROWS=1000000, $sqlldr_opts)\nUNRECOVERABLE\n"; $direct_load_post = "REENABLE DISABLED_CONSTRAINTS\n"; } else { my $commit_rows = $opts->{CommitSize} || 2000; $direct_load_pre = "OPTIONS (ROWS=$commit_rows, BINDSIZE=5000000, READSIZE=20970000, $sqlldr_opts)\n"; } my $default_date_fmt = $opts->{SybaseDateFmt} ? 'MON DD YYYY HH12:MI:SS:FF3AM' : $opts->{DateFormat} ? $opts->{DateFormat} : 'YYYY-MM-DD HH24:MI:SS.FF3' ; for ( keys %is_date ) { $date_fmt{$_} ||= $default_date_fmt; $is_date{$_} = 'TIMESTAMP' if $date_fmt{$_} =~ /FF|TZ[DHMR]/; } my $quote_str = $opts->{QuoteFields} ? qq( OPTIONALLY ENCLOSED BY '"') : '' ; if ( $opts->{LoadWhen} ) { $direct_load_post .= "WHEN $opts->{LoadWhen}\n"; } my $nls_str = ''; $nls_str = "CHARACTERSET $opts->{NLSLang}" if $opts->{NLSLang}; $nls_str .= " LENGTH SEMANTICS $opts->{Semantics}" if $opts->{Semantics}; $nls_str .= "\n" if $nls_str; my %sybase_type; @sybase_type{@file_columns} = @{$opts->{SybaseTypes}} if $opts->{SybaseTypes}; # Logic for trimming or preserving blanks on char/varchar columns my $blank_control = sub { my $size = $char_sz{$_}; return " $_ CHAR($size) PRESERVE BLANKS" if $opts->{PreserveBlanks} or $size == 1; if ( $opts->{SybaseTypes} ) { # On the off chance a Sybase char column becomes an Oracle BLOB return qq[ $_ CHAR($size)] if $is_lob{$_}; return qq[ $_ CHAR($size) "NVL(RTRIM(:$_),' ')"] if $sybase_type{$_} eq 'char'; } else { return qq[ $_ CHAR($size)] if $is_lob{$_}; return qq[ $_ CHAR($size) "NVL(RTRIM(:$_),' ')"] if $opts->{TrimBlanks}; } return " $_ CHAR($size)"; }; my $field_ref = $opts->{FieldRef} || {}; my %field_ref = map { my $col = $_; my $tmp = $field_ref->{$col}; my $v = ( $tmp =~ s/^~// ) ? "POSITION $tmp" : qq("$tmp"); uc($col) => $v; } keys %$field_ref; # Field ref columns that don't reference themselves # will be considered similar to constant columns, but they must come # last, otherwise column alignment will be off my %field_ref_const; for ( keys %field_ref ) { next if $field_ref{$_} =~ /:$_\b/i; $field_ref_const{$_}++; } @columns = ( $opts->{ColumnList} && @{$opts->{ColumnList}} ) ? map uc($_), @{$opts->{ColumnList}} : ( (grep !$field_ref_const{$_}, @columns), keys %field_ref_const, ); my %is_filler; if ( $opts->{Filler} ) { $is_filler{uc($_)}++ for @{$opts->{Filler}}; } my $file_str = join(",", @bcp_files); my $sqlldr_file_str = join("\n", map "INFILE '$_'", @bcp_files); my $disp_table = my $sqlldr_table = $table; if ($partition) { $sqlldr_table .= " PARTITION ($partition)"; $disp_table .= ":$partition"; } # Default charset is roman8 on HP # Must set it here printf $ctl_fh $direct_load_pre. "LOAD DATA\n". #"CHARACTERSET WE8ROMAN8\n". $nls_str. "%s\n". $row_delim_str. "INTO TABLE %s %s\n". $direct_load_post. qq(FIELDS TERMINATED BY '$delimiter'$quote_str\n). "TRAILING NULLCOLS\n". "(\n%s\n)\n", $sqlldr_file_str, $sqlldr_table, $action, join(",\n", map { ( exists($const{$_}) ? qq[ $_ CONSTANT '$const{$_}'] : exists($is_filler{$_}) ? qq[ $_ FILLER] : exists($field_ref{$_}) ? qq[ $_ $field_ref{$_}] : $is_date{$_} ? " $_ $is_date{$_} '$date_fmt{$_}'" : $char_sz{$_} ? $blank_control->() : " $_" ) } @columns); if ($prm_fh) { print $prm_fh "userid=$user/$self->{PASSWORD}\@$db\n"; close $prm_fh; } $_->close() for $ctl_fh, $bad_fh, $log_fh; print "Loading $db..$disp_table from $file_str\n"; my $ctl_file = $ctl_fh->filename(); my $bad_file = $bad_fh->filename(); my $log_file = $log_fh->filename(); my $prm_file = $prm_fh ? $prm_fh->filename() : undef; if ($keep_temp) { print "SqlldrControlFile: ", abs_path($ctl_file), "\n"; print "SqlldrBadRowFile : ", abs_path($bad_file), "\n"; print "SqlldrLogFile : ", abs_path($log_file), "\n"; } local $ENV{NLS_DATE_FORMAT} = 'YYYY-MM-DD HH24:MI:SS'; local $ENV{NLS_TIMESTAMP_FORMAT} = 'YYYY-MM-DD HH24:MI:SS.FF'; local $ENV{NLS_TIMESTAMP_TZ_FORMAT} = 'YYYY-MM-DD HH24:MI:SS.FF'; my @prm_opt; @prm_opt = "parfile=$prm_file" if $prm_file; my @cmd = ( sqlldr => "control=$ctl_file", "log=$log_file", "bad=$bad_file", @prm_opt, "silent=(header,discards,feedback,partitions)", ); print "Executing: @cmd\n" if $opts->{Debug} || $opts->{NoExec}; return "@cmd" if $opts->{NoExec}; my $close_success; # We could do this either way with IPC::Run # But lets not require it unless necessary. if ($stdin) { require IPC::Run; $close_success = IPC::Run::run( \@cmd, '<', $stdin ); } else { # Hide user/passwd from ps open(my $cmd_fh, "|-", @cmd) or confess "Could not exec sqlldr: $!"; print $cmd_fh "$user/$self->{PASSWORD}\@$db\n"; # We don't want to exit right away on failure # We want to see the log file and bad record if any $close_success = close $cmd_fh; } # We don't want to exit right away on failure # We want to see the log file and bad record if any my $exit_stat = $? >> 8; my $exit_sig = $? & 127; my $exit_core = $? & 128; # We have a limit of one rejected row. If we have a bad row # we'll just include it in the error. # Oops thats no longer true now that we have a MaxErrors option # Just show the first bad row if we allow > 1 error my $bad_row; if ( -s $bad_file ) { if ( $max_errors > 0 ) { local ($_, $.); local $/ = $opts->{RowDelimiter} || "\n"; open(my $fh, "<", $bad_file) or confess "Can't open sqlldr reject file $bad_file: $!"; $bad_row = <$fh>; close $fh; } else { warn "sqlldr error loading $file_str into $disp_table on row:\n"; $bad_row = `cat $bad_file`; } } open(my $fh, "<", $log_file) or confess "Can't open sqlldr log $log_file: $!"; print "Opened $log_file\n"; local ($_, $.); my ( $rows, $error_rows, $failed_rows, $null_rows, $error_msg, $discontinued, $dp_errors ); # Only save first 1000 errors my $err_cnt = 0; while (<$fh>) { print; if ( /^\s*(\d+)/ ) { my $tmp_rows = $1; $rows = $tmp_rows if /successfully loaded/; $error_rows = $tmp_rows if /not loaded due to data errors/; $failed_rows = $tmp_rows if /not loaded because all WHEN clauses/; $null_rows = $tmp_rows if /not loaded because all fields were null/; next; } if ( /^Record \d+: Rejected/ ) { $error_msg .= $_ if $err_cnt < 1000; next; } if ( /^(?:SQL\*Loader|ORA)-\d+:/ ) { $error_msg .= $_ if ++$err_cnt <= 1000; $discontinued++ if /discontinued|aborted/; next; } # Catch direct path errors if ( /was not re-(?:enabled|validated)/ ) { # These errors do not cause non-zero exit status $dp_errors++; $error_msg .= $_ if $err_cnt < 1000; next; } if ( /^index \S+ was made unusable/ ) { $dp_errors++; $error_msg .= $_ if ++$err_cnt <= 1000; next; } } close $fh; if (!$close_success or $dp_errors) { $error_msg ||= ''; if ( $exit_stat != 0 or $dp_errors ) { if ( $exit_stat == 2 or $dp_errors ) { # Exit status 2 is just a warning # But we should consider it an error if we exceeded the max errors allowed # Or if load was discontinued for any reason # Or for any direct path errors my $first = ($max_errors > 0) ? 'first ' : ''; confess "sqlldr exited with status $exit_stat [$error_msg]" if $dp_errors; confess "sqlldr exited with status $exit_stat [$error_msg] - ${first}rejected record:[$bad_row]" if $error_rows > $max_errors; confess "sqlldr exited with status $exit_stat [$error_msg]" if $discontinued; } else { confess "sqlldr exited with status $exit_stat [$error_msg]"; } } confess "sqlldr received signal $exit_sig [$error_msg]" if $exit_sig > 0; confess "sqlldr coredumped [$error_msg]" if $exit_core; } return $rows; } } # Dummy method for compatibility with Sybase sub mk_view { } sub date_masks_from_file { my $self = shift; my ($files, $columns, $is_date, $opts) = @_; return unless $is_date and %$is_date; $opts ||= {}; my $sample_rows = $opts->{DateSampleRows} || 1000; my $d = $opts->{Delimiter} || $self->{DELIMITER}; my $rd = $opts->{RowDelimiter}; my $year_mask = $opts->{Year2Mask} || 'YY'; local ($., $_, $ARGV, *ARGV); local $/ = $rd if $rd; local @ARGV = @$files; my $row_cnt; my (%remaining, %got); $remaining{$_}++ for keys %$is_date; my %fmt; my $dc_fmt = $opts->{DateColumnFmt} || {}; for my $col ( keys %$dc_fmt ) { my $c = uc($col); $fmt{$c} = $dc_fmt->{$col}; delete $remaining{$c} } my %row; while (<>) { next if $opts->{Header} and $. <= $opts->{Header}; chomp; @row{@$columns} = $opts->{QuoteFields} ? split_quoted( $_, $d ) : split /\Q$d/; for (keys %remaining) { if ( $row{$_} ) { delete $remaining{$_}; $got{$_} = $row{$_}; last if !%remaining; } } # If we haven't found values by now, give up last if ++$row_cnt >= $sample_rows; } $fmt{$_} = $self->date_mask($got{$_}, $year_mask) for keys %got; return %fmt; } # If we allow quoted fields, need to split correctly and # handle embedded quotes and delimiters sub split_quoted { my ($line,$d) = @_; my @result; while ( $line =~ s/\A("?)((?:""|.)*?)\1(\Q$d\E|\z)//s ) { my ( $q, $s,$got_d ) = ( $1, $2, $3 ); $s =~ s/""/"/g if $q; push @result, $s; last if length($got_d) == 0; } return @result; } { my @mon = qw( Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec ); my $mon_str = join("|", @mon); my $mon_re = qr/(?i)$mon_str/; my @months = qw( January February March April May June July August September October November December ); my $month_str = join("|", @months); my $month_re = qr/(?i)$month_str/; my @days = qw( Mon Tue Wed Thu Fri Sat Sun ); my $day_str = join("|", @days); my $day_re = qr/(?i)$day_str/; sub date_mask { my ($self, $str, $year2mask) = @_; return unless $str; local $_ = $str; my $fmt = ''; $year2mask ||= 'YY'; # YYYY-MM-DD or YYYYMMDD if ( s/^\d{4}(\D?)\d\d(\D?)\d\d// ) { $fmt .= "YYYY${1}MM${2}DD"; $fmt .= time_mask(); #die "Can not determine date mask for $str ($fmt)" if length($_); return if length($_); return $fmt; } # Allow day abbreviation (Mon Tue etc.) $fmt .= "DY " if s/^$day_re\s+//; # Jan 23 2010 if ( s/^$mon_re\s+\d+// ) { my $end_year; $fmt .= "MON DD"; if ( s/^\s\d{4}// ) { $fmt .= " YYYY"; } elsif ( s/\s+\d{4}$// ) { $end_year++; } else { #die "Can not determine date mask for $str ($fmt)"; return; } $fmt .= time_mask(); #die "Can not determine date mask for $str ($fmt)" if length($_); return if length($_); $fmt .= " YYYY" if $end_year; return $fmt; } # January 23, 2010 if ( s/^$month_re\s+\d+// ) { my $end_year; $fmt .= "MONTH DD"; if ( s/^(\W?)\s\d{4}// ) { my $comma = $1; $fmt .= "$comma YYYY"; } elsif ( s/\s+\d{4}$// ) { $end_year++; } else { #die "Can not determine date mask for $str ($fmt)"; return; } $fmt .= time_mask(); #die "Can not determine date mask for $str ($fmt)" if length($_); return if length($_); $fmt .= " YYYY" if $end_year; return $fmt; } # 02-Jan-2010 if ( s/^\d\d?(\D?)$mon_re(\D?)\d{4}// ) { $fmt .= "DD${1}MON${2}YYYY"; $fmt .= time_mask(); #die "Can not determine date mask for $str ($fmt)" if length($_); return if length($_); return $fmt; } # 02-Jan-10 if ( s/^\d\d?(\D?)$mon_re(\D?)\d\d?// ) { $fmt .= "DD${1}MON${2}$year2mask"; $fmt .= time_mask(); #die "Can not determine date mask for $str ($fmt)" if length($_); return if length($_); return $fmt; } # MM/DD/YYYY if ( s|^\d\d?(\D)\d\d?(\D)\d{4}|| ) { $fmt .= "MM${1}DD${2}YYYY"; $fmt .= time_mask(); #die "Can not determine date mask for $str ($fmt)" if length($_); return if length($_); return $fmt; } #die "Failure to determine date mask for $str"; return; } } # Operates on and modifies current $_ sub time_mask { my $fmt = ''; if ( s/^(\D?)[\s\d]\d// ) { my $sep = $1; $sep = qq("$sep") if $sep =~ /\S/; $fmt .= "${sep}HH"; $fmt .= /[AP]M\b/i ? "12" : "24"; if ( s/^(\D)\d\d// ) { $fmt .= "${1}MI"; if ( s/^(\D)\d\d// ) { $fmt .= "${1}SS"; if ( s/^(\D)(\d+)// ) { $fmt .= $1 . "FF" . length($2); } } } if ( s/^(\s?)[AP]M// ) { $fmt .= "${1}AM"; } if ( s/^(\s*)\w{2,3}T//i ) { $fmt .= "${1}TZD"; } if ( s/^\s[+-]\d\d(\D)\d\d// ) { $fmt .= " TZH${1}TZM"; } } return $fmt; } { my %type_map = ( TABLE => 'T', VIEW => 'V', PROCEDURE => 'P' ); sub obj_type { my ( $self, $name ) = @_; $name = uc($name); my $type; if ( $name =~ /^([^.]+)\.(.+)/ ) { my ($schema, $table) = ($1, $2); $type = $self->{DBH}->selectrow_array( "select object_type from all_objects where owner = ? and object_name = ?", undef, $schema, $table, ); } else { $type = $self->{DBH}->selectrow_array( "select object_type from user_objects where object_name = ?", undef, $name ); } return unless $type; return $type_map{$type} || confess "Don't know about type $type for object $name"; } } sub curr_schema { my $self = shift; return $self->get("sys_context('USERENV', 'SESSION_SCHEMA')"); } { my $sql_t = <{DBH}; my ( $schema, $tbl ) = split /\./, uc($table); if ( !$tbl ) { $tbl = $schema; $schema = $self->curr_schema(); } my $sql = sprintf $sql_t, $dbh->quote($schema), $dbh->quote($tbl); $sql .= "and a.uniqueness = 'UNIQUE'\n" unless $all_indexes; $sql .= "ORDER BY b.column_position\n"; my $sth = $dbh->prepare($sql); $sth->execute(); my @col_names = @{$sth->{NAME_lc}}; my %row; $sth->bind_columns(\@row{@col_names}); my %ind; while ($sth->fetch()) { push @{$ind{$row{index_name}}}, lc($row{column_name}); } return unless %ind; return \%ind; } } sub primary_key { my ($self, $table) = @_; $table = uc($table); my ($schema, $tbl) = split /\./, $table; if ( !$tbl ) { $tbl = $schema; $schema = $self->curr_schema(); } my @pk = map lc, $self->{DBH}->primary_key(undef, $schema, $tbl); return unless @pk; return \@pk; } { my $sql = <{DBH}; my $table = $args{Table}; my $stg_table = $args{StgTable}; my $stg_info = $self->column_info($stg_table); my $stg_map = $stg_info->{MAP}; my %stg_has; $stg_has{$_}++ for @{$stg_info->{LIST}}; my $key_col_ref = ($args{KeyCols} && @{$args{KeyCols}}) ? $args{KeyCols} : $self->key_columns($table); my $upd_col_ref = ($args{UpdCols} && @{$args{UpdCols}}) ? $args{UpdCols} : $self->upd_columns($table); # Normalize all columns and maps to lowercase my @key_cols = map lc, @$key_col_ref; my @upd_cols = map lc, @$upd_col_ref; my @fields = (@key_cols, @upd_cols); my %col_map = $args{ColMap} ? map lc, %{$args{ColMap}} : (); my $upd_col_str = join(",", map { $col_map{$_} ? $stg_has{$col_map{$_}} ? "d.$_=s.$col_map{$_}" : "d.$_=$col_map{$_}" : $stg_has{$_} ? "d.$_=s.$_" : () } @upd_cols), # Determine if last_chg_user, last_chg_date need to be updated # If staging table does not have the columns, and the target table does # Then default the values my %chg_col = $self->last_chg_list($table, \@fields); delete $chg_col{$_} for grep $stg_has{$_}, qw(last_chg_user last_chg_date); for my $col ( sort { $b cmp $a } keys %chg_col ) { $upd_col_str .= ",$col=".( ($col eq 'last_chg_user') ? "'".uc(substr($dbh->{Username}, 0, $chg_col{$col}))."'" : 'SYSTIMESTAMP' ); } my $parallel = $args{Parallel} ? '/* parallel(8) append */' : ''; my $merge_sql = sprintf($sql, $parallel, $table, $args{MergeFilter} ? "$args{StgTable} WHERE $args{MergeFilter}": $args{StgTable}, join(" AND ", map "d.$_=s.".($col_map{$_}||$_), @key_cols), $upd_col_str, join(",", @fields), join(",", map { $col_map{$_} ? $stg_has{$col_map{$_}} ? "s.$col_map{$_}" : $col_map{$_} : $stg_has{$_} ? "s.$_" : "NULL" } @fields), ); # No update if no update columns $merge_sql =~ s/^WHEN MATCHED.*\n//m unless @upd_cols; print("Executing: $merge_sql\n"); return 1 if $args{NoExec}; $dbh->do("ALTER SESSION ENABLE PARALLEL DML") if $args{Parallel}; my $rows = $dbh->do($merge_sql) + 0; print("$rows rows updated/inserted\n\n"); return $rows; } } # #!!!UNFINISHED!!! # Static block for mk_ext_table { my $sql = <column_info($table); my $cmap = $cols->{MAP}; my @col_list; for my $col (@{$cols->{LIST}}) { my $col_str = $col; my $cdata = $cmap->{$col}; my $type = $cdata->{TYPE_NAME}; my $dec = $cdata->{DECIMAL_DIGITS}; $col_str .= " $type"; my $size = $cdata->{COLUMN_SIZE}; for ($type) { $col_str .= /CHAR/ ? "($size)" : /NUMBER/ ? (defined $dec) ? "($size,$dec)" : '' : ''; } #$col_str .= " DEFAULT $cdata->{COLUMN_DEF}" if defined $cdata->{COLUMN_DEF}; #$col_str =~ s/\s+$//; #$col_str .= " NOT NULL" unless $cdata->{NULLABLE}; push @col_list, $col_str; } my $create_sql = sprintf($sql, $ext_table, join(",\n", @col_list ), $dir, #$args{RowDelimiter} || "\\n", $args{Delimiter} || "|", $file, ); $self->{DBH}->do($create_sql); return $ext_table; } } package DBIx::BulkUtil::Release; sub new { my ($class, $f) = @_; bless $f, $class; } sub DESTROY { $_[0]->() } 1; __END__ =head1 NAME DBIx::BulkUtil - Sybase/SybaseIQ/Oracle bulk load and other utilities =head1 SYNOPSIS use DBIx::BulkUtil; # Return just the regular DBI handle my $dbh = DBIx::BulkUtil->connect(%options, \%dbi_options); # Or return a DBI handle and a 'Utility' object. # syb_connect,ora_connect, and iq_connect methods are also provided # to directly specify database type my ($dbh, $db_util) = DBIx::BulkUtil->connect(%options, \%dbi_options); # Wrappers for Sybase bcp, Oracle sqlldr, IQ 'load table' $db_util->bcp_in($table, [$file], [\%options]); $db_util->bcp_out($table, [$file], [\%options]); $column_info = $db_util->column_info($table); $insert_sth = $db_util->prepare(%options); $cnt = $db_util->merge(%options); $blk_sth = $db_util->blk_prepare($table, %options); $blk_sth->execute(@args); $blk_sth->finish(); $index_info = $db_util->index_info($table, [$all_indexes]); $primary_key = $db_util->primary_key($table); $stored_proc_sql = $db_util->sp_sql($stored_proc, @args); $stored_proc_sth = $db_util->sp_sth($stored_proc, @args); my $passwd = DBIx::BulkUtil->passwd(); $object_type = $db_util->obj_type($object_name); $current_date_function = $db_util->now(); $date_function_str = $db_util->add($date, $amount, $units); $one_row_no_table_sql = $db_util->row_select($select_clause); @no_table_results = $db_util->get($select_clause) $ten_minutes_from_now = $db_util->get( $db_util->add( $db_util->now(), 10, 'minute' ) ); =head1 DESCRIPTION Provide easy to use bulk load and other utility methods. =head1 CLASS METHODS =over 4 =item connect() Returns a DBI database handle. In list context, returns a database handle and a database utility object. The default connection attribute values for the database handle are: ChopBlanks => 1, AutoCommit => 1, PrintError => 0, RaiseError => 1, If the last argument to connect() is a hash reference, then you can either override these attributes or set other attributes on connect. (Note: The connect is made using RaiseError => 1, but after the connection is successful, HandleError is set to a subroutine that calls Carp::confess which gives a stack trace when DBI throws an exception). The first argument to connect is a hash with the following keys: =over 4 =item Server The database server to connect to. If no server or database is provided, then it is defaulted to the value of environment variable DSQUERY. =item Database The database to connect to. If a database but no server is provided, then it is assumed to be an Oracle database. If no server or database is provided, then the default is the pm database. =item Type The database type (Oracle, Sybase, or SybaseIQ). The default depends on what combination of Server and Database is provided. =item User The user to connect to the database as. Defaults to calling the user() method. =item DataDir Meant to be a data directory to keep config info in, to be used in the env2db method in any way you see fit. =item Password The password to use to login to the database. Default is to call the passwd method. =item RetryCount Will retry connection to the database this many times. =item RetryMinutes Will wait this many minutes before trying to connect to the database again. =item BulkLogin For Sybase, enables the use of the blk_prepare method on the utility handle for bulk inserts (i.e. the syb_bcp_attribs attribute on insert statements). =item NoBlankNull For SybaseIQ, when loading a file via bcp_in, will not convert blank columns to null values. =item Dsl A string or arrayref of connect options to use as the dsl connect string instead of using the Server or Database after 'dbi:$db_type:' =item DslOptions A string or arrayref of connect options to add to the dsl connect string. E.g.: DBIx::BulkUtil->connect( Server => $server, DslOptions => [ 'interfaces=/my/interfaces_file', 'port=1234' ], ); Will result in the connect string: 'dbi:Sybase:server=;interfaces=/my/interfaces_file;port=1234' =item NoServer For Sybase, will not add the 'server=...' argument to the DSL connect string. For Oracle will not add the database name to the DSL string. The DslOptions option is necessary if this option is used. =back =item connect_cached() Same as connect method, but calls the DBI connect_cached method to make the actual database connection, and will return the same database handle previously returned for the same database, user, and DBI options. =item syb_connect() Same as connect method, but calls the DBIx::BulkUtil connect method with the Type option set to 'Sybase' =item ora_connect() Same as connect method, but calls the DBIx::BulkUtil connect method with the Type option set to 'Oracle' =item ora_connect_cached() Same as connect method, but calls the DBIx::BulkUtil connect_cached method with the Type option set to 'Oracle' =item iq_connect() Same as connect method, but calls the DBIx::BulkUtil connect method with the Type option set to 'SybaseIQ' =item iq_connect_cached() Same as connect method, but calls the DBIx::BulkUtil connect_cached method with the Type option set to 'SybaseIQ' =item passwd() Dummy function to override for determining password. =back =head1 UTILITY OBJECT METHODS Methods that may be called on the utility object that is optionally returned from the connect or connect_cached DBIX::BulkUtil class methods. These methods provide convenience and/or make some operations between Oracle and Sybase databases more transparent. =over 4 =item now Returns sql that will return the current date/time of the database (e.g. to be used as a column in a select statement). =item add Returns sql that will add some unit of time to a datetime expression. E.g. $util->add($util->now(), 10, 'hour') adds 10 hours to the current time. =item row_select Given just a select clause (the part after the "SELECT" keyword), returns sql to select a row from no table (e.g. for fetching the current time from the database). =item get Fetches the row from a select clause with no table. E.g. $ect_util->get($ect_util->now()) returns the current database date/time. =item obj_type Returns T/V/P depending on whether the given object is a Table, View, or Procedure. =item sp_sql Returns sql to execute a given stored procedure with arguments. If one of the arguments is ":cursor", then for Sybase it is filtered out, for Oracle we assume it is a parameter name and not a literal string to be bound. Sybase stored procedures can return multiple result sets, and also a list of output parameters. Oracle does not return result sets, but you can pass in a cursor as an output parameter. When you pass in a parameter ":cursor", we assume its an output parameter that will hold a statement handle, so you can return a single result set in a nearly "backwards compatible" way. But we don't handle "multiple" result sets (yet), we don't deal with other output parameters, and so this this method is not meant to be completely transparent for all stored procedures. =item sp_sth Prepares and executes a stored procedure with arguments, and returns a statement handle for fetching. If one of the arguments is ":cursor", then we assume for Oracle it is a cursor type output parameter, and the statement handle for the cursor is returned. For Sybase, we ignore any ":cursor" argument. =item bcp_in For Sybase, uses BCP, for Oracle, SQL Loader, to load a pipe-delimited file into a database table. If the last argument is a hash reference, then additional options may be specified. Current options are: =over 8 =item Delimiter Specifies the delimiter in the bcp file (default: "|"). =item RowDelimiter Specifies the record terminator in the bcp file (default: "\n"). =item Header For bcp_in, the number of rows to ignore at the start of the file. For bcp_out, if true, the first row will be the column names of the table. =item DirectPath For Oracle only, if true, does Direct path instead of conventional load. If value is 'P', also does parallel load. For parallel loads, indexes are not rebuilt after the load. =item Constants (Oracle and SybaseIQ only). A hashref of column names and constant values to set the columns to which are not in the file. =item FieldRef (Oracle only). A hashref of column names and sqlldr expressions to specify the value of the column. If the expression includes the column itself (e.g. ':column_name'), then the field will appear in the same position in the control file corresponding to its position in the table. If it does not (e.g. "to_date('2014-02-01','YYYY-MM-DD')"), then the column appears at the end of the control file field list (i.e. it assumes the column is not in the file). Also, if the expression begins with "~", then assume the expression is position information for a fixed width file. =item Filler Generally used with the ColumnList option, a list of column names in the file which are filler and not loaded into the database. =item Default (SybaseIQ only). A reference to an array of column names not in the file, which will be set to their default values. =item Stdin (Oracle only). A file handle, subroutine, or reference to a scalar to supply data to sqlldr through stdin. If it is a subroutine, return values will be used as input until it returns undef. If this option is used, and none of the files supplied to bcp_in is named '-', then '-' is automatically added to the list. If one of the files is named '-', and this option is not used, then this option is assumed to be the *STDIN filehandle. =item TrailingDelimiter (SybaseIQ only). Boolean flag which indicates that the last column of each record has a trailing column delimiter. =item DateFormat (Oracle only). Sets the default date format mask for sqlldr. By default, the process will try to determine the date format for each date column from the input file. If the format can not be determined, then this format will be used. =item SybaseDateFmt (Oracle only). Sets the default DateFormat to 'MON DD YYYY HH12:MI:SS:FF3AM'. =item QuoteFields (Oracle only). Allows fields in bcp file to be quoted, thereby allowing delimiters within the field. =item NLSLang (Oracle only). Sets the characterset option set for sqlldr. =item Semantics (Oracle only). Sets the 'LENGTH SEMANTICS' in the control file. Allowable values are CHAR or BYTE. =item Action Sets sqlldr mode to APPEND, REPLACE, or TRUNCATE (valid values are A, R, or T, default is A). Simulates same thing for Sybase through truncate or delete sql statements for T and R. Replace does not replace individual rows, it deletes all rows first. =item SybaseTypes (Oracle only). An array of the Sybase data types being loaded. When the type is 'char' (not 'varchar'), then PRESERVE BLANKS is added in the control file for char(1) columns, and trim logic for char > 1 columns. =item Debug (Oracle only). Displays the sqlldr command line executed, and does not remove the sqlldr control, log, and bad record files. =item NoExec (Oracle only). Displays and returns but does not execute the sqlldr command line that would be executed. =item CommitSize The number of rows loaded before committing each batch (default: 1000). =item MaxErrors The maximum number of errors allowed before aborting the load (default: 0). =item LoadWhen (Oracle only). Adds this text to a WHEN clause in the sqlldr control file which determines which rows in the data file are loaded. =item ColumnList List of ordered column names in bcp file. =item PacketSize (Sybase only). Sets network packet size for bcp. =item PassThru (Sybase only). Allows an arbitrary list of arguments to be passed to the bcp command line. =back If the file is not provided, it is assumed to be the table name with a ".bcp" extension. Sybase bcp is broken. If you have delimiter characters in your data, there is no way to escape them. If your fields are quoted as in csv files, Sybase bcp will complain. For bcp_in, unquote fields and convert your file to a format with a new delimiter that does not appear in your data. For bcp_out, choose a delimiter that does not appear in your data. =item bcp_out For Sybase uses BCP, for Oracle, just select and print (Oracle has no "bcp out" type functionality) to export a database table to a file. See bcp_in for options. If the file is not provided, it is assumed to be the table name with a ".bcp" extension. For Sybase, if there are any money columns, or if the Filter option is used, then a view is temporarily created to bcp from. Money columns are converted to decimal so that they are not truncated. Sybase bcp_out is broken. It does not escape delimiter characters. If you have delimiter characters in your data, you can call the select2file method, although bcp_in will not load the resulting file. See bcp_in. If the last argument is a hash reference, then additional options may be specified. Current options are: =over =item Delimiter Same as bcp_in. =item RowDelimiter Same as bcp_in. =item Header Same as bcp_in. =item NoFix When using Sybase native bcp out, the default is to transform the dates into ANSI standard format (which historically used to be the only reason to use this library). This option, if true, disables that transformation, and can save time on large transfers. =item Filter Appends additional SQL clauses to the SELECT * statement, e.g. "WHERE asof_date > '2011-01-01'". =item Columns Comma separated list of columns to select from table. =back =item blk_prepare Prepares an insert statement for bulk insert into a table. For Sybase, the BulkLogin option must have been supplied with a true value on connect, or else this method will fail. Returns a statement handle that will insert arguments into the table. E.g.: my $sth = $db_util->blk_prepare('some_table'); while () { chomp; my @data = split /,/; $sth->execute(@data); } $sth->finish(); Inserts are batched, and so the finish() method must be called to commit the final batch. The execute() method must be called with a list of arguments corresponding to the list of columns from the table (excluding any columns in the Constant option below). The first argument is the table name. The following optional arguments are key/value pairs with the following keys: =over =item Constants A hash reference of column name and constant value pairs that will be inserted on every execute call. Values for these columns should not be included in the list of arguments in the call to execute(). =item CommitSize The number of inserts per batch (default: 1000). =item BlkOpts A hash reference of options to pass to the Sybase syb_bcp_attribs options. Needed if inserting to identity columns. See L for these options. =back =item bcp_sql (Sybase only). Given a table name and a sql statement, uses sqsh to execute a sql statement and bcp the results into a table. =item select2file Calls the non-Sybase version of bcp_out which just selects from a table and saves to a delimited file. Accepts an optional hashref as the last argument with the same options as bcp_out. Also accepts the option Filter which appends additional SQL clauses to the SELECT * statement. Include any additional keywords (e.g. "WHERE", "GROUP BY", etc.) in the Filter option. Returns the number of rows selected. =item bcp_file Modifies a bcp_file that contains a header row. Arguments are ($input_file, output_file, {%options}), with available options KeepCols and DropCols. KeepCols is a list of column names to keep from the input file (and the order in which they will appear in the output file), DropCols is a list of columns to drop from the input file. KeepCols overrides DropCols. Also accepts the following same options as bcp_out: Delimiter. =item merge Merges data from a staging table into a target table. For Oracle, issues a MERGE statement, for Sybase, it deletes from the target table corresponding rows from the staging table, then inserts records from the staging to the target table. Columns last_chg_user and last_chg_date are appropriately updated in the target table. Accepts a hash with the following keys as its argument: =over =item Table The target table. =item StgTable The staging table. =item KeyCols A list of the key columns in the target table. Defaults to the columns in the first unique index found on the table. =item UpdCols A list of the columns to update in the target table. Defaults to all columns in the target table not in the list of key columns. Ignored in Sybase since rows are deleted and inserted, not updated. =item ColMap A hashref of target to staging table column name mappings. =item NoExec Display but do not execute SQL. =back =item column_info Given a table name, returns a hash reference with keys LIST and MAP. LIST will contain a list of all columns in the table lowercased. MAP contains a hash reference with the lowercase column names as keys, and the value is a hash reference with keys NAME and PRECISION. NAME is the column name in the actual (upper/lower/mixed) case in the database, and PRECISION is the size of the column. =item index_info Given a table name, returns a hash reference of the names of any indexes on the table and an array reference of the column names in the index. If the optional second argument is true, returns all indexes, otherwise returns only unique indexes. =item key_columns Given a table name, returns a reference to an array of column names that are in the primary_key, or if no primary_key exists, returns the columns in the first unique index found on the table. =item upd_columns Given a table name, returns a reference to an array of all of the column names in a table that are not the key columns of a table. =item primary_key Returns an array reference of the column names in the primary key of a table. =item strptime_fmt Given a date string, returns a template suitable for passing to strptime. Returns an undefined value if the format can not be determined. =item ora_date_fmt Given a date string, returns an Oracle date format string. Returns an undefined value if the format can not be determined. =item prepare Prepares a SQL statement, optionally binds a reference to a hash or array to its input parameters, and returns a statement handle. Example: my $sth = $dbu->prepare( Table => 'eqa_own.some_table', Columns => [qw(column1 column2)] BindHash => \my %href, ); $href{column1} = 'Col1Value'; $href{column2} = 'Col2Value1'; $sth->execute(); $href{column2} = 'Col2Value2'; $sth->execute(); Accepts as arguments a hash with the following keys: =over 8 =item Table Table name. If provided, will construct an insert statement for this table. (Required if Sql not provided). =item Sql Sql statement. Prepares this SQL statement. (Required if Table not provided). =item Columns List of column names. If Table is provided, must be names of columns in the table. (default: all columns in Table if Table is provided). =item BindHash (Oracle only). Reference to hash. Placeholders in SQL statement will be bound to this hash reference. (May not be used with BindArray). =item BindArray (Oracle only). Reference to array. Placeholders in SQL statement will be bound to this array reference. (May not be used with BindHash). =item ByName If true, use ":column_name" type placeholders in SQL statement and in binding to hash or array. If false (but defined), use "?" as placeholders. "?" placeholders may not be used with BindHash or BindArray. (default: Oracle true, Sybase false). =back =back =cut