For Programmers: Free Programming Magazines  


Home > Archive > ithreads > October 2006 > sharing objects









You are viewing an archived Text-only version of the thread. To view this thread in it's original format and/or if you want to reply to this thread please [click here]

 

Author sharing objects
Christopher Fowler

2006-10-06, 7:07 pm

I'm trying to figure out the best way to share objects. I've got a etst
program that has 2 threads.

Thread 1:
Look for SYN packets and create a Packet object
Add that object to a list.

Thread 2:
Look through list looking for any Packet objects
Display to stdout the packet.


I am not getting the sharing part done write. In order to share Packet
do I have to create the constructor like I did? What is the best
practice for what I'm trying to do?


--- [ Cut Here ] ---------------------------------
use Net::PcapUtils;
use NetPacket::Ethernet;
use NetPacket::IP;
use NetPacket::TCP;
use threads;
use threads::shared;
use Data::Dumper;
use strict;

{
package Packet;

sub new {
shift;
my ($v1, $v2 ) = @_;
my $pkt;
threads::shared::share($pkt);
$pkt = &threads::shared::share({});
bless $pkt, 'Packet'; # Tag object with pkg name
$pkt->{'packet'} = $v1;
$pkt->{'timestamp'} = $v2;

return $pkt; # Return object
}

sub get_timestamp {
my $self=shift;
return $self->{'timestamp'};
}

sub get_packet {
my $self= shift;
return $self->{'packet'};
}
}

$| = 1;
my @objs : shared = ();
my $prog = "tcp[13] = 2 and src net not 192.168.2";

sub get_timestamp {
my $ti = shift;
$ti = time() unless $ti;
my ($sec,$min,$hour,$mday,$mon,$year,$wday,
$yday,$isdst) =
localtime($ti);

my $text = sprintf("%02d-%02d %02d:%02d:%02d",
($mon+1), $mday, $hour, $min, $sec);
return $text;

}

sub usleep {
my $time = shift;
$time = $time * 0.000001;
select(undef, undef, undef, $time);
return;
}

sub now {
return time();
}


sub redir {
my $file = shift;
return unless $file;
open STDOUT, ">> $file" or return;
open STDERR, ">&STDOUT";
open STDIN, "</dev/null";
return;
}

sub resolv {
my $addr = shift;

my $host = `host $addr`;
chomp $host;

return $addr if $host =~ m/not found/;
return $addr unless $host =~ m/pointer\s(.+?)\.$/;

$addr = $1;
return $addr;

}

sub process_thread {

while(1) {

# We do not want to consume much CPU and
# we do not want to sleep too long. Just
# sleep a bit then test again.
while($#objs == -1 ) { usleep(250000); }

# Get the packet off the list and unpack the time
my %p = shift @objs;
my $pkt = $p->get_packet();
my $ti = $p->get_timestamp();

#my ($ti, $pkt) = unpack "Ia*", $pkt;

# get the source IP adrress
my $src_ip = NetPacket::IP->decode(
NetPacket::Ethernet::strip($pkt))->{src_ip};

# I want incoming
next if $src_ip =~ m/209\.168\.246\.233/;

my $pt = NetPacket::TCP->decode(
NetPacket::IP::strip(
NetPacket::Ethernet::strip($pkt)));

print get_timestamp($ti)." SYN ".resolv($src_ip)."($src_ip) -> $pt->{'dest_port'}\n";
}
}

sub catch {
my ($arg,$hdr,$pkt) = @_ ;
#my $data = pack "Ia*", now(), $pkt;
push @objs, Packet->new($pkt, now());
}

sub main {
#my $pid = fork();
#return 0 if $pid;

# in Child
#redir "/opt/SAM/logs/syn.log";
my $pthr = threads->new(\&process_thread);
Net::PcapUtils::loop(\&catch, FILTER=> $prog);
return 0;

}

exit main;

Michael J. Pomraning

2006-10-07, 4:23 am

Christopher Fowler wrote:
> Thread 1:
> Look for SYN packets and create a Packet object
> Add that object to a list.
>
> Thread 2:
> Look through list looking for any Packet objects
> Display to stdout the packet.
>
> I am not getting the sharing part done write. In order to share Packet
> do I have to create the constructor like I did? What is the best
> practice for what I'm trying to do?


One line in your 'process_thread' sub looks typo'd to me:

> my %p = shift @objs;


In any case, sharing your blessings can be tricky in perl.
(threads::shared/BUGS) You might skip blessing to 'Packet' entirely --
it encapsulates only two data members in your test program -- and
instead pass an unblessed arrayref or hashref between threads.

I'd also recommend replacing the "@objs : shared" with a Thread::Queue
object, eliminating the need for sleep-poll behavior.

Good luck,
Mike
Christopher Fowler

2006-10-07, 7:13 pm

On Sat, 2006-10-07 at 00:49 -0500, Michael J. Pomraning wrote:
> In any case, sharing your blessings can be tricky in perl.
> (threads::shared/BUGS) You might skip blessing to 'Packet' entirely
> --
> it encapsulates only two data members in your test program -- and
> instead pass an unblessed arrayref or hashref between threads.


I was going to expand Packet to be an "object" that provides methods to
get any info from the packet. Like get_dest_ip(), etc.



Christopher Fowler

2006-10-12, 7:21 pm

On Sat, 2006-10-07 at 00:49 -0500, Michael J. Pomraning wrote:[color=darkred]
>
> One line in your 'process_thread' sub looks typo'd to me:
>

Yea that is bad code. The original is at

http://buford.linxdev.com/syn_orig.pl


This program simply places the packet in an array for the process thread
to display it. The next step is that I need to place a timestamp with
the packet so that the process thread nows when that packet arrived. To
do that I started heading down the road of creating a Packet object to
append to the list. Apparently not a good idea....



Michael J. Pomraning

2006-10-12, 7:21 pm

Christopher Fowler wrote:
> This program simply places the packet in an array for the process thread
> to display it. The next step is that I need to place a timestamp with
> the packet so that the process thread nows when that packet arrived. To


FWIW, the 'header' argument passed to the pcap loop callback already has
the timestamp of packet receipt from libpcap's perspective.
($header->{tv_sec}, $header->{tv_usec}).

> do that I started heading down the road of creating a Packet object to
> append to the list. Apparently not a good idea....


Perhaps pass an unblest reference through perl's prefab Thread::Queue class?

# untested
my $q = Thread::Queue->new;
...
sub pcap_callback {
my (undef, $hdr, $pkt) = @_;
my $job = &share([]);

@$job = ($pkt, $hdr->{tv_sec});
$q->enqueue($job);
}
...
sub worker_thread {
while (1) {
my $job = $q->dequeue();
my ($pkt, $when) = @$job;
...
}
}

HTH,
Mike
Christopher Fowler

2006-10-12, 7:21 pm

On Tue, 2006-10-10 at 09:23 -0500, Michael J. Pomraning wrote:
> Perhaps pass an unblest reference through perl's prefab Thread::Queue
> class?


That should be the best way. Is there any great way to share objects
between threads?

Dean Arnold

2006-10-12, 7:21 pm

Christopher Fowler wrote:
> On Tue, 2006-10-10 at 09:23 -0500, Michael J. Pomraning wrote:
>
> That should be the best way. Is there any great way to share objects
> between threads?
>
>


Sharing objects between threads is simple: just make sure that you
use a shared array or hash for your object, and only assign either
simple scalars or shared refs to its members. Then you can pass
the objects between threads via a simple Thread::Queue object.

I've done a quick rewrite of your example (note I have no idea what
the function of your script is, but the Packet class is a pretty simple container
class, so hopefully this captures what you're attempting).

Some major issues I've noticed:

1) using a `host $addr` to do resolution may be a problem, since
you're shelling out from a thread...I'm not certain how well that's going to work.
I've noted the use of gethostbyaddr() as an alternate (tho I don't know how thread
safe it is either)

2) You never waited to join() your threads; I don't know what effect that
might have on your exit condition

3) The way you setup your object sharing was a bit obtuse; I've simplified it

4) I've replaced your @obj ad-hoc queue with Thread::Queue, which handles
the needed locking (which it appears you weren't applying)

Code below. NOTE: I've not run it, it likely has some syntax errors.

HTH,
Dean Arnold
Presicient Corp.


package Packet;

use threads;
use threads::shared;
use strict;
use warnings;

sub new {
my ($class, $v1, $v2) = @_;
#
# NOTE: this assume that the packet and timestamp
# elements are simple scalars; if not, then you'll
# need more share()'ing to coerce them into something
# that can be assigned to a shared hash
#
my %pkt : shared = (
packet => $v1,
timestamp => $v2
);
return bless \%pkt, $class;
}

sub get_timestamp {
my $self = shift;
#
# FYI/Warning: I've encountered some instances where
# derefencing an element off a shared hash would
# silently discard the returned value, and had to do
# an intermediate assignment to a lexical variable
# before using the value; I've not yet determined
# the cause of the issue yet
#
return $self->{'timestamp'};
}

sub get_packet {
my $self = shift;
#
# see caveat above
#
return $self->{'packet'};
}

1;

package main;

use Net::PcapUtils;
use NetPacket::Ethernet;
use NetPacket::IP;
use NetPacket::TCP;
use threads;
use threads::shared;
use Thread::Queue;
use Data::Dumper;

use strict;
use warnings;

$| = 1;
my $prog = "tcp[13] = 2 and src net not 192.168.2";
#
# create a thread queue
#
my $q = Thread::Queue->new();

sub get_timestamp {
my $ti = shift;
$ti = time() unless $ti;
my ($sec,$min,$hour,$mday,$mon,$year,$wday,
$yday,$isdst) =
localtime($ti);

my $text = sprintf("%02d-%02d %02d:%02d:%02d",
($mon+1), $mday, $hour, $min, $sec);
return $text;

}

sub now {
return time();
}

sub resolv {
my $addr = shift;
#
# OUCH! this may never work properly...forking from a thread
# may lead to chaos, esp. on Win32...isn't there a better
# Perlish solution (e.g., get host by addr ?
#
# my $host = gethostbyaddr($addr, AF_INET);
#
my $host = `host $addr`;
chomp $host;

return $addr if $host =~ m/not found/;
return $addr unless $host =~ m/pointer\s(.+?)\.$/;

$addr = $1;
return $addr;

}

sub process_thread {
while(1) {

#
# wait for a packet (or for 'DIE')
#
my $pktobj = $q->dequeue();
#
# if not a ref, then it must be 'DIE'
#
return 1
unless ref $pktobj;
#
# Get the packet off the list and unpack the time
#
my $pkt = $pktobj->get_packet();
my $ti = $pktobj->get_timestamp();

# get the source IP adrress
my $src_ip = NetPacket::IP->decode(
NetPacket::Ethernet::strip($pkt))->{src_ip};

# I want incoming
next if $src_ip =~ m/209\.168\.246\.233/;

my $pt = NetPacket::TCP->decode(
NetPacket::IP::strip(
NetPacket::Ethernet::strip($pkt)));

print get_timestamp($ti)." SYN ".resolv($src_ip)."($src_ip) -> $pt->{'dest_port'}\n";
}
}

sub catch {
my ($arg,$hdr,$pkt) = @_ ;
#
# enqueue the packet for processing
#
$q->enqueue(Packet->new($pkt, now()));
}

#
# spawn the thread
#
my $pthr = threads->new(\&process_thread);
#
# start snooping
#
Net::PcapUtils::loop(\&catch, FILTER=> $prog);
#
# I don't know what your exit condition is ???
# but here's how I've instrumented cleanup
# NOTE that if you start >1 thread, you'll need
# to send multiple DIE's, and join all the threads
#
$q->enqueue('DIE');
$pthr->join();

Dean Arnold

2006-10-12, 7:21 pm

Dean Arnold wrote:
> Christopher Fowler wrote:
>
> I've done a quick rewrite of your example (note I have no idea what
> the function of your script is, but the Packet class is a pretty simple
> container
> class, so hopefully this captures what you're attempting).


Big OOOPS on my part. I forgot one little detail: you have to rebless
the object when its dequeue'd:

>
> sub process_thread {
> while(1) {
>
> #
> # wait for a packet (or for 'DIE')
> #
> my $pktobj = $q->dequeue();
> #
> # if not a ref, then it must be 'DIE'
> #
> return 1
> unless ref $pktobj;


#
# You have to rebless in the receiving thread
#
bless $pktobj, 'Packet';
> #
> # Get the packet off the list and unpack the time
> #
> my $pkt = $pktobj->get_packet();
> my $ti = $pktobj->get_timestamp();


FWIW (and blatant plug): if your Packet class becomes more complex, esp.
if it has object refs or file handles as members,
you might check out Thread::Queue::Duplex, which provides a
Thread::Queue::Queueable base class which your Packet class can subclass
to implement its own marshalling/unmarshalling (aka curse()/redeem())
methods to deal with recovering things.

Dean Arnold
Presicient Corp.
Christopher Fowler

2006-10-12, 7:21 pm

Thanks,

I'll try these ideas.

My programs are in no way going into production :). These are just
exercises so that I can understand how threads work in Perl. I
appreciate the help.

I think my next exercise will be a bit harder. I'm going to take a
program I've done in threads in C and convert to Perl. It is a device
monitor that uses simple ping to make sure devices are up. There is 1
thread that walks through the list of objects doing notification
(email). There is a thread for each IP address. The whole idea of
doing it in threads is so that we can guarantee that every 60 seconds we
ping a host. If we simply walked through a list a couple of unreachable
addresses could throw that guarantee out the window. In the C version I
created a linked list of objects. Each thread worked on one object and
the notification thread simply walked that list looking for things to
act on. Hopefully I can get the Perl program to do the same thing :)



On Tue, 2006-10-10 at 10:25 -0700, Dean Arnold wrote:
> Dean Arnold wrote:
>
> Big OOOPS on my part. I forgot one little detail: you have to rebless
> the object when its dequeue'd:
>
>
> #
> # You have to rebless in the receiving thread
> #
> bless $pktobj, 'Packet';
>
> FWIW (and blatant plug): if your Packet class becomes more complex, esp.
> if it has object refs or file handles as members,
> you might check out Thread::Queue::Duplex, which provides a
> Thread::Queue::Queueable base class which your Packet class can subclass
> to implement its own marshalling/unmarshalling (aka curse()/redeem())
> methods to deal with recovering things.
>
> Dean Arnold
> Presicient Corp.


Dean Arnold

2006-10-12, 7:21 pm

Jack Steadman wrote:
> On 10/10/06, Dean Arnold <darnold@presicient.com> wrote:
>
>
> I've found it convenient to store the class name as a data member of
> the class and implement a simple 'rebless' function to automatically
> rebless shared objects when necessary. So for example:
>
> $obj = rebless($queue->dequeue());
>
> sub rebless {
> my $obj = shift;
> return bless $obj, $obj->{'class_name'};
> }
>
> You could even subclass or wrap Thread::Queue to do this automatically
> if you know that all of your queued items will be objects which follow
> this pattern.
>
> Jack
>


Thats the general idea behind Thread::Queue::Duplex's Thread::Queue::Queueable
base class, except its more generic to permit app-specific marshalling.
I've considered making it a default behavior in TQQ, but since its hard
to know what the members of an object might be - which might require deep
traversal, or even Storable to handle - I've kept the default curse()/redeem()
methods fairly simple. Thread::Apartment implements a more complex/complete version
to marshall params back&forth between apartment threaded objects (including
support for passing closure proxies).

- Dean
Jack Steadman

2006-10-12, 7:21 pm

On 10/10/06, Dean Arnold <darnold@presicient.com> wrote:
> Dean Arnold wrote:
> Big OOOPS on my part. I forgot one little detail: you have to rebless
> the object when its dequeue'd:


> #
> # You have to rebless in the receiving thread
> #
> bless $pktobj, 'Packet';


I've found it convenient to store the class name as a data member of
the class and implement a simple 'rebless' function to automatically
rebless shared objects when necessary. So for example:

$obj = rebless($queue->dequeue());

sub rebless {
my $obj = shift;
return bless $obj, $obj->{'class_name'};
}

You could even subclass or wrap Thread::Queue to do this automatically
if you know that all of your queued items will be objects which follow
this pattern.

Jack
Sponsored Links







Also available: Server administration forum archive | Web Design forum archive | Software forum archive | Hardware reviews archive

Copyright 2008 codecomments.com