usync

package
v0.0.0-...-ea06120 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2025 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrTimeout = errors.New("Operation timed out")

Functions

func Await

func Await(timeout, interval time.Duration, done func() (bool, error)) error

invoke done until it returns true or error, or the timeout occurs. return error if done errors or timeout occurs

func AwaitTrue

func AwaitTrue(timeout, interval time.Duration, done func() bool) (rv bool)

invoke done() until it returns true, or the timeout occurs. return true iff done() returns true before timeout

func FatalPanic

func FatalPanic(x any, msg string)

Log any panics and exit the program.

Use: defer usync.FatalPanic(recover(), "boo")

func HashBytes

func HashBytes(b []byte) uintptr

compute a unique hash for a short byte slice

func HashString

func HashString(s string) uintptr

compute a unique hash for a short string

this saves about 80% compared to HashBytes([]byte(s)) and is more convenient

func IgnorePanic

func IgnorePanic()

Ignore any panics. Prefer IgnorePanicIn instead.

Use: defer usync.IgnorePanic()

Do not: defer func() { usync.IgnorePanic() }

note that there is no outer func - this can only be used if directly used by defer.

func LogPanic

func LogPanic(x any, msg string)

Log any panics.

Use: defer usync.LogPanic(recover(), msg)

note that there is no outer func - this can only be used if directly used by defer.

func LogPanicIn

func LogPanicIn(msg string, activity func())

Log any panics in activity().

Types

type AtomicBool

type AtomicBool struct {
	// contains filtered or unexported fields
}

boolean that is safe to access by multiple threads

func (*AtomicBool) Clear

func (this *AtomicBool) Clear()

func (*AtomicBool) ClearUnlessClear

func (this *AtomicBool) ClearUnlessClear() (changed bool)

return true if able to clear

func (*AtomicBool) IsSet

func (this *AtomicBool) IsSet() bool

func (*AtomicBool) Set

func (this *AtomicBool) Set()

func (*AtomicBool) SetUnlessSet

func (this *AtomicBool) SetUnlessSet() (changed bool)

return true if able to set

type AtomicBool32

type AtomicBool32 struct {
	// contains filtered or unexported fields
}

smaller boolean that is safe to access by multiple threads

func (*AtomicBool32) Clear

func (this *AtomicBool32) Clear()

func (*AtomicBool32) ClearUnlessClear

func (this *AtomicBool32) ClearUnlessClear() (changed bool)

return true if able to clear

func (*AtomicBool32) IsSet

func (this *AtomicBool32) IsSet() bool

func (*AtomicBool32) Set

func (this *AtomicBool32) Set()

func (*AtomicBool32) SetUnlessSet

func (this *AtomicBool32) SetUnlessSet() (changed bool)

return true if able to set

type AtomicBools

type AtomicBools struct {
	// contains filtered or unexported fields
}

a set of 64 booleans that are safe to access by multiple threads

func (*AtomicBools) Clear

func (this *AtomicBools) Clear(bit int)

func (*AtomicBools) GetAll

func (this *AtomicBools) GetAll() uint64

func (*AtomicBools) IsSet

func (this *AtomicBools) IsSet(bit int) bool

func (*AtomicBools) Set

func (this *AtomicBools) Set(bit int)

func (*AtomicBools) SetAll

func (this *AtomicBools) SetAll(newV uint64)

func (*AtomicBools) SetAllUnlessSet

func (this *AtomicBools) SetAllUnlessSet(oldV, newV uint64) (changed bool)

return true if able to set

func (*AtomicBools) SetUnlessSet

func (this *AtomicBools) SetUnlessSet(bit int) (changed bool)

return true if able to set

type AtomicBools32

type AtomicBools32 struct {
	// contains filtered or unexported fields
}

a set of 32 booleans that are safe to access by multiple threads

func (*AtomicBools32) Clear

func (this *AtomicBools32) Clear(bit int)

func (*AtomicBools32) GetAll

func (this *AtomicBools32) GetAll() uint32

func (*AtomicBools32) IsSet

func (this *AtomicBools32) IsSet(bit int) bool

func (*AtomicBools32) Set

func (this *AtomicBools32) Set(bit int)

func (*AtomicBools32) SetAll

func (this *AtomicBools32) SetAll(newV uint32)

func (*AtomicBools32) SetAllUnlessSet

func (this *AtomicBools32) SetAllUnlessSet(oldV, newV uint32) (changed bool)

return true if able to set

func (*AtomicBools32) SetUnlessSet

func (this *AtomicBools32) SetUnlessSet(bit int) (changed bool)

return true if able to set

type AtomicInt

type AtomicInt struct {
	// contains filtered or unexported fields
}

func (*AtomicInt) Add

func (this *AtomicInt) Add(amount int64) (result int64)

func (*AtomicInt) AddIfLessThan

func (this *AtomicInt) AddIfLessThan(
	amount, lessThan int64,
) (
	result int64, added bool,
)

func (*AtomicInt) Cas

func (this *AtomicInt) Cas(oldV, newV int64) (swapped bool)

func (*AtomicInt) Get

func (this *AtomicInt) Get() (rv int64)

func (*AtomicInt) Set

func (this *AtomicInt) Set(amount int64)

func (AtomicInt) String

func (this AtomicInt) String() (rv string)

type Chan

type Chan[T any] chan T

Generic channel - syntactic sugar for common uses

ch := NewChan[any](capacity)
...
close(ch)

func NewChan

func NewChan[T any](capacity int) (rv Chan[T])

func (Chan[T]) Get

func (this Chan[T]) Get() (rv T, ok bool)

get an item, wait forever or until closed. return false if closed

func (Chan[T]) GetWait

func (this Chan[T]) GetWait(d time.Duration) (rv T, ok bool)

try to get an item, waiting up to duration time, return true of got an item

func (Chan[T]) Put

func (this Chan[T]) Put(it T)

put an item into this, waiting forever

the item is automatically wrapped in a Delayed

func (Chan[T]) PutRecover

func (this Chan[T]) PutRecover(it T) (ok bool)

put an item into this, waiting forever, recover in case of chan close, returning false if chan closed

in general, the writer should 'know' the chan is closed because they closed it, but there are sometimes cases where this is not true

func (Chan[T]) PutWait

func (this Chan[T]) PutWait(it T, d time.Duration) (ok bool)

try to put an item into this, waiting no more than specified, returning true if successful

func (Chan[T]) PutWaitRecover

func (this Chan[T]) PutWaitRecover(it T, d time.Duration) (ok bool)

try to put an item into this, waiting no more than specified, returning true if successful and not closed

in general, the writer should 'know' the chan is closed because they closed it, but there are sometimes cases where this is not true

func (Chan[T]) TryGet

func (this Chan[T]) TryGet() (rv T, ok bool)

try to get an item, returning immediately, return true of got an item

type DeathChan

type DeathChan chan struct{}

a channel to signal death

func NewDeathChan

func NewDeathChan() (rv DeathChan)

a new channel of death!

func (DeathChan) Check

func (this DeathChan) Check() (timeToDie bool)

reader: check to see if it's time to die

func (DeathChan) Close

func (this DeathChan) Close()

writer: signal to any reader it's time to die

func (DeathChan) Wait

func (this DeathChan) Wait(timeout time.Duration) (timeToDie bool)

reader: wait up to timeout for death to occur

type DelayChan

type DelayChan struct {
	Delayer
	OutC Chan[any] // where to get delayed things
}

A channel with a time delay

Items inserted to InC become available on OutC after delay.

func (DelayChan) Get

func (this DelayChan) Get() (rv any, ok bool)

get an item from this, waiting forever false indicates channel closed and not item received

func (DelayChan) GetWait

func (this DelayChan) GetWait(d time.Duration) (rv any, ok bool)

try to get an item from this, waiting at most specified time, returning true if got item, false may indicate channel closed

func (*DelayChan) Open

func (this *DelayChan) Open()

func (DelayChan) ShutdownAndDrain

func (this DelayChan) ShutdownAndDrain()

Shutdown and drain OutC in the background, disgarding all items

func (DelayChan) TryGet

func (this DelayChan) TryGet() (rv any, ok bool)

try to get an item from this, without waiting, returning true if got item false may indicate channel closed

type Delayed

type Delayed struct {
	Deadline time.Time
	Value    any
}

All values added to Delayer are wrapped in one of these so that delay time can be properly accounted for.

The Put() methods do this automatically, but you may need to use the Wrap() method if using InC directly.

type Delayer

type Delayer struct {
	Name  string        //
	Cap   int           // capacity - max items to hold
	Delay time.Duration // amount of time to delay each item
	InC   chan Delayed  // where to put things, or use Put() methods

	OnItem  func(value any) // call for each item after delay
	OnClose func()          // if set, call after InC closed and drained
	// contains filtered or unexported fields
}

Delays things

func (Delayer) Close

func (this Delayer) Close()

close this down

InC will be drained and all items will be processed with normal delay

func (Delayer) Drain

func (this Delayer) Drain()

Cause all items to be discarded

Once drained, use Plug() to ready Delayer for use

func (Delayer) IsDrained

func (this Delayer) IsDrained() bool

Are all items drained?

func (*Delayer) Open

func (this *Delayer) Open()

Start this up

func (Delayer) Plug

func (this Delayer) Plug()

Return Delayer to service

func (Delayer) Put

func (this Delayer) Put(it any)

put an item into this, waiting forever

the item is automatically wrapped in a Delayed

func (Delayer) PutWait

func (this Delayer) PutWait(it any, d time.Duration) (ok bool)

try to put an item into this, waiting no more than specified, returning OK if successful

the item is automatically wrapped in a Delayed

func (Delayer) PutWaitRecover

func (this Delayer) PutWaitRecover(it any, timeout time.Duration) (ok bool)

put an item into this, waiting forever

if chan closed, then return false

the item is automatically wrapped in a Delayed

func (Delayer) Shutdown

func (this Delayer) Shutdown()

Shutdown this ASAP

All items will be discarded and all delays cancelled

func (Delayer) Wrap

func (this Delayer) Wrap(value any) (rv Delayed)

wrap value in Delayed (if using InC directly)

type LruMap

type LruMap struct {
	// contains filtered or unexported fields
}

Least Recently Used (LRU) eviction map

Lookup is by unique hash, which can be computed with HashString, HashBytes,

or by using one of he convenience accessors.

func NewLruMap

func NewLruMap(capacity int) (rv *LruMap)

func (*LruMap) Get

func (this *LruMap) Get(key string) (rv interface{}, ok bool)

get the value from the map, setting ok to true if value found

func (*LruMap) GetAsString

func (this *LruMap) GetAsString(key string) (rv string, ok bool)

func (*LruMap) GetByBytes

func (this *LruMap) GetByBytes(key []byte) (rv interface{}, ok bool)

get the value from the map, setting ok to true if value found

func (*LruMap) GetByHash

func (this *LruMap) GetByHash(hash uintptr) (rv interface{}, ok bool)

Refer to HashString or HashBytes

func (*LruMap) GetOrAdd

func (this *LruMap) GetOrAdd(key string, value interface{}) (rv interface{})

get the value from the map, or add it if not found

func (*LruMap) GetOrAddAsString

func (this *LruMap) GetOrAddAsString(key, value string) (rv string)

func (*LruMap) GetOrAddByBytes

func (this *LruMap) GetOrAddByBytes(
	key []byte,
	add func() (value interface{}),
) (
	rv interface{},
)

get the value from the map, or add it from values produced by func if not found.

func (*LruMap) GetOrAddByHash

func (this *LruMap) GetOrAddByHash(
	hash uintptr,
	add func() (value interface{}),
) (
	rv interface{},
)

Refer to HashString or HashBytes

func (*LruMap) GetOrAddF

func (this *LruMap) GetOrAddF(
	key string,
	add func() (value interface{}),
) (
	rv interface{},
)

get the value from the map, or add it from values produced by func if not found.

func (*LruMap) Len

func (this *LruMap) Len() (size int)

get the size of the map

func (*LruMap) SetByHash

func (this *LruMap) SetByHash(hash uintptr, v interface{})

Refer to HashString or HashBytes

type Map

type Map struct {
	// contains filtered or unexported fields
}

Map for read heavy use.

func (*Map) Add

func (this *Map) Add(key string, value interface{})

add the value to the map, replacing any existing value

func (*Map) Each

func (this *Map) Each(visit func(key string, value interface{}))

Iterate through each entry

func (*Map) Get

func (this *Map) Get(key string) (value interface{})

get the value from the map

func (*Map) GetOk

func (this *Map) GetOk(key string) (value interface{}, ok bool)

get the value from the map, setting ok to true if value found

func (*Map) Len

func (this *Map) Len() (size int)

get the size of the map

func (*Map) Remove

func (this *Map) Remove(key string) (value interface{})

remove the value from the map, returning the value if found

func (*Map) RemoveUsing

func (this *Map) RemoveUsing(rmf func(key string, value interface{}) bool)

remove values from the map using the provided func

type Pool

type Pool struct {
	New    func() interface{}  // if set, manufacture a new value
	Delete func(v interface{}) // if set and New not set, use to dispose of Puts
	// contains filtered or unexported fields
}

Unlike sync.Pool, this does not lose things. Use to share/reuse values between multiple threads.

Meant to enable reuse of expensive to create values that need to be cleaned up properly at some point.

Do not change New or Delete once the pool is in use.

If set, New() will manufacture a new value if no values are in the cache. New() will be called while locked, so only one caller at a time will be in New() at a time.

If New() is not set or Stop() has been called, and Delete() is set, the Put() will result in Delete() being called instead of caching the value. This enables orderly shutdown after Stop().

func (*Pool) Get

func (this *Pool) Get() (rv interface{})

try to get an existing object, or create a new one

func (*Pool) Put

func (this *Pool) Put(v interface{})

Put a value back into the pool.

If New is not set and Delete is set, then Delete will be called on value

func (*Pool) Stop

func (this *Pool) Stop()

prevent allocation of new objects from this point on

if Delete set, run it on all current values and clear the cache

type Proc

type Proc struct {
	ProcC chan ProcF // queue of closures to invoke
}

mixin to be added to a service goroutine to allow clients to make sync and/or async calls to the service. rather than marshaling args into a struct and passing them to the service, a closure is passed to the service to execute.

The closure then has access to the variables it closes on, as well as the internal service state, which it can safely manipulate or use because it is operating within the service thread context.

When a closure is invoked by the service, an error may be passed to the service that there's a problem. This error is not passed to the caller unless the caller capture the error in a different arror variable.

Async calls are fire and forget. They will be executed in the future by the service.

Sync calls wait for the service to perform the invocation before returning.

type Svc struct {
	proc: usync.Proc
	state: string
}
func(svc *Svc) AsyncSvcCall(arg string) {
	proc.Async(func() (svcErr error) {
		// ... do something with arg in Svc context
		// ... do something with state in Svc context
		return svcErr // tell service call succeeded or failed
	})
}
func(svc *Svc) SyncSvcCall(arg string) (err error) {
	// this call will not return until service done invoking it
	proc.Call(func() (svcErr error) {
		// ... do something with arg in Svc context
		// ... do something with state in Svc context
		// ... set err if needed
		return svcErr       // tell service call succeeded or failed
	})
	return
}

func NewProc

func NewProc(backlog int) (this *Proc)

func (*Proc) Async

func (this *Proc) Async(closure ProcF) (ok bool)

fire and forget call.

this is approx 5x faster than Sync

return true if backend accepted the call (it's not dead)

func (*Proc) Call

func (this *Proc) Call(closure ProcF) (ok bool)

wait for service to invoke

return true if backend accepted the call (it's not dead)

func (*Proc) Close

func (this *Proc) Close()

func (*Proc) Construct

func (this *Proc) Construct(backlog int)

func (*Proc) InvokeUntilError

func (this *Proc) InvokeUntilError() (err error)

semi-example of a possible use

type ProcAny

type ProcAny struct {
	ProcC chan any // add closures to invoke to this
}

the same pattern can be applied to a service that already has an any chan.

is it just me, or is this impossible to do with go generics?

func (*ProcAny) Async

func (this *ProcAny) Async(closure ProcF) (ok bool)

func (*ProcAny) Call

func (this *ProcAny) Call(closure ProcF) (ok bool)

func (*ProcAny) Construct

func (this *ProcAny) Construct(c chan any)

type ProcF

type ProcF func() (svcError error)

a closure to invoke on a service. any error returned is for the service to handle

type Semaphore

type Semaphore chan empty_

a counting semaphore

func NewSemaphore

func NewSemaphore(size int) Semaphore

func (Semaphore) Acquire

func (this Semaphore) Acquire()

func (Semaphore) AcquireN

func (this Semaphore) AcquireN(amount int)

func (Semaphore) Release

func (this Semaphore) Release()

func (Semaphore) ReleaseN

func (this Semaphore) ReleaseN(amount int)

type Ticker

type Ticker struct {
	C <-chan time.Time // user gets time on this (just like time.Ticker)
	// contains filtered or unexported fields
}

A ticker that takes an initial time and a period

Just like time.Ticker but with an initial time

func NewTicker

func NewTicker(initial, period time.Duration) (rv *Ticker)

create a ticker that will emit a time after an initial time, and then periodically after that

func (*Ticker) Stop

func (this *Ticker) Stop()

make sure to Stop the ticker when done!

type WorkF

type WorkF func(any)

type WorkGang

type WorkGang struct {
	Pool Workers

	//
	// produce next req for workers, or nil if done.
	//
	// returning nil causes pool to Close() and this will no longer be called
	//
	// if !ok, entire operation is aborted.
	//
	// this will be run from a separate goroutine
	//
	OnFeed func() (req any, ok bool)

	//
	// Function called by worker goroutines to take requests and produce responses
	//
	// resp is passed to OnResponse.  if !ok, then entire operation is aborted.
	//
	OnRequest func(req any) (resp any, ok bool)

	//
	// Function called by workers
	//
	// if !ok, entire operation is aborted
	//
	// this runs in the callers goroutine
	//
	OnResponse func(resp any) (ok bool)
}

a feeder -> workers -> collector flow

func (*WorkGang) Work

func (this *WorkGang) Work(workers int)

perform the work, returning when done

type Workers

type Workers struct {
	OnDone   func()    // if set, call this when workers done
	RequestC Chan[any] // where to post requests to be worked on

	//
	// if this is set, call whenever a goroutine panics
	// if this returns true, then cause the worker to end
	//
	// if this is not set, panic will be captured and logged, and worker will end
	//
	OnPanic func(cause any) (die bool)
	// contains filtered or unexported fields
}

A pool of workers

Example:

pool := usync.Workers{}

// start a bunch of workers
pool.Go( 2,
    func(req any) {
        ...
    })

go func() { // feeder feeds requests to pool
    for ... {
        pool.RequestC <- ...
    }
    pool.Close() // feeder closes pool
}()

func (*Workers) Close

func (this *Workers) Close()

Tell the pool there's no more work coming

There may be still results being worked on after this

func (*Workers) Drain

func (this *Workers) Drain()

tell workers to pull items from RequestC, but not do any work.

this does not close the request chan, so any workers blocked on that chan will remain blocked until Close() is called.

func (*Workers) Go

func (this *Workers) Go(workers int, factory func() (work WorkF))

start N workers to perform processing

func (*Workers) IsDrained

func (this *Workers) IsDrained() bool

Return true if drained. This can only be true if Drain is set.

func (*Workers) IsDraining

func (this *Workers) IsDraining() bool

did someone set Drain?

func (*Workers) IsEmpty

func (this *Workers) IsEmpty() bool

Return true if requestC currently empty

func (*Workers) Plug

func (this *Workers) Plug()

stop the drain

func (*Workers) Put

func (this *Workers) Put(req any)

func (*Workers) PutWait

func (this *Workers) PutWait(req any, d time.Duration) (ok bool)

func (*Workers) PutWaitRecover

func (this *Workers) PutWaitRecover(req any, wait time.Duration) (ok bool)

func (*Workers) Shutdown

func (this *Workers) Shutdown()

Cease all work as soon as possible and close this down, throwing away any queued requests

func (*Workers) WaitDone

func (this *Workers) WaitDone()

Wait til all workers done

func (*Workers) WaitDrained

func (this *Workers) WaitDrained(deadline time.Duration) bool

Return true when drained. This can only occur if Drain is set.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL