Streams
A stream represents a stream of bytes, either being read from some source, or written to some destination. Storm provides different streams for different sources and destinations. These all present the same interface, which means that programs typically do not need to be aware of the exact source or destination of the data.
All streams in Storm present a synchronous interface. Utilizing the cheap user level
threads in Storm, it is possible to create
nonblocking calls by simply spawning a new user level thread (spawn
in Basic Storm). Using
multiple user level threads in this fashion makes Storm issue nonblocking IO requests to the
operating system, so that other user level threads are not blocked by long-running IO requests (this
is not the case for disk access on Linux currently due to the POSIX API, but it will be fixed
eventually).
There are three types of streams in Storm: output streams, input streams, and random access input streams.
Output Streams
Output streams accept data and writes it to some destination. An output stream is implemented
as subclasses to the OStream
class. Output streams are not buffered in general, so it is
usually a good idea to write data in chunks when possible. The class
core.io.BufferedOStream
can be used to wrap an output to ensure data is buffered.
The OStream
class has the following members:
-
core.Nat write(core.io.Buffer buf)
Write all data from the start of the buffer and up to, but not including the
filled
marker in the buffer. Blocks until all data is written, unless an error occurs. Returns the number of bytes frombuf
that were written to the stream. The returned value will be equal to the number of bytes in the buffer unless an error occurred. -
core.Nat write(core.io.Buffer buf, core.Nat start)
Write all data from
start
up to, but not including thefilled
marker in the buffer. Blocks until all data is written, unless an error occurs. Returns the number of bytes frombuf
that were written to the stream. The returned value will be equal to the number of bytes in the buffer unless an error occurred. -
core.Bool flush()
Flush any buffered data to the destination. The exact behavior of this operation depends on the stream that is used. For example, file- and network streams are generally unbuffered by default, so it is not necessary to flush them. There are, however, streams that buffer the input for efficiency (e.g.
core.io.BufferedOStream
). These useflush()
to allow manually flushing the buffer. Calls toflush
are generally passed along chains of output streams. For example, callingflush
on aBufferedOStream
will flush theBufferedOStream
and call flush on whichever stream theBufferedOStream
is writing its data to. Returnsfalse
if the operation fails. -
void close()
Closes the stream and frees any associated resources. As with
flush
, this is generally propagated in the case of chained streams.The destructor of the
OStream
calls close as a last resort. Since output streams are garbage collected, it might be a while before the destructor is called. As such, manually closing streams is generally preferred. -
core.io.ErrorCode error()
Check if any error has occurred during write. Errors generally cause
write
operations to fail, but no exceptions are thrown. This makes it possible to check for errors at the end of a large write.
Input Streams
An input stream reads data from some source. Input streams are generally not buffered, so it is usually a good idea to read data in chunks. This is done by many consumers of data from input streams, such as the text IO system.
The IStream
class has the following members:
-
core.Bool more()
Returns
false
if the end of the stream has been reached, andtrue
otherwise. In some cases it is not possible to determine the end of stream ahead of time. In such cases,more
might returntrue
even ifread
will later report that it managed to read zero bytes. Wheneverread
succeeds with zero bytes,more
will start returningfalse
afterwards, except for streams that support timeouts. A timed-out read does not count as an end of stream. -
core.io.Buffer read(core.io.Buffer to)
Read data from the stream and attempt to fill the bytes from
filled
tocount
in the bufferto
with bytes from the stream. Returns a buffer with the newly acquired data, and afilled
member that has been updated to reflect the number of read bytes. Generally, the returned buffer refers to the same contents asto
to avoid copying potentially large amounts of data. The implementation is, however, free to re-allocate the buffer as needed. In particular, this is always the case when the system needs to execute code on another thread, and thereby copy the buffers. Because of this, users of theread
function shall always use the returned buffer and not rely on the buffer passed asto
being updated.The function blocks until at least one byte has been successfully read from the stream, or if the end of stream has been reached. This means that if
filled
has not been updated from its original location, then it is guaranteed that the end of the stream has been reached. Note, however, that theread
function does not guarantee that the entire buffer has been filled. This often happens in the case of networking, for example. If the remote end has only sent one byte, a read operation of 10 bytes will typically only result in 1 out of 10 bytes being returned, since filling the buffer would potentially block foreveer (for example, the remote end might wait for a response). -
core.io.Buffer read(core.Nat maxBytes)
Like
read(Buffer)
, but allocates a new buffer rather than reusing an existing one. -
core.io.Buffer fill(core.io.Buffer to)
Like
read
, but repeats the read operation until the buffer is full, or until the end of the stream has been reached. This means that if the buffer is not full afterread
has been called, then the end of stream has always been reached. Do, however, note that this behavior may not always be desirable when working with pipes or networking. -
core.io.Buffer fill(core.Nat bytes)
Like
fill(Buffer)
, but allocates a new buffer rather than reusing an existing one. -
core.io.Buffer peek(core.io.Buffer to)
The exact number of bytes that can be safely acquired through a
peek
operation depends on the stream in use. -
core.io.Buffer peek(core.Nat maxBytes)
Like
peek(Buffer)
, but allocates a new buffer rather than reusing an existing one. -
void close()
Closes the stream and frees any associated resources. As with
flush
, this is generally propagated in the case of chained streams.The destructor of the
OStream
calls close as a last resort. Since output streams are garbage collected, it might be a while before the destructor is called. As such, manually closing streams is generally preferred. -
core.io.RIStream randomAccess()
Create a random access stream based on this stream. For streams that are random access, this function simply returns the same stream, but casted to a random access stream. For streams that do not natively support random access, it instead a
core.io.LazyMemIStream
that buffers data to allow seeking from this point onwards. Regardless of which case was used, the original stream does not need to be closed. -
core.io.ErrorCode error()
Check if any error has occurred while reading. When an error has occurred, the stream will act as if it has reached the end of file. The reason can be checked by calling this function.
Random Access Input Streams
Random access input stream are derived from core.io.RIStream
. The class RIStream
inherit from
IStream
, which means that random-access input streams are usable as regular input streams. Random
streams provide the following members in addition to regular streams:
-
core.Word length()
Get the length of the input data, in bytes.
-
core.Word tell()
Get current position.
-
void seek(core.Word to)
Seek relative the start.
-
core.io.ErrorCode error()
Check if any error has occurred while reading. When an error has occurred, the stream will act as if it has reached the end of file. The reason can be checked by calling this function.