Documentation
¶
Overview ¶
Copyright © 2021 Optable Technologies Inc. All rights reserved. See LICENSE for details.
Copyright © 2021 Optable Technologies Inc. All rights reserved. See LICENSE for details.
Copyright © 2021 Optable Technologies Inc. All rights reserved. See LICENSE for details.
Index ¶
- Variables
- func NewBufferWriteCloser(w io.Writer) io.WriteCloser
- func NewBufferWriteCloserSize(w io.Writer, size int) io.WriteCloser
- func NewChainedCloser(w io.Writer, cs ...io.Closer) io.WriteCloser
- func ReadAllFrames(r FrameReader) ([][]byte, error)
- func SafeCloser(closer io.Closer) io.Closer
- type ChunkReader
- type CloserFn
- type FrameReader
- func MultiFrameReader(readers ...FrameReader) FrameReader
- func NewNewlineDelimitedFrameReader(r io.Reader, skipEmpty bool) FrameReader
- func NewVarLenFrameReader(r io.Reader) FrameReader
- func ReadAllChunks(chunker ChunkReader) (readers []FrameReader, err error)
- func SliceFrameReader(frames [][]byte) FrameReader
- type FrameWriter
Constants ¶
This section is empty.
Variables ¶
var InvalidArgErr = errors.New("Invalid argument")
var NoFrameFoundErr = errors.New("No frame found in chunk")
Functions ¶
func NewBufferWriteCloser ¶
func NewBufferWriteCloser(w io.Writer) io.WriteCloser
NewBufferWriteCloser wraps an io.Writer in a buffer that is both flushed and properly closed. If the writer also implements the io.Closer interface, it will be properly closed.
func NewBufferWriteCloserSize ¶
func NewBufferWriteCloserSize(w io.Writer, size int) io.WriteCloser
NewBufferWriteCloserSize wraps an io.Writer in a buffer that is both flushed and properly closed. If the writer also implements the io.Closer interface, it will be properly closed.
func NewChainedCloser ¶
NewChainedCloser returns a io.WriteCloser that will close the provided closer in order. This is often required when transferring ownership of composed io.Writer. Some of the chained writers may need to be closed, e.g. os.File, net.Conn, gzip.Writer, etc. This wrapper takes care of closing the writers in the proper order making the new owner oblivious of all the involved hierarchy.
func ReadAllFrames ¶
func ReadAllFrames(r FrameReader) ([][]byte, error)
ReadAllFrames returns all frame exposed by a FrameReader until io.EOF is reached. If an error is encountered, it returns said error with an empty slice.
Types ¶
type ChunkReader ¶
type ChunkReader interface {
// NextChunk returns a FrameReader.
NextChunk() (FrameReader, error)
}
ChunkReader breaks a stream into chunks amenable to parallel parsing.
func NewNewlineDelimitedChunkReader ¶
func NewNewlineDelimitedChunkReader(reader io.Reader, chunkSize int) (ChunkReader, error)
NewNewlineDelimitedChunkReader returns a ChunkReader that breaks chunks of frames delimited by newlines. It is expected that the caller provides a chunkSize large enough to include a full frame. This is a limitation similar to bufio.Scanner. We recommend that the chunkSize should contain a handful of frames. Otherwise use a FrameReader directly.
The chunker will not look for `\r` rune like bufio.Scanner (and NewlineDelimitedFrameReader) does.
type CloserFn ¶
type CloserFn func() error
CloserFn implements the io.Closer interface for closures of the same signature.
type FrameReader ¶
type FrameReader interface {
// Read a single message. Returns the payload of the message.
Read() ([]byte, error)
}
FrameReader reads messages framed in a stream. A FrameReader is usually the opposite of a FrameWriter. The implementer is not required to provide any concurrency guarantees. Returns io.EOF when no frames are left.
func MultiFrameReader ¶
func MultiFrameReader(readers ...FrameReader) FrameReader
NewMultiFrameReader returns a FrameReader that concatenates FrameReaders in a single virtual FrameReader. This is similar to io.MultiReader.
func NewNewlineDelimitedFrameReader ¶
func NewNewlineDelimitedFrameReader(r io.Reader, skipEmpty bool) FrameReader
NewNewlineDelimitedReader parses stream separated by newlines. The implementation uses bufio.Scanner underneath which has some noted peculiarity:
- it will support message delimited by `\r?\n`, i.e. it also supports the windows-based newline text format. - it is capped to messages of of the buffer size. If a message is larger than the default buffer (see `bufio.MaxScanTokenSize`), it will fail with `bufio.ErrTooLong`.
Due to the previous points, NewNewlineDelimitedReader is not the exact inverse of NewNewlineDelimitedWriter.
func NewVarLenFrameReader ¶
func NewVarLenFrameReader(r io.Reader) FrameReader
NewVarLenReader creates a FrameReader reading the framing format defined by NewVarLenWriter.
func ReadAllChunks ¶
func ReadAllChunks(chunker ChunkReader) (readers []FrameReader, err error)
ReadAllChunks consumes all FrameReader from the chunker and returns them in a slice. If an error is encountered (except io.EOF) returns it immediately with a nil slice.
Carefully use this function as it may hold the entire io.Reader in memory and will never return with an infinite stream. This utility function is used mostly for testing.
func SliceFrameReader ¶
func SliceFrameReader(frames [][]byte) FrameReader
SliceFrameReader wraps a slice of frames in a FrameReader.
type FrameWriter ¶
type FrameWriter interface {
// Write a single message. Returns the number of bytes required to write
// the message with framing.
Write(payload []byte) (int, error)
}
FrameWriter wraps messages (payload) and takes care of framing them in a stream. An example of a popular framing is the new-line delimited file format where each message is a single line.
Framing is required for readers to provide an iterator-like interface without knowing about the content. The implementer is not required to provide any concurrency guarantees.
func ConcurrentFrameWriter ¶
func ConcurrentFrameWriter(w FrameWriter) FrameWriter
ConcurrentFrameWriter protects a FrameWriter with a mutex.
func NewNewlineDelimitedFrameWriter ¶
func NewNewlineDelimitedFrameWriter(w io.Writer) FrameWriter
NewNewlineDelimitedWriter uses the trivial 'delimiter' based framing, i.e. it separates messages with a `\n`. It comes with the limitation that the payload should not contain a newline, this is the responsibility of the caller.
This framing is not robust due to the previous limitation but is provided for the ubiquitous json-newline-delimited format.
func NewVarLenFrameWriter ¶
func NewVarLenFrameWriter(w io.Writer) FrameWriter
NewVarLenWriter creates a FrameWriter where each frame is composed of the size (uint64) of the message encoded with varlen encoding followed by the message itself.