For Programmers: Free Programming Magazines  


Home > Archive > ithreads > June 2005 > One part solved, Threads::Synchronized still an issue









You are viewing an archived Text-only version of the thread. To view this thread in it's original format and/or if you want to reply to this thread please [click here]

 

Author One part solved, Threads::Synchronized still an issue
PerlDiscuss - Perl Newsgroups and mailing lists

2005-06-08, 9:03 pm

To answer my own question, and to provide some more information...

1. SOLUTION: My implementation of the shared variables in the object
fields does work. I was just calling lock() on an incorrect (undefined)
hash reference...DOH!

2. PROBLEM: Threads::Synchronized is still reporting lock issues when I
try to call a method that has 'synchronized method' specified in it's
definition. Any thoughts on this?

3. COMMENT: I discovered how to specify object methods in thread->new()
calls. Given a blessed object $obj with method 'foo' defined in the
object's package 'My::Pkg', you can start a thread with:

my $thr = thread->new(\&My::Pkg::foo, $obj);

Using this form, it is possible to specify 'foo : synchronized { ... }' in
the definition of foo, but 'synchronized method' still fails with the
usual 'thread failed to start: lock can only be used on shared values at
...' error. Is there another way I can create new threads on an object
that both start a proper thread AND use the synchronized method? For now,
it looks like I'll have to use Thread::Semaphore to emulate this behavior.

-Eric



Perldiscuss - Perl Newsgroups And Mailing Lists wrote:

> Hi,


> Currently, I am trying to implement an ithread-based DBI connection
> pool for an application I am developing for a client. I have verified
> that the DBD::Oracle driver I am using is ithread safe by properly sharing
> the ora_dbh_share parameter(see
> http://search.cpan.org/~timb/DBD-Or...nect_Attributes)
> within all threads that will be accessing the database.


> The issues I am running into consist of two parts, but appear to be
> related to a single parent problem: shared variables and blessed object
> references. First, I am attempting to use Thread::Synchronized to insure
> that my database pool control methods (add to, subtract from, and monitor
> pool) can only be called by one thread at a time (so I don't have to worry
> about sharing semaphores to do this). Secondly, in tune to the current
> inherent limitations of threads::shared, I am attempting to share only
> specific hashrefs and arrayrefs of my class fields that are cricital to be
> accessable/modifiable by all threads. With a non-object (e.g. blessed)
> hashref, I have been able to prove the concept that, given
> my $hash = {1=>0}
> I can set
> share($hash{1});
> $hash{1} = 0;
> Then, within any thread, do
> {
> lock($hash{1});
> $hash{1}++;
> }
> and the lock will be properly respected and the variable shared
> appropriatly incremented globally.


> The issue I am running into is: 'lock can only be used on shared values
> at ...' at any place where I am attempting to either declare a
> synchronized method (i.e. 'sub foo : synchronized method { ... }') or
> lock a variable I shared earlier using 'shared($self->{foo})'. As you
> will see in the code, I have followed the design assuming that share()
> will clobber any pre-exisiting data in the specified data structure.


> In threads::shared POD, I saw the following in the BUGS section:
> "share() allows you to share $hashref->{key} without giving any error
> message. But the $hashref->{key} is not shared, causing the error "locking
> can only be used on shared values" to occur when you attempt to lock
> $hasref->{key}." Thus, I assume I'm currently running into some variant
> of this issue when trying to access fields I shared within a blessed
> object.


> Any insight or suggestions would be greatly appreciated. I'm pretty
> sure I can fall back to flattening out the contents of this class into the
> main calling program (.pl) to subvert these issue, but it would be a very
> messy implementation IMO.


> The code is just a work in progress, but is enough to demonstrate the
> issues at hand.



> ########################################
######################
> # DBIPool.pm
> ########################################
######################


> require 5.008;
> package DBIPool;
> use strict;
> use warnings;
> use Carp;
> use vars qw($debug);
> use threads;
> use threads::shared;
> use Thread::Queue;
> use Thread::Synchronized;
> use DBI;
> $debug = 0;


> ### CLASS FIELDS
> my (%const, %var, %private);
> %const = (
> 'param_default' => {
> 'dbi_params' => undef
> ,'dbi_db' => 'ubd02'
> ,'dbi_user' => 'rerix_engine'
> ,'dbi_pass' => 'rerix_engine'
> ,'max_connect' => 12
> ,'min_connect' => 4
> ,'keep_alive' => 120 #seconds between keepalive signals for min

connections
> ,'timeout_extra' => 120 #seconds before extra connections (beyond min)
> disconnect
> ,'timeout' => 300 #seconds before warning given for unresponsive
> connection (not returned to pool)
> }
> ,'param' => {
> 'dbi_params' => undef
> ,'dbi_db' => undef
> ,'dbi_user' => undef
> ,'dbi_pass' => undef
> ,'max_connect' => undef
> ,'min_connect' => undef
> ,'keep_alive' => 120
> ,'timeout_extra' => 120
> ,'timeout' => 300
> }
> ,'max_connect_total' => 32 #max allowed at any time
> );
> %var = ();
> %private = (
> 'dbi_str_prefix' => 'DBI:Oracle:'
> ,'rs_pool' => [('') x $const{max_connect_total}]
> #elements:$ora_dbh_share_handle (must be initialized to empty string)
> ,'rs_avail' => Thread::Queue->new #elements:pool idx
> ,'rs_avail_num' => 0
> ,'rs_inuse' => [] #elements:in use start ts (epoch sec) (idx corresponds

to
> pool idx)
> ,'rs_inuse_num' => 0
> ,'rs_monitor_tid' => undef
> ,'rs_monitor_alive' => 0
> ,'rs_monitor_sleep' => 2 #seconds between monitor wakeup requests
> );


> ########################################
################################
> sub new (%)
> {
> my $proto = shift;
> my %param = @_;


> ### input params ###
> $const{param}->{max_connect} = ($param{max_connect} =~ m/D/ ||
> $param{max_connect} == 0)?
> $const{param_default}->{max_connect} : $param{max_connect} >
> $const{max_connect_total} ?
> $const{max_connect_total} : $param{max_connect};
> $const{param}->{min_connect} = ($param{min_connect} =~ m/D/ ||
> $param{min_connect} == 0)?
> $const{param_default}->{min_connect} : $param{min_connect} >
> $const{param}->{max_connect} ?
> $const{param}->{max_connect} : $param{min_connect};
> $const{param}->{dbi_params} = (defined $param{dbi_params} && ref
> $param{dbi_params} eq 'HASH') ?
> $param{dbi_params} : $const{param_default}->{dbi_params};
> undef $const{param}->{dbi_params}->{ora_dbh_share}; #explicitly undefine
> ora_dbh_share so DBD::Oracle isn't
> $const{param}->{dbi_db} = defined $param{dbi_db} ? $param{dbi_db} :
> $const{param_default}->{dbi_db};
> $const{param}->{dbi_user} = defined $param{dbi_user} ? $param{dbi_user} :
> $const{param_default}->{dbi_user};
> $const{param}->{dbi_pass} = defined $param{dbi_pass} ? $param{dbi_pass} :
> $const{param_default}->{dbi_pass};


> ### create object ###
> my $class = ref($proto) || $proto;
> my $self = { #fields
> %const
> ,%var
> ,%private
> ,'_final' => %const #constant class fields
> ,'_permitted' => %var #public class variables
> ,'_private' => %private #private class variables
> };


> bless ($self, $class);


> ### declare shared variables ###
> share($self->{rs_pool}); $self->{rs_pool} = $private{rs_pool}; #clumsy but
> necessary due to threads::shared limitations
> share($self->{rs_avail}); $self->{rs_avail} = $private{rs_avail};
> share($self->{rs_avail_num}); $self->{rs_avail_num} =

$private{rs_avail_num};
> share($self->{rs_inuse}); $self->{rs_inuse} = $private{rs_inuse};
> share($self->{rs_inuse_num}); $self->{rs_inuse_num} =

$private{rs_inuse_num};
> share($self->{rs_monitor_tid}); $self->{rs_monitor_tid} =
> $private{rs_monitor_tid};
> share($self->{rs_monitor_alive}); $self->{rs_monitor_alive} =
> $private{rs_monitor_alive};


> ### create initial pool ###
> my $connect_success = $self->_inc_pool() x
> $self->{param}->{min_connect}; #Database.pm reports errors
> if ($connect_success =~ m/0/)
> {
> my $monitor_thr = threads->new($self->_pool_monitor());
> $self->{rs_monitor_tid} = $monitor_thr->tid;
> }


> return $connect_success =~ m/0/o ? $self : undef;
> }


> sub DESTROY
> {
> my $self = shift;
> $self->{rs_monitor_alive} = 0; #terminate resource monitor (terminates once
> it wakes from sleep)
> my $rs_monitor = threads->object($self->{rs_monitor_tid});
> $rs_monitor->detach;
> while (@{$self->{rs_pool}})
> {
> $self->_dec_pool();
> }
> }
> ########################################
################################
> sub _get_dbh ($)
> {
> my $self = shift;
> my $ora_dbh_share = $_[0];


> return undef;
> }


> sub _pool_monitor () : synchronized method {
> my $self = shift;
> warn "***In pool Monitorn" if $debug;
> unless ($self->{rs_monitor_alive})
> {
> $self->{rs_monitor_alive} = 1;
> while ($self->{rs_monitor_alive})
> {
> warn "***Pool monitor awake...n" if $debug;
> ### send keepalive ping to each connection ###
> for (my $i = 0; $i < $self->{rs_avail_num}; $i++)
> {
> if (defined $self->{rs_pool}->[$i])
> {
> # my $dbh = _get_dbh($self->{rs_pool}->[$i]);
> }
> }


> ### check for extra sleeping connections that should be closed ###


> ### warn on connections that have been in use too long (caller thread
> may have died?) --debugging ##


> warn "***Pool monitor going to sleep...n" if $debug;
> sleep $self->{rs_monitor_sleep};
> }
> }


> return $self->{pool_monitor};
> }


> sub _inc_pool : synchronized method {
> my $self = shift;
> lock($self->{db_pool});
> lock($self->{db_pool_avail});
> lock($self->{db_pool_inuse});
> if ($self->{db_pool_avail}->pending >
> $self->{param}->{max_connect}) #debugging
> {
> die "thread issue: total connections ".$self->{db_pool_avail}->pending."
> }
> unless ($self->{db_pool_avail}->pending == $self->{param}->{max_connect})
> {
> my %dbi_params = (ora_dbh_share => $self->{rs_pool});
> %dbi_params = defined $self->{dbi_params} ? (%dbi_params,
> %{$self->{dbi_params}}) : ();
> # my $db = DBI->new(dbi_params=>{%{$self->{dbi_params}}});


> }
> return 1;
> }


> sub _dec_pool () : synchronized method {}


> sub select {


> }


> sub insert {


> }


> sub update {


> }


> sub query {


> }


> sub procedure {


> }


> 1;



> Thanks,
> Eric



Sponsored Links







Also available: Server administration forum archive | Web Design forum archive | Software forum archive | Hardware reviews archive

Copyright 2008 codecomments.com