Documentation
¶
Overview ¶
Package cluster implements rendezvous hashing (a.k.a. highest random weight hashing). See http://en.wikipedia.org/wiki/Rendezvous_hashing for more information. This is based off of https://github.com/tysonmote/rendezvous
Index ¶
- Constants
- Variables
- func GetOutboundIP() (net.IP, error)
- func IsInitReady(pod *v1.Pod) bool
- func IsPodReady(pod *v1.Pod) bool
- type Batch
- type Batcher
- type BatcherOpts
- type Client
- type ClientOpts
- type Coordinator
- type CoordinatorOpts
- type ErrBadRequest
- type FakeReplicator
- type Hash
- type Health
- func (h *Health) Close() error
- func (h *Health) IsHealthy() bool
- func (h *Health) IsPeerHealthy(peer string) bool
- func (h *Health) MaxSegmentAge() time.Duration
- func (h *Health) Open(ctx context.Context) error
- func (h *Health) SegmentsSize() int64
- func (h *Health) SegmentsTotal() int64
- func (h *Health) SetPeerHealthy(peer string)
- func (h *Health) SetPeerUnhealthy(peer string)
- func (h *Health) TransferQueueSize() int
- func (h *Health) UnhealthyReason() string
- func (h *Health) UploadQueueSize() int
- type HealthOpts
- type HealthStatus
- type MetricPartitioner
- type OTLPLogsWriter
- type Partitioner
- type PeerHealthReporter
- type PeerOverloadedError
- type QueueSizer
- type Replicator
- type ReplicatorOpts
- type SegmentExistsError
- type SegmentLockedError
- type SegmentRemover
- type Segmenter
- type TimeSeriesWriter
Constants ¶
const ( ReasonLargeUploadQueue = "LargeUploadQueue" ReasonLargeTransferQueue = "LargeTransferQueue" ReasonMaxSegmentsExceeded = "MaxSegmentsExceeded" ReasonMaxDiskUsageExceeded = "MaxDiskUsageExceeded" )
const DefaultMaxSegmentCount = 25
DefaultMaxSegmentCount is the default maximum number of segments to include in a batch before creating a new batch.
Variables ¶
var ErrPeerOverloaded error = &PeerOverloadedError{}
var ErrSegmentExists error = &SegmentExistsError{}
var ErrSegmentLocked error = &SegmentLockedError{}
Functions ¶
func IsInitReady ¶
IsInitReady returns true if all init containers are in a ready state
func IsPodReady ¶
IsPodReady returns true if all containers in a pod are in a ready state
Types ¶
type Batch ¶
type Batch struct {
Segments []wal.SegmentInfo
Database string
Table string
Prefix string
// contains filtered or unexported fields
}
func (*Batch) IsReleased ¶
type Batcher ¶
type Batcher interface {
service.Component
BatchSegments() error
UploadQueueSize() int
TransferQueueSize() int
SegmentsTotal() int64
SegmentsSize() int64
Release(batch *Batch)
Remove(batch *Batch) error
MaxSegmentAge() time.Duration
}
func NewBatcher ¶
func NewBatcher(opts BatcherOpts) (Batcher, error)
type BatcherOpts ¶
type BatcherOpts struct {
StorageDir string
MinUploadSize int64
MaxSegmentAge time.Duration
MaxTransferSize int64
MaxTransferAge time.Duration
MaxBatchSegments int
Partitioner MetricPartitioner
Segmenter Segmenter
UploadQueue chan *Batch
TransferQueue chan *Batch
PeerHealthReporter PeerHealthReporter
TransfersDisabled bool
// Metrics for observability
SegmentsCountMetric prometheus.Gauge
SegmentsSizeBytesMetric prometheus.Gauge
SegmentsMaxAgeMetric prometheus.Gauge
}
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func NewClient ¶
func NewClient(opts ClientOpts) (*Client, error)
type ClientOpts ¶
type ClientOpts struct {
// Close controls whether the client closes the connection after each request.
Close bool
// Timeout is the timeout for the http client and the http request.
Timeout time.Duration
// InsecureSkipVerify controls whether the client verifies the server's certificate chain and host name.
InsecureSkipVerify bool
// IdleConnTimeout is the maximum amount of time an idle (keep-alive) connection
// will remain idle before closing itself.
IdleConnTimeout time.Duration
// ResponseHeaderTimeout is the amount of time to wait for a server's response headers
// after fully writing the request (including its body, if any).
ResponseHeaderTimeout time.Duration
// MaxIdleConns controls the maximum number of idle (keep-alive) connections across all hosts.
MaxIdleConns int
// MaxIdleConnsPerHost, if non-zero, controls the maximum idle (keep-alive) per host.
MaxIdleConnsPerHost int
// MaxConnsPerHost, if non-zero, controls the maximum connections per host.
MaxConnsPerHost int
// TLSHandshakeTimeout specifies the maximum amount of time to
// wait for a TLS handshake. Zero means no timeout.
TLSHandshakeTimeout time.Duration
// DisableHTTP2 controls whether the client disables HTTP/2 support.
DisableHTTP2 bool
// DisableKeepAlives controls whether the client disables HTTP keep-alives.
DisableKeepAlives bool
// DisableGzip controls whether the client disables gzip compression.
DisableGzip bool
}
func (ClientOpts) WithDefaults ¶
func (c ClientOpts) WithDefaults() ClientOpts
type Coordinator ¶
type Coordinator interface {
MetricPartitioner
service.Component
// IsLeader returns true if the current node is the leader.
IsLeader() bool
}
func NewCoordinator ¶
func NewCoordinator(opts *CoordinatorOpts) (Coordinator, error)
type CoordinatorOpts ¶
type CoordinatorOpts struct {
WriteTimeSeriesFn TimeSeriesWriter
K8sCli kubernetes.Interface
// Namespace is the namespace used to discover peers. If not specified, the coordinator will
// try to use the namespace of the current pod.
Namespace string
// Hostname is the hostname of the current node. This should be the statefulset hostname format
// in order to discover peers correctly
Hostname string
// InsecureSkipVerify controls whether a client verifies the server's certificate chain and host name.
InsecureSkipVerify bool
// PartitionSize is the max size of the group of nodes forming a partition. A partition is a set of nodes where
// keys are distributed.
// NOTE: This is not used in the current implementation.
PartitionSize int
}
type ErrBadRequest ¶
type ErrBadRequest struct {
Msg string
// contains filtered or unexported fields
}
func (*ErrBadRequest) Error ¶
func (e *ErrBadRequest) Error() string
func (*ErrBadRequest) Is ¶
func (e *ErrBadRequest) Is(target error) bool
func (*ErrBadRequest) WithRequestId ¶ added in v0.2.0
func (e *ErrBadRequest) WithRequestId(id string) error
type FakeReplicator ¶
type FakeReplicator struct {
// contains filtered or unexported fields
}
func NewFakeReplicator ¶
func NewFakeReplicator() *FakeReplicator
func (*FakeReplicator) Close ¶
func (f *FakeReplicator) Close() error
func (*FakeReplicator) TransferQueue ¶
func (f *FakeReplicator) TransferQueue() chan *Batch
type Hash ¶
type Hash struct {
// contains filtered or unexported fields
}
func NewRendezvous ¶
New returns a new Hash ready for use with the given nodes.
type Health ¶
type Health struct {
QueueSizer QueueSizer
// contains filtered or unexported fields
}
Health tracks the health of peers in the cluster. If a peer is overloaded, it will be marked as unhealthy which will cause the service to stop sending writes to that peer for timeout period. Similarly, the of the current peer is tracked here and if it is unhealthy, the service will stop accepting writes.
func NewHealth ¶
func NewHealth(opts HealthOpts) *Health
func (*Health) IsPeerHealthy ¶
func (*Health) MaxSegmentAge ¶ added in v0.2.0
func (*Health) SegmentsSize ¶
func (*Health) SegmentsTotal ¶
func (*Health) SetPeerHealthy ¶
func (*Health) SetPeerUnhealthy ¶
func (*Health) TransferQueueSize ¶
func (*Health) UnhealthyReason ¶
func (*Health) UploadQueueSize ¶
type HealthOpts ¶
type HealthOpts struct {
// UnhealthyTimeout is the amount of time to wait before marking a peer as healthy.
UnhealthyTimeout time.Duration
QueueSizer QueueSizer
MaxSegmentCount int64
MaxDiskUsage int64
}
type HealthStatus ¶
type MetricPartitioner ¶
type OTLPLogsWriter ¶
type Partitioner ¶
type Partitioner struct {
// contains filtered or unexported fields
}
Partitioner manages the distribution of metrics across nodes. It uses rendezvous hashing to distribute metrics roughly evenly. When nodes are added or removed, the distribution of metrics will change, but only by a proportional amount for each node. For example, if four nodes exists and a fifth is added, only 20% of the metrics will be reassigned to the new node.
func NewPartition ¶
func NewPartition(nodes map[string]string) (*Partitioner, error)
type PeerHealthReporter ¶
type PeerOverloadedError ¶ added in v0.2.0
type PeerOverloadedError struct {
// contains filtered or unexported fields
}
PeerOverloadedError represents a peer overloaded error with optional requestId.
func (*PeerOverloadedError) Error ¶ added in v0.2.0
func (e *PeerOverloadedError) Error() string
func (*PeerOverloadedError) Is ¶ added in v0.2.0
func (e *PeerOverloadedError) Is(target error) bool
func (*PeerOverloadedError) WithRequestId ¶ added in v0.2.0
func (e *PeerOverloadedError) WithRequestId(id string) error
type QueueSizer ¶
type Replicator ¶
type Replicator interface {
service.Component
// TransferQueue returns a channel that can be used to transfer files to other nodes.
TransferQueue() chan *Batch
}
Replicator manages the transfer of local segments to other nodes.
func NewReplicator ¶
func NewReplicator(opts ReplicatorOpts) (Replicator, error)
type ReplicatorOpts ¶
type ReplicatorOpts struct {
// Partitioner is used to determine which node owns a given metric.
Partitioner MetricPartitioner
// Health is used to report the health of the peer replication.
Health PeerHealthReporter
// SegmentRemover is used to remove segments after they have been replicated.
SegmentRemover SegmentRemover
// InsecureSkipVerify controls whether a client verifies the server's certificate chain and host name.
InsecureSkipVerify bool
// Hostname is the name of the current node.
Hostname string
// MaxTransferConcurrency is the maximum number of concurrent transfer requests to in flight at a time.
// Default is 5.
MaxTransferConcurrency int
// DisableGzip controls whether the client uses gzip compression for transfer requests.
DisableGzip bool
}
type SegmentExistsError ¶ added in v0.2.0
type SegmentExistsError struct {
// contains filtered or unexported fields
}
SegmentExistsError represents a segment already exists error with optional requestId.
func (*SegmentExistsError) Error ¶ added in v0.2.0
func (e *SegmentExistsError) Error() string
func (*SegmentExistsError) Is ¶ added in v0.2.0
func (e *SegmentExistsError) Is(target error) bool
func (*SegmentExistsError) WithRequestId ¶ added in v0.2.0
func (e *SegmentExistsError) WithRequestId(id string) error
type SegmentLockedError ¶ added in v0.2.0
type SegmentLockedError struct {
// contains filtered or unexported fields
}
SegmentLockedError represents a segment locked error with optional requestId.
func (*SegmentLockedError) Error ¶ added in v0.2.0
func (e *SegmentLockedError) Error() string
func (*SegmentLockedError) Is ¶ added in v0.2.0
func (e *SegmentLockedError) Is(target error) bool
func (*SegmentLockedError) WithRequestId ¶ added in v0.2.0
func (e *SegmentLockedError) WithRequestId(id string) error
type SegmentRemover ¶
type Segmenter ¶
type Segmenter interface {
Get(infos []wal.SegmentInfo, prefix string) []wal.SegmentInfo
PrefixesByAge() []string
Remove(si wal.SegmentInfo)
TotalSegments() int
TotalSize() int64
OldestSegmentAge() time.Duration
}
type TimeSeriesWriter ¶
type TimeSeriesWriter func(ctx context.Context, ts []*prompb.TimeSeries) error