cluster_test_tool

package
v0.0.0-...-e952b3c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 9, 2025 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Overview

Package cluster_test_tool offers utilities for cluster-related tests.

Index

Constants

View Source
const (
	// PubSubSubscriberKind is the cluster kind used for regular subscribers.
	PubSubSubscriberKind = "Subscriber"
	// PubSubTimeoutSubscriberKind is a subscriber kind that intentionally times out.
	PubSubTimeoutSubscriberKind = "TimeoutSubscriber"
)
View Source
const DefaultWaitTimeout = time.Second * 5

DefaultWaitTimeout is the default duration to wait for test conditions.

View Source
const InvalidIdentity string = "invalid"

InvalidIdentity represents a non-existing actor identity used in tests.

Variables

View Source
var File_pubsub_cluster_proto protoreflect.FileDescriptor

Functions

func WaitUntil

func WaitUntil(t testing.TB, cond func() bool, errorMsg string, timeout time.Duration)

WaitUntil repeatedly checks cond until it returns true or the timeout is reached.

Types

type BaseClusterFixture

type BaseClusterFixture struct {
	// contains filtered or unexported fields
}

BaseClusterFixture implements common functionality for cluster fixtures.

func NewBaseClusterFixture

func NewBaseClusterFixture(clusterSize int, opts ...ClusterFixtureOption) *BaseClusterFixture

NewBaseClusterFixture creates a BaseClusterFixture with the given cluster size.

func NewBaseInMemoryClusterFixture

func NewBaseInMemoryClusterFixture(clusterSize int, opts ...ClusterFixtureOption) *BaseClusterFixture

NewBaseInMemoryClusterFixture creates a new in memory cluster fixture

func (*BaseClusterFixture) GetClusterSize

func (b *BaseClusterFixture) GetClusterSize() int

GetClusterSize returns the expected size of the cluster.

func (*BaseClusterFixture) GetMembers

func (b *BaseClusterFixture) GetMembers() []*cluster.Cluster

GetMembers returns the current cluster members.

func (*BaseClusterFixture) Initialize

func (b *BaseClusterFixture) Initialize()

Initialize initializes the cluster fixture

func (*BaseClusterFixture) RemoveNode

func (b *BaseClusterFixture) RemoveNode(node *cluster.Cluster, graceful bool)

RemoveNode removes the given member from the cluster.

func (*BaseClusterFixture) ShutDown

func (b *BaseClusterFixture) ShutDown()

ShutDown disposes the fixture and stops all members.

func (*BaseClusterFixture) SpawnNode

func (b *BaseClusterFixture) SpawnNode() *cluster.Cluster

SpawnNode adds a new member to the cluster and returns it.

type ClusterFixture

type ClusterFixture interface {
	GetMembers() []*cluster.Cluster
	GetClusterSize() int
	SpawnNode() *cluster.Cluster
	RemoveNode(node *cluster.Cluster, graceful bool)
	ShutDown()
}

ClusterFixture defines operations for managing a test cluster.

type ClusterFixtureConfig

type ClusterFixtureConfig struct {
	GetClusterKinds    func() []*cluster.Kind
	GetClusterProvider func() cluster.ClusterProvider
	Configure          func(*cluster.Config) *cluster.Config
	GetIdentityLookup  func(clusterName string) cluster.IdentityLookup
	OnDeposing         func()
}

ClusterFixtureConfig holds configuration for building test clusters.

type ClusterFixtureOption

type ClusterFixtureOption func(*ClusterFixtureConfig)

ClusterFixtureOption configures a ClusterFixtureConfig.

func WithClusterConfigure

func WithClusterConfigure(configure func(*cluster.Config) *cluster.Config) ClusterFixtureOption

WithClusterConfigure sets the cluster configure function for the cluster fixture

func WithGetClusterKinds

func WithGetClusterKinds(getKinds func() []*cluster.Kind) ClusterFixtureOption

WithGetClusterKinds sets the cluster kinds for the cluster fixture

func WithGetClusterProvider

func WithGetClusterProvider(getProvider func() cluster.ClusterProvider) ClusterFixtureOption

WithGetClusterProvider sets the cluster provider for the cluster fixture

func WithGetIdentityLookup

func WithGetIdentityLookup(identityLookup func(clusterName string) cluster.IdentityLookup) ClusterFixtureOption

WithGetIdentityLookup sets the identity lookup function for the cluster fixture

func WithOnDeposing

func WithOnDeposing(onDeposing func()) ClusterFixtureOption

WithOnDeposing sets the on deposing function for the cluster fixture

type DataPublished

type DataPublished struct {
	Data int32 `protobuf:"varint,1,opt,name=data,proto3" json:"data,omitempty"`
	// contains filtered or unexported fields
}

func (*DataPublished) Descriptor deprecated

func (*DataPublished) Descriptor() ([]byte, []int)

Deprecated: Use DataPublished.ProtoReflect.Descriptor instead.

func (*DataPublished) GetData

func (x *DataPublished) GetData() int32

func (*DataPublished) ProtoMessage

func (*DataPublished) ProtoMessage()

func (*DataPublished) ProtoReflect

func (x *DataPublished) ProtoReflect() protoreflect.Message

func (*DataPublished) Reset

func (x *DataPublished) Reset()

func (*DataPublished) String

func (x *DataPublished) String() string

type Delivery

type Delivery struct {
	Identity string
	Data     int
}

Delivery describes a message delivered to a subscriber.

type InMemorySubscribersStore

type InMemorySubscribersStore[T any] struct {
	// contains filtered or unexported fields
}

InMemorySubscribersStore provides a simple concurrent map-based storage.

func NewInMemorySubscriberStore

func NewInMemorySubscriberStore() *InMemorySubscribersStore[*cluster.Subscribers]

NewInMemorySubscriberStore returns an in-memory key-value store for subscribers.

func (*InMemorySubscribersStore[T]) Clear

func (i *InMemorySubscribersStore[T]) Clear(_ context.Context, key string) error

Clear removes the value associated with the given key.

func (*InMemorySubscribersStore[T]) Get

func (i *InMemorySubscribersStore[T]) Get(_ context.Context, key string) (T, error)

Get retrieves the value for the given key.

func (*InMemorySubscribersStore[T]) Set

func (i *InMemorySubscribersStore[T]) Set(_ context.Context, key string, value T) error

Set stores the value for the given key.

type PubSubClusterFixture

type PubSubClusterFixture struct {
	*BaseClusterFixture

	Deliveries     []Delivery
	DeliveriesLock *sync.RWMutex
	// contains filtered or unexported fields
}

PubSubClusterFixture simplifies setting up clusters for PubSub testing.

func NewPubSubClusterFixture

func NewPubSubClusterFixture(t testing.TB, clusterSize int, useDefaultTopicRegistration bool, opts ...ClusterFixtureOption) *PubSubClusterFixture

NewPubSubClusterFixture creates a new fixture with the given cluster size.

func (*PubSubClusterFixture) AppendDelivery

func (p *PubSubClusterFixture) AppendDelivery(delivery Delivery)

AppendDelivery appends a delivery to the deliveries slice

func (*PubSubClusterFixture) ClearDeliveries

func (p *PubSubClusterFixture) ClearDeliveries()

ClearDeliveries clears the deliveries

func (*PubSubClusterFixture) GetSubscribersForTopic

func (p *PubSubClusterFixture) GetSubscribersForTopic(topic string) (*cluster.Subscribers, error)

GetSubscribersForTopic returns the subscribers for the given topic

func (*PubSubClusterFixture) PublishData

func (p *PubSubClusterFixture) PublishData(topic string, data int) (*cluster.PublishResponse, error)

PublishData publishes the given message to the given topic

func (*PubSubClusterFixture) PublishDataBatch

func (p *PubSubClusterFixture) PublishDataBatch(topic string, data []int) (*cluster.PublishResponse, error)

PublishDataBatch publishes the given messages to the given topic

func (*PubSubClusterFixture) RandomMember

func (p *PubSubClusterFixture) RandomMember() *cluster.Cluster

RandomMember returns a random cluster member from the fixture.

func (*PubSubClusterFixture) SubscribeAllTo

func (p *PubSubClusterFixture) SubscribeAllTo(topic string, subscriberIds []string)

SubscribeAllTo subscribes all the given subscribers to the given topic

func (*PubSubClusterFixture) SubscribeTo

func (p *PubSubClusterFixture) SubscribeTo(topic, identity, kind string)

SubscribeTo subscribes the given subscriber to the given topic

func (*PubSubClusterFixture) SubscriberIds

func (p *PubSubClusterFixture) SubscriberIds(prefix string, count int) []string

SubscriberIds returns the subscriber ids

func (*PubSubClusterFixture) UnSubscribeAllFrom

func (p *PubSubClusterFixture) UnSubscribeAllFrom(topic string, subscriberIds []string)

UnSubscribeAllFrom unsubscribes all the given subscribers from the given topic

func (*PubSubClusterFixture) UnSubscribeTo

func (p *PubSubClusterFixture) UnSubscribeTo(topic, identity, kind string)

UnSubscribeTo unsubscribes the given subscriber from the given topic

func (*PubSubClusterFixture) VerifyAllSubscribersGotAllTheData

func (p *PubSubClusterFixture) VerifyAllSubscribersGotAllTheData(subscriberIds []string, numMessages int)

VerifyAllSubscribersGotAllTheData verifies that all subscribers got all the data

type Response

type Response struct {
	// contains filtered or unexported fields
}

func (*Response) Descriptor deprecated

func (*Response) Descriptor() ([]byte, []int)

Deprecated: Use Response.ProtoReflect.Descriptor instead.

func (*Response) ProtoMessage

func (*Response) ProtoMessage()

func (*Response) ProtoReflect

func (x *Response) ProtoReflect() protoreflect.Message

func (*Response) Reset

func (x *Response) Reset()

func (*Response) String

func (x *Response) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL