Documentation
¶
Index ¶
- Constants
- Variables
- func BytesToUint64(b []byte) (uint64, error)
- func DefaultTimeoutWheel() *goetty.TimeoutWheel
- func DiskStats(path string) (*disk.UsageStat, error)
- func ExecuteInParallel(q *Queue, fn func(interface{}))
- func FileMarshal(fp string, v interface{}) (err error)
- func FileMmap(f *os.File) (data []byte, err error)
- func FileMunmap(data []byte) (err error)
- func FileUnmarshal(fp string, v interface{}) (err error)
- func Float32ToSortableUint64(valS string) (val uint64, err error)
- func Float64ToSortableUint64(valS string) (val uint64, err error)
- func FormatFloat64ToBytes(v float64) []byte
- func FormatInt64ToBytes(v int64) []byte
- func GZIP(path string) error
- func GetIntValue(value, defaultValue int) int
- func GetStringValue(value, defaultValue string) string
- func GetUint64Value(value, defaultValue uint64) uint64
- func InitMetric(runner *Runner, cfg *MetricCfg)
- func MustMarshal(target Marashal) []byte
- func MustMarshalTo(target Marashal, data []byte) int
- func MustUnmarshal(target Unmarshal, data []byte)
- func NoConvert(key []byte, do func([]byte) metapb.Cell) metapb.Cell
- func ParseUrls(s string) ([]url.URL, error)
- func ReplaceFpExt(fp string, newExt string) string
- func SliceToString(b []byte) (s string)
- func StrFloat64(v []byte) (float64, error)
- func StrInt64(v []byte) (int64, error)
- func StringToSlice(s string) (b []byte)
- func Uint64Convert(key []byte, do func([]byte) metapb.Cell) metapb.Cell
- func Uint64ToBytes(v uint64) []byte
- func UnGZIP(file string, dest string) error
- type CellItem
- type CellTree
- func (t *CellTree) Ascend(fn func(cell *metapb.Cell) bool)
- func (t *CellTree) AscendRange(start, end []byte, fn func(cell *metapb.Cell) bool)
- func (t *CellTree) NextCell(start []byte) *metapb.Cell
- func (t *CellTree) Remove(cell metapb.Cell) bool
- func (t *CellTree) Search(key []byte) metapb.Cell
- func (t *CellTree) Update(cell metapb.Cell)
- type Item
- type Job
- func (job *Job) Cancel()
- func (job *Job) GetResult() interface{}
- func (job *Job) IsCancelled() bool
- func (job *Job) IsCancelling() bool
- func (job *Job) IsComplete() bool
- func (job *Job) IsFailed() bool
- func (job *Job) IsFinished() bool
- func (job *Job) IsNotComplete() bool
- func (job *Job) IsPending() bool
- func (job *Job) IsRunning() bool
- func (job *Job) SetResult(result interface{})
- type JobState
- type KVTree
- func (kv *KVTree) Delete(key []byte) bool
- func (kv *KVTree) Get(key []byte) []byte
- func (kv *KVTree) Put(key, value []byte)
- func (kv *KVTree) RangeDelete(start, end []byte)
- func (kv *KVTree) Scan(start, end []byte, handler func(key, value []byte) (bool, error)) error
- func (kv *KVTree) Seek(key []byte) ([]byte, []byte)
- type Limiter
- type Marashal
- type MetricCfg
- type OffsetQueue
- type OrderedArray
- type PriorityQueue
- type Queue
- func (q *Queue) Dispose() []interface{}
- func (q *Queue) Disposed() bool
- func (q *Queue) Empty() bool
- func (q *Queue) Get(number int64, items []interface{}) (int64, error)
- func (q *Queue) Len() int64
- func (q *Queue) Peek() (interface{}, error)
- func (q *Queue) Poll(number int64, items []interface{}, timeout time.Duration) (int64, error)
- func (q *Queue) Put(items ...interface{}) error
- func (q *Queue) PutOrUpdate(cmp func(interface{}, interface{}) bool, item interface{}) error
- type Runner
- func (s *Runner) AddNamedWorker(name string) (uint64, error)
- func (s *Runner) IsNamedWorkerBusy(worker string) bool
- func (s *Runner) RunCancelableTask(task func(context.Context)) (uint64, error)
- func (s *Runner) RunJob(desc string, task func() error) error
- func (s *Runner) RunJobWithNamedWorker(desc, worker string, task func() error) error
- func (s *Runner) RunJobWithNamedWorkerWithCB(desc, worker string, task func() error, cb func(*Job)) error
- func (s *Runner) RunTask(task func()) error
- func (s *Runner) Stop() error
- func (s *Runner) StopCancelableTask(id uint64) error
- type Unmarshal
Constants ¶
const ( // Pending job is wait to running Pending = JobState(0) // Running job is running Running = JobState(1) // Cancelling job is cancelling Cancelling = JobState(2) // Cancelled job is cancelled Cancelled = JobState(3) // Finished job is complete Finished = JobState(4) // Failed job is failed when execute Failed = JobState(5) )
const MaxInt = int(MaxUint >> 1)
const MaxUint = ^uint(0)
https://stackoverflow.com/questions/6878590/the-maximum-value-for-an-int-type-in-go
const MinInt = -MaxInt - 1
const MinUint = 0
Variables ¶
var ( // ErrDisposed is returned when an operation is performed on a disposed // queue. ErrDisposed = errors.New(`queue: disposed`) // ErrTimeout is returned when an applicable queue operation times out. ErrTimeout = errors.New(`queue: poll timed out`) // ErrEmptyQueue is returned when an non-applicable queue operation was called // due to the queue's empty item state ErrEmptyQueue = errors.New(`queue: empty queue`) )
var ( // ErrJobCancelled error job cancelled ErrJobCancelled = errors.New("Job cancelled") )
Functions ¶
func DefaultTimeoutWheel ¶
func DefaultTimeoutWheel() *goetty.TimeoutWheel
DefaultTimeoutWheel returns default timeout wheel
func ExecuteInParallel ¶
func ExecuteInParallel(q *Queue, fn func(interface{}))
ExecuteInParallel will (in parallel) call the provided function with each item in the queue until the queue is exhausted. When the queue is exhausted execution is complete and all goroutines will be killed. This means that the queue will be disposed so cannot be used again.
func FileMarshal ¶
FileMarshal marshals the given object to file.
func FileMmap ¶
FileMmap mmaps the given file. https://medium.com/@arpith/adventures-with-mmap-463b33405223
func FileUnmarshal ¶
FileUnmarshal unmarshals the given file to object.
func Float32ToSortableUint64 ¶
Float32ToSortableUint64 converts a float32 string to sortable uint64.
Refers to: github.com/apache/lucene-solr/lucene/core/src/java/org/apache/lucene/util/NumericUtils.java,
public static int floatToSortableInt(float value); public static long doubleToSortableLong(double value);
https://en.wikipedia.org/wiki/Single-precision_floating-point_format https://en.wikipedia.org/wiki/Double-precision_floating-point_format
func Float64ToSortableUint64 ¶
Float64ToSortableUint64 converts a float64 string to sortable uint64.
func FormatFloat64ToBytes ¶
FormatFloat64ToBytes float64 -> string
func FormatInt64ToBytes ¶
FormatInt64ToBytes int64 -> string
func GetIntValue ¶
GetIntValue get int value if value if 0 reutrn default value
func GetStringValue ¶
GetStringValue get string value if value if "" reutrn default value
func GetUint64Value ¶
GetUint64Value get uint64 value if value if 0 reutrn default value
func MustMarshalTo ¶
MustMarshalTo if marsh failed, will panic
func MustUnmarshal ¶
MustUnmarshal if unmarshal failed, will panic
func ReplaceFpExt ¶
func SliceToString ¶
SliceToString slice to string with out data copy
func StringToSlice ¶
StringToSlice string to slice with out data copy
func Uint64Convert ¶
Uint64Convert returns the hash crc64 result value, must use `ReleaseConvertBytes` to release
Types ¶
type CellItem ¶
type CellItem struct {
// contains filtered or unexported fields
}
CellItem is the cell btree item
type CellTree ¶
CellTree is the btree for cell
func (*CellTree) AscendRange ¶
AscendRange asc iterator the tree in the range [start, end) until fn returns false
func (*CellTree) Remove ¶
Remove removes a cell if the cell is in the tree. It will do nothing if it cannot find the cell or the found cell is not the same with the cell.
type Item ¶
type Item interface {
// Compare returns a int that can be used to determine
// ordering in the priority queue. Assuming the queue
// is in ascending order, this should return > logic.
// Return 1 to indicate this object is greater than the
// the other logic, 0 to indicate equality, and -1 to indicate
// less than other.
Compare(other Item) int
}
Item is an item that can be added to the priority queue.
type Job ¶
Job is do for something with state
func (*Job) IsCancelled ¶
IsCancelled returns true if job state is Cancelled
func (*Job) IsCancelling ¶
IsCancelling returns true if job state is Cancelling
func (*Job) IsComplete ¶
IsComplete return true means the job is complete.
func (*Job) IsFinished ¶
IsFinished returns true if job state is Finished
func (*Job) IsNotComplete ¶
IsNotComplete return true means the job is not complete.
type KVTree ¶
KVTree kv btree
func (*KVTree) RangeDelete ¶
RangeDelete deletes key in [start, end)
type Limiter ¶
type Limiter struct {
// contains filtered or unexported fields
}
Limiter limiter implemention by token
type Marashal ¶
type Marashal interface {
Size() int
Marshal() ([]byte, error)
MarshalTo(data []byte) (int, error)
}
Marashal marashal interface
type OffsetQueue ¶
OffsetQueue is a queue for sync.
func (*OffsetQueue) Add ¶
func (q *OffsetQueue) Add(item interface{}) uint64
Add add a item to the queue
func (*OffsetQueue) Get ¶
func (q *OffsetQueue) Get(offset uint64) ([]interface{}, uint64)
Get returns all the items after the offset, and remove all items before this offset
func (*OffsetQueue) GetMaxOffset ¶
func (q *OffsetQueue) GetMaxOffset() uint64
GetMaxOffset returns the max offset in the queue
type OrderedArray ¶
type OrderedArray struct {
// contains filtered or unexported fields
}
OrderedArray is similar to size limited array except that it's to solve "get M smallest ones among N(N>M) items".
func NewOrderedArray ¶
func NewOrderedArray(capacity int) (oa *OrderedArray, err error)
NewOrderedArray is the constructor for an ordered array. capacity is size limit of queue. capacity shall > 0.
func (*OrderedArray) Finalize ¶
func (oa *OrderedArray) Finalize() (items []Item)
Finalize retrieves all items from and clear the array. items is sorted in ascending order.
func (*OrderedArray) Len ¶
func (oa *OrderedArray) Len() int
Len returns a number indicating how many items are in the array.
type PriorityQueue ¶
type PriorityQueue struct {
// contains filtered or unexported fields
}
PriorityQueue is similar to queue except that it takes items that implement the Item interface and adds them to the queue in priority order.
func NewPriorityQueue ¶
func NewPriorityQueue(capHint int) *PriorityQueue
NewPriorityQueue is the constructor for a priority queue.
func (*PriorityQueue) BulkGet ¶
func (pq *PriorityQueue) BulkGet(number int) (items []Item)
BulkGet retrieves items from the queue. len(items) will be max(0, min(number, len(pq.items))) items is sorted in ascending order.
func (*PriorityQueue) Empty ¶
func (pq *PriorityQueue) Empty() bool
Empty returns a bool indicating if there are any items left in the queue.
func (*PriorityQueue) Get ¶
func (pq *PriorityQueue) Get() (item Item)
Get retrieves an item from the queue if non-empty, othewise returns nil
func (*PriorityQueue) Len ¶
func (pq *PriorityQueue) Len() int
Len returns a number indicating how many items are in the queue.
func (*PriorityQueue) Peek ¶
func (pq *PriorityQueue) Peek() Item
Peek will look at the next item without removing it from the queue.
func (*PriorityQueue) Put ¶
func (pq *PriorityQueue) Put(items ...Item)
Put adds items to the queue.
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue is the struct responsible for tracking the state of the queue.
func (*Queue) Dispose ¶
func (q *Queue) Dispose() []interface{}
Dispose will dispose of this queue and returns the items disposed. Any subsequent calls to Get or Put will return an error.
func (*Queue) Disposed ¶
Disposed returns a bool indicating if this queue has had disposed called on it.
func (*Queue) Get ¶
Get retrieves items from the queue. If there are some items in the queue, get will return a number UP TO the number passed in as a parameter. If no items are in the queue, this method will pause until items are added to the queue.
func (*Queue) Peek ¶
Peek returns a the first item in the queue by value without modifying the queue.
func (*Queue) Poll ¶
Poll retrieves items from the queue. If there are some items in the queue, Poll will return a number UP TO the number passed in as a parameter. If no items are in the queue, this method will pause until items are added to the queue or the provided timeout is reached. A non-positive timeout will block until items are added. If a timeout occurs, ErrTimeout is returned.
func (*Queue) PutOrUpdate ¶
PutOrUpdate will add the specified item to the queue, update it if exists
type Runner ¶
Runner TODO
func (*Runner) AddNamedWorker ¶
AddNamedWorker add a named worker, the named worker has uniq queue, so jobs are linear execution
func (*Runner) IsNamedWorkerBusy ¶
IsNamedWorkerBusy returns true if named queue is not empty
func (*Runner) RunCancelableTask ¶
RunCancelableTask run a task that can be cancelled Example:
err := s.RunCancelableTask(func(ctx context.Context) {
select {
case <-ctx.Done():
// cancelled
case <-time.After(time.Second):
// do something
}
})
if err != nil {
// hanle error
return
}
func (*Runner) RunJobWithNamedWorker ¶
RunJobWithNamedWorker run a job in a named worker
func (*Runner) RunJobWithNamedWorkerWithCB ¶
func (s *Runner) RunJobWithNamedWorkerWithCB(desc, worker string, task func() error, cb func(*Job)) error
RunJobWithNamedWorkerWithCB run a job in a named worker
func (*Runner) Stop ¶
Stop stop all task RunTask will failure with an error Wait complete for the tasks that already in execute Cancel the tasks that is not start
func (*Runner) StopCancelableTask ¶
StopCancelableTask stop cancelable spec task