Documentation
¶
Index ¶
- Variables
- func Await(timeout, interval time.Duration, done func() (bool, error)) error
- func AwaitTrue(timeout, interval time.Duration, done func() bool) (rv bool)
- func FatalPanic(x any, msg string)
- func HashBytes(b []byte) uintptr
- func HashString(s string) uintptr
- func IgnorePanic()
- func LogPanic(x any, msg string)
- func LogPanicIn(msg string, activity func())
- type AtomicBool
- type AtomicBool32
- type AtomicBools
- func (this *AtomicBools) Clear(bit int)
- func (this *AtomicBools) GetAll() uint64
- func (this *AtomicBools) IsSet(bit int) bool
- func (this *AtomicBools) Set(bit int)
- func (this *AtomicBools) SetAll(newV uint64)
- func (this *AtomicBools) SetAllUnlessSet(oldV, newV uint64) (changed bool)
- func (this *AtomicBools) SetUnlessSet(bit int) (changed bool)
- type AtomicBools32
- func (this *AtomicBools32) Clear(bit int)
- func (this *AtomicBools32) GetAll() uint32
- func (this *AtomicBools32) IsSet(bit int) bool
- func (this *AtomicBools32) Set(bit int)
- func (this *AtomicBools32) SetAll(newV uint32)
- func (this *AtomicBools32) SetAllUnlessSet(oldV, newV uint32) (changed bool)
- func (this *AtomicBools32) SetUnlessSet(bit int) (changed bool)
- type AtomicInt
- func (this *AtomicInt) Add(amount int64) (result int64)
- func (this *AtomicInt) AddIfLessThan(amount, lessThan int64) (result int64, added bool)
- func (this *AtomicInt) Cas(oldV, newV int64) (swapped bool)
- func (this *AtomicInt) Get() (rv int64)
- func (this *AtomicInt) Set(amount int64)
- func (this AtomicInt) String() (rv string)
- type Chan
- func (this Chan[T]) Get() (rv T, ok bool)
- func (this Chan[T]) GetWait(d time.Duration) (rv T, ok bool)
- func (this Chan[T]) Put(it T)
- func (this Chan[T]) PutRecover(it T) (ok bool)
- func (this Chan[T]) PutWait(it T, d time.Duration) (ok bool)
- func (this Chan[T]) PutWaitRecover(it T, d time.Duration) (ok bool)
- func (this Chan[T]) TryGet() (rv T, ok bool)
- type DeathChan
- type DelayChan
- type Delayed
- type Delayer
- func (this Delayer) Close()
- func (this Delayer) Drain()
- func (this Delayer) IsDrained() bool
- func (this *Delayer) Open()
- func (this Delayer) Plug()
- func (this Delayer) Put(it any)
- func (this Delayer) PutWait(it any, d time.Duration) (ok bool)
- func (this Delayer) PutWaitRecover(it any, timeout time.Duration) (ok bool)
- func (this Delayer) Shutdown()
- func (this Delayer) Wrap(value any) (rv Delayed)
- type LruMap
- func (this *LruMap) Get(key string) (rv interface{}, ok bool)
- func (this *LruMap) GetAsString(key string) (rv string, ok bool)
- func (this *LruMap) GetByBytes(key []byte) (rv interface{}, ok bool)
- func (this *LruMap) GetByHash(hash uintptr) (rv interface{}, ok bool)
- func (this *LruMap) GetOrAdd(key string, value interface{}) (rv interface{})
- func (this *LruMap) GetOrAddAsString(key, value string) (rv string)
- func (this *LruMap) GetOrAddByBytes(key []byte, add func() (value interface{})) (rv interface{})
- func (this *LruMap) GetOrAddByHash(hash uintptr, add func() (value interface{})) (rv interface{})
- func (this *LruMap) GetOrAddF(key string, add func() (value interface{})) (rv interface{})
- func (this *LruMap) Len() (size int)
- func (this *LruMap) SetByHash(hash uintptr, v interface{})
- type Map
- func (this *Map) Add(key string, value interface{})
- func (this *Map) Each(visit func(key string, value interface{}))
- func (this *Map) Get(key string) (value interface{})
- func (this *Map) GetOk(key string) (value interface{}, ok bool)
- func (this *Map) Len() (size int)
- func (this *Map) Remove(key string) (value interface{})
- func (this *Map) RemoveUsing(rmf func(key string, value interface{}) bool)
- type Pool
- type Proc
- type ProcAny
- type ProcF
- type Semaphore
- type Ticker
- type WorkF
- type WorkGang
- type Workers
- func (this *Workers) Close()
- func (this *Workers) Drain()
- func (this *Workers) Go(workers int, factory func() (work WorkF))
- func (this *Workers) IsDrained() bool
- func (this *Workers) IsDraining() bool
- func (this *Workers) IsEmpty() bool
- func (this *Workers) Plug()
- func (this *Workers) Put(req any)
- func (this *Workers) PutWait(req any, d time.Duration) (ok bool)
- func (this *Workers) PutWaitRecover(req any, wait time.Duration) (ok bool)
- func (this *Workers) Shutdown()
- func (this *Workers) WaitDone()
- func (this *Workers) WaitDrained(deadline time.Duration) bool
Constants ¶
This section is empty.
Variables ¶
var ErrTimeout = errors.New("Operation timed out")
Functions ¶
func Await ¶
invoke done until it returns true or error, or the timeout occurs. return error if done errors or timeout occurs
func AwaitTrue ¶
invoke done() until it returns true, or the timeout occurs. return true iff done() returns true before timeout
func FatalPanic ¶
Log any panics and exit the program.
Use: defer usync.FatalPanic(recover(), "boo")
func HashString ¶
compute a unique hash for a short string
this saves about 80% compared to HashBytes([]byte(s)) and is more convenient
func IgnorePanic ¶
func IgnorePanic()
Ignore any panics. Prefer IgnorePanicIn instead.
Use: defer usync.IgnorePanic()
Do not: defer func() { usync.IgnorePanic() }
note that there is no outer func - this can only be used if directly used by defer.
Types ¶
type AtomicBool ¶
type AtomicBool struct {
// contains filtered or unexported fields
}
boolean that is safe to access by multiple threads
func (*AtomicBool) Clear ¶
func (this *AtomicBool) Clear()
func (*AtomicBool) ClearUnlessClear ¶
func (this *AtomicBool) ClearUnlessClear() (changed bool)
return true if able to clear
func (*AtomicBool) IsSet ¶
func (this *AtomicBool) IsSet() bool
func (*AtomicBool) Set ¶
func (this *AtomicBool) Set()
func (*AtomicBool) SetUnlessSet ¶
func (this *AtomicBool) SetUnlessSet() (changed bool)
return true if able to set
type AtomicBool32 ¶
type AtomicBool32 struct {
// contains filtered or unexported fields
}
smaller boolean that is safe to access by multiple threads
func (*AtomicBool32) Clear ¶
func (this *AtomicBool32) Clear()
func (*AtomicBool32) ClearUnlessClear ¶
func (this *AtomicBool32) ClearUnlessClear() (changed bool)
return true if able to clear
func (*AtomicBool32) IsSet ¶
func (this *AtomicBool32) IsSet() bool
func (*AtomicBool32) Set ¶
func (this *AtomicBool32) Set()
func (*AtomicBool32) SetUnlessSet ¶
func (this *AtomicBool32) SetUnlessSet() (changed bool)
return true if able to set
type AtomicBools ¶
type AtomicBools struct {
// contains filtered or unexported fields
}
a set of 64 booleans that are safe to access by multiple threads
func (*AtomicBools) Clear ¶
func (this *AtomicBools) Clear(bit int)
func (*AtomicBools) GetAll ¶
func (this *AtomicBools) GetAll() uint64
func (*AtomicBools) IsSet ¶
func (this *AtomicBools) IsSet(bit int) bool
func (*AtomicBools) Set ¶
func (this *AtomicBools) Set(bit int)
func (*AtomicBools) SetAll ¶
func (this *AtomicBools) SetAll(newV uint64)
func (*AtomicBools) SetAllUnlessSet ¶
func (this *AtomicBools) SetAllUnlessSet(oldV, newV uint64) (changed bool)
return true if able to set
func (*AtomicBools) SetUnlessSet ¶
func (this *AtomicBools) SetUnlessSet(bit int) (changed bool)
return true if able to set
type AtomicBools32 ¶
type AtomicBools32 struct {
// contains filtered or unexported fields
}
a set of 32 booleans that are safe to access by multiple threads
func (*AtomicBools32) Clear ¶
func (this *AtomicBools32) Clear(bit int)
func (*AtomicBools32) GetAll ¶
func (this *AtomicBools32) GetAll() uint32
func (*AtomicBools32) IsSet ¶
func (this *AtomicBools32) IsSet(bit int) bool
func (*AtomicBools32) Set ¶
func (this *AtomicBools32) Set(bit int)
func (*AtomicBools32) SetAll ¶
func (this *AtomicBools32) SetAll(newV uint32)
func (*AtomicBools32) SetAllUnlessSet ¶
func (this *AtomicBools32) SetAllUnlessSet(oldV, newV uint32) (changed bool)
return true if able to set
func (*AtomicBools32) SetUnlessSet ¶
func (this *AtomicBools32) SetUnlessSet(bit int) (changed bool)
return true if able to set
type AtomicInt ¶
type AtomicInt struct {
// contains filtered or unexported fields
}
func (*AtomicInt) AddIfLessThan ¶
type Chan ¶
type Chan[T any] chan T
Generic channel - syntactic sugar for common uses
ch := NewChan[any](capacity) ... close(ch)
func (Chan[T]) GetWait ¶
try to get an item, waiting up to duration time, return true of got an item
func (Chan[T]) Put ¶
func (this Chan[T]) Put(it T)
put an item into this, waiting forever
the item is automatically wrapped in a Delayed
func (Chan[T]) PutRecover ¶
put an item into this, waiting forever, recover in case of chan close, returning false if chan closed
in general, the writer should 'know' the chan is closed because they closed it, but there are sometimes cases where this is not true
func (Chan[T]) PutWait ¶
try to put an item into this, waiting no more than specified, returning true if successful
func (Chan[T]) PutWaitRecover ¶
try to put an item into this, waiting no more than specified, returning true if successful and not closed
in general, the writer should 'know' the chan is closed because they closed it, but there are sometimes cases where this is not true
type DeathChan ¶
type DeathChan chan struct{}
a channel to signal death
type DelayChan ¶
A channel with a time delay
Items inserted to InC become available on OutC after delay.
func (DelayChan) Get ¶
get an item from this, waiting forever false indicates channel closed and not item received
func (DelayChan) GetWait ¶
try to get an item from this, waiting at most specified time, returning true if got item, false may indicate channel closed
func (DelayChan) ShutdownAndDrain ¶
func (this DelayChan) ShutdownAndDrain()
Shutdown and drain OutC in the background, disgarding all items
type Delayed ¶
All values added to Delayer are wrapped in one of these so that delay time can be properly accounted for.
The Put() methods do this automatically, but you may need to use the Wrap() method if using InC directly.
type Delayer ¶
type Delayer struct {
Name string //
Cap int // capacity - max items to hold
Delay time.Duration // amount of time to delay each item
InC chan Delayed // where to put things, or use Put() methods
OnItem func(value any) // call for each item after delay
OnClose func() // if set, call after InC closed and drained
// contains filtered or unexported fields
}
Delays things
func (Delayer) Close ¶
func (this Delayer) Close()
close this down
InC will be drained and all items will be processed with normal delay
func (Delayer) Drain ¶
func (this Delayer) Drain()
Cause all items to be discarded
Once drained, use Plug() to ready Delayer for use
func (Delayer) Put ¶
put an item into this, waiting forever
the item is automatically wrapped in a Delayed
func (Delayer) PutWait ¶
try to put an item into this, waiting no more than specified, returning OK if successful
the item is automatically wrapped in a Delayed
func (Delayer) PutWaitRecover ¶
put an item into this, waiting forever
if chan closed, then return false
the item is automatically wrapped in a Delayed
type LruMap ¶
type LruMap struct {
// contains filtered or unexported fields
}
Least Recently Used (LRU) eviction map
Lookup is by unique hash, which can be computed with HashString, HashBytes,
or by using one of he convenience accessors.
func (*LruMap) GetByBytes ¶
get the value from the map, setting ok to true if value found
func (*LruMap) GetOrAddAsString ¶
func (*LruMap) GetOrAddByBytes ¶
func (this *LruMap) GetOrAddByBytes( key []byte, add func() (value interface{}), ) ( rv interface{}, )
get the value from the map, or add it from values produced by func if not found.
func (*LruMap) GetOrAddByHash ¶
func (this *LruMap) GetOrAddByHash( hash uintptr, add func() (value interface{}), ) ( rv interface{}, )
Refer to HashString or HashBytes
type Map ¶
type Map struct {
// contains filtered or unexported fields
}
Map for read heavy use.
func (*Map) RemoveUsing ¶
remove values from the map using the provided func
type Pool ¶
type Pool struct {
New func() interface{} // if set, manufacture a new value
Delete func(v interface{}) // if set and New not set, use to dispose of Puts
// contains filtered or unexported fields
}
Unlike sync.Pool, this does not lose things. Use to share/reuse values between multiple threads.
Meant to enable reuse of expensive to create values that need to be cleaned up properly at some point.
Do not change New or Delete once the pool is in use.
If set, New() will manufacture a new value if no values are in the cache. New() will be called while locked, so only one caller at a time will be in New() at a time.
If New() is not set or Stop() has been called, and Delete() is set, the Put() will result in Delete() being called instead of caching the value. This enables orderly shutdown after Stop().
func (*Pool) Get ¶
func (this *Pool) Get() (rv interface{})
try to get an existing object, or create a new one
type Proc ¶
type Proc struct {
ProcC chan ProcF // queue of closures to invoke
}
mixin to be added to a service goroutine to allow clients to make sync and/or async calls to the service. rather than marshaling args into a struct and passing them to the service, a closure is passed to the service to execute.
The closure then has access to the variables it closes on, as well as the internal service state, which it can safely manipulate or use because it is operating within the service thread context.
When a closure is invoked by the service, an error may be passed to the service that there's a problem. This error is not passed to the caller unless the caller capture the error in a different arror variable.
Async calls are fire and forget. They will be executed in the future by the service.
Sync calls wait for the service to perform the invocation before returning.
type Svc struct {
proc: usync.Proc
state: string
}
func(svc *Svc) AsyncSvcCall(arg string) {
proc.Async(func() (svcErr error) {
// ... do something with arg in Svc context
// ... do something with state in Svc context
return svcErr // tell service call succeeded or failed
})
}
func(svc *Svc) SyncSvcCall(arg string) (err error) {
// this call will not return until service done invoking it
proc.Call(func() (svcErr error) {
// ... do something with arg in Svc context
// ... do something with state in Svc context
// ... set err if needed
return svcErr // tell service call succeeded or failed
})
return
}
func (*Proc) Async ¶
fire and forget call.
this is approx 5x faster than Sync
return true if backend accepted the call (it's not dead)
func (*Proc) Call ¶
wait for service to invoke
return true if backend accepted the call (it's not dead)
func (*Proc) InvokeUntilError ¶
semi-example of a possible use
type ProcAny ¶
type ProcAny struct {
ProcC chan any // add closures to invoke to this
}
the same pattern can be applied to a service that already has an any chan.
is it just me, or is this impossible to do with go generics?
type ProcF ¶
type ProcF func() (svcError error)
a closure to invoke on a service. any error returned is for the service to handle
type Ticker ¶
type Ticker struct {
C <-chan time.Time // user gets time on this (just like time.Ticker)
// contains filtered or unexported fields
}
A ticker that takes an initial time and a period
Just like time.Ticker but with an initial time
type WorkGang ¶
type WorkGang struct {
Pool Workers
//
// produce next req for workers, or nil if done.
//
// returning nil causes pool to Close() and this will no longer be called
//
// if !ok, entire operation is aborted.
//
// this will be run from a separate goroutine
//
OnFeed func() (req any, ok bool)
//
// Function called by worker goroutines to take requests and produce responses
//
// resp is passed to OnResponse. if !ok, then entire operation is aborted.
//
OnRequest func(req any) (resp any, ok bool)
//
// Function called by workers
//
// if !ok, entire operation is aborted
//
// this runs in the callers goroutine
//
OnResponse func(resp any) (ok bool)
}
a feeder -> workers -> collector flow
type Workers ¶
type Workers struct {
OnDone func() // if set, call this when workers done
RequestC Chan[any] // where to post requests to be worked on
//
// if this is set, call whenever a goroutine panics
// if this returns true, then cause the worker to end
//
// if this is not set, panic will be captured and logged, and worker will end
//
OnPanic func(cause any) (die bool)
// contains filtered or unexported fields
}
A pool of workers
Example:
pool := usync.Workers{}
// start a bunch of workers
pool.Go( 2,
func(req any) {
...
})
go func() { // feeder feeds requests to pool
for ... {
pool.RequestC <- ...
}
pool.Close() // feeder closes pool
}()
func (*Workers) Close ¶
func (this *Workers) Close()
Tell the pool there's no more work coming
There may be still results being worked on after this
func (*Workers) Drain ¶
func (this *Workers) Drain()
tell workers to pull items from RequestC, but not do any work.
this does not close the request chan, so any workers blocked on that chan will remain blocked until Close() is called.