# $Id: DBI.pm 362 2007-05-02 04:46:44Z btrott $

package Data::ObjectDriver::Driver::DBI;
use strict;
use warnings;

use base qw( Data::ObjectDriver Class::Accessor::Fast );

use DBI;
use Carp ();
use Data::ObjectDriver::Errors;
use Data::ObjectDriver::SQL;
use Data::ObjectDriver::Driver::DBD;

__PACKAGE__->mk_accessors(qw( dsn username password connect_options dbh get_dbh dbd prefix ));

sub init {
    my $driver = shift;
    my %param = @_;
    for my $key (keys %param) {
        $driver->$key($param{$key});
    }
    if(!exists $param{dbd}) {
        ## Create a DSN-specific driver (e.g. "mysql").
        my $type;
        if (my $dsn = $driver->dsn) {
            ($type) = $dsn =~ /^dbi:(\w*)/;
        } elsif (my $dbh = $driver->dbh) {
            $type = $dbh->{Driver}{Name};
        } elsif (my $getter = $driver->get_dbh) {
## Ugly. Shouldn't have to connect just to get the driver name.
            my $dbh = $getter->();
            $type = $dbh->{Driver}{Name};
        }
        $driver->dbd(Data::ObjectDriver::Driver::DBD->new($type));
    }
    $driver;
}

sub generate_pk {
    my $driver = shift;
    if (my $generator = $driver->pk_generator) {
        return $generator->(@_);
    }
}

sub init_db {
    my $driver = shift;
    my $dbh;
    eval {
        local $SIG{ALRM} = sub { die "alarm\n" };
        $dbh = DBI->connect($driver->dsn, $driver->username, $driver->password,
            { RaiseError => 1, PrintError => 0, AutoCommit => 1,
              %{$driver->connect_options || {}} })
            or Carp::croak("Connection error: " . $DBI::errstr);
        alarm 0;
    };
    if ($@) {
        Carp::croak(@$ eq "alarm\n" ? "Connection timeout" : $@);
    }
    $driver->dbd->init_dbh($dbh);
    $driver->{__dbh_init_by_driver} = 1;
    $dbh;
}

sub rw_handle {
    my $driver = shift;
    my $db = shift || 'main';
    $driver->dbh(undef) if $driver->dbh and !$driver->dbh->ping;
    my $dbh = $driver->dbh;
    unless ($dbh) {
        if (my $getter = $driver->get_dbh) {
            $dbh = $getter->();
        } else {
            $dbh = $driver->init_db($db) or die $driver->errstr;
            $driver->dbh($dbh);
        }
    }
    $dbh;
}
*r_handle = \&rw_handle;

sub fetch_data {
    my $driver = shift;
    my($obj) = @_;
    return unless $obj->has_primary_key;
    my $terms = $obj->primary_key_to_terms;
    my $args  = { limit => 1 };
    my $rec = {};
    my $sth = $driver->fetch($rec, $obj, $terms, $args);
    $sth->fetch;
    $sth->finish;
    $driver->end_query($sth);
    return $rec;
}

sub fetch {
    my $driver = shift;
    my($rec, $class, $orig_terms, $orig_args) = @_;

    ## Use (shallow) duplicates so the pre_search trigger can modify them.
    my $terms = defined $orig_terms ? { %$orig_terms } : undef;
    my $args  = defined $orig_args  ? { %$orig_args  } : undef;
    $class->call_trigger('pre_search', $terms, $args);

    my $stmt = $driver->prepare_statement($class, $terms, $args);

    my @bind;
    my $map = $stmt->select_map;
    for my $col (@{ $stmt->select }) {
        push @bind, \$rec->{ $map->{$col} };
    }

    my $sql = $stmt->as_sql;
    $sql .= "\nFOR UPDATE" if $orig_args->{for_update};
    my $dbh = $driver->r_handle($class->properties->{db});
    $driver->start_query($sql, $stmt->{bind});
    my $sth = $dbh->prepare_cached($sql);
    $sth->execute(@{ $stmt->{bind} });
    $sth->bind_columns(undef, @bind);

    # need to slurp 'offset' rows for DBs that cannot do it themselves
    if (!$driver->dbd->offset_implemented && $args->{offset}) {
        for (1..$args->{offset}) {
            $sth->fetch;
        }
    }

    # TBD what happens if $sth goes out of scope without finish() being called ?
    $sth;
}

sub search {
    my($driver) = shift;
    my($class, $terms, $args) = @_;
    
    my $rec = {};
    my $sth = $driver->fetch($rec, $class, $terms, $args);

    my $iter = sub {
        ## This is kind of a hack--we need $driver to stay in scope,
        ## so that the DESTROY method isn't called. So we include it
        ## in the scope of the closure.
        my $d = $driver;

        unless ($sth->fetch) {
            $sth->finish;
            $driver->end_query($sth);
            return;
        }
        my $obj;
        $obj = $class->new;
        $obj->set_values_internal($rec);
        ## Don't need a duplicate as there's no previous version in memory
        ## to preserve.
        $obj->call_trigger('post_load') unless $args->{no_triggers};
        $obj;
    };
    if (wantarray) {
        my @objs = ();

        while (my $obj = $iter->()) {
            push @objs, $obj;
        }
        return @objs;
    } else {
        return $iter;
    }
    return;
}

sub lookup {
    my $driver = shift;
    my($class, $id) = @_;
    return unless defined $id;
    my @obj = $driver->search($class,
        $class->primary_key_to_terms($id), { limit => 1 , is_pk => 1 });
    $obj[0];
}

sub lookup_multi {
    my $driver = shift;
    my($class, $ids) = @_;
    return [] unless @$ids;
    my @got;
    ## If it's a single-column PK, assume it's in one partition, and
    ## use an OR search.
    unless (ref($ids->[0])) {
        my $terms = $class->primary_key_to_terms([ $ids ]);
        my @sqlgot = $driver->search($class, $terms, { is_pk => 1 });
        my %hgot = map { $_->primary_key() => $_ } @sqlgot;
        @got = map { $hgot{$_} } @$ids;
    } else {
        for my $id (@$ids) {
            push @got, $class->driver->lookup($class, $id);
        }
    }
    \@got;
}

sub select_one {
    my $driver = shift;
    my($sql, $bind) = @_;
    my $dbh = $driver->r_handle;

    $driver->start_query($sql, $bind);
    my $sth = $dbh->prepare_cached($sql);
    $sth->execute(@$bind);
    $sth->bind_columns(undef, \my($val));
    unless ($sth->fetch) {
        $sth->finish;
        $driver->end_query($sth);
        return;
    }

    $sth->finish;
    $driver->end_query($sth);

    return $val;
}

sub table_for {
    my $driver = shift;
    my($this) = @_;
    my $src = $this->datasource or return;
    return $driver->prefix ? join('', $driver->prefix, $src) : $src;
}

sub exists {
    my $driver = shift;
    my($obj) = @_;
    return unless $obj->has_primary_key;

    ## should call pre_search trigger so we can use enum in the part of PKs
    my $terms = $obj->primary_key_to_terms;

    my $class = ref $obj;
    $class->call_trigger('pre_search', $terms);

    my $tbl = $driver->table_for($obj);
    my $stmt = $driver->prepare_statement($class, $terms, { limit => 1 });
    my $sql = "SELECT 1 FROM $tbl\n";
    $sql .= $stmt->as_sql_where;
    my $dbh = $driver->r_handle($obj->properties->{db});
    $driver->start_query($sql, $stmt->{bind});
    my $sth = $dbh->prepare_cached($sql);
    $sth->execute(@{ $stmt->{bind} });
    my $exists = $sth->fetch;
    $sth->finish;
    $driver->end_query($sth);

    return $exists;
}

sub replace {
    my $driver = shift;
    if ($driver->dbd->can_replace) {
        $driver->_insert_or_replace(@_, { replace => 1 });
    } else {
        $driver->begin_work;
        eval {
            $driver->remove(@_);
            $driver->insert(@_);
        };
        if ($@) {
            $driver->rollback;
            Carp::croak("REPLACE transaction error $driver: $@");
        }
        $driver->commit;
    }
}

sub insert {
    my $driver = shift;
    my($orig_obj) = @_;
    $driver->_insert_or_replace($orig_obj, { replace => 0 });
}

sub _insert_or_replace {
    my $driver = shift;
    my($orig_obj, $options) = @_;

    ## Syntax switch between INSERT or REPLACE statement based on options
    $options ||= {};
    my $INSERT_OR_REPLACE = $options->{replace} ? 'REPLACE' : 'INSERT';
    
    ## Use a duplicate so the pre_save trigger can modify it.
    my $obj = $orig_obj->clone_all;
    $obj->call_trigger('pre_save', $orig_obj);
    $obj->call_trigger('pre_insert', $orig_obj);

    my $cols = $obj->column_names;
    if (!$obj->is_pkless && ! $obj->has_primary_key) {
        ## If we don't already have a primary key assigned for this object, we
        ## may need to generate one (depending on the underlying DB
        ## driver). If the driver gives us a new ID, we insert that into
        ## the new record; otherwise, we assume that the DB is using an
        ## auto-increment column of some sort, so we don't specify an ID
        ## at all.
        my $pk = $obj->primary_key_tuple;
        if(my $generated = $driver->generate_pk($obj)) {
            ## The ID is the only thing we *are* allowed to change on
            ## the original object, so copy it back.
            $orig_obj->$_($obj->$_) for @$pk;
        } else {
            ## Filter the undefined key fields out of the columns to include
            ## in the query, so that we don't specify them in the query.
            my %pk = map { $_ => 1 } @$pk;
            $cols = [ grep !$pk{$_} || defined $obj->$_(), @$cols ];
        }
    }
    my $tbl = $driver->table_for($obj);
    my $sql = "$INSERT_OR_REPLACE INTO $tbl\n";
    my $dbd = $driver->dbd;
    $sql .= '(' . join(', ',
                  map { $dbd->db_column_name($tbl, $_) }
                  @$cols) .
            ')' . "\n" .
            'VALUES (' . join(', ', ('?') x @$cols) . ')' . "\n";
    my $dbh = $driver->rw_handle($obj->properties->{db});
    $driver->start_query($sql, $obj->{column_values});
    my $sth = $dbh->prepare_cached($sql);
    my $i = 1;
    my $col_defs = $obj->properties->{column_defs};
    for my $col (@$cols) {
        my $val = $obj->column($col);
        my $type = $col_defs->{$col} || 'char';
        my $attr = $dbd->bind_param_attributes($type, $obj, $col);
        $sth->bind_param($i++, $val, $attr);
    }
    $sth->execute;
    $sth->finish;
    $driver->end_query($sth);

    ## Now, if we didn't have an object ID, we need to grab the
    ## newly-assigned ID.
    if (!$obj->is_pkless && ! $obj->has_primary_key) {
        my $pk = $obj->primary_key_tuple; ## but do that only for relation that aren't PK-less
        my $id_col = $pk->[0]; # XXX are we sure we will always use '0' ?
        my $id = $dbd->fetch_id(ref($obj), $dbh, $sth);
        $obj->$id_col($id);
        ## The ID is the only thing we *are* allowed to change on
        ## the original object.
        $orig_obj->$id_col($id);
    }

    $obj->call_trigger('post_save', $orig_obj);
    $obj->call_trigger('post_insert', $orig_obj);

    $orig_obj->{changed_cols} = {};
    1;
}

sub update {
    my $driver = shift;
    my($orig_obj, $terms) = @_;

    ## Use a duplicate so the pre_save trigger can modify it.
    my $obj = $orig_obj->clone_all;
    $obj->call_trigger('pre_save', $orig_obj);
    $obj->call_trigger('pre_update', $orig_obj);

    my $cols = $obj->column_names;
    my @changed_cols = $obj->changed_cols;

    ## If there's no updated columns, update() is no-op
    ## but we should call post_* triggers
    unless (@changed_cols) {
        $obj->call_trigger('post_save', $orig_obj);
        $obj->call_trigger('post_update', $orig_obj);
        return 1;
    }

    my $tbl = $driver->table_for($obj);
    my $sql = "UPDATE $tbl SET\n";
    my $dbd = $driver->dbd;
    $sql .= join(', ',
            map { $dbd->db_column_name($tbl, $_) . " = ?" }
            @changed_cols) . "\n";
    my $stmt = $driver->prepare_statement(ref($obj), {
            %{ $obj->primary_key_to_terms },
            %{ $terms || {} }
        });
    $sql .= $stmt->as_sql_where;

    my $dbh = $driver->rw_handle($obj->properties->{db});
    $driver->start_query($sql, $obj->{column_values});
    my $sth = $dbh->prepare_cached($sql);
    my $i = 1;
    my $col_defs = $obj->properties->{column_defs};
    for my $col (@changed_cols) {
        my $val = $obj->column($col);
        my $type = $col_defs->{$col} || 'char';
        my $attr = $dbd->bind_param_attributes($type, $obj, $col);
        $sth->bind_param($i++, $val, $attr);
    }

    ## Bind the primary key value(s).
    for my $val (@{ $stmt->{bind} }) {
        $sth->bind_param($i++, $val);
    }

    my $rows = $sth->execute;
    $sth->finish;
    $driver->end_query($sth);

    $obj->call_trigger('post_save', $orig_obj);
    $obj->call_trigger('post_update', $orig_obj);

    $orig_obj->{changed_cols} = {};
    return $rows;
}

sub remove {
    my $driver = shift;
    my $orig_obj = shift;

    ## If remove() is called on class method and we have 'nofetch'
    ## option, we remove the record using $term and won't create
    ## $object. This is for efficiency and PK-less tables
    ## Note: In this case, triggers won't be fired
    ## Otherwise, Class->remove is a shortcut for search+remove
    unless (ref($orig_obj)) {
        if ($_[1] && $_[1]->{nofetch}) {
            return $driver->direct_remove($orig_obj, @_);
        } else {
            my $result = 0;
            my @obj = $driver->search($orig_obj, @_);
            for my $obj (@obj) {
                $result ++;
                $obj->remove;
            }
            return $result || 0E0;
        }
    }

    return unless $orig_obj->has_primary_key;

    ## Use a duplicate so the pre_save trigger can modify it.
    my $obj = $orig_obj->clone_all;
    $obj->call_trigger('pre_remove', $orig_obj);

    my $tbl = $driver->table_for($obj);
    my $sql = "DELETE FROM $tbl\n";
    my $stmt = $driver->prepare_statement(ref($obj), $obj->primary_key_to_terms);
    $sql .= $stmt->as_sql_where;
    my $dbh = $driver->rw_handle($obj->properties->{db});
    $driver->start_query($sql, $stmt->{bind});
    my $sth = $dbh->prepare_cached($sql);
    my $result = $sth->execute(@{ $stmt->{bind} });
    $sth->finish;
    $driver->end_query($sth);

    $obj->call_trigger('post_remove', $orig_obj);

    return $result;
}

sub direct_remove {
    my $driver = shift;
    my($class, $orig_terms, $orig_args) = @_;

    ## Use (shallow) duplicates so the pre_search trigger can modify them.
    my $terms = defined $orig_terms ? { %$orig_terms } : undef;
    my $args  = defined $orig_args  ? { %$orig_args  } : undef;
    $class->call_trigger('pre_search', $terms, $args);

    my $stmt = $driver->prepare_statement($class, $terms, $args);
    my $tbl  = $driver->table_for($class);
    my $sql  = "DELETE from $tbl\n";
       $sql .= $stmt->as_sql_where;

    # not all DBD drivers can do this.  check.  better to die than do
    # unbounded DELETE when they requested a limit.
    if ($stmt->limit) {
        Carp::croak("Driver doesn't support DELETE with LIMIT")
            unless $driver->dbd->can_delete_with_limit;
        $sql .= $stmt->as_limit;
    }

    my $dbh = $driver->rw_handle($class->properties->{db});
    $driver->start_query($sql, $stmt->{bind});
    my $sth = $dbh->prepare_cached($sql);
    my $result = $sth->execute(@{ $stmt->{bind} });
    $sth->finish;
    $driver->end_query($sth);
    return $result;
}

sub bulk_insert {
    my $driver = shift;
    my $class = shift;
    my $dbd = $driver->dbd;

    my $cols = shift;
    my $data = shift;


    Carp::croak("Driver doesn't support bulk_insert")
	    unless ($dbd->can('bulk_insert'));

    # check that cols are valid..
    my %valid_cols = map {$_ => 1} @{$class->column_names};

    my $invalid_cols;
    foreach my $c (@{$cols}) {
        $invalid_cols .= "$c " if (!$valid_cols{$c});
    }
    if (defined($invalid_cols)) {
        Carp::croak("Invalid columns $invalid_cols passed to bulk_insert");
    }


    # pass this directly to the backend DBD
    my $dbh = $driver->rw_handle($class->properties->{db});
    my $tbl  = $driver->table_for($class);
    my @db_cols =  map {$dbd->db_column_name($tbl, $_) } @{$cols};

    return $dbd->bulk_insert($dbh, $tbl, \@db_cols, $data);
}

sub begin_work {
    my $driver = shift;
    my $dbh = $driver->dbh;
    unless ($dbh) {
        $driver->{__delete_dbh_after_txn} = 1;
        $dbh = $driver->rw_handle;
        $driver->dbh($dbh);
    }
    eval {
        $dbh->begin_work;
    };
    if ($@) {
        $driver->rollback;
        Carp::croak("Begin work failed for driver $driver: $@");
    }
}

sub commit { shift->_end_txn('commit') }
sub rollback { shift->_end_txn('rollback') }

sub _end_txn {
    my $driver = shift;
    my($action) = @_;
    my $dbh = $driver->dbh
        or Carp::croak("$action called without a stored handle--begin_work?");
    eval { $dbh->$action() };
    if ($@) {
        Carp::croak("$action failed for driver $driver: $@");
    }
    if ($driver->{__delete_dbh_after_txn}) {
        $driver->dbh(undef);
        delete $driver->{__delete_dbh_after_txn};
    }
    return 1;
}

sub DESTROY {
    my $driver = shift;
    ## Don't take the responsability of disconnecting this handler
    ## if we haven't created it ourself.
    return unless $driver->{__dbh_init_by_driver};
    if (my $dbh = $driver->dbh) {
        $dbh->disconnect if $dbh;
    }
}

sub prepare_statement {
    my $driver = shift;
    my($class, $terms, $args) = @_;

    my $dbd = $driver->dbd;
    my $stmt = $args->{sql_statement} || $dbd->sql_class->new;

    if (my $tbl = $driver->table_for($class)) {
        my $cols = $class->column_names;
        my %fetch = $args->{fetchonly} ?
            (map { $_ => 1 } @{ $args->{fetchonly} }) : ();
        my $skip = $stmt->select_map_reverse;
        for my $col (@$cols) {
            next if $skip->{$col};
            if (keys %fetch) {
                next unless $fetch{$col};
            }
            my $dbcol = join '.', $tbl, $dbd->db_column_name($tbl, $col);
            $stmt->add_select($dbcol => $col);
        }

        $stmt->from([ $tbl ]);

        if (defined($terms)) {
            for my $col (keys %$terms) {
                my $db_col = $dbd->db_column_name($tbl, $col);
                $stmt->add_where(join('.', $tbl, $db_col), $terms->{$col});
            }
        }

        ## Set statement's ORDER clause if any.
        if ($args->{sort} || $args->{direction}) {
            my @order;
            my $sort = $args->{sort} || 'id';
            unless (ref $sort) {
                $sort = [{column    => $sort,
                          direction => $args->{direction}||''}];
            }

            foreach my $pair (@$sort) {
                my $col = $dbd->db_column_name($tbl, $pair->{column} || 'id');
                my $dir = $pair->{direction} || '';
                push @order, {column => $col,
                              desc   => ($dir eq 'descend') ? 'DESC' : 'ASC',
                             }
            }

            $stmt->order(\@order);
        }
    }
    $stmt->limit($args->{limit}) if $args->{limit};
    $stmt->offset($args->{offset}) if $args->{offset};

    if (my $terms = $args->{having}) {
        for my $col (keys %$terms) {
            $stmt->add_having($col => $terms->{$col});
        }
    }

    $stmt;
}

sub last_error {
    my $driver = shift;
    return $driver->dbd->map_error_code($DBI::err, $DBI::errstr);
}

1;


syntax highlighted by Code2HTML, v. 0.9.1