linebuf

package module
v0.3.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2021 License: MIT Imports: 9 Imported by: 0

README

LineBuf

This module allows JSON stream processing via line-buffered JSON objects.

See package documenation

How to use

package main

import (
	"fmt"
	"io"

	"github.com/arivum/linebuf"
)

type Test struct {
	ID   int64
	Text string
}

func main() {
	var array = make([]Test, 10000)
	for i := 0; i < len(array); i++ {
		array[i] = Test{
			Text: fmt.Sprintf("this is text #%d", i),
			ID:   int64(i),
		}
	}

	reader, writer := io.Pipe()
	go func() {
		// create new line buffered JSON encoder that writes to the write-end of the pipe
		encoder, _ := linebuf.NewEncoder(writer)

		// get stream channel to send stream entries onto the wire
		stream := encoder.Stream()
		for _, a := range array {
			// send each array entry to the encoding channel
			stream <- a
		}
		// don't forget to close the encoder. It signals the end to the decoder on the other end of the wire
		encoder.Close()
	}()

	// create new decoder that reads the read-end of the pipe
	decoder, _ := linebuf.NewDecoder(reader)

	// get stream channel to read each decoded entry. Specify an optional parameter to tell the decoder how to unmarshal each entry. If left empty, the chance of getting map[string]interface{} is high
	stream := decoder.Stream(&Test{})
	for entry := range stream {
		// cast to target type `Test` to access the ID field
		fmt.Println(entry.(Test).ID)
	}
}

Documentation

Index

Constants

View Source
const (
	ModeLinebuf = "linebuf"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Decoder

type Decoder struct {
	context.Context
	// contains filtered or unexported fields
}

Decoder reads line-buffered JSON from the underlaying io.Reader and decodes them in single-entry modde or stream mode

func NewDecoder

func NewDecoder(r io.Reader, options ...DecoderOption) (*Decoder, error)

NewDecoder returns a linebuf decoder from an io.Reader while allowing to specify custom options like unbuffered mode and/or setting the buffer size.

The default buffer size is 4k

An error will be returned e.g. if parsing of the bufsize fails

func (*Decoder) Decode

func (dec *Decoder) Decode(v interface{}) error

Decode decodes a single entry that is received by the underlaying io.Reader into a pointer to an object given via parameter "v".

If some errors occure, they will mainly be I/O errors

func (*Decoder) LastError

func (dec *Decoder) LastError() error

LastError returns the last error that occured or nil if everything worked well

func (*Decoder) Stream

func (dec *Decoder) Stream(entry ...interface{}) <-chan interface{}

Stream starts decoding entries that are received by the underlaying io.Reader. Specify an optional pointer parameter `entry` that's used to decode JSON into a specific structure.

It returns a channel that can be used to receive the encoded entries inside a loop like:

for entry := range dec.Stream() {
	// do sth.
}

To retrieve the last error that occured, use

dec.LastError()

type DecoderOption added in v0.3.0

type DecoderOption (func(*Decoder) error)

func WithDecoderBuffersize added in v0.3.0

func WithDecoderBuffersize(bufSize string) DecoderOption

WithDecoderBuffersize can be used as option to NewDecoder, thus providing another buffersize than the default 4k

func WithDecoderUnbuffered added in v0.3.0

func WithDecoderUnbuffered() DecoderOption

WithDecoderUnbuffered can be provided as option to NewDecoder, thus turning off buffer mode

type Encoder

type Encoder struct {
	context.Context
	// contains filtered or unexported fields
}

Encoder encodes objects and writes the resulting JSON in line-buffered-mode to the underlaying io.Writer

func NewEncoder

func NewEncoder(w io.Writer, options ...EncoderOption) (*Encoder, error)

NewEncoder returns a linebuf encoder from an io.Writer while allowing to sspecify custom options like unbuffered mode and/or setting the buffer size.

The default buffer size is 4k

An error will be returned e.g. if parsing of the bufsize fails

func (*Encoder) Close

func (enc *Encoder) Close()

Close closes the encoder and the underlaying io.Writer. This signals the opposite read-end that the transmission has ended.

func (*Encoder) Encode

func (enc *Encoder) Encode(v interface{}) error

Decode encodes a single entry to JSON and write it to the underlaying io.Writer.

If some errors occure, they will mainly be I/O errors

func (*Encoder) LastError

func (enc *Encoder) LastError() error

LastError returns the last error that occured or nil if everything worked well

func (*Encoder) Stream

func (enc *Encoder) Stream() chan<- interface{}

Stream creates and returns a channel that's used to send consecutive entries that will be encoded and written to the underlaying io.Writer.

A possible snippet:

for entry := range array {
	enc.Stream() <- entry
}

To retrieve the last error that occured, use

enc.LastError()

type EncoderOption added in v0.3.0

type EncoderOption (func(*Encoder) error)

func WithEncoderBuffersize added in v0.3.0

func WithEncoderBuffersize(bufSize string) EncoderOption

WithBuffersize can be used as option to NewEncoder, thus providing another buffersize than the default 4k

func WithEncoderUnbuffered added in v0.3.0

func WithEncoderUnbuffered() EncoderOption

WithUnbuffered can be provided as option to NewEncoder, thus turning off buffer mode

type LineSanitizedReader

type LineSanitizedReader struct {
	*io.PipeReader
	// contains filtered or unexported fields
}

LineSanitizedReader wraps an io.Reader and strips all newlines, to allow to differentiate between multiline JSON and linebuffered JSON.

func NewLineSanitizedReader

func NewLineSanitizedReader(r io.Reader) LineSanitizedReader

NewLineSanitizedReader returns a line-sanitizing reader that strips all newlines from JSON objects recieved by the underlaying io.Reader.

This allows to differntiate between multiline JSON and linebuffered JSON.

func (*LineSanitizedReader) LastError

func (s *LineSanitizedReader) LastError() error

LastError returns the last error that occured or nil if everything worked well

type LinebufJSONConverter added in v0.2.0

type LinebufJSONConverter struct {
	*bufio.Writer
	// contains filtered or unexported fields
}

LinebufJSONConverter wraps an io.Writer and converts line-buffered writes to regular JSON

func NewLinebufJSONConverter added in v0.2.0

func NewLinebufJSONConverter(w io.WriteCloser) *LinebufJSONConverter

NewLinebufJSONConverter returns a writer that converts line-buffered JSON back to valid JSON and writes it to the underlaying io.WriteCloser.

func (*LinebufJSONConverter) Close added in v0.2.2

func (l *LinebufJSONConverter) Close() error

Close waits until the last writes have finished and gracefully closes the underlaying writers

func (*LinebufJSONConverter) LastError added in v0.2.0

func (l *LinebufJSONConverter) LastError() error

LastError returns the last error that occured or nil if everything worked well

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL