dsUtils

package
v1.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2024 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package dsUtils provides data structures

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrPipeClosed = errors.New("pipe is closed")
	ErrPipeFull   = errors.New("pipe is full")
)

Functions

func RunWorkerPool

func RunWorkerPool(
	ctx context.Context,
	tasksChan chan common.Task,
	capacity int,
	errHanlder func(error) error,
) (err error)

RunWorkerPool is a single function worker pool implementation

Create a channel where u will send tasks, send some tasks there and they will execute. If there are too many tasks -- they will be saved to the queue.

Also this function may be extended to have resizable capacity, more callbacks, tasks priority, but those modifications would harm performance.

If you need out-of-the-box more flexible approach -- use WorkerPool struct instead. Also this wp is worse in terms of performance. IDK why it exists, it's just sucks.

Types

type CBuf

type CBuf[T any] struct {
	ICBuf[T]
	Buffer   []T
	Cursor   int
	Capacity int
}

CBuf is the implementation of ICBuf interface Simplest circular buffer implementation If thread safety is needed -- use CBufSync

WARNING: It's not thread-safe by default, use it with a lock

func NewCBuf

func NewCBuf[T any](capacity int) *CBuf[T]

NewCBuf creates a new CBuf instance

func (*CBuf[T]) Get

func (cbuf *CBuf[T]) Get() []T

Get returns the whole buffer in the correct order

func (*CBuf[T]) Push

func (cbuf *CBuf[T]) Push(item T)

func (*CBuf[T]) PushFront

func (cbuf *CBuf[T]) PushFront(item T)

This method isn't a part of ICBuf interface

It pushes item to the "buffer start" (Right before the cursor) removing item from back

type Cache

type Cache struct {
	ICache
	// contains filtered or unexported fields
}

Cache provides simple cache functionality All objects in the same cache instance are getting same TTL

func NewCache

func NewCache(ttl time.Duration) *Cache

NewCache Creates new Cache instance

func (*Cache) Get

func (c *Cache) Get(key string) (data []byte, isSet bool)

func (*Cache) GetAndRemove

func (c *Cache) GetAndRemove(key string) (data []byte, isSet bool)

func (*Cache) Remove

func (c *Cache) Remove(key string)

func (*Cache) Run

func (c *Cache) Run(ctx context.Context) error

func (*Cache) Set

func (c *Cache) Set(data map[string][]byte, canExpire bool, updateExpirationTime bool)

func (*Cache) SetIfNotExists

func (c *Cache) SetIfNotExists(key string, value []byte, canExpire bool) (isSet bool)

type Counter32

type Counter32 struct {
	// contains filtered or unexported fields
}

Counter32 is a 32 bit implementation of ICounter

func (*Counter32) Add

func (c *Counter32) Add(delta int32) (newVal int32)

func (*Counter32) Get

func (c *Counter32) Get() (val int32)

func (*Counter32) Set

func (c *Counter32) Set(new int32) (old int32)

type Counter64

type Counter64 struct {
	// contains filtered or unexported fields
}

Counter64 is a 64 bit implementation of ICounter

func (*Counter64) Add

func (c *Counter64) Add(delta int64) (newVal int64)

func (*Counter64) Get

func (c *Counter64) Get() (val int64)

func (*Counter64) Set

func (c *Counter64) Set(new int64) (old int64)

type ICBuf

type ICBuf[T any] interface {
	Push(item T)
	Get() []T
}

ICBuf is a basic Circular buffer interface

type ICache

type ICache interface {
	Get(key string) (item []byte, isSet bool)
	Set(data map[string][]byte, canExpire bool, updateExpirationTime bool)
	Remove(key string)
	SetIfNotExists(key string, value []byte, canExpire bool) (isSet bool)
	GetAndRemove(key string) (item []byte, isSet bool)

	Run(ctx context.Context) error
}

type ICounter

type ICounter[i int32 | int64] interface {
	Set(new i) (old i)
	Add(delta i) (newVal i)
	Get() (val i)
}

func NewCounter32

func NewCounter32(initVal int32) ICounter[int32]

NewCounter32 creates new Counter32 instance

func NewCounter64

func NewCounter64(initVal int64) ICounter[int64]

NewCounter64 creates new Counter64 instance

type IIdSyncMap

type IIdSyncMap[IDType common.Int, ItemType any] interface {
	GetAll() map[IDType]ItemType
	Add(items ...ItemType) (id IDType)
	Rm(id IDType)
	Pop(id IDType) (item ItemType, exists bool)
}

func NewIDSyncMap

func NewIDSyncMap[IDType common.Int, ItemType any]() IIdSyncMap[IDType, ItemType]

type IPipe

type IPipe interface {
	io.Closer
	Write(ctx context.Context, data ...[]byte) (n int, err error)
	WaitWrite(ctx context.Context, data ...[]byte) (n int, err error)
	Read(ctx context.Context) (data [][]byte, isOpen bool, err error)
	WaitRead(ctx context.Context) (data [][]byte, isOpen bool, err error)

	Cap() int
	Grow(n int)
	Len() int
	Reset()
}

IPipe provides Pipe Interface. Pipe is a data structure in which u can write and from which u can read chunks of data. Almost like buffer

type IQueue

type IQueue[T any] interface {
	Push(item T)
	Get() T
	Len() int
}

type IStack

type IStack[T any] interface {
	Push(item T)
	Get() T
	Len() int
}

type IWorkerPool

type IWorkerPool interface {
	Exec(task common.Task)
	QueueLen() int
	WorkersCount() WorkersCount
	SetWorkersCountLim(newSize int32)
}

type IdSyncMap

type IdSyncMap[IDType common.Int, ItemType any] struct {
	IIdSyncMap[IDType, ItemType]
	Lock   *sync.RWMutex
	Data   map[IDType]ItemType
	NextID IDType
}

func (*IdSyncMap[IDType, ItemType]) Add

func (idsyncmap *IdSyncMap[IDType, ItemType]) Add(items ...ItemType) (id IDType)

func (*IdSyncMap[IDType, ItemType]) GetAll

func (idsyncmap *IdSyncMap[IDType, ItemType]) GetAll() map[IDType]ItemType

func (*IdSyncMap[IDType, ItemType]) Pop

func (idsyncmap *IdSyncMap[IDType, ItemType]) Pop(id IDType) (item ItemType, exists bool)

func (*IdSyncMap[IDType, ItemType]) Rm

func (idsyncmap *IdSyncMap[IDType, ItemType]) Rm(id IDType)

type KeyValue

type KeyValue struct {
	Key   string
	Value []byte
}

type MaybeExpirableData

type MaybeExpirableData struct {
	Data           []byte
	ExpirationTime time.Time
	CanExpire      bool
}

type Pipe

type Pipe struct {
	IPipe
	// contains filtered or unexported fields
}

Pipe implements IPipe interface

func NewPipe

func NewPipe(sizeLimit int) *Pipe

NewPipe creates new Pipe and returns pointer to it Set sizeLimit = -1 for no limit discard

func (*Pipe) Cap

func (p *Pipe) Cap() int

func (*Pipe) Close

func (p *Pipe) Close() error

func (*Pipe) GetData

func (p *Pipe) GetData() *[][]byte

func (*Pipe) Grow

func (p *Pipe) Grow(n int)

func (*Pipe) Len

func (p *Pipe) Len() int

func (*Pipe) LenCap

func (p *Pipe) LenCap() (length int, capacity int)

func (*Pipe) Read

func (p *Pipe) Read(ctx context.Context) (data [][]byte, isOpen bool, err error)

Read reads all the chunks from the pipe and resets pipe data after reading. Also returns if pipe is open and may return io.EOF as an error if there is no data and the pipe was closed

func (*Pipe) Reset

func (p *Pipe) Reset()

func (*Pipe) WaitRead

func (p *Pipe) WaitRead(ctx context.Context) (data [][]byte, isOpen bool, err error)

WaitRead when pipe is empty -- waits for some data to be written. Then reads all the chunks from the pipe and resets pipe data after reading. Also returns if pipe is open and may return io.EOF as an error if there is no data and the pipe was closed. Also, may return ErrPipeClosed if pipe is closed during reading and there is no data in the pipe

func (*Pipe) WaitWrite

func (p *Pipe) WaitWrite(ctx context.Context, data ...[]byte) (overflow [][]byte, err error)

WaitWrite waits for pipe too free if there isn't enough space and then saves chunks into the pipe and returns the overflow. May return ErrPipeClosed and ErrPipeFull as an error

func (*Pipe) Write

func (p *Pipe) Write(ctx context.Context, data ...[]byte) (overflow [][]byte, err error)

Write saves chunks into the pipe and returns the overflow. May return ErrPipeClosed and ErrPipeFull as an error

type Queue

type Queue[T any] struct {
	IQueue[T]
	Buffer []T
}

Queue isn't thread safe and suppose to be used only in a single thread For multithreaded usage -- use QueueSync

func NewQueue

func NewQueue[T any]() *Queue[T]

NewQueue creates a new Queue instance

func (*Queue[T]) Get

func (q *Queue[T]) Get() (T, error)

Get returns a single item from a queue. If queue is empty -- it will return io.EOF

func (*Queue[T]) Len

func (q *Queue[T]) Len() int

func (*Queue[T]) Push

func (q *Queue[T]) Push(item T)

type QueueSync

type QueueSync[T any] struct {
	IQueue[T]
	Buffer       []T
	Mu           *sync.Mutex
	NonEmptyCond *sync.Cond
}

QueueSync is a Queue but for multi thread usage

func NewQueueSync

func NewQueueSync[T any]() *QueueSync[T]

NewQueueSync creates a new QueueSync instance

func (*QueueSync[T]) Get

func (q *QueueSync[T]) Get() T

func (*QueueSync[T]) Len

func (q *QueueSync[T]) Len() int

func (*QueueSync[T]) Push

func (q *QueueSync[T]) Push(item T)

type Stack

type Stack[T any] struct {
	IStack[T]
	Buffer []T
}

Stack isn't thread safe and suppose to be used only in a single thread For multithreaded usage -- use Stack

func NewStack

func NewStack[T any]() *Stack[T]

NewStack creates a new Stack instance

func (*Stack[T]) Get

func (s *Stack[T]) Get() (T, error)

func (*Stack[T]) Len

func (s *Stack[T]) Len() int

func (*Stack[T]) Push

func (s *Stack[T]) Push(item T)

type StackSync

type StackSync[T any] struct {
	IStack[T]
	Buffer       []T
	Mu           *sync.Mutex
	NonEmptyCond *sync.Cond
}

StackSync is a Stack but for multi thread usage

func NewStackSync

func NewStackSync[T any]() *StackSync[T]

func (*StackSync[T]) Get

func (s *StackSync[T]) Get() T

func (*StackSync[T]) Len

func (s *StackSync[T]) Len() int

func (*StackSync[T]) Push

func (s *StackSync[T]) Push(item T)

type ToBeInvalidatedData

type ToBeInvalidatedData struct {
	Key            string
	ExpirationTime time.Time
}

type WorkerPool

type WorkerPool struct {
	TaskQueue    []common.Task
	Mu           *sync.RWMutex
	CountLim     int32
	CountBusy    int32
	TaskDeadline time.Duration
}

Just a worker pool with queue before it Thread-safe

func NewWorkerPool

func NewWorkerPool(workersCountLim int32, taskDeadline time.Duration) *WorkerPool

func (*WorkerPool) Exec

func (wpq *WorkerPool) Exec(task common.Task)

func (*WorkerPool) QueueLen

func (wpq *WorkerPool) QueueLen() int

func (*WorkerPool) SetWorkersCountLim

func (wpq *WorkerPool) SetWorkersCountLim(newSize int32)

func (*WorkerPool) WorkersCount

func (wpq *WorkerPool) WorkersCount() WorkersCount

type WorkersCount

type WorkersCount struct {
	Lim  int32
	Busy int32
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL