For Programmers: Free Programming Magazines  


Home > Archive > Ruby > August 2005 > ThreadPool (was: Re: Thread.list confusion)









You are viewing an archived Text-only version of the thread. To view this thread in it's original format and/or if you want to reply to this thread please [click here]

 

Author ThreadPool (was: Re: Thread.list confusion)
Andrew S. Townley

2005-08-30, 7:58 am


Hi Eric

On Mon, 2005-08-29 at 19:09, Eric Hodel wrote:
> On 29 Aug 2005, at 08:57, Andrew S. Townley wrote:

[snip]
> ThreadGroup, ThreadGroup, ThreadGroup. Create a ThreadGroup for each
> thread you spawn so you can see who is spawning the extra threads.


Hmmm... I think I might use a ThreadGroup, no? ;)


Ok, this was just selective stupidity on my part. Apologies. After
digging out my Doug Lea Concurrent Java book, I saw what I was
forgetting...
[color=darkred]
> What do you want?


Something like this (actually, it was straightforward enough once I
thought about it a little):

$ cat tpool.rb
require 'thread'

class ThreadPool
def initialize(size)
@work = Queue.new
@workers = []
@group = ThreadGroup.new
@shutdown = false
@sh_mutex = Mutex.new
size.times do
@workers << t = Thread.new { Thread.stop; thread_work };
@group.add(t)
end
@monitor = Thread.new do
Thread.stop
loop do
@sh_mutex.synchronize { Thread.current.terminate if @shutdown }
sleep(1)
end
end
end

def <<(runnable)
@work << runnable
self
end

def thread_work
loop do
@sh_mutex.synchronize do
if @shutdown
puts "#{Thread.current} stopping";
Thread.current.terminate
end
end
puts "#{Thread.current.inspect} is one of #{@work.num_waiting} waiting for work"
job = @work.deq
begin
job.run if job != nil
Thread.pass
rescue => e
puts e
next
end
end
end

def start
@workers.each { |w| w.run }
@monitor.run
end

def join
@monitor.join
end

def shutdown(wait = true)
@sh_mutex.synchronize { @shutdown = true }
@workers.each { |w| w.join if w.alive? } if wait
end

attr_reader :group
end

class Runnable
def initialize(*args, &block)
@block = block
end

def run
@block.call
end
end

pool = ThreadPool.new(8)

pool.start
job1 = Runnable.new do
3.times { puts "#{Thread.current.inspect} - hello"; sleep(rand*3) }
end

vagrant = Runnable.new { raise "broken" }

pool << job1 << vagrant << job1 << job1 << job1 << job1
pool << vagrant << job1 << job1 << vagrant << vagrant << job1

Thread.new { t = rand*2; puts "sleeping #{t}"; sleep(t); pool.shutdown(false) }
pool.join
pool.shutdown

puts "Thread group"
pool.group.list.each { |w| puts w.inspect }

puts "Thread.list"
Thread.list.each { |w| puts w.inspect }


****************************************
****************************************
*******************
The information in this email is confidential and may be legally privileged. Access to this email by anyone other than the intended addressee is unauthorized. If you are not the intended recipient of this message, any review, disclosure, copying, distri
bution, retention, or any action taken or omitted to be taken in reliance on it is prohibited and may be unlawful. If you are not the intended recipient, please reply to or forward a copy of this message to the sender and delete the message, any attachme
nts, and any copies thereof from your system.
****************************************
****************************************
*******************


Eric Hodel

2005-08-30, 7:02 pm

On 30 Aug 2005, at 04:25, Andrew S. Townley wrote:

> Hi Eric
>
> On Mon, 2005-08-29 at 19:09, Eric Hodel wrote:
>
> [snip]
>
>
> Hmmm... I think I might use a ThreadGroup, no? ;)
>
>
> Ok, this was just selective stupidity on my part. Apologies. After
> digging out my Doug Lea Concurrent Java book, I saw what I was
> forgetting...
>
>
> Something like this (actually, it was straightforward enough once I
> thought about it a little):
>
> $ cat tpool.rb
> require 'thread'
>
> class ThreadPool
> def initialize(size)
> @work = Queue.new

# @workers = []

You don't need to keep threads in an Array because you don't keep
track of their status or #value. (Its better to just push a value
onto a Queue than to check #value because you can be sure you're
always getting something that is valid.)

> @group = ThreadGroup.new
> @shutdown = false
> @sh_mutex = Mutex.new


I don't think this mutex protects anything, assignment of a constant
will be atomic.

> size.times do

Thread.new { @group.add Thread.current; thread.stop;
thread_work }
> end
> @monitor = Thread.new do
> Thread.stop
> loop do
> @sh_mutex.synchronize { Thread.current.terminate if
> @shutdown }
> sleep(1)
> end
> end
> end
>
> def <<(runnable)
> @work << runnable
> self
> end
>
> def thread_work
> loop do
> @sh_mutex.synchronize do
> if @shutdown
> puts "#{Thread.current} stopping";
> Thread.current.terminate
> end
> end
> puts "#{Thread.current.inspect} is one of #
> {@work.num_waiting} waiting for work"
> job = @work.deq
> begin
> job.run if job != nil
> Thread.pass
> rescue => e
> puts e
> next
> end
> end
> end
>
> def start

@group.list.each { |w| w.run }
> @monitor.run
> end
>
> def join
> @monitor.join
> end
>
> def shutdown(wait = true)

@group.enclose
> @sh_mutex.synchronize { @shutdown = true }

@group.list.first.join until @group.list.empty? if wait
> end
>
> attr_reader :group
> end
>
> class Runnable
> def initialize(*args, &block)
> @block = block
> end
>
> def run
> @block.call
> end
> end
>
> pool = ThreadPool.new(8)
>
> pool.start
> job1 = Runnable.new do
> 3.times { puts "#{Thread.current.inspect} - hello"; sleep(rand*3) }
> end
>
> vagrant = Runnable.new { raise "broken" }
>
> pool << job1 << vagrant << job1 << job1 << job1 << job1
> pool << vagrant << job1 << job1 << vagrant << vagrant << job1
>
> Thread.new { t = rand*2; puts "sleeping #{t}"; sleep(t);
> pool.shutdown(false) }
> pool.join
> pool.shutdown
>
> puts "Thread group"
> pool.group.list.each { |w| puts w.inspect }


The group should always be empty after shutdown if you waited. A
ThreadGroup does not hold dead threads.

> puts "Thread.list"
> Thread.list.each { |w| puts w.inspect }


--
Eric Hodel - drbrain@segment7.net - http://segment7.net
FEC2 57F1 D465 EB15 5D6E 7C11 332A 551C 796C 9F04



Sponsored Links







Also available: Server administration forum archive | Web Design forum archive | Software forum archive | Hardware reviews archive

Copyright 2008 codecomments.com