concurrency

package module
v0.0.0-...-d42cb4e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2026 License: Apache-2.0 Imports: 12 Imported by: 5

Documentation

Index

Constants

View Source
const (
	NoLimit = 0 // NoLimit denotes no limit imposed on the concurrency
)

Variables

View Source
var (

	// ErrExpectByteSlicePtr denotes that the assertion of a byte slice pointer failed
	ErrExpectByteSlicePtr = errors.New("expected byte slice reference / pointer argument")

	// ErrExpectByteSlice denotes that the assertion of a byte slice failed
	ErrExpectByteSlice = errors.New("expected byte slice argument")
)
View Source
var (
	JSONEncoder = func(w io.Writer) Encoder {
		return jsoniter.NewEncoder(w)
	}
	JSONDecoder = func(r io.Reader) Decoder {
		return jsoniter.NewDecoder(r)
	}
	YAMLEncoder = func(w io.Writer) Encoder {
		return yaml.NewEncoder(w)
	}
	YAMLDecoder = func(r io.Reader) Decoder {
		return yaml.NewDecoder(r)
	}
	BytesEncoder = func(w io.Writer) Encoder {
		return &byteEncoder{Writer: w}
	}
	BytesDecoder = func(r io.Reader) Decoder {
		return &byteDecoder{Reader: r}
	}
	BytesDecoderZeroCopy = func(r io.Reader) Decoder {
		return &byteDecoder{Reader: r, zeroCopy: true}
	}
)

Some default encoder wrapper / convenience functions

View Source
var (

	// ErrLockConfirmTimeout signifies that the lock request has not been confirmed
	// by the main routine (in a timely manner)
	ErrLockConfirmTimeout = errors.New("timeout waiting for lock confirmation")

	// ErrLockNotifyTimeout signifies that the lock request could not be sent
	// to the main routine (in a timely manner), e.g. because it wasn't handled
	ErrLockNotifyTimeout = errors.New("timeout notifying for lock")

	// ErrUnlockConfirmTimeout signifies that the unlock request has not been confirmed
	// by the main routine (in a timely manner)
	ErrUnlockConfirmTimeout = errors.New("timeout waiting for unlock confirmation")
)
View Source
var (
	//ErrNoSlotAvailable denotes that there is no slot available at present
	ErrNoSlotAvailable = errors.New("no semaphore slot available")
)

Functions

This section is empty.

Types

type Decoder

type Decoder interface {
	Decode(v any) error
}

Decoder denotes a generic decoder interface

type DecoderFn

type DecoderFn func(r io.Reader) Decoder

DecoderFn denotes an io.Reader based decoder function / method

type Encoder

type Encoder interface {
	Encode(v any) error
}

Encoder denotes a generic encoder interface

type EncoderFn

type EncoderFn func(w io.Writer) Encoder

EncoderFn denotes an io.Writer based encoder function / method

type GZIPReader

type GZIPReader struct {
	*gzip.Reader
}

GZIPReader provides a wrapper around a standard gzip.Reader instance

func NewGZIPReader

func NewGZIPReader() *GZIPReader

NewGZIPReader initializes a new (wrapped) gzip.Reader instance, fulfilling the Reader interface

func (*GZIPReader) Close

func (g *GZIPReader) Close() error

Close closes a (wrapped) gzip.Reader instance

func (*GZIPReader) Init

func (g *GZIPReader) Init(r io.Reader) (io.Reader, error)

Init resets a (wrapped) gzip.Reader instance from the pool for reuse

func (*GZIPReader) Return

func (g *GZIPReader) Return()

Return returns a (wrapped) gzip.Reader instance to the pool

type GZIPWriter

type GZIPWriter struct {
	*gzip.Writer
}

GZIPWriter provides a wrapper around a standard gzip.Writer instance

func NewGZIPWriter

func NewGZIPWriter() *GZIPWriter

NewGZIPWriter initializes a new (wrapped) gzip.Writer instance, fulfilling the Writer interface

func (*GZIPWriter) Close

func (g *GZIPWriter) Close() error

Close closes a (wrapped) gzip.Writer instance

func (*GZIPWriter) Init

func (g *GZIPWriter) Init(w io.Writer) io.Writer

Init resets a (wrapped) gzip.Writer instance from the pool for reuse

func (*GZIPWriter) Return

func (g *GZIPWriter) Return()

Return returns a (wrapped) gzip.Writer instance to the pool

type Limit

type Limit = Semaphore

Limt provides backward compatibility and an alternative naming scheme for the Semaphore type

type MemFile

type MemFile struct {
	// contains filtered or unexported fields
}

MemFile denotes an in-memory abstraction of an underlying file, acting as a buffer (drawing memory from a pool)

func NewMemFile

func NewMemFile(r ReadWriteSeekCloser, pool MemPool) (*MemFile, error)

NewMemFile instantiates a new in-memory file buffer

func (*MemFile) Close

func (m *MemFile) Close() error

Close fulfils the underlying io.Closer interface (returning the buffer to the pool)

func (*MemFile) Data

func (m *MemFile) Data() []byte

Data provides zero-copy access to the underlying data of the MemFile

func (*MemFile) Read

func (m *MemFile) Read(p []byte) (n int, err error)

Read fulfils the io.Reader interface (reading len(p) bytes from the buffer)

func (*MemFile) Seek

func (m *MemFile) Seek(offset int64, whence int) (int64, error)

Seek fulfils the io.Seeker interface (seeking to a designated position)

func (*MemFile) Stat

func (m *MemFile) Stat() (fs.FileInfo, error)

Stat return the (stub) Stat element providing the length of the underlying data

func (*MemFile) Write

func (m *MemFile) Write(p []byte) (n int, err error)

Write fulfils the io.Writer interface (writing len(p) bytes to the buffer)

type MemPool

type MemPool interface {

	// Slice element Get / Put operations
	Get(size int) (elem []byte)
	Put(elem []byte)

	// io.ReadWriter Get / Put operations
	GetReadWriter(size int) *ReadWriter
	PutReadWriter(elem *ReadWriter)
}

MemPool denotes a generic memory buffer pool

type MemPoolGCable

type MemPoolGCable interface {
	Clear()

	MemPool
}

MemPoolGCable denotes a generic memory buffer pool that can be "cleaned", i.e. that allows making all resources available for garbage collection

type MemPoolLimit

type MemPoolLimit struct {
	// contains filtered or unexported fields
}

MemPoolLimit provides a channel-based memory buffer pool (limiting the number of resources and allowing for cleanup)

func NewMemPool

func NewMemPool(n int) *MemPoolLimit

NewMemPool instantiates a new memory pool that manages bytes slices

func (*MemPoolLimit) Clear

func (p *MemPoolLimit) Clear()

Clear releases all pool resources and makes them available for garbage collection

func (*MemPoolLimit) Get

func (p *MemPoolLimit) Get(size int) (elem []byte)

Get retrieves a memory element (already performing the type assertion)

func (*MemPoolLimit) GetReadWriter

func (p *MemPoolLimit) GetReadWriter(size int) *ReadWriter

GetReadWriter return a wrapped element providing an io.ReadWriter

func (*MemPoolLimit) Put

func (p *MemPoolLimit) Put(elem []byte)

Put returns a memory element to the pool, resetting its size to capacity in the process

func (*MemPoolLimit) PutReadWriter

func (p *MemPoolLimit) PutReadWriter(elem *ReadWriter)

PutReadWriter returns a wrapped element providing an io.ReadWriter to the pool

type MemPoolLimitUnique

type MemPoolLimitUnique struct {
	sync.Mutex
	// contains filtered or unexported fields
}

MemPoolLimitUnique provides a channel-based memory buffer pool (limiting the number of resources, enforcing their uniqueness and allowing for cleanup)

func NewMemPoolLimitUnique

func NewMemPoolLimitUnique(n int, initialElementSize int) *MemPoolLimitUnique

NewMemPoolLimitUnique instantiates a new memory pool that manages bytes slices

func (*MemPoolLimitUnique) Clear

func (p *MemPoolLimitUnique) Clear()

Clear releases all pool resources and makes them available for garbage collection

func (*MemPoolLimitUnique) Get

func (p *MemPoolLimitUnique) Get(size int) (elem []byte)

Get retrieves a memory element (already performing the type assertion)

func (*MemPoolLimitUnique) Put

func (p *MemPoolLimitUnique) Put(elem []byte)

Put returns a memory element to the pool, resetting its size to capacity in the process

func (*MemPoolLimitUnique) Resize

func (p *MemPoolLimitUnique) Resize(elem []byte, size int) []byte

Resize resizes an element of the pool, updating its tracking information in the process

type MemPoolNoLimit

type MemPoolNoLimit struct {
	sync.Pool
}

MemPoolNoLimit wraps a standard sync.Pool (no limit to resources)

func NewMemPoolNoLimit

func NewMemPoolNoLimit() *MemPoolNoLimit

NewMemPoolNoLimit instantiates a new memory pool that manages bytes slices of arbitrary capacity

func (*MemPoolNoLimit) Get

func (p *MemPoolNoLimit) Get(size int) (elem []byte)

Get retrieves a memory element (already performing the type assertion)

func (*MemPoolNoLimit) GetReadWriter

func (p *MemPoolNoLimit) GetReadWriter(size int) *ReadWriter

GetReadWriter returns a wrapped element providing an io.ReadWriter

func (*MemPoolNoLimit) Put

func (p *MemPoolNoLimit) Put(elem []byte)

Put returns a memory element to the pool, resetting its size to capacity in the process

func (*MemPoolNoLimit) PutReadWriter

func (p *MemPoolNoLimit) PutReadWriter(elem *ReadWriter)

PutReadWriter returns a wrapped element providing an io.ReadWriter to the pool

type ReadWriteSeekCloser

type ReadWriteSeekCloser interface {
	Stat() (fs.FileInfo, error)

	io.Reader
	io.Writer
	io.Seeker
	io.Closer
}

ReadWriteSeekCloser provides an interface to all the wrapped interfaces in one instance

type ReadWriter

type ReadWriter struct {
	// contains filtered or unexported fields
}

ReadWriter denotes a wrapper around a data slice from a memory pool that fulfils the io.Reader and io.Writer interfaces (similar to a bytes.Buffer, on which parts of the implementation are based on)

func (*ReadWriter) Bytes

func (rw *ReadWriter) Bytes() []byte

Bytes returns a slice holding the unread portion of the ReadWriter, valid for use only until the next buffer modification (that is, only until the next call to a method like Read(), Write() or Reset() The slice aliases the buffer content at least until the next buffer modification, so immediate changes to the slice will affect the result of future reads

func (*ReadWriter) BytesCopy

func (rw *ReadWriter) BytesCopy() []byte

BytesCopy returns a slice holding a copy of the unread portion of the ReadWriter

func (*ReadWriter) Read

func (rw *ReadWriter) Read(p []byte) (n int, err error)

Read reads the next len(p) bytes from the buffer or until the buffer is drained. The return value n is the number of bytes read. If the buffer has no data to return, err is io.EOF (unless len(p) is zero); otherwise it is nil

func (*ReadWriter) Reset

func (rw *ReadWriter) Reset()

Reset resets the buffer to be empty, but it retains the underlying storage for use by future writes

func (*ReadWriter) Write

func (rw *ReadWriter) Write(p []byte) (int, error)

Write appends the contents of p to the buffer, growing the buffer as needed. The return value n is the length of p; err is always nil

type Reader

type Reader interface {
	Init(r io.Reader) (io.Reader, error)
	Close() error
	Return()
}

Reader denotes a generic reader interface (enforcing an initialization and closing method)

type ReaderChain

type ReaderChain struct {
	io.Reader
	// contains filtered or unexported fields
}

ReaderChain provides convenient access to a chained io.Reader sequence (and potentially decoding)

func NewReaderChain

func NewReaderChain(r io.Reader) *ReaderChain

NewReaderChain instantiates a new ReaderChain

func (*ReaderChain) AddReader

func (rc *ReaderChain) AddReader(r Reader) *ReaderChain

AddReader adds a Reader instance to the chain

func (*ReaderChain) Build

func (rc *ReaderChain) Build() *ReaderChain

Build constructs the chain of Readers and defines / defers potential cleanup function calls

func (*ReaderChain) Close

func (rc *ReaderChain) Close() (err error)

Close closes the Reader chain, flushing all underlying Readers

func (*ReaderChain) Decode

func (rc *ReaderChain) Decode(fn DecoderFn, v any) error

Decode decodes from an object using the provided decoder function

func (*ReaderChain) DecodeAndClose

func (rc *ReaderChain) DecodeAndClose(fn DecoderFn, v any) error

DecodeAndClose performs the decoding and closes / flushes all Readers in the chain simultaneously

func (*ReaderChain) MemPool

func (rc *ReaderChain) MemPool(memPool *MemPoolNoLimit) *ReaderChain

MemPool sets an (external) memory pool for the chain of Readers

func (*ReaderChain) PostFn

func (rc *ReaderChain) PostFn(fn func(rw *ReadWriter) error) *ReaderChain

PostFn sets a function to be executed at the end of the Reader / decoding chain

type ReaderFn

type ReaderFn func(r io.Reader) (io.Reader, error)

ReaderFn denotes a chained io.Reader based function / method

type Semaphore

type Semaphore chan struct{}

Semaphore provides a generic concurrency / work semaphore

func New

func New(n int) (l Semaphore)

New instantiates a new semaphore with the given maximum concurrency

func (Semaphore) Add

func (l Semaphore) Add()

Add adds a new worker / task to be taken into account

func (Semaphore) Done

func (l Semaphore) Done()

Done releases a worker / task back into the pool

func (Semaphore) TryAdd

func (l Semaphore) TryAdd() (func(), error)

TryAdd attempts to add a new worker / task to be taken into account and aborts with an error if not possible

func (Semaphore) TryAddFor

func (l Semaphore) TryAddFor(timeout time.Duration) (func(), error)

TryAddFor attempts to add a new worker / task to be taken into account for a certain period of time, otherwise aborts with an error

type SemaphoreBuffer

type SemaphoreBuffer = []byte

SemaphoreBuffer is simply the underlying byte slice (from a memory pool), serving as both shared buffer and SemaphoreBuffer representing the held lock

type ThreePointLock

type ThreePointLock struct {
	// contains filtered or unexported fields
}

ThreePointLock denotes a concurrency pattern that allows for rare locks on a high- throughput main loop function with minimal performance impact on said routine

func NewThreePointLock

func NewThreePointLock(options ...ThreePointLockOption) *ThreePointLock

NewThreePointLock creates a new instance of ThreePointLock with the given options

func (*ThreePointLock) Close

func (tpl *ThreePointLock) Close()

Close ensures that all channels are closed, releasing any potentially waiting goroutines

func (*ThreePointLock) ConfirmLockRequest

func (tpl *ThreePointLock) ConfirmLockRequest()

ConfirmLockRequest confirms that the main loop is not processing

func (*ThreePointLock) ConsumeLockRequest

func (tpl *ThreePointLock) ConsumeLockRequest() SemaphoreBuffer

ConsumeLockRequest consumes the lock request

func (*ThreePointLock) ConsumeUnlockRequest

func (tpl *ThreePointLock) ConsumeUnlockRequest()

ConsumeUnlockRequest consumes the unlock request

func (*ThreePointLock) HasLockRequest

func (tpl *ThreePointLock) HasLockRequest() bool

HasLockRequest checks if there is a lock request

func (*ThreePointLock) HasUnlockRequest

func (tpl *ThreePointLock) HasUnlockRequest() bool

HasUnlockRequest checks if there is an unlock request

func (*ThreePointLock) Lock

func (tpl *ThreePointLock) Lock() (err error)

Lock acquires the lock and returns the semaphore If a timeout is specified, the method waits until the timeout expires

func (*ThreePointLock) MustLock

func (tpl *ThreePointLock) MustLock()

MustLock acquires the lock and returns the semaphore (panics on failure)

func (*ThreePointLock) MustUnlock

func (tpl *ThreePointLock) MustUnlock()

MustUnlock releases the lock (panics on failure)

func (*ThreePointLock) Release

func (tpl *ThreePointLock) Release(sem SemaphoreBuffer)

Release releases the semaphore back to the memory pool

func (*ThreePointLock) Unlock

func (tpl *ThreePointLock) Unlock() (err error)

Unlock releases the lock

type ThreePointLockOption

type ThreePointLockOption func(*ThreePointLock)

ThreePointLockOption denotes a functional option for the three-point lock type

func WithLockRequestFn

func WithLockRequestFn(fn func() error) ThreePointLockOption

WithLockRequestFn sets the lock request function for the ThreePointLock

func WithMemPool

func WithMemPool(memPool *MemPoolLimitUnique) ThreePointLockOption

WithMemPool sets the memory pool for the ThreePointLock

func WithMinElementSize

func WithMinElementSize(size int) ThreePointLockOption

WithMinElementSize sets the minimum element size for the ThreePointLock

func WithTimeout

func WithTimeout(timeout time.Duration) ThreePointLockOption

WithTimeout sets the timeout for the lock operation in the ThreePointLock

func WithUnlockRequestFn

func WithUnlockRequestFn(fn func() error) ThreePointLockOption

WithUnlockRequestFn sets the unlock request function for the ThreePointLock

type Writer

type Writer interface {
	Init(w io.Writer) io.Writer
	Close() error
	Return()
}

Writer denotes a generic writer interface (enforcing an initialization and closing method)

type WriterChain

type WriterChain struct {
	io.Writer
	// contains filtered or unexported fields
}

WriterChain provides convenient access to a chained io.Writer sequence (and potentially encoding)

func NewWriterChain

func NewWriterChain() *WriterChain

NewWriterChain instantiates a new WriterChain

func (*WriterChain) AddWriter

func (wc *WriterChain) AddWriter(w Writer) *WriterChain

AddWriter adds a Writer instance to the chain

func (*WriterChain) Build

func (wc *WriterChain) Build() *WriterChain

Build constructs the chain of Writers and defines / defers potential cleanup function calls

func (*WriterChain) Close

func (wc *WriterChain) Close() (err error)

Close closes the Writer chain, flushing all underlying Writers

func (*WriterChain) Encode

func (wc *WriterChain) Encode(fn EncoderFn, v any) (*ReadWriter, error)

Encode encodes the output of the chain of Writers into an object using the provided encoder function

func (*WriterChain) EncodeAndClose

func (wc *WriterChain) EncodeAndClose(fn EncoderFn, v any) error

EncodeAndClose performs the encoding and closes / flushes all Writers in the chain simultaneously

func (*WriterChain) MemPool

func (wc *WriterChain) MemPool(memPool *MemPoolNoLimit) *WriterChain

MemPool sets an (external) memory pool for the chain of Writers

func (*WriterChain) PostFn

func (wc *WriterChain) PostFn(fn func(rw *ReadWriter) error) *WriterChain

PostFn sets a function to be executed at the end of the Writer / encoding chain

type WriterFn

type WriterFn func(w io.Writer) io.Writer

WriterFn denotes a chained io.Writer based function / method

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL