Documentation
¶
Index ¶
- type Hammer
- type HammerFunc
- type HammerOption
- type Nailer
- func (n *Nailer[T, R]) Close()
- func (n *Nailer[T, R]) Drain()
- func (n *Nailer[T, R]) ExecuteAll(ctx context.Context, ins []T)
- func (n *Nailer[T, R]) Push(ctx context.Context, in T)
- func (n *Nailer[T, R]) PushAll(ctx context.Context, ins []T)
- func (n *Nailer[T, R]) Start(ctx context.Context)
- func (n *Nailer[T, R]) WaitUntilEmpty(ctx context.Context)
- type NailerFunc
- type NailerOption
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Hammer ¶
type Hammer struct {
*shutter.Shutter
In chan interface{}
Out chan interface{}
// contains filtered or unexported fields
}
Hammer is a tool that batches and parallelize tasks from the 'In' channel and writes results to the 'Out' channel. It can optimize performance in two ways:
- It calls your HammerFunc with a maximum of `batchSize` values taken from the 'In' channel (batching)
- It calls your HammerFunc a maximum of `maxConcurrency` times in parallel (debouncing)
Both approaches give good results, but combining them gives greatest results, especially with large batch size with small debouncing.
Closing the context will shutdown the batcher immediately. calling "Close" will close the `In` chan and finish processing until the Hammer closes the `Out` chan and shuts down
func NewHammer ¶
func NewHammer(batchSize, maxConcurrency int, hammerFunc HammerFunc, options ...HammerOption) *Hammer
NewHammer returns a single-use batcher startSingle will force batcher to run the first batch with a single object in it
type HammerFunc ¶
type HammerOption ¶
type HammerOption = func(h *Hammer)
func FirstBatchUnitary ¶
func FirstBatchUnitary() HammerOption
func HammerLogger ¶
func HammerLogger(logger *zap.Logger) HammerOption
func HammerTracer ¶
func HammerTracer(tracer logging.Tracer) HammerOption
func SetInChanSize ¶
func SetInChanSize(size int) HammerOption
type Nailer ¶
type Nailer[T any, R any] struct { *shutter.Shutter Out chan R // contains filtered or unexported fields }
Example ¶
package main
import (
"context"
"fmt"
"strconv"
"github.com/streamingfast/dhammer"
)
func main() {
nailer := dhammer.NewNailer(2, func(ctx context.Context, i int) (string, error) {
return strconv.FormatInt(int64(i*2), 10), nil
})
ctx := context.Background()
nailer.Start(ctx)
done := make(chan bool, 1)
go func() {
for out := range nailer.Out {
fmt.Println("Received", out)
}
done <- true
}()
for _, i := range []int{1, 2, 3, 4, 5, 6, 7, 8} {
nailer.Push(ctx, i)
}
nailer.Close()
<-done
fmt.Println("Completed")
}
Output: Received 2 Received 4 Received 6 Received 8 Received 10 Received 12 Received 14 Received 16 Completed
func NewNailer ¶
func NewNailer[T any, R any](maxConcurrency int, nailerFunc NailerFunc[T, R], options ...NailerOption) *Nailer[T, R]
func (*Nailer[T, R]) Close ¶
func (n *Nailer[T, R]) Close()
Close should be called when the nailer is no more needed and closes all started goroutine.
func (*Nailer[T, R]) Drain ¶
func (n *Nailer[T, R]) Drain()
Drain fully consumes the output channel from the nailer, discarding it right away.
func (*Nailer[T, R]) ExecuteAll ¶
ExecuteAll is going to [Start] the nailer, push all `ins` into it and then call [Close] to stop the nailer process. It's a shortcut method if you want to execute pre-made jobs rapidly.
func (*Nailer[T, R]) PushAll ¶
Deprecated Renamed to ExecuteAll, going to be replaced by a different implementation at a later time.
func (*Nailer[T, R]) Start ¶
Start should be call prior sending any job to the nailer, it **must** be started otherwise nothing will work properly.
func (*Nailer[T, R]) WaitUntilEmpty ¶
WaitUntilEmpty waits until no more input nor active inflight operations is in progress blocking along the way. This method is only useful if you need to wait until all output has been sent in `nailer.Out` and want to re-use the same nailer. If you simply need to wait until all elements has been processed, you should call `Close()` and wait until your last element has been processed:
``` nailer := dhammer.NewNailer(4, func(ctx context.Context, i int) (int, error) { return i * 2, nil }) nailer.Start()
done := make(chan bool, 1)
go func() {
for out := range nailer.Out {
// Do something with out
}
done <- true
}()
for i := range []int{1, 2, 3, 4, 5, 6, 7, 8} {
nailer.Push(ctx, i)
}
nailer.Close() <-done ```
**Important** It's really important to understand that 'WaitUntilEmpty' only knowns when last output has been sent to 'nailer.Out' channel for consumption and not **when** the output processor reading 'nailer.Out' as finished processing the last element of the channel! If you need to wait until empty the queue is empty **and** that your output processor fully processed the last item, you should add some waiting barrier based that is lifted once your last element has been properly processed.
This method works only if output is consumed.
type NailerOption ¶
type NailerOption func(config *nailerConfig)
NailerOption represents a configuration option that be passed to NewNailer.
func NailerDiscardAll ¶
func NailerDiscardAll() NailerOption
func NailerLogger ¶
func NailerLogger(logger *zap.Logger) NailerOption
func NailerTracer ¶
func NailerTracer(tracer logging.Tracer) NailerOption