Documentation
¶
Overview ¶
Package dsUtils provides data structures
Index ¶
- Variables
- func RunWorkerPool(ctx context.Context, tasksChan chan common.Task, capacity int, ...) (err error)
- type CBuf
- type Cache
- func (c *Cache) Get(key string) (data []byte, isSet bool)
- func (c *Cache) GetAndRemove(key string) (data []byte, isSet bool)
- func (c *Cache) Remove(key string)
- func (c *Cache) Run(ctx context.Context) error
- func (c *Cache) Set(data map[string][]byte, canExpire bool, updateExpirationTime bool)
- func (c *Cache) SetIfNotExists(key string, value []byte, canExpire bool) (isSet bool)
- type Counter32
- type Counter64
- type ICBuf
- type ICache
- type ICounter
- type IIdSyncMap
- type IPipe
- type IQueue
- type IStack
- type IWorkerPool
- type IdSyncMap
- func (idsyncmap *IdSyncMap[IDType, ItemType]) Add(items ...ItemType) (id IDType)
- func (idsyncmap *IdSyncMap[IDType, ItemType]) GetAll() map[IDType]ItemType
- func (idsyncmap *IdSyncMap[IDType, ItemType]) Pop(id IDType) (item ItemType, exists bool)
- func (idsyncmap *IdSyncMap[IDType, ItemType]) Rm(id IDType)
- type KeyValue
- type MaybeExpirableData
- type Pipe
- func (p *Pipe) Cap() int
- func (p *Pipe) Close() error
- func (p *Pipe) GetData() *[][]byte
- func (p *Pipe) Grow(n int)
- func (p *Pipe) Len() int
- func (p *Pipe) LenCap() (length int, capacity int)
- func (p *Pipe) Read(ctx context.Context) (data [][]byte, isOpen bool, err error)
- func (p *Pipe) Reset()
- func (p *Pipe) WaitRead(ctx context.Context) (data [][]byte, isOpen bool, err error)
- func (p *Pipe) WaitWrite(ctx context.Context, data ...[]byte) (overflow [][]byte, err error)
- func (p *Pipe) Write(ctx context.Context, data ...[]byte) (overflow [][]byte, err error)
- type Queue
- type QueueSync
- type Stack
- type StackSync
- type ToBeInvalidatedData
- type WorkerPool
- type WorkersCount
Constants ¶
This section is empty.
Variables ¶
var ( ErrPipeClosed = errors.New("pipe is closed") ErrPipeFull = errors.New("pipe is full") )
Functions ¶
func RunWorkerPool ¶
func RunWorkerPool( ctx context.Context, tasksChan chan common.Task, capacity int, errHanlder func(error) error, ) (err error)
RunWorkerPool is a single function worker pool implementation
Create a channel where u will send tasks, send some tasks there and they will execute. If there are too many tasks -- they will be saved to the queue.
Also this function may be extended to have resizable capacity, more callbacks, tasks priority, but those modifications would harm performance.
If you need out-of-the-box more flexible approach -- use WorkerPool struct instead. Also this wp is worse in terms of performance. IDK why it exists, it's just sucks.
Types ¶
type CBuf ¶
CBuf is the implementation of ICBuf interface Simplest circular buffer implementation If thread safety is needed -- use CBufSync
WARNING: It's not thread-safe by default, use it with a lock
type Cache ¶
type Cache struct { ICache // contains filtered or unexported fields }
Cache provides simple cache functionality All objects in the same cache instance are getting same TTL
type Counter32 ¶
type Counter32 struct {
// contains filtered or unexported fields
}
Counter32 is a 32 bit implementation of ICounter
type Counter64 ¶
type Counter64 struct {
// contains filtered or unexported fields
}
Counter64 is a 64 bit implementation of ICounter
type ICBuf ¶
type ICBuf[T any] interface { Push(item T) Get() []T }
ICBuf is a basic Circular buffer interface
type ICache ¶
type ICache interface { Get(key string) (item []byte, isSet bool) Set(data map[string][]byte, canExpire bool, updateExpirationTime bool) Remove(key string) SetIfNotExists(key string, value []byte, canExpire bool) (isSet bool) GetAndRemove(key string) (item []byte, isSet bool) Run(ctx context.Context) error }
type ICounter ¶
type ICounter[i int32 | int64] interface { Set(new i) (old i) Add(delta i) (newVal i) Get() (val i) }
func NewCounter32 ¶
NewCounter32 creates new Counter32 instance
func NewCounter64 ¶
NewCounter64 creates new Counter64 instance
type IIdSyncMap ¶
type IIdSyncMap[IDType common.Int, ItemType any] interface { GetAll() map[IDType]ItemType Add(items ...ItemType) (id IDType) Rm(id IDType) Pop(id IDType) (item ItemType, exists bool) }
func NewIDSyncMap ¶
func NewIDSyncMap[IDType common.Int, ItemType any]() IIdSyncMap[IDType, ItemType]
type IPipe ¶
type IPipe interface { io.Closer Write(ctx context.Context, data ...[]byte) (n int, err error) WaitWrite(ctx context.Context, data ...[]byte) (n int, err error) Read(ctx context.Context) (data [][]byte, isOpen bool, err error) WaitRead(ctx context.Context) (data [][]byte, isOpen bool, err error) Cap() int Grow(n int) Len() int Reset() }
IPipe provides Pipe Interface. Pipe is a data structure in which u can write and from which u can read chunks of data. Almost like buffer
type IWorkerPool ¶
type IWorkerPool interface { Exec(task common.Task) QueueLen() int WorkersCount() WorkersCount SetWorkersCountLim(newSize int32) }
type IdSyncMap ¶
type IdSyncMap[IDType common.Int, ItemType any] struct { IIdSyncMap[IDType, ItemType] Lock *sync.RWMutex Data map[IDType]ItemType NextID IDType }
func (*IdSyncMap[IDType, ItemType]) Add ¶
func (idsyncmap *IdSyncMap[IDType, ItemType]) Add(items ...ItemType) (id IDType)
func (*IdSyncMap[IDType, ItemType]) GetAll ¶
func (idsyncmap *IdSyncMap[IDType, ItemType]) GetAll() map[IDType]ItemType
type MaybeExpirableData ¶
type Pipe ¶
type Pipe struct { IPipe // contains filtered or unexported fields }
Pipe implements IPipe interface
func NewPipe ¶
NewPipe creates new Pipe and returns pointer to it Set sizeLimit = -1 for no limit discard
func (*Pipe) Read ¶
Read reads all the chunks from the pipe and resets pipe data after reading. Also returns if pipe is open and may return io.EOF as an error if there is no data and the pipe was closed
func (*Pipe) WaitRead ¶
WaitRead when pipe is empty -- waits for some data to be written. Then reads all the chunks from the pipe and resets pipe data after reading. Also returns if pipe is open and may return io.EOF as an error if there is no data and the pipe was closed. Also, may return ErrPipeClosed if pipe is closed during reading and there is no data in the pipe
type Queue ¶
Queue isn't thread safe and suppose to be used only in a single thread For multithreaded usage -- use QueueSync
type QueueSync ¶
QueueSync is a Queue but for multi thread usage
func NewQueueSync ¶
NewQueueSync creates a new QueueSync instance
type Stack ¶
Stack isn't thread safe and suppose to be used only in a single thread For multithreaded usage -- use Stack
type ToBeInvalidatedData ¶
type WorkerPool ¶
type WorkerPool struct { TaskQueue []common.Task Mu *sync.RWMutex CountLim int32 CountBusy int32 TaskDeadline time.Duration }
Just a worker pool with queue before it Thread-safe
func NewWorkerPool ¶
func NewWorkerPool(workersCountLim int32, taskDeadline time.Duration) *WorkerPool
func (*WorkerPool) Exec ¶
func (wpq *WorkerPool) Exec(task common.Task)
func (*WorkerPool) QueueLen ¶
func (wpq *WorkerPool) QueueLen() int
func (*WorkerPool) SetWorkersCountLim ¶
func (wpq *WorkerPool) SetWorkersCountLim(newSize int32)
func (*WorkerPool) WorkersCount ¶
func (wpq *WorkerPool) WorkersCount() WorkersCount