dhammer

package module
v0.0.0-...-c34bbd5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2023 License: Apache-2.0 Imports: 5 Imported by: 18

README

StreamingFast Hammer Library

reference License

This is a helper library for serializing concurrent and grouped task results. It is useful when you want to debounce network calls that may or may not be grouped in a batch call. It is part of StreamingFast.

Usage

See example usage in EOSIO on StreamingFast.

Contributing

Issues and PR in this repo related strictly to the dhammer library.

Report any protocol-specific issues in their respective repositories

Please first refer to the general StreamingFast contribution guide, if you wish to contribute to this code base.

This codebase uses unit tests extensively, please write and run tests.

License

Apache 2.0

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Hammer

type Hammer struct {
	*shutter.Shutter
	In  chan interface{}
	Out chan interface{}
	// contains filtered or unexported fields
}

Hammer is a tool that batches and parallelize tasks from the 'In' channel and writes results to the 'Out' channel. It can optimize performance in two ways:

  1. It calls your HammerFunc with a maximum of `batchSize` values taken from the 'In' channel (batching)
  2. It calls your HammerFunc a maximum of `maxConcurrency` times in parallel (debouncing)

Both approaches give good results, but combining them gives greatest results, especially with large batch size with small debouncing.

Closing the context will shutdown the batcher immediately. calling "Close" will close the `In` chan and finish processing until the Hammer closes the `Out` chan and shuts down

func NewHammer

func NewHammer(batchSize, maxConcurrency int, hammerFunc HammerFunc, options ...HammerOption) *Hammer

NewHammer returns a single-use batcher startSingle will force batcher to run the first batch with a single object in it

func (*Hammer) Close

func (h *Hammer) Close()

func (*Hammer) Start

func (h *Hammer) Start(ctx context.Context)

type HammerFunc

type HammerFunc func(context.Context, []interface{}) ([]interface{}, error)

type HammerOption

type HammerOption = func(h *Hammer)

func FirstBatchUnitary

func FirstBatchUnitary() HammerOption

func HammerLogger

func HammerLogger(logger *zap.Logger) HammerOption

func HammerTracer

func HammerTracer(tracer logging.Tracer) HammerOption

func SetInChanSize

func SetInChanSize(size int) HammerOption

type Nailer

type Nailer[T any, R any] struct {
	*shutter.Shutter

	Out chan R
	// contains filtered or unexported fields
}
Example
package main

import (
	"context"
	"fmt"
	"strconv"

	"github.com/streamingfast/dhammer"
)

func main() {
	nailer := dhammer.NewNailer(2, func(ctx context.Context, i int) (string, error) {
		return strconv.FormatInt(int64(i*2), 10), nil
	})

	ctx := context.Background()
	nailer.Start(ctx)

	done := make(chan bool, 1)
	go func() {
		for out := range nailer.Out {
			fmt.Println("Received", out)
		}

		done <- true
	}()

	for _, i := range []int{1, 2, 3, 4, 5, 6, 7, 8} {
		nailer.Push(ctx, i)
	}

	nailer.Close()
	<-done

	fmt.Println("Completed")
}
Output:
Received 2
Received 4
Received 6
Received 8
Received 10
Received 12
Received 14
Received 16
Completed

func NewNailer

func NewNailer[T any, R any](maxConcurrency int, nailerFunc NailerFunc[T, R], options ...NailerOption) *Nailer[T, R]

func (*Nailer[T, R]) Close

func (n *Nailer[T, R]) Close()

Close should be called when the nailer is no more needed and closes all started goroutine.

func (*Nailer[T, R]) Drain

func (n *Nailer[T, R]) Drain()

Drain fully consumes the output channel from the nailer, discarding it right away.

func (*Nailer[T, R]) ExecuteAll

func (n *Nailer[T, R]) ExecuteAll(ctx context.Context, ins []T)

ExecuteAll is going to [Start] the nailer, push all `ins` into it and then call [Close] to stop the nailer process. It's a shortcut method if you want to execute pre-made jobs rapidly.

func (*Nailer[T, R]) Push

func (n *Nailer[T, R]) Push(ctx context.Context, in T)

func (*Nailer[T, R]) PushAll

func (n *Nailer[T, R]) PushAll(ctx context.Context, ins []T)

Deprecated Renamed to ExecuteAll, going to be replaced by a different implementation at a later time.

func (*Nailer[T, R]) Start

func (n *Nailer[T, R]) Start(ctx context.Context)

Start should be call prior sending any job to the nailer, it **must** be started otherwise nothing will work properly.

func (*Nailer[T, R]) WaitUntilEmpty

func (n *Nailer[T, R]) WaitUntilEmpty(ctx context.Context)

WaitUntilEmpty waits until no more input nor active inflight operations is in progress blocking along the way. This method is only useful if you need to wait until all output has been sent in `nailer.Out` and want to re-use the same nailer. If you simply need to wait until all elements has been processed, you should call `Close()` and wait until your last element has been processed:

``` nailer := dhammer.NewNailer(4, func(ctx context.Context, i int) (int, error) { return i * 2, nil }) nailer.Start()

done := make(chan bool, 1)

go func() {
	  for out := range nailer.Out {
	    // Do something with out
	  }

	  done <- true
	}()

for i := range []int{1, 2, 3, 4, 5, 6, 7, 8} {
  nailer.Push(ctx, i)
}

nailer.Close() <-done ```

**Important** It's really important to understand that 'WaitUntilEmpty' only knowns when last output has been sent to 'nailer.Out' channel for consumption and not **when** the output processor reading 'nailer.Out' as finished processing the last element of the channel! If you need to wait until empty the queue is empty **and** that your output processor fully processed the last item, you should add some waiting barrier based that is lifted once your last element has been properly processed.

This method works only if output is consumed.

type NailerFunc

type NailerFunc[T any, R any] func(context.Context, T) (R, error)

type NailerOption

type NailerOption func(config *nailerConfig)

NailerOption represents a configuration option that be passed to NewNailer.

func NailerDiscardAll

func NailerDiscardAll() NailerOption

func NailerLogger

func NailerLogger(logger *zap.Logger) NailerOption

func NailerTracer

func NailerTracer(tracer logging.Tracer) NailerOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL