For Programmers: Free Programming Magazines  


Home > Archive > PERL POE > February 2006 > Concurrency not working for Server::TCP









You are viewing an archived Text-only version of the thread. To view this thread in it's original format and/or if you want to reply to this thread please [click here]

 

Author Concurrency not working for Server::TCP
Gil Vidals

2006-02-21, 3:59 am

I cannot get concurrency to work. I want to get the new feature below to
work:

Concurrency => $MAX_CONCURENCY

My Server::TCP component works great when $MAX_CONCURRENCY is -1
(unlimited), but when I set $MAX_CONCURRENCY to 3, I can only make 3
connections and then everything fails after that. The way I test it is to
make four telnet connections to my TCP::Server component. I issue a command
on the first three connections. After one of the first three connections
responds with output and then says "Connection closed by foreign host", I
try to issue a command to the fourth connection, but I never get a response.
Even if I try to re-use one of the first three connections, I never get a
response back.

It seems that everything is locked. New telnet connections don't work
either. I want to take advantage of max_concurrency, but I must be missing
some key concept on how to implement concurrency. I'm open to any advice
that might point me in the right direction.


<mailto:Gil.Vidals@PositionResearch.com> Gil.Vidals@PositionResearch.com
Position Research, Inc.
Search engine results by research
tel: (760) 480-8291 fax: (760) 480-8271
<http://www.PositionResearch.com> www.PositionResearch.com



my $class = shift;

my $server = POE::Component::Server::TCP->new(

Address => ListenAddr,
Port => ListenPort,
Concurrency => $MAX_CONCURENCY,
ClientInput => sub {
my ( $kernel, $heap, $session ) = @_[ KERNEL,
HEAP, SESSION];

$kernel->alias_set('Spider
API'.int(rand(time)));

my ($url, $max) = split(' ', $_[ARG0]);

if( $url ne '')
{
$kernel->yield('StartSpider', $url, $max
)
}
else
{
$kernel->yield('shutdown');
}
},

InlineStates => {
StartSpider => sub {
my ( $kernel, $heap, $session, $url, $max )
= @_[ KERNEL, HEAP, SESSION, ARG0, ARG1];
$log->info('SESS:'.$session->ID.", QUERY:
'$url' '$max'");
#Caller's responsbility to specify where to
start

$heap->{START_TIME} = time;

$log->debug("START TIME for $url -->
".$heap->{START_TIME});

$max ||= $MAX_DEPTH;
#DEFAULT 50 deep, only if max depth is not
defined by user input

$heap->{MAX} = $max;

push @{$heap->{TODO}}, $url;

#$heap->{UA} = "ua1";
#UA commented out since we are no longer
using the PoCo version of HTTP::Client

$heap->{KIDS} = 0;
$heap->{COUNT} = 0;

my $uri = new URI($url);
$heap->{BASE} = $uri->authority;

my $canonical_url =
scalar(make_canonical($url));

$log->debug("Canonicalized:
$canonical_url");
$heap->{DONE}{$canonical_url}++;

$kernel->yield("ReadySpider", "initial");
$log->debug("Ready to spider: Yielding to
ReadySpider");
},

ReadySpider => sub {
my ( $kernel, $heap, $session ) = @_[
KERNEL, HEAP, SESSION ];

if (((time - $heap->{START_TIME})/60) >
$MAXTIME)
{
$kernel->yield('shutdown');

return;
}

return if $heap->{KIDS} >= $KIDMAX;
return unless my $url = shift
@{$heap->{TODO}};

use URI;
my $uri = new URI($url);
return unless $uri->scheme eq 'http';
return unless $uri->authority eq
$heap->{BASE};

if($uri->query)
{
my @query_arr = split("&", $uri->query);
my $queries = @query_arr;

return if $queries > 3;
}

$heap->{KIDS}++;

=cut
POE::Component::Xray::SpiderChecker->spawn
(
UA => $heap->{UA},
URL => $url,
TOP => $heap->{BASE},
POSTBACK =>
$_[SESSION]->postback("SpiderDone", $url),
);
=cut

$kernel->yield('DownloadPage', $url);
$kernel->yield("ReadySpider", "looping");
},
DownloadPage => sub {
use LWP::UserAgent;
my ( $kernel, $heap, $session, $url) = @_[
KERNEL, HEAP, SESSION, ARG0];

my $ua = LWP::UserAgent->new;
$ua->max_size(550000); # if file bigger than
max_size, will retrieve all bytes up to max_size
$ua->max_redirect(0); # do not follow
redirects
$ua->timeout(25); # allow up to 25
seconds to get connection
$ua->agent('Mozilla/4.0 (compatible; MSIE
6.0; Windows NT 5.1; SV1)'); # latest user agent as of 01/20/06

my $response = $ua->get($url);
my $valid = 0;
my @links;

if ($response->is_success)
{
if ($response->base =~ m{\Q$heap->{BASE}})
{
$log->info("Internal link: $url");

if
($heap->{DONE}{md5($response->content)}++)
#if the md5 for this page has already
been recorded
{
$log->info("$url skipped due to same
md5 match");
$kernel->yield('SpiderDone', [$url],
[\@links, $valid]);
return;
}

if ($response->content_type eq
"text/html")
{
$log->info("Valid HTML");
$valid = 1;
require HTML::SimpleLinkExtor;
my $e =
HTML::SimpleLinkExtor->new($response->base);
$e->parse($response->content);
@links = grep m{^http:}, $e->href;
}
}
}

$kernel->yield('SpiderDone', [$url],
[\@links, $valid]);

},
SpiderDone => sub {
my ( $kernel, $heap, $session ) = @_[
KERNEL, HEAP, SESSION ];

$heap->{KIDS}--;

return if not $heap->{client}; #if client
already disconnected, this may happen if the max count or time has been
reached, and the client has disconnected, but there are still children
processesrunning

$heap->{MAX} = $heap->{COUNT} if (((time -
$heap->{START_TIME})/60) > $MAXTIME);

my ($request,$response) = @_[ARG0,ARG1];

if ($heap->{COUNT} >= $heap->{MAX})
{
$kernel->yield("shutdown");
return;
}

my ($url) = @$request;
my @links = @{$response->[0]};
my $valid = $response->[1];

if($valid)
{
$log->info("Valid URL: $url");
$heap->{COUNT}++;
$heap->{client}->put("INT::$url") if($url
=~ m{\Q$heap->{BASE}} and $url);
}

my @ext;

for (@links) {
#processing links extracted from page. If
link is internal, push on to TODO heap, if link is external, feed back to
client in EXT::url -> ext_url format
my $new_url = scalar(make_canonical($_));

push @{$heap->{TODO}}, $new_url unless
$heap->{DONE}{$new_url}++ || ($new_url !~ m{\Q$heap->{BASE}} and $valid);

$heap->{client}->put("EXT::$url ->
$new_url") if($new_url !~ m{\Q$heap->{BASE}} and $valid);
#output if not of the same host
}

if(not @{$heap->{TODO}} or ($heap->{MAX} ==
$heap->{COUNT}))
{
$kernel->yield("shutdown");
return;
}

$kernel->yield("ReadySpider", "child done");
},
}


);
return $server;
}



Sponsored Links







Also available: Server administration forum archive | Web Design forum archive | Software forum archive | Hardware reviews archive

Copyright 2008 codecomments.com