View mode: basic / threaded / horizontal-split · Log in · Help
October 15, 2010
Re: Streaming library
Denis Koroskin Wrote:

> // A generic stream
> interface Stream
> {
>      @property InputStream input();
>      @property OutputStream output();
>      @property SeekableStream seekable();
>      @property bool endOfStream();
>      void close();
> }

I think, it's better to inherit Stream from InputStream and OutputStream.
Do you even need endOfStream? From my experience, it's ok to blocked-read and determine end when 0 is read. Even if you read from network, is there a point in non-blocking read?

> InputStream doesn't really has many methods:
> 
> interface InputStream
> {
> 	// reads up to buffer.length bytes from a stream
> 	// returns number of bytes read
> 	// throws on error
> 	size_t read(ubyte[] buffer);
> 

I've found
ubyte[] read(ubyte[] buffer)
more usable:

ubyte[] buffer=new ubyte[sz];
size_t rd=stream.read(buffer);
ubyte[] rdata=buffer[0..rd];

ubyte[] buffer=new ubyte[sz];
ubyte[] rdata=stream.read(buffer);

And you can instantly pass the read data to some other function.

myProcessor.Process(stream.read(buffer));

> 	// reads from current position
> 	AsyncReadRequest readAsync(ubyte[] buffer, Mailbox* mailbox = null);
> }

I also have an implementation of asyncronous stream with interface similar to one of .net, though recently I came to another design.

///What one can name "an actual stream" that holds the handle
interface AsyncStreamSource
{
 ///Advances position on each read
 AsyncStream createStream();
 ///Leaves position intact - adjust manually or rather don't adjust
 AsyncStream createStillStream();
}

///An accessor for AsyncStreamSource that wraps an io completion port or its analogue. Contains stream position at which io is done on AsyncStreamSource (that's why it's stream - it works like unshared blocking stream with asynchronous access).
interface AsyncStream
{
 void beginRead(ubyte[] buffer);
 ubyte[] endRead();
 long position() @property;
 void position(long newPosition) @property;
}

Multiple AsyncStreams can be created for one AsyncStreamSource. So effectively one AsyncStreamSource can be shared through different AsyncStreams, while individual AsyncStreams cannot be shared. With this design you won't new AsyncResult for each io operation.

Though such desing can be problematic for linux as its async io functionality is quite... errmmm... linux way as I remember.
October 15, 2010
Re: Streaming library
On Fri, 15 Oct 2010 22:48:20 +0400, Kagamin <spam@here.lot> wrote:

> Denis Koroskin Wrote:
>
>> // A generic stream
>> interface Stream
>> {
>>      @property InputStream input();
>>      @property OutputStream output();
>>      @property SeekableStream seekable();
>>      @property bool endOfStream();
>>      void close();
>> }
>
> I think, it's better to inherit Stream from InputStream and OutputStream.
> Do you even need endOfStream? From my experience, it's ok to  
> blocked-read and determine end when 0 is read. Even if you read from  
> network, is there a point in non-blocking read?
>

Probably, I think I'll try both ways and see which one turns out to be  
better.

>> InputStream doesn't really has many methods:
>>
>> interface InputStream
>> {
>> 	// reads up to buffer.length bytes from a stream
>> 	// returns number of bytes read
>> 	// throws on error
>> 	size_t read(ubyte[] buffer);
>>
>
> I've found
> ubyte[] read(ubyte[] buffer)
> more usable:
>
> ubyte[] buffer=new ubyte[sz];
> size_t rd=stream.read(buffer);
> ubyte[] rdata=buffer[0..rd];
>
> ubyte[] buffer=new ubyte[sz];
> ubyte[] rdata=stream.read(buffer);
>
> And you can instantly pass the read data to some other function.
>
> myProcessor.Process(stream.read(buffer));
>

Either way is fine with me. But I agree yours is handy, too.
I was actually thinking about a plain ubyte[] read(); method:

struct BufferedStream
{
	ubyte[] read(); // just give me something
}

because in many cases you don't really care about buffer size or don't  
even know amount of data you can read (e.g. socket stream).

>> 	// reads from current position
>> 	AsyncReadRequest readAsync(ubyte[] buffer, Mailbox* mailbox = null);
>> }
>
> I also have an implementation of asyncronous stream with interface  
> similar to one of .net, though recently I came to another design.
>
> ///What one can name "an actual stream" that holds the handle
> interface AsyncStreamSource
> {
>   ///Advances position on each read
>   AsyncStream createStream();
>   ///Leaves position intact - adjust manually or rather don't adjust
>   AsyncStream createStillStream();
> }
>
> ///An accessor for AsyncStreamSource that wraps an io completion port or  
> its analogue. Contains stream position at which io is done on  
> AsyncStreamSource (that's why it's stream - it works like unshared  
> blocking stream with asynchronous access).
> interface AsyncStream
> {
>   void beginRead(ubyte[] buffer);
>   ubyte[] endRead();
>   long position() @property;
>   void position(long newPosition) @property;
> }
>
> Multiple AsyncStreams can be created for one AsyncStreamSource. So  
> effectively one AsyncStreamSource can be shared through different  
> AsyncStreams, while individual AsyncStreams cannot be shared. With this  
> design you won't new AsyncResult for each io operation.
>

Interesting, I think I'll give it a try. This will reduce basic Stream  
interface size, and some implementations can return null unless they  
support async read/write.

> Though such desing can be problematic for linux as its async io  
> functionality is quite... errmmm... linux way as I remember.

:)
October 15, 2010
Re: Streaming library
Denis Koroskin Wrote:

> > I think, it's better to inherit Stream from InputStream and OutputStream.
> > Do you even need endOfStream? From my experience, it's ok to  
> > blocked-read and determine end when 0 is read. Even if you read from  
> > network, is there a point in non-blocking read?
> >
> 
> Probably, I think I'll try both ways and see which one turns out to be  
> better.

I should say, that implementation will be somewhat tricky, as different kinds of streams handle reads beyond end in different ways. Say, reading from a pipe whose write end is closed results in an error.

> Either way is fine with me. But I agree yours is handy, too.
> I was actually thinking about a plain ubyte[] read(); method:
> 
> struct BufferedStream
> {
> 	ubyte[] read(); // just give me something
> }

Funny idea.
Here we can also think about MemoryStream: when you have all the data in memory, you don't need user side buffer, and can just return direct slice to data as const(ubyte)[].
October 15, 2010
Re: Streaming library
On 10/15/10 14:54 CDT, Kagamin wrote:
> Denis Koroskin Wrote:
>
>>> I think, it's better to inherit Stream from InputStream and OutputStream.
>>> Do you even need endOfStream? From my experience, it's ok to
>>> blocked-read and determine end when 0 is read. Even if you read from
>>> network, is there a point in non-blocking read?
>>>
>>
>> Probably, I think I'll try both ways and see which one turns out to be
>> better.
>
> I should say, that implementation will be somewhat tricky, as different kinds of streams handle reads beyond end in different ways. Say, reading from a pipe whose write end is closed results in an error.
>
>> Either way is fine with me. But I agree yours is handy, too.
>> I was actually thinking about a plain ubyte[] read(); method:
>>
>> struct BufferedStream
>> {
>> 	ubyte[] read(); // just give me something
>> }
>
> Funny idea.
> Here we can also think about MemoryStream: when you have all the data in memory, you don't need user side buffer, and can just return direct slice to data as const(ubyte)[].

We've circled all the way back to ranges a la byChunk.

bool empty();
ubyte[] front();
void popFront();

Look ma, no copying, no fuss, no muss.

Whatever interface(s) we find work best for various kinds of streams, we 
should make them play nice with ranges. Burst streams (the kind that 
offer data in variable-size chunks) work great with a range interface.



Andrei
October 15, 2010
Re: Streaming library
On Fri, 15 Oct 2010 23:54:32 +0400, Kagamin <spam@here.lot> wrote:

> Denis Koroskin Wrote:
>
>> > I think, it's better to inherit Stream from InputStream and  
>> OutputStream.
>> > Do you even need endOfStream? From my experience, it's ok to
>> > blocked-read and determine end when 0 is read. Even if you read from
>> > network, is there a point in non-blocking read?
>> >
>>
>> Probably, I think I'll try both ways and see which one turns out to be
>> better.
>
> I should say, that implementation will be somewhat tricky, as different  
> kinds of streams handle reads beyond end in different ways. Say, reading  
> from a pipe whose write end is closed results in an error.
>
>> Either way is fine with me. But I agree yours is handy, too.
>> I was actually thinking about a plain ubyte[] read(); method:
>>
>> struct BufferedStream
>> {
>> 	ubyte[] read(); // just give me something
>> }
>
> Funny idea.
> Here we can also think about MemoryStream: when you have all the data in  
> memory, you don't need user side buffer, and can just return direct  
> slice to data as const(ubyte)[].

Yeah. I'm also investigating into reading/sending multiple buffers at once  
(aka scatter-gather I/O:  
http://www.delorie.com/gnu/docs/glibc/libc_246.html)

It most likely won't be a part of a Stream interface, because I'd like to  
support different types of buffer ranges, and that asks for a templated  
implementation:

size_t writeRange(Range)(Range buffers);

Each element of Range needs to be of type ubyte[], and no other  
requirements. Returns number of bytes written (not number of buffers,  
because data transmission might stop at a middle of buffer.
October 15, 2010
Re: Streaming library
Andrei Alexandrescu Wrote:

> Whatever interface(s) we find work best for various kinds of streams, we 
> should make them play nice with ranges. Burst streams (the kind that 
> offer data in variable-size chunks) work great with a range interface.

I was thinking about chunk size to be supplied by user like this

int readInt()
{
 ubyte[] buffer=read(4);
 assert(buffer.length==4); // can this trigger?
 return *cast(int*)buffer.ptr;
}
October 15, 2010
Re: Streaming library
On 10/15/10 15:54 CDT, Kagamin wrote:
> Andrei Alexandrescu Wrote:
>
>> Whatever interface(s) we find work best for various kinds of streams, we
>> should make them play nice with ranges. Burst streams (the kind that
>> offer data in variable-size chunks) work great with a range interface.
>
> I was thinking about chunk size to be supplied by user like this
>
> int readInt()
> {
>    ubyte[] buffer=read(4);
>    assert(buffer.length==4); // can this trigger?
>    return *cast(int*)buffer.ptr;
> }

That's a fair point. I don't think you can assert, there could always be 
the situation that there was not enough data. Anyhow, wrt byChunk I was 
thinking of adding a property for changing the chunk size prior to 
popFront():

auto chunks = File("file.bin").byChunk(8);
// now chunks.front() is a ubyte[] containing 8 bytes
chunks.chunkSize = 4;
chunks.popFront();
// now chunks.front() is a ubyte[] containing 4 bytes
chunks.chunkSize = 4096;
for (; !chunks.empty; chunks.popFront()) {
    // handle 4KB at a time
}


Andrei
October 15, 2010
Re: Streaming library
On Sat, 16 Oct 2010 01:01:33 +0400, Andrei Alexandrescu  
<SeeWebsiteForEmail@erdani.org> wrote:

> On 10/15/10 15:54 CDT, Kagamin wrote:
>> Andrei Alexandrescu Wrote:
>>
>>> Whatever interface(s) we find work best for various kinds of streams,  
>>> we
>>> should make them play nice with ranges. Burst streams (the kind that
>>> offer data in variable-size chunks) work great with a range interface.
>>
>> I was thinking about chunk size to be supplied by user like this
>>
>> int readInt()
>> {
>>    ubyte[] buffer=read(4);
>>    assert(buffer.length==4); // can this trigger?
>>    return *cast(int*)buffer.ptr;
>> }
>

ubyte[] read() and ubyte[] read(size_t size) both require buffering (e.g.  
BufferedStream adapter). As such, it can provide stronger guaranties over  
raw streams.

> That's a fair point. I don't think you can assert, there could always be  
> the situation that there was not enough data. Anyhow, wrt byChunk I was  
> thinking of adding a property for changing the chunk size prior to  
> popFront():
>
> auto chunks = File("file.bin").byChunk(8);
> // now chunks.front() is a ubyte[] containing 8 bytes
> chunks.chunkSize = 4;
> chunks.popFront();
> // now chunks.front() is a ubyte[] containing 4 bytes
> chunks.chunkSize = 4096;
> for (; !chunks.empty; chunks.popFront()) {
>      // handle 4KB at a time
> }
>
>
> Andrei
October 15, 2010
Re: Streaming library
Denis Koroskin Wrote:

> ubyte[] read() and ubyte[] read(size_t size) both require buffering (e.g.  
> BufferedStream adapter). As such, it can provide stronger guaranties over  
> raw streams.

What do you plan to do if user requests too much data from BufferedStream? Ideally stream can allocate big buffer and store it in a weak pointer for it to be both memory and allocation-wise, but we don't have weak pointers, do we?
October 15, 2010
Re: Streaming library
On Sat, 16 Oct 2010 01:25:35 +0400, Kagamin <spam@here.lot> wrote:

> Denis Koroskin Wrote:
>
>> ubyte[] read() and ubyte[] read(size_t size) both require buffering  
>> (e.g.
>> BufferedStream adapter). As such, it can provide stronger guaranties  
>> over
>> raw streams.
>
> What do you plan to do if user requests too much data from  
> BufferedStream?

Allocate, read, return.

> Ideally stream can allocate big buffer and store it in a weak pointer  
> for it to be both memory and allocation-wise, but we don't have weak  
> pointers, do we?

I don't see anything that would prevent weak references from working. I'm  
storing some of my pointers as size_t, and they are being  
garbage-collected like intended (dtor updates dead reference). I will try  
implementing WeakRef template and see how it turns out later.
Top | Discussion index | About this forum | D home