Home > Archive > Ruby > August 2005 > ThreadPool (was: Re: Thread.list confusion)
You are viewing an archived Text-only version of the thread.
To view this thread in it's original format and/or if you want to reply to
this thread please [click here]
| Author |
ThreadPool (was: Re: Thread.list confusion)
|
|
| Andrew S. Townley 2005-08-30, 7:58 am |
|
Hi Eric
On Mon, 2005-08-29 at 19:09, Eric Hodel wrote:
> On 29 Aug 2005, at 08:57, Andrew S. Townley wrote:
[snip]
> ThreadGroup, ThreadGroup, ThreadGroup. Create a ThreadGroup for each
> thread you spawn so you can see who is spawning the extra threads.
Hmmm... I think I might use a ThreadGroup, no? ;)
Ok, this was just selective stupidity on my part. Apologies. After
digging out my Doug Lea Concurrent Java book, I saw what I was
forgetting...
[color=darkred]
> What do you want?
Something like this (actually, it was straightforward enough once I
thought about it a little):
$ cat tpool.rb
require 'thread'
class ThreadPool
def initialize(size)
@work = Queue.new
@workers = []
@group = ThreadGroup.new
@shutdown = false
@sh_mutex = Mutex.new
size.times do
@workers << t = Thread.new { Thread.stop; thread_work };
@group.add(t)
end
@monitor = Thread.new do
Thread.stop
loop do
@sh_mutex.synchronize { Thread.current.terminate if @shutdown }
sleep(1)
end
end
end
def <<(runnable)
@work << runnable
self
end
def thread_work
loop do
@sh_mutex.synchronize do
if @shutdown
puts "#{Thread.current} stopping";
Thread.current.terminate
end
end
puts "#{Thread.current.inspect} is one of #{@work.num_waiting} waiting for work"
job = @work.deq
begin
job.run if job != nil
Thread.pass
rescue => e
puts e
next
end
end
end
def start
@workers.each { |w| w.run }
@monitor.run
end
def join
@monitor.join
end
def shutdown(wait = true)
@sh_mutex.synchronize { @shutdown = true }
@workers.each { |w| w.join if w.alive? } if wait
end
attr_reader :group
end
class Runnable
def initialize(*args, &block)
@block = block
end
def run
@block.call
end
end
pool = ThreadPool.new(8)
pool.start
job1 = Runnable.new do
3.times { puts "#{Thread.current.inspect} - hello"; sleep(rand*3) }
end
vagrant = Runnable.new { raise "broken" }
pool << job1 << vagrant << job1 << job1 << job1 << job1
pool << vagrant << job1 << job1 << vagrant << vagrant << job1
Thread.new { t = rand*2; puts "sleeping #{t}"; sleep(t); pool.shutdown(false) }
pool.join
pool.shutdown
puts "Thread group"
pool.group.list.each { |w| puts w.inspect }
puts "Thread.list"
Thread.list.each { |w| puts w.inspect }
****************************************
****************************************
*******************
The information in this email is confidential and may be legally privileged. Access to this email by anyone other than the intended addressee is unauthorized. If you are not the intended recipient of this message, any review, disclosure, copying, distri
bution, retention, or any action taken or omitted to be taken in reliance on it is prohibited and may be unlawful. If you are not the intended recipient, please reply to or forward a copy of this message to the sender and delete the message, any attachme
nts, and any copies thereof from your system.
****************************************
****************************************
*******************
| |
| Eric Hodel 2005-08-30, 7:02 pm |
| On 30 Aug 2005, at 04:25, Andrew S. Townley wrote:
> Hi Eric
>
> On Mon, 2005-08-29 at 19:09, Eric Hodel wrote:
>
> [snip]
>
>
> Hmmm... I think I might use a ThreadGroup, no? ;)
>
>
> Ok, this was just selective stupidity on my part. Apologies. After
> digging out my Doug Lea Concurrent Java book, I saw what I was
> forgetting...
>
>
> Something like this (actually, it was straightforward enough once I
> thought about it a little):
>
> $ cat tpool.rb
> require 'thread'
>
> class ThreadPool
> def initialize(size)
> @work = Queue.new
# @workers = []
You don't need to keep threads in an Array because you don't keep
track of their status or #value. (Its better to just push a value
onto a Queue than to check #value because you can be sure you're
always getting something that is valid.)
> @group = ThreadGroup.new
> @shutdown = false
> @sh_mutex = Mutex.new
I don't think this mutex protects anything, assignment of a constant
will be atomic.
> size.times do
Thread.new { @group.add Thread.current; thread.stop;
thread_work }
> end
> @monitor = Thread.new do
> Thread.stop
> loop do
> @sh_mutex.synchronize { Thread.current.terminate if
> @shutdown }
> sleep(1)
> end
> end
> end
>
> def <<(runnable)
> @work << runnable
> self
> end
>
> def thread_work
> loop do
> @sh_mutex.synchronize do
> if @shutdown
> puts "#{Thread.current} stopping";
> Thread.current.terminate
> end
> end
> puts "#{Thread.current.inspect} is one of #
> {@work.num_waiting} waiting for work"
> job = @work.deq
> begin
> job.run if job != nil
> Thread.pass
> rescue => e
> puts e
> next
> end
> end
> end
>
> def start
@group.list.each { |w| w.run }
> @monitor.run
> end
>
> def join
> @monitor.join
> end
>
> def shutdown(wait = true)
@group.enclose
> @sh_mutex.synchronize { @shutdown = true }
@group.list.first.join until @group.list.empty? if wait
> end
>
> attr_reader :group
> end
>
> class Runnable
> def initialize(*args, &block)
> @block = block
> end
>
> def run
> @block.call
> end
> end
>
> pool = ThreadPool.new(8)
>
> pool.start
> job1 = Runnable.new do
> 3.times { puts "#{Thread.current.inspect} - hello"; sleep(rand*3) }
> end
>
> vagrant = Runnable.new { raise "broken" }
>
> pool << job1 << vagrant << job1 << job1 << job1 << job1
> pool << vagrant << job1 << job1 << vagrant << vagrant << job1
>
> Thread.new { t = rand*2; puts "sleeping #{t}"; sleep(t);
> pool.shutdown(false) }
> pool.join
> pool.shutdown
>
> puts "Thread group"
> pool.group.list.each { |w| puts w.inspect }
The group should always be empty after shutdown if you waited. A
ThreadGroup does not hold dead threads.
> puts "Thread.list"
> Thread.list.each { |w| puts w.inspect }
--
Eric Hodel - drbrain@segment7.net - http://segment7.net
FEC2 57F1 D465 EB15 5D6E 7C11 332A 551C 796C 9F04
|
|
|
|
|