node

package
v0.7.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2025 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Counters = []prometheus.CounterDefinition{
	{
		Name: directMessagesMetric,
		Help: "Number of direct messages this node received.",
	},
	{
		Name: topicMessagesMetric,
		Help: "Number of topic messages this node received.",
	},
	{
		Name: subscriptionsMetric,
		Help: "Number of topics this node subscribes to.",
	},
	{
		Name: messagesSentMetric,
		Help: "Number of messages sent.",
	},
	{
		Name: messagesPublishedMetric,
		Help: "Number of messages published.",
	},
	{
		Name: messagesProcessedMetric,
		Help: "Number of messages this node processed.",
	},
	{
		Name: messagesProcessedOkMetric,
		Help: "Number of messages successfully processed by the node.",
	},
	{
		Name: messagesProcessedErrMetric,
		Help: "Number of messages processed with an error.",
	},
}
View Source
var DefaultConfig = Config{
	Topics:         []string{bls.DefaultTopic},
	HealthInterval: bls.DefaultHealthInterval,
	Concurrency:    bls.DefaultConcurrency,
}

DefaultConfig represents the default settings for the node core.

View Source
var DirectMessagePipeline = Pipeline{ID: DirectMessage}
View Source
var (
	Gauges = []prometheus.GaugeDefinition{
		{
			Name: NodeInfoMetric,
			Help: "Information about the b7s node.",
		},
	}
)
View Source
var (
	NodeInfoMetric = []string{"node", "info"}
)

Functions

func HandleMessage

func HandleMessage[T bls.Message](ctx context.Context, from peer.ID, payload []byte, processFunc func(ctx context.Context, from peer.ID, msg T) error) error

func NewCore

func NewCore(log zerolog.Logger, host *host.Host, opts ...Option) *core

Types

type Config

type Config struct {
	Topics         []string      // Topics to subscribe to.
	HealthInterval time.Duration // How often should we emit the health ping.
	Concurrency    uint          // How many requests should the node process in parallel.
}

type Core

type Core interface {
	// ID returns the node ID.
	ID() string

	Logger
	Network
	Telemetry
	NodeOps
}

type Logger

type Logger interface {
	Log() *zerolog.Logger
}

type Messaging

type Messaging interface {
	Send(context.Context, peer.ID, bls.Message) error
	SendToMany(context.Context, []peer.ID, bls.Message, bool) error

	JoinTopic(string) error
	Subscribe(context.Context, string) error
	Publish(context.Context, bls.Message) error
	PublishToTopic(context.Context, string, bls.Message) error
}

type Network

type Network interface {
	Host() *host.Host
	Connected(peer.ID) bool
	Messaging
}

type NodeOps

type NodeOps interface {
	Run(context.Context, func(context.Context, peer.ID, string, []byte) error) error
}

type Option

type Option func(*Config)

Option can be used to set Node configuration options.

func Concurrency

func Concurrency(n uint) Option

Concurrency specifies how many requests the node should process in parallel.

func HealthInterval

func HealthInterval(d time.Duration) Option

HealthInterval specifies how often we should emit the health signal.

func Topics

func Topics(topics []string) Option

Topics specifies the p2p topics to which node should subscribe.

type Pipeline

type Pipeline struct {
	ID    PipelineID // ID of the pipeline on which the message was received.
	Topic string     // optional - topic on which this message was published.
}

func PubSubPipeline

func PubSubPipeline(topic string) Pipeline

func (Pipeline) String

func (p Pipeline) String() string

type PipelineID

type PipelineID int
const (
	PubSub PipelineID = iota + 1
	DirectMessage
)

func (PipelineID) String

func (i PipelineID) String() string

type Telemetry

type Telemetry interface {
	Tracer() *tracing.Tracer
	Metrics() *metrics.Metrics
}

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL