utils

package
v0.0.0-...-3b18725 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2026 License: MIT Imports: 27 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var GrpcReconnectConfig = struct {
	InitialDelay time.Duration
	MaxDelay     time.Duration
	Multiplier   float64
	Jitter       float64
}{
	InitialDelay: 1 * time.Second,
	MaxDelay:     10 * time.Second,
	Multiplier:   2.0,
	Jitter:       0.2,
}

Functions

func CoinSelect

func CoinSelect(
	boardingUtxos []types.Utxo, vtxos []types.VtxoWithTapTree,
	outputs []types.Receiver, dust uint64, withoutExpirySorting bool,
	feeEstimator *arkfee.Estimator,
) ([]types.Utxo, []types.VtxoWithTapTree, uint64, error)

CoinSelect selects among boarding utxos and vtxos to cover the total amount of the outputs it includes fee computation of the input and output thanks to feeEstimator the change is expressed in btc sats

func CoinSelectAsset

func CoinSelectAsset(
	vtxos []types.VtxoWithTapTree, amount uint64,
	assetID string, withoutExpirySorting bool,
) ([]types.VtxoWithTapTree, uint64, error)

CoinSelectAsset selects a set of vtxos holding a specific asset amount the change is expressed in asset sats

func DecryptAES256

func DecryptAES256(encrypted, password []byte) ([]byte, error)

func EncryptAES256

func EncryptAES256(privateKey, password []byte) ([]byte, error)

func FilterVtxosByExpiry

func FilterVtxosByExpiry(vtxos []types.Vtxo, expiryThreshold int64) []types.Vtxo

func GenerateRandomPrivateKey

func GenerateRandomPrivateKey() (*btcec.PrivateKey, error)

func HashPassword

func HashPassword(password []byte) []byte

func IsOnchainOnly

func IsOnchainOnly(receivers []types.Receiver) bool

func ListenToJSONStream

func ListenToJSONStream(url string, chunkCh chan ChunkJSONStream)

func NetworkFromString

func NetworkFromString(net string) arklib.Network

func ParseBitcoinAddress

func ParseBitcoinAddress(addr string, net chaincfg.Params) (
	bool, []byte, error,
)

func ShouldReconnect

func ShouldReconnect(err error) (bool, time.Duration)

func SortVtxosByExpiry

func SortVtxosByExpiry(vtxos []types.Vtxo) []types.Vtxo

func StartReconnectingStream

func StartReconnectingStream[S grpcClientStream, R any, E any](
	ctx context.Context,
	cfg ReconnectingStreamConfig[S, R, E],
) (<-chan E, func(), error)

StartReconnectingStream opens a stream, continuously receives messages and emits mapped events to a channel until context cancelation or a terminal error.

Retryable receive/open errors are handled with backoff and reopen attempts. When ConnectionEvent is provided, DISCONNECTED/RECONNECTED lifecycle events are emitted around reconnect windows.

It returns the event channel, a close function, and an initialization error if the initial stream open fails.

Type parameters:

  • S: concrete stream type used by Open/Recv.
  • R: raw response type read from the stream.
  • E: output event type written to the returned channel.

Example:

eventsCh, closeFn, err := StartReconnectingStream(
	ctx,
	ReconnectingStreamConfig[
		arkv1.ArkService_GetTransactionsStreamClient,
		*arkv1.GetTransactionsStreamResponse,
		client.TransactionEvent,
	]{
		Open: func(ctx context.Context) (arkv1.ArkService_GetTransactionsStreamClient, error) {
			return svc.GetTransactionsStream(ctx, &arkv1.GetTransactionsStreamRequest{})
		},
		Recv: func(s arkv1.ArkService_GetTransactionsStreamClient) (*arkv1.GetTransactionsStreamResponse, error) {
			return s.Recv()
		},
		HandleResp: func(
			ctx context.Context,
			out chan<- client.TransactionEvent,
			resp *arkv1.GetTransactionsStreamResponse,
		) error {
			// map response and send out one or more domain events
			return nil
		},
		ErrorEvent: func(err error) client.TransactionEvent {
			return client.TransactionEvent{Err: err}
		},
	},
)
if err != nil {
	return err
}
defer closeFn()

for ev := range eventsCh {
	_ = ev
}

func ToBitcoinNetwork

func ToBitcoinNetwork(net arklib.Network) chaincfg.Params

Types

type Broadcaster

type Broadcaster[T any] struct {
	// contains filtered or unexported fields
}

func NewBroadcaster

func NewBroadcaster[T any]() *Broadcaster[T]

func (*Broadcaster[T]) Close

func (l *Broadcaster[T]) Close()

func (*Broadcaster[T]) Publish

func (l *Broadcaster[T]) Publish(v T) int

func (*Broadcaster[T]) Subscribe

func (l *Broadcaster[T]) Subscribe(buf int) <-chan T

func (*Broadcaster[T]) Unsubscribe

func (l *Broadcaster[T]) Unsubscribe(ch <-chan T)

type Cache

type Cache[V any] struct {
	// contains filtered or unexported fields
}

func NewCache

func NewCache[V any]() *Cache[V]

func (Cache[V]) Get

func (c Cache[V]) Get(key string) (V, bool)

func (Cache[V]) Set

func (c Cache[V]) Set(key string, value V)

type ChunkJSONStream

type ChunkJSONStream struct {
	Msg []byte
	Err error
}

type ClientFactory

type ClientFactory func(string, bool) (client.TransportClient, error)

type IndexerFactory

type IndexerFactory func(string, bool) (indexer.Indexer, error)

type ReconnectingStreamConfig

type ReconnectingStreamConfig[S grpcClientStream, R any, E any] struct {
	// Connect creates a new stream instance. Called once at startup
	Connect func(context.Context) (S, error)
	// Reconnect creates a new stream instance after retryable failures while reconnecting.
	Reconnect func(context.Context) (S, error)
	// Recv reads one response from the current stream instance.
	Recv func(S) (*R, error)
	// HandleResp maps one response into domain events and writes them to eventsCh.
	// Returning an error terminates the stream and emits ErrorEvent.
	HandleResp func(context.Context, chan<- E, R) error
	// ErrorEvent maps terminal errors into the stream event type.
	ErrorEvent func(error) E
	// ConnectionEvent maps DISCONNECTED/RECONNECTED lifecycle transitions into
	// the stream event type. Optional.
	ConnectionEvent func(ReconnectingStreamStateEvent) E

	// OnServerClosed runs when Recv returns io.EOF.
	OnServerClosed func()
	// OnDisconnect runs for every retryable receive error before sleep.
	OnDisconnect func(error)
	// OnReconnectSuccess runs after a stream reopen succeeds and the first
	// message has been received from the new stream.
	OnReconnectSuccess func(R)
}

ReconnectingStreamConfig defines how to open, read and map a server stream into domain events while handling reconnects in a shared generic loop.

Required fields: Open, Recv, HandleResp, ErrorEvent. Optional callbacks are used for connection lifecycle events, hooks and logs.

Type parameters:

  • S is the concrete gRPC client stream type (must implement CloseSend), for example arkv1.ArkService_GetEventStreamClient.
  • R is the raw message type returned by stream.Recv(), for example *arkv1.GetEventStreamResponse.
  • E is the final event type emitted on the output channel, for example client.BatchEventChannel.

type ReconnectingStreamState

type ReconnectingStreamState string
const (
	ReconnectingStreamStateDisconnected ReconnectingStreamState = "DISCONNECTED"
	ReconnectingStreamStateReconnected  ReconnectingStreamState = "RECONNECTED"
	ReconnectingStreamStateReady        ReconnectingStreamState = "READY"
)

type ReconnectingStreamStateEvent

type ReconnectingStreamStateEvent struct {
	State          ReconnectingStreamState
	At             time.Time
	DisconnectedAt time.Time
	Err            error
}

type SupportedType

type SupportedType[V any] map[string]V

func (SupportedType[V]) String

func (t SupportedType[V]) String() string

func (SupportedType[V]) Supports

func (t SupportedType[V]) Supports(typeStr string) bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL