| Paolo Bonzini 2007-06-06, 7:22 pm |
|
> 1) If you want to get serious with this, it would probably be good to
> replace the shared queue with some sort of shared, fixed size, circular
> buffer to prevent one process from generating a huge intermediate queue
> without letting the next one consume it. That should help with the flow
> through the pipeline. Ideally the buffer would also be fully polymorphic
> with streams, so that you can use the full arsenal, including
> #next:into:startingAt:, nextAvailable: and such.
In fact, this is what I implemented a while ago (remember the
PipeStream thread about turning a ReadStream decorator into a
WriteStream decorator?)
The PipeStream is a circular buffer, with the writer using #nextPut:
and the reader using #next.
Here is how it is used to turn the ReadStream decorator
(DeflateStream) into a WriteStream decorator. The user wants to write
uncompressed data with #nextPut:, but get deflated data when they send
#contents.
- the DeflateStream reads the data from the read side of the
PipeStream, and its #next method returns decorated data; it is not
visible to the user
- the PipeStream forks a process that reads the decorated data off the
DeflateStream and writes it on the write stream.
- the user sees the PipeStream as the decorator for its write stream,
since they will be able to use its write side.
- #contents uses a Promise object so that the connection process can
finish the compression
decorate: writeStream with: class
"A LazyPromise is the same as a Promise but with #value redefined
as `^super value value'. The idea is that #value: receives a
block,
which is quite expensive to evaluate -- so we have to delay it
until
the user actually asks for the value."
contents := LazyPromise new.
[
| readStream |
readStream := class on: self.
[
"This blocks the reader process if the buffer is empty."
writeStream nextPutAll: readStream nextHunk.
"Exit if they send #close to the PipeStream"
self isConnected and: [ readStream atEnd not ] ] whileTrue.
writeStream nextPutAll: readStream.
"Don't evaluate unless requested."
contents value: [ writeStream contents ] ] fork
contents
^contents value
|