Code Comments

Programming Forum and web based access to our favorite programming groups.
For Programmers: Free Programming Magazines | New: Database administration forum
Registration is free! Edit your profileCalendarFind other membersFrequently Asked QuestionsSearch -> 
Post New Thread











Thread
Author

[PATCH] Advanced tagging and filtering
Hi Justin,

I was looking for a nice IPC module on CPAN and what did I find?  Yet
another Perl Module created by you :)

Unfortunately was IPC::DirQueue really slow in one of the cases I tried to
use it for.  Thats why I wrote the attached patch.  But first:  What do I
need?

I have a server which has a tcpdump running, listening for UDP packets, and
where you can connect via TCP to request notifications about some kind of
packets.  The notifications are based on ids which are inside the UDP
packets.  (This is actually used to do some UDP hole punching as a
University project.)

So we've got two parallel queues, one for the incoming requests and one for
the outgoing replies.  On the one side we can have n processes pushing
requests into the incoming queue and waiting for replies on the outgoing
one.  On the other side is just a single consumer/producer, which parses
the tcpdump output, looks wether there's a request for the id in the
incoming queue and if so, puts a reply into the outgoing queue which is
then sent via TCP to the user.

That worked pretty well but at some point I got an almost-deadlock:  If one
of the clients pushes loads of requests into the incoming queue, it can
overwhelm the consumer which can happen to look at all those new packets
first before it sees the request it was actually looking for.  Additionally
is there quite some overhead because each time the metadata has to be read
first before I can identify the owner.

Actually the following simple code can have the effect I described above:
use IPC::DirQueue;
my $dq = IPC::DirQueue->new({ dir => 'dq' });
$dq->enqueue_string("foo!", {id => 'foo'});
$dq->enqueue_string("bar!", {id => 'bar'});
while (1) {
my $job = $dq->wait_for_queued_job(0, 0.1);
my $id = $job->{metadata}->{id};
#print $id;
if ($id eq 'foo') {
my $data = $job->get_data();
$job->finish();
print $data;
}
else {
$job->return_to_queue();
}
}
If started often enough, at some point we will have loads of bars which
block the foos.  Just uncomment the print $id and the app will scream bar
all the time but never foo.  Hmmm... I guess there's actually another bug
hidden because sometimes I see no foo at all; maybe the dir iterator is
reset or something.

Whatever, I actually went to increase the speed for applications like this.
The attached patch does the following:

a)  Adds a parameter file_mode to the constructor;  doesn't really increase
speed but I guess in most cases one wants to change both modes and this way
its less code :)

b)  Makes the TAG part of the filename configurable.  Ah, yeah, TAG was
called HASH before :)  I added the parameters tag (static tag), tag_sub
(callback to generate a dynamic tag) and two more (tag_max_length and
tag_warn, cf. inline doc).

c)  Adds $filter parameter to wait_for_queued_job() and pickup_queued_job().
These accept a regular expression which is matched against the queue file
name.  I don't really like the position of the $filter parameter in the
wait method, I'd prefer to have it as the first parameter and experimented
with a 'ref $_[1] eq Regexp' to keep it backwards compatible but then just
went the simple way and put it to the end :)

d)  The actual filtering stuff is done in the iterators.  I guess I did
quite some refactoring/rewriting there.  The most important thing is the
$iter->{filter} code, that I made the iterator more or less independent
from the $self object and thus could use it to rewrite the fanout stuff to
use a recursive (unordered) iterator, thus reducing code duplication.

The tests all work (again) and I attached another one for the stuff I did.

So, with the patch, change the code above into the following and its fast
as... dunno, but something really fast :)
use IPC::DirQueue;
my $dq = IPC::DirQueue->new({ dir => 'dq', tag_sub => sub {
return $_[0]->{metadata}->{id};
}});
$dq->enqueue_string("foo!", {id => 'foo'});
$dq->enqueue_string("bar!", {id => 'bar'});
while (1) {
my $job = $dq->wait_for_queued_job(0, 0.1, qr/\.foo(\..*|)$/);
my $id = $job->{metadata}->{id};
#print $id;
my $data = $job->get_data();
print $data;
$job->finish();
}

Cheers,
Malte


Report this thread to moderator Post Follow-up to this message
Old Post
Malte S. Stretz
12-30-06 12:23 AM


Re: [PATCH] Advanced tagging and filtering
hi Malte!

Thanks for this, it looks very useful indeed.  One tweak, though:

> b)  Makes the TAG part of the filename configurable.  Ah, yeah, TAG was
> called HASH before :)  I added the parameters tag (static tag), tag_sub
> (callback to generate a dynamic tag) and two more (tag_max_length and
> tag_warn, cf. inline doc).

I would prefer to *add* the TAG part *as well as* the HASH stuff.
The hash is useful, since it avoids collisions -- TAG doesn't make
that guarantee (nor should it have to).

That should be OK, right?

otherwise, all looks good...

--j.

Report this thread to moderator Post Follow-up to this message
Old Post

01-03-07 01:26 PM


Re: [PATCH] Advanced tagging and filtering
On Wednesday 03 January 2007 13:02 CET Justin Mason wrote:
> hi Malte!
>
> Thanks for this, it looks very useful indeed.  One tweak, though: 
>
> I would prefer to *add* the TAG part *as well as* the HASH stuff.
> The hash is useful, since it avoids collisions -- TAG doesn't make
> that guarantee (nor should it have to).
>
> That should be OK, right?

Yeah, that was my original plan.  But I wanted to keep the filename format
backwards compatible.  Not sure if thats really needed, I also wasn't sure
if I should add the tag in the front, the end or in between.  In the end I
went for compatibility and assumed that in most cases the tags will still
be quite unique -- at least in the producer/consumer case where you've got
most probably some kind of pid in there they will be at least as unique as
the current hostname/pid based ones.  And if not, we've got the timestamp
plus the additional really random part at the end :)

Actually, when people start to filter the files based on their name, the
format must not change anymore in future.  Maybe the filter should apply to
the tag only...

Cheers,
Malte

Report this thread to moderator Post Follow-up to this message
Old Post
Malte S. Stretz
01-04-07 12:25 AM


Re: [PATCH] Advanced tagging and filtering
Malte S. Stretz writes:
> On Wednesday 03 January 2007 13:02 CET Justin Mason wrote: 
>
> Yeah, that was my original plan.  But I wanted to keep the filename format
> backwards compatible.  Not sure if thats really needed, I also wasn't sure
> if I should add the tag in the front, the end or in between.  In the end I
> went for compatibility and assumed that in most cases the tags will still
> be quite unique -- at least in the producer/consumer case where you've got
> most probably some kind of pid in there they will be at least as unique as
> the current hostname/pid based ones.  And if not, we've got the timestamp
> plus the additional really random part at the end :)

backwards compatibility is not a problem here, so go ahead and
add the tag in addition to the hash.

I think the best location is just before the hash, e.g.

50.20040909232529941258.TAG.HASH[.PID.RAND]

(vs the simple

50.20040909232529941258.HASH[.PID.RAND]

when tags are not in use.)

> Actually, when people start to filter the files based on their name, the
> format must not change anymore in future.  Maybe the filter should apply t
o
> the tag only...

Yes, I think that's a good idea... people shouldn't have to worry about
the rest of the filename changing.

--j.

Report this thread to moderator Post Follow-up to this message
Old Post

01-04-07 12:25 AM


Re: [PATCH] Advanced tagging and filtering
On Wednesday 03 January 2007 16:38 CET Justin Mason wrote:
> backwards compatibility is not a problem here, so go ahead and
> add the tag in addition to the hash.
>
> I think the best location is just before the hash, e.g.
>
>     50.20040909232529941258.TAG.HASH[.PID.RAND]
>
> (vs the simple
>
>     50.20040909232529941258.HASH[.PID.RAND]
>
> when tags are not in use.)
> 
>
> Yes, I think that's a good idea... people shouldn't have to worry about
> the rest of the filename changing.

Ok, attached is a patch on top of the last one which changes the code as you
described it above.  Additionally there's a revamped test.

While I was hunting another totally stupid bug, I found out that I actually
forgot to add the $filter in the second call to pickup_job in wait_for_job.
Gave some nasty surprises.  And visit_all_jobs now supports $filter, too.

The syntax for $filter has changed a bit:  If it is a string, it must be
equal the tag, if it is a RE created with qr// it is matched as a RE.

Can you update the QUEUE DIRECTORY STRUCTURE section for me?  Your English
is a lot better than mine :)

Cheers,
Malte


Report this thread to moderator Post Follow-up to this message
Old Post
Malte S. Stretz
01-04-07 12:25 AM


Re: [PATCH] Advanced tagging and filtering
hey, btw -- something's up with the latest version, at least; when
I apply the patches and make test, it hangs in t/10enq_string as soon
as pickup_queued_job() is called.  SVN trunk works fine, as does
the first patch, but as soon as the second patch is applied this
happens.

Here's the current rev of the combined patch against SVN:
http://taint.org/x/2007/filter.patch

That contains your two patches, my documentation changes, and a "warn" in
the test script to illustrate this hang.

--j.

Malte S. Stretz writes:
> On Wednesday 03 January 2007 16:38 CET Justin Mason wrote: 
>
> Ok, attached is a patch on top of the last one which changes the code as y
ou
> described it above.  Additionally there's a revamped test.
>
> While I was hunting another totally stupid bug, I found out that I actuall
y
> forgot to add the $filter in the second call to pickup_job in wait_for_job
.
> Gave some nasty surprises.  And visit_all_jobs now supports $filter, too.
>
> The syntax for $filter has changed a bit:  If it is a string, it must be
> equal the tag, if it is a RE created with qr// it is matched as a RE.
>
> Can you update the QUEUE DIRECTORY STRUCTURE section for me?  Your English
> is a lot better than mine :)
>
> Cheers,
> Malte
> part 3     text/x-diff               7472
> --- DirQueue.pm.patch1	2006-12-30 01:27:31.000000000 +0100
> +++ DirQueue.pm	2007-01-03 21:08:17.000000000 +0100
> @@ -173,8 +173,10 @@
>      $self->{ordered} = 1;
>    }
>
> -  $self->{tag} ||= hash_string_to_filename($self->gethostname().$$);
> -  $self->{tag_sub} ||= sub { return $self->{tag}; };
> +  $self->{hash} ||= hash_string_to_filename($self->gethostname().$$);
> +  if (defined $self->{tag}) {
> +    $self->{tag_sub} ||= sub { return $self->{tag}; };
> +  }
>    $self->{tag_max_length} ||= 128;
>    if (!defined $self->{tag_warn}) {
>      $self->{tag_warn} = 1;
> @@ -456,9 +458,10 @@
>
>  Pick up the next job in the queue, so that it can be processed.
>
> -The parameter C<$filter> can be used to specify a regular expression whic
h
> -is matched against the queued filename.  All files which don't match will
 be
> -skipped.
> +The parameter C<$filter> can be used to specify either a string or a regu
lar
> +expression (with qr//) which is compared (the the first case) or matched
> +(in the latter case) against the tag part of the queued filename.  All fi
les
> +which don't match will be skipped.
>
>  If no job is available for processing, either because the queue is
>  empty or because other worker processes are already working on
> @@ -709,7 +712,7 @@
>      while (time == $qdirlaststat) {
>        Time::HiRes::usleep ($pollintvl);
>        dbg "wait_for_queued_job: spinning until time != stat $qdirlaststat
";
> -      my $job = $self->pickup_queued_job();
> +      my $job = $self->pickup_queued_job($filter);
>        if ($job) { return $job; }
>      }
>
> @@ -739,7 +742,7 @@
>
>   ########################################
#################################
##
>
> -=item $job = $dq->visit_all_jobs($visitor, $visitcontext);
> +=item $job = $dq->visit_all_jobs($visitor, $visitcontext, $filter);
>
>  Visit all the jobs in the queue, in a read-only mode.  Used to list
>  the entire queue.
> @@ -759,15 +762,17 @@
>    'active_host': the hostname on which the job is active
>    'active_pid': the process ID of the process which picked up the job
>
> +The jobs can be filtered with C<$filter> as in C<pickup_queued_job()>.
> +
>  =cut
>
>  sub visit_all_jobs {
> -  my ($self, $visitor, $visitcontext) = @_;
> +  my ($self, $visitor, $visitcontext, $filter) = @_;
>
>    my $pathqueuedir = $self->q_subdir('queue');
>    my $pathactivedir = $self->q_subdir('active');
>
> -  my $iter = $self->queue_iter_start($pathqueuedir);
> +  my $iter = $self->queue_iter_start($pathqueuedir, $filter);
>
>    my $nextfile;
>    while (1) {
> @@ -1118,30 +1123,39 @@
>
>    my @gmt = gmtime ($job->{time_submitted_secs});
>
> -  # NN.20040718140300MMMM.tag[.rand]
> +  # NN.20040718140300MMMM[.tag].hash[.rand]
>    #
>    # NN = priority, default 50
>    # MMMM = microseconds from Time::HiRes::gettimeofday()
> -  # tag = some base64-ish string, default hash(hostname.$$)
> +  # hash = hash(hostname.$$)
> +  # tag = some base64-ish string
>    # hostname = current hostname
>
> -  my $base = sprintf ("%02d.%04d%02d%02d%02d%02d%02d%06d.",
> +  my $file = sprintf ("%02d.%04d%02d%02d%02d%02d%02d%06d",
>          $job->{pri},
>          $gmt[5]+1900, $gmt[4]+1, $gmt[3], $gmt[2], $gmt[1], $gmt[0],
>          $job->{time_submitted_msecs});
>
> +  # add tag (including leading dot, only if wanted) and hash
> +  $file .= $self->get_q_filename_tag($job).".".$self->{hash};
> +
>    # normally, this isn't used.  but if there's a collision,
>    # all retries after that will do this; in this case, the
>    # extra anti-collision stuff is useful
> -  my $extra = $addextra ? ".".$$.".".$self->get_random_int() : "";
> +  if ($addextra) {
> +    $file .= ".".$$.".".$self->get_random_int();
> +  }
>
> -  return $base.$self->get_q_filename_tag($job, $base, $extra).$extra;
> +  return $file;
>  }
>
>  sub get_q_filename_tag {
> -  my($self, $job, $base, $extra) = @_;
> -  # create a (new?) tag
> -  my $str = $self->{tag_sub}->($job, $base, $extra) || '';
> +  my($self, $job) = @_;
> +  # return an empty string if there's nothing to do
> +  return '' unless defined $self->{tag_sub};
> +  # create a (new?) tag, possibly empty
> +  my $str = $self->{tag_sub}->($job);
> +  return '' unless defined $str;
>    # weed out all dangerous chars
>    my $tag = filter_unsafe_chars($str);
>    # limit the length
> @@ -1150,7 +1164,7 @@
>    if ($self->{tag_warn} && $tag ne $str) {
>      warn "IPC::DirQueue: the tag was filtered\n";
>    }
> -  return $tag;
> +  return ".$tag";
>  }
>
>  sub hash_string_to_filename {
> @@ -1242,23 +1256,37 @@
>
>  sub queue_iter_start {
>    my ($self, $pathqueuedir, $filter, $type) = @_;
> -
> -  $filter ||= qr/^/;
> +
> +  $filter = qr// unless defined $filter;
>    dbg ("queue iter: filter $filter in $pathqueuedir");
>    unless (ref $filter eq 'CODE') {
>      # we need to copy $filter here else the closure will get annoyed
> -    my $f = $filter;
> +    my $re = $filter;
> +    unless (ref ($re) eq 'Regexp') {
> +      $re = quotemeta($re);
> +      $re = qr/^${re}$/;
> +    }
>      $filter = sub {
> -                if (wantarray) { # grep is picky about list context
> -                  return grep { /^\d/ && /$f/ } @_;
> -                }
> -                else {
> -                  $_ = shift;
> -                  return unless defined;
> -                  return unless /^\d/;
> -                  return unless /$f/;
> -                  return $_;
> +                # we can't use grep because it is picky about
> +                # list context and this sub can be called for
> +                # single files and grep would return the count
> +                # in those cases
> +                my @r;
> +                while (@_) {
> +                  my $f = shift;
> +                  next unless defined $f;
> +                  next unless $f =~ /^\d/;
> +                  # we've got to go split here to make the ^
> +                  # and $ anchors work
> +                  my @f = split(/\./, $f);
> +                  # does this file have a tag (ie. even number
> +                  # of elements)?
> +                  next if scalar (@f) % 2;
> +                  # apply the tag
> +                  next unless $f[2] =~ $re;
> +                  push(@r, $f);
>                  }
> +                return wantarray ? @r : $r[0];
>                };
>    }
>
> @@ -1358,11 +1386,11 @@
>      @files = sort $iter->{filter}->(readdir(DIR));
>      closedir DIR;
>    }
> -
> +
>    if (scalar @files <= 0) {
>      return if $self->queuedir_is_bad($iter->{dir});
>    }
> -
> +
>    $iter->{type} = 'files';
>    $iter->{files} = \@files;
>    return $iter;
> @@ -1555,8 +1583,7 @@
>
>  The filename format is as follows:
>
> -    50.20040909232529941258.TAG[.PID.RAND]
> -    |        base          |tag | extra |
> +    50.20040909232529941258[.TAG].HASH[.PID.RAND]
>
>  The first two digits (C<50> ) are the priority of the job.  Lower priority
>  numbers are run first.  C<20040909232529> is the current date and time wh
en the
> @@ -1568,10 +1595,8 @@
>  possible to set this manually with the C<tag> parameter of the constructo
r, or even
>  to have it generated dynamically by the routine set with C<tag_sub>.
>
> -The sub routine is called with the parameters C<$job>, C<$base> and C<$ex
tra>.  The
> -first is the job currently enqueued and thus allows access to the metadat
a.  The
> -latter two parameters correspond to the already known parts of the filena
me as
> -shown above (including the dots).
> +The sub routine is called with a single parameter C<$job>.  This is the j
ob
> +currently enqueued and thus allows access to the metadata.
>
>  Note that only characters from the set [A-Za-z0-9+_] are allowed and the 
length
>  of the string is limited to 128 characters.  If any of these restrictions are[/co
lor]

Report this thread to moderator Post Follow-up to this message
Old Post

01-04-07 01:23 PM



I just love anal sex, sorry...
http://I-Love-Anal.info/videos/medi...php?file=218571

Report this thread to moderator Post Follow-up to this message
Old Post
Amtapjo
03-19-07 11:29 PM



[url]http://Celine-Dion-facestanding-movies.org/WindowsMediaPlayer.php?movie=148803[/ur
l]

Report this thread to moderator Post Follow-up to this message
Old Post
Dicktard09
04-13-07 12:42 PM



http://Halle-Berry-anal-action.org/...hp?movie=148803

Report this thread to moderator Post Follow-up to this message
Old Post
Anmopo80
05-05-07 07:48 AM



Anjelina Jolie doing it!
http://Angelina-Jolie-doing-it.info...p?movie=1673286

Report this thread to moderator Post Follow-up to this message
Old Post
Moterjodick27
05-18-07 03:15 AM


Sponsored Links




Last Thread Next Thread Next
Pages (2): [1] 2 »
Search this forum -> 
Post New Thread

IPC DirQueue archive

Show a Printable Version Send to friend Email This Page to Someone! subscribe to this thread Receive updates to this thread
Computer Consultants
Programming Jobs
Visual Basic Controls
SQL Server Programming
Webservices
Java Security
Visual Studio
C# Programming
Visual J++
Software engineering
Open source Software
Perl Programming
PHP Programming
ASP Programming
ASP .NET Programming
Visual Basic Programming
Windows Scripting Host
Java Programming
Java Help
Java Beans
VBScript
Cobol
MAC Applications
Unix Programming
Forum Jump:
All times are GMT. The time now is 03:37 PM.

 
Free MCSE Braindumps | Real Estate Topics

Programming forum archive

Copyrights CodeComments.com 2004 - 2006

Powered by vBulletin Copyright 2000-2006 Jelsoft Enterprises Limited.