cluster

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 7, 2026 License: MIT Imports: 39 Imported by: 0

Documentation

Overview

Package cluster implements rendezvous hashing (a.k.a. highest random weight hashing). See http://en.wikipedia.org/wiki/Rendezvous_hashing for more information. This is based off of https://github.com/tysonmote/rendezvous

Index

Constants

View Source
const (
	ReasonLargeUploadQueue     = "LargeUploadQueue"
	ReasonLargeTransferQueue   = "LargeTransferQueue"
	ReasonMaxSegmentsExceeded  = "MaxSegmentsExceeded"
	ReasonMaxDiskUsageExceeded = "MaxDiskUsageExceeded"
)
View Source
const DefaultMaxSegmentCount = 25

DefaultMaxSegmentCount is the default maximum number of segments to include in a batch before creating a new batch.

Variables

View Source
var ErrPeerOverloaded error = &PeerOverloadedError{}
View Source
var ErrSegmentExists error = &SegmentExistsError{}
View Source
var ErrSegmentLocked error = &SegmentLockedError{}

Functions

func GetOutboundIP

func GetOutboundIP() (net.IP, error)

Get preferred outbound ip of this machine

func IsInitReady

func IsInitReady(pod *v1.Pod) bool

IsInitReady returns true if all init containers are in a ready state

func IsPodReady

func IsPodReady(pod *v1.Pod) bool

IsPodReady returns true if all containers in a pod are in a ready state

Types

type Batch

type Batch struct {
	Segments []wal.SegmentInfo
	Database string
	Table    string
	Prefix   string
	// contains filtered or unexported fields
}

func (*Batch) IsReleased

func (b *Batch) IsReleased() bool

func (*Batch) IsRemoved

func (b *Batch) IsRemoved() bool

func (*Batch) Paths

func (b *Batch) Paths() []string

func (*Batch) Release

func (b *Batch) Release()

Release releases the segments in the batch so they can be processed again.

func (*Batch) Remove

func (b *Batch) Remove() error

Remove removes the segments in the batch from disk.

type Batcher

type Batcher interface {
	service.Component
	BatchSegments() error
	UploadQueueSize() int
	TransferQueueSize() int
	SegmentsTotal() int64
	SegmentsSize() int64

	Release(batch *Batch)
	Remove(batch *Batch) error
	MaxSegmentAge() time.Duration
}

func NewBatcher

func NewBatcher(opts BatcherOpts) (Batcher, error)

type BatcherOpts

type BatcherOpts struct {
	StorageDir       string
	MinUploadSize    int64
	MaxSegmentAge    time.Duration
	MaxTransferSize  int64
	MaxTransferAge   time.Duration
	MaxBatchSegments int

	Partitioner MetricPartitioner
	Segmenter   Segmenter

	UploadQueue        chan *Batch
	TransferQueue      chan *Batch
	PeerHealthReporter PeerHealthReporter

	TransfersDisabled bool

	// Metrics for observability
	SegmentsCountMetric     prometheus.Gauge
	SegmentsSizeBytesMetric prometheus.Gauge
	SegmentsMaxAgeMetric    prometheus.Gauge
}

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(opts ClientOpts) (*Client, error)

func (*Client) Write

func (c *Client) Write(ctx context.Context, endpoint string, filename string, body io.Reader) error

Write writes the given paths to the given endpoint. If multiple paths are given, they are merged into the first file at the destination. This ensures we transfer the full batch atomimcally.

type ClientOpts

type ClientOpts struct {
	// Close controls whether the client closes the connection after each request.
	Close bool

	// Timeout is the timeout for the http client and the http request.
	Timeout time.Duration

	// InsecureSkipVerify controls whether the client verifies the server's certificate chain and host name.
	InsecureSkipVerify bool

	// IdleConnTimeout is the maximum amount of time an idle (keep-alive) connection
	// will remain idle before closing itself.
	IdleConnTimeout time.Duration

	// ResponseHeaderTimeout is the amount of time to wait for a server's response headers
	// after fully writing the request (including its body, if any).
	ResponseHeaderTimeout time.Duration

	// MaxIdleConns controls the maximum number of idle (keep-alive) connections across all hosts.
	MaxIdleConns int

	// MaxIdleConnsPerHost, if non-zero, controls the maximum idle (keep-alive) per host.
	MaxIdleConnsPerHost int

	// MaxConnsPerHost, if non-zero, controls the maximum connections per host.
	MaxConnsPerHost int

	// TLSHandshakeTimeout specifies the maximum amount of time to
	// wait for a TLS handshake. Zero means no timeout.
	TLSHandshakeTimeout time.Duration

	// DisableHTTP2 controls whether the client disables HTTP/2 support.
	DisableHTTP2 bool

	// DisableKeepAlives controls whether the client disables HTTP keep-alives.
	DisableKeepAlives bool

	// DisableGzip controls whether the client disables gzip compression.
	DisableGzip bool
}

func (ClientOpts) WithDefaults

func (c ClientOpts) WithDefaults() ClientOpts

type Coordinator

type Coordinator interface {
	MetricPartitioner
	service.Component

	// IsLeader returns true if the current node is the leader.
	IsLeader() bool
}

func NewCoordinator

func NewCoordinator(opts *CoordinatorOpts) (Coordinator, error)

type CoordinatorOpts

type CoordinatorOpts struct {
	WriteTimeSeriesFn TimeSeriesWriter
	K8sCli            kubernetes.Interface

	// Namespace is the namespace used to discover peers.  If not specified, the coordinator will
	// try to use the namespace of the current pod.
	Namespace string

	// Hostname is the hostname of the current node.  This should be the statefulset hostname format
	// in order to discover peers correctly
	Hostname string

	// InsecureSkipVerify controls whether a client verifies the server's certificate chain and host name.
	InsecureSkipVerify bool

	// PartitionSize is the max size of the group of nodes forming a partition.  A partition is a set of nodes where
	// keys are distributed.
	// NOTE: This is not used in the current implementation.
	PartitionSize int
}

type ErrBadRequest

type ErrBadRequest struct {
	Msg string
	// contains filtered or unexported fields
}

func (*ErrBadRequest) Error

func (e *ErrBadRequest) Error() string

func (*ErrBadRequest) Is

func (e *ErrBadRequest) Is(target error) bool

func (*ErrBadRequest) WithRequestId added in v0.2.0

func (e *ErrBadRequest) WithRequestId(id string) error

type FakeReplicator

type FakeReplicator struct {
	// contains filtered or unexported fields
}

func NewFakeReplicator

func NewFakeReplicator() *FakeReplicator

func (*FakeReplicator) Close

func (f *FakeReplicator) Close() error

func (*FakeReplicator) Open

func (f *FakeReplicator) Open(ctx context.Context) error

func (*FakeReplicator) TransferQueue

func (f *FakeReplicator) TransferQueue() chan *Batch

type Hash

type Hash struct {
	// contains filtered or unexported fields
}

func NewRendezvous

func NewRendezvous(nodes ...string) *Hash

New returns a new Hash ready for use with the given nodes.

func (*Hash) Add

func (h *Hash) Add(nodes ...string)

Add adds additional nodes to the Hash.

func (*Hash) Get

func (h *Hash) Get(key string) string

Get returns the node with the highest score for the given key. If this Hash has no nodes, an empty string is returned.

func (*Hash) GetN

func (h *Hash) GetN(n int, key string) []string

GetN returns no more than n nodes for the given key, ordered by descending score. GetN is not goroutine-safe.

type Health

type Health struct {
	QueueSizer QueueSizer
	// contains filtered or unexported fields
}

Health tracks the health of peers in the cluster. If a peer is overloaded, it will be marked as unhealthy which will cause the service to stop sending writes to that peer for timeout period. Similarly, the of the current peer is tracked here and if it is unhealthy, the service will stop accepting writes.

func NewHealth

func NewHealth(opts HealthOpts) *Health

func (*Health) Close

func (h *Health) Close() error

func (*Health) IsHealthy

func (h *Health) IsHealthy() bool

func (*Health) IsPeerHealthy

func (h *Health) IsPeerHealthy(peer string) bool

func (*Health) MaxSegmentAge added in v0.2.0

func (h *Health) MaxSegmentAge() time.Duration

func (*Health) Open

func (h *Health) Open(ctx context.Context) error

func (*Health) SegmentsSize

func (h *Health) SegmentsSize() int64

func (*Health) SegmentsTotal

func (h *Health) SegmentsTotal() int64

func (*Health) SetPeerHealthy

func (h *Health) SetPeerHealthy(peer string)

func (*Health) SetPeerUnhealthy

func (h *Health) SetPeerUnhealthy(peer string)

func (*Health) TransferQueueSize

func (h *Health) TransferQueueSize() int

func (*Health) UnhealthyReason

func (h *Health) UnhealthyReason() string

func (*Health) UploadQueueSize

func (h *Health) UploadQueueSize() int

type HealthOpts

type HealthOpts struct {
	// UnhealthyTimeout is the amount of time to wait before marking a peer as healthy.
	UnhealthyTimeout time.Duration

	QueueSizer      QueueSizer
	MaxSegmentCount int64
	MaxDiskUsage    int64
}

type HealthStatus

type HealthStatus struct {
	Healthy   bool
	NextCheck time.Time
}

type MetricPartitioner

type MetricPartitioner interface {
	Owner([]byte) (string, string)
}

type OTLPLogsWriter

type OTLPLogsWriter func(ctx context.Context, database, table string, logs *otlp.Logs) error

type Partitioner

type Partitioner struct {
	// contains filtered or unexported fields
}

Partitioner manages the distribution of metrics across nodes. It uses rendezvous hashing to distribute metrics roughly evenly. When nodes are added or removed, the distribution of metrics will change, but only by a proportional amount for each node. For example, if four nodes exists and a fifth is added, only 20% of the metrics will be reassigned to the new node.

func NewPartition

func NewPartition(nodes map[string]string) (*Partitioner, error)

func (*Partitioner) Owner

func (p *Partitioner) Owner(b []byte) (string, string)

Owner returns the hostname and address of the node that owns the given key and the address of that node.

type PeerHealthReporter

type PeerHealthReporter interface {
	IsPeerHealthy(peer string) bool
	SetPeerUnhealthy(peer string)
	SetPeerHealthy(peer string)
}

type PeerOverloadedError added in v0.2.0

type PeerOverloadedError struct {
	// contains filtered or unexported fields
}

PeerOverloadedError represents a peer overloaded error with optional requestId.

func (*PeerOverloadedError) Error added in v0.2.0

func (e *PeerOverloadedError) Error() string

func (*PeerOverloadedError) Is added in v0.2.0

func (e *PeerOverloadedError) Is(target error) bool

func (*PeerOverloadedError) WithRequestId added in v0.2.0

func (e *PeerOverloadedError) WithRequestId(id string) error

type QueueSizer

type QueueSizer interface {
	TransferQueueSize() int
	UploadQueueSize() int
	SegmentsTotal() int64
	SegmentsSize() int64
	MaxSegmentAge() time.Duration
}

type Replicator

type Replicator interface {
	service.Component
	// TransferQueue returns a channel that can be used to transfer files to other nodes.
	TransferQueue() chan *Batch
}

Replicator manages the transfer of local segments to other nodes.

func NewReplicator

func NewReplicator(opts ReplicatorOpts) (Replicator, error)

type ReplicatorOpts

type ReplicatorOpts struct {
	// Partitioner is used to determine which node owns a given metric.
	Partitioner MetricPartitioner

	// Health is used to report the health of the peer replication.
	Health PeerHealthReporter

	// SegmentRemover is used to remove segments after they have been replicated.
	SegmentRemover SegmentRemover

	// InsecureSkipVerify controls whether a client verifies the server's certificate chain and host name.
	InsecureSkipVerify bool

	// Hostname is the name of the current node.
	Hostname string

	// MaxTransferConcurrency is the maximum number of concurrent transfer requests to in flight at a time.
	// Default is 5.
	MaxTransferConcurrency int

	// DisableGzip controls whether the client uses gzip compression for transfer requests.
	DisableGzip bool
}

type SegmentExistsError added in v0.2.0

type SegmentExistsError struct {
	// contains filtered or unexported fields
}

SegmentExistsError represents a segment already exists error with optional requestId.

func (*SegmentExistsError) Error added in v0.2.0

func (e *SegmentExistsError) Error() string

func (*SegmentExistsError) Is added in v0.2.0

func (e *SegmentExistsError) Is(target error) bool

func (*SegmentExistsError) WithRequestId added in v0.2.0

func (e *SegmentExistsError) WithRequestId(id string) error

type SegmentLockedError added in v0.2.0

type SegmentLockedError struct {
	// contains filtered or unexported fields
}

SegmentLockedError represents a segment locked error with optional requestId.

func (*SegmentLockedError) Error added in v0.2.0

func (e *SegmentLockedError) Error() string

func (*SegmentLockedError) Is added in v0.2.0

func (e *SegmentLockedError) Is(target error) bool

func (*SegmentLockedError) WithRequestId added in v0.2.0

func (e *SegmentLockedError) WithRequestId(id string) error

type SegmentRemover

type SegmentRemover interface {
	Remove(path string) error
}

type Segmenter

type Segmenter interface {
	Get(infos []wal.SegmentInfo, prefix string) []wal.SegmentInfo
	PrefixesByAge() []string
	Remove(si wal.SegmentInfo)
	TotalSegments() int
	TotalSize() int64
	OldestSegmentAge() time.Duration
}

type TimeSeriesWriter

type TimeSeriesWriter func(ctx context.Context, ts []*prompb.TimeSeries) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL