protosessions

package
v0.18.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2026 License: MIT Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MetaOriginalAuthoritativeClientIDKey = "original_authoritative_client_id"
	MetaMarkedForEvictionKey             = "marked_for_eviction"
	MetaIsolatedClientIDKey              = "isolated_client_id"
	MetaIsolatedSessionStampKey          = "isolated_session_stamp"
	MetaIsolatedUserIDStampKey           = "isolated_user_id_stamp"
)
View Source
const (
	// NextBucketKeyPrefix is the key prefix for storing the next bucket to process.
	NextBucketKeyPrefix = "timingwheel.next"
)

Variables

This section is empty.

Functions

func BatchedIOBackendTestSuite added in v0.13.0

func BatchedIOBackendTestSuite(t *testing.T, factory func() BatchedIOBackend)

BatchedIOBackendTestSuite tests BatchedIOBackend implementations

func EvictWholeProtosessionStrategy added in v0.13.0

func EvictWholeProtosessionStrategy(
	hit *hits.Hit,
	conflict *IdentifierConflictResponse,
	hitsToBeSaved *[]*hits.Hit,
	protosessionsForEviction map[hits.ClientID][]*hits.Hit,
)

EvictWholeProtosessionStrategy evicts entire protosession and re-queues it

func GetIsolatedClientID added in v0.18.3

func GetIsolatedClientID(hit *hits.Hit) hits.ClientID

func GetIsolatedSessionStamp added in v0.18.3

func GetIsolatedSessionStamp(hit *hits.Hit) (string, bool)

func GetIsolatedUserIDStamp added in v0.18.3

func GetIsolatedUserIDStamp(hit *hits.Hit) (string, bool)

func GetOriginalAuthoritativeClientID added in v0.13.0

func GetOriginalAuthoritativeClientID(hit *hits.Hit) (hits.ClientID, bool)

func Handler

func Handler(
	ctx context.Context,
	backend BatchedIOBackend,
	tickerStateBackend TimingWheelStateBackend,
	closer Closer,
	requeuer receiver.Storage,
	settingsRegistry properties.SettingsRegistry,
	options ...OrchestratorOptionsFunc,
) func(_ map[string]string, h *hits.HitProcessingTask) *worker.Error

Handler returns a function that processes hit processing tasks.

func IsMarkedForEviction added in v0.13.0

func IsMarkedForEviction(hit *hits.Hit) bool

func MarkForEviction added in v0.13.0

func MarkForEviction(hit *hits.Hit, targetClientID hits.ClientID)

func RewriteIDAndUpdateInPlaceStrategy added in v0.13.0

func RewriteIDAndUpdateInPlaceStrategy(
	hit *hits.Hit,
	conflict *IdentifierConflictResponse,
	hitsToBeSaved *[]*hits.Hit,
	protosessionsForEviction map[hits.ClientID][]*hits.Hit,
)

RewriteIDAndUpdateInPlaceStrategy rewrites the hit's ID and updates it in place

func SendPingWithTime added in v0.13.0

func SendPingWithTime(
	f func(_ map[string]string, h *hits.HitProcessingTask) *worker.Error,
	t time.Time,
) error

SendPingWithTime sends a ping with the given time, advancing the worker time.

func SetIsolatedClientID added in v0.18.3

func SetIsolatedClientID(hit *hits.Hit, isolatedID hits.ClientID)

func SetIsolatedSessionStamp added in v0.18.3

func SetIsolatedSessionStamp(hit *hits.Hit, stamp string)

func SetIsolatedUserIDStamp added in v0.18.3

func SetIsolatedUserIDStamp(hit *hits.Hit, stamp string)

Types

type AppendHitsToProtoSessionRequest added in v0.13.0

type AppendHitsToProtoSessionRequest struct {
	ProtoSessionID hits.ClientID
	Hits           []*hits.Hit
}

AppendHitsToProtoSessionRequest represents a request to append hits to a proto-session

func NewAppendHitsToProtoSessionRequest added in v0.13.0

func NewAppendHitsToProtoSessionRequest(
	protoSessionID hits.ClientID,
	hits []*hits.Hit,
) *AppendHitsToProtoSessionRequest

NewAppendHitsToProtoSessionRequest creates a new append hits request

type AppendHitsToProtoSessionResponse added in v0.13.0

type AppendHitsToProtoSessionResponse struct {
	Err error
}

AppendHitsToProtoSessionResponse represents the result of appending hits

func NewAppendHitsToProtoSessionResponse added in v0.13.0

func NewAppendHitsToProtoSessionResponse(err error) *AppendHitsToProtoSessionResponse

NewAppendHitsToProtoSessionResponse creates a new append hits response

type BatchedIOBackend added in v0.13.0

type BatchedIOBackend interface {
	GetIdentifierConflicts(
		ctx context.Context,
		requests []*IdentifierConflictRequest,
	) []*IdentifierConflictResponse
	HandleBatch(
		ctx context.Context,
		appendHitsRequests []*AppendHitsToProtoSessionRequest,
		getProtoSessionHitsRequests []*GetProtoSessionHitsRequest,
		markProtoSessionClosingForGivenBucketRequests []*MarkProtoSessionClosingForGivenBucketRequest,
	) (
		[]*AppendHitsToProtoSessionResponse,
		[]*GetProtoSessionHitsResponse,
		[]*MarkProtoSessionClosingForGivenBucketResponse,
	)
	GetAllProtosessionsForBucket(
		ctx context.Context,
		requests []*GetAllProtosessionsForBucketRequest,
	) []*GetAllProtosessionsForBucketResponse
	Cleanup(
		ctx context.Context,
		hitsRequests []*RemoveProtoSessionHitsRequest,
		metadataRequests []*RemoveAllHitRelatedMetadataRequest,
		bucketMetadataRequests []*RemoveBucketMetadataRequest,
	) (
		[]*RemoveProtoSessionHitsResponse,
		[]*RemoveAllHitRelatedMetadataResponse,
		[]*RemoveBucketMetadataResponse,
	)
	// Stops the backend, terminates all the gorutines, frees all the resources
	Stop(context.Context) error
}

BatchedIOBackend provides batched I/O operations for proto-sessions

func NewDeduplicatingBatchedIOBackend added in v0.13.0

func NewDeduplicatingBatchedIOBackend(underlying BatchedIOBackend) BatchedIOBackend

NewDeduplicatingBatchedIOBackend wraps a BatchedIOBackend and deduplicates requests

func NewTestBatchedIOBackend added in v0.13.0

func NewTestBatchedIOBackend(opts ...TestBatchedIOBackendOption) BatchedIOBackend

NewTestBatchedIOBackend creates a test backend with success-like defaults

type BucketProcessorFunc added in v0.13.0

type BucketProcessorFunc func(ctx context.Context, bucketNumber int64) error

BucketProcessorFunc processes a single bucket. Returns nil on success (wheel advances), or an error on failure (wheel does not advance).

type Closer

type Closer interface {
	Close(protosession [][]*hits.Hit) error
}

Closer defines an interface for closing and processing hit sessions

func NewPrintingCloser

func NewPrintingCloser() Closer

NewPrintingCloser creates a new Closer implementation that prints the hits to stdout

func NewShardingCloser added in v0.13.0

func NewShardingCloser(n int, factory func(shardIndex int) Closer) Closer

NewShardingCloser distributes batches of proto-sessions across N child Closers

func NewTestCloser added in v0.13.0

func NewTestCloser(opts ...TestCloserOption) Closer

NewTestCloser creates a test closer with success-like defaults

type EvictionStrategy added in v0.13.0

type EvictionStrategy func(
	hit *hits.Hit,
	conflict *IdentifierConflictResponse,
	hitsToBeSaved *[]*hits.Hit,
	protosessionsForEviction map[hits.ClientID][]*hits.Hit,
)

EvictionStrategy processes conflicting hits and determines eviction behavior

type GetAllProtosessionsForBucketRequest added in v0.13.0

type GetAllProtosessionsForBucketRequest struct {
	BucketID int64
}

GetAllProtosessionsForBucketRequest represents a request to get all proto-sessions for a bucket

func NewGetAllProtosessionsForBucketRequest added in v0.13.0

func NewGetAllProtosessionsForBucketRequest(bucketID int64) *GetAllProtosessionsForBucketRequest

NewGetAllProtosessionsForBucketRequest creates a new get all protosessions for bucket request

type GetAllProtosessionsForBucketResponse added in v0.13.0

type GetAllProtosessionsForBucketResponse struct {
	ProtoSessions [][]*hits.Hit
	Err           error
}

GetAllProtosessionsForBucketResponse represents all proto-sessions for a bucket

func NewGetAllProtosessionsForBucketResponse added in v0.13.0

func NewGetAllProtosessionsForBucketResponse(
	protoSessions [][]*hits.Hit,
	err error,
) *GetAllProtosessionsForBucketResponse

NewGetAllProtosessionsForBucketResponse creates a new get all protosessions for bucket response

type GetProtoSessionHitsRequest added in v0.13.0

type GetProtoSessionHitsRequest struct {
	ProtoSessionID hits.ClientID
}

GetProtoSessionHitsRequest represents a request to get hits for a proto-session

func NewGetProtoSessionHitsRequest added in v0.13.0

func NewGetProtoSessionHitsRequest(protoSessionID hits.ClientID) *GetProtoSessionHitsRequest

NewGetProtoSessionHitsRequest creates a new get proto-session hits request

type GetProtoSessionHitsResponse added in v0.13.0

type GetProtoSessionHitsResponse struct {
	Hits []*hits.Hit
	Err  error
}

GetProtoSessionHitsResponse represents the result of getting proto-session hits

func NewGetProtoSessionHitsResponse added in v0.13.0

func NewGetProtoSessionHitsResponse(hits []*hits.Hit, err error) *GetProtoSessionHitsResponse

NewGetProtoSessionHitsResponse creates a new get proto-session hits response

type IdentifierConflictRequest added in v0.13.0

type IdentifierConflictRequest struct {
	Hit               *hits.Hit
	IdentifierType    string
	ExtractIdentifier func(*hits.Hit) string
}

IdentifierConflictRequest represents a request to check for identifier conflicts

func GetConflictCheckRequests added in v0.13.0

func GetConflictCheckRequests(
	hit *hits.Hit,
	settings *properties.Settings,
) []*IdentifierConflictRequest

GetConflictCheckRequests returns a list of identifier conflict requests for a given hit and settings

func NewIdentifierConflictRequest added in v0.13.0

func NewIdentifierConflictRequest(
	hit *hits.Hit,
	identifierType string,
	extractIdentifier func(*hits.Hit) string,
) *IdentifierConflictRequest

NewIdentifierConflictRequest creates a new identifier conflict request

type IdentifierConflictResponse added in v0.13.0

type IdentifierConflictResponse struct {
	Err           error
	HasConflict   bool
	ConflictsWith hits.ClientID
	Request       *IdentifierConflictRequest
}

IdentifierConflictResponse represents the result of identifier conflict check

func NewIdentifierConflictResponse added in v0.13.0

func NewIdentifierConflictResponse(
	request *IdentifierConflictRequest,
	err error,
	hasConflict bool,
	conflictsWith hits.ClientID,
) *IdentifierConflictResponse

NewIdentifierConflictResponse creates a new identifier conflict response

type IdentifierIsolationGuard added in v0.18.3

type IdentifierIsolationGuard interface {
	IsolatedClientID(hit *hits.Hit) hits.ClientID
	IsolatedSessionStamp(hit *hits.Hit) string
	IsolatedUserID(hit *hits.Hit) string
}

func NewNoIsolationGuard added in v0.18.3

func NewNoIsolationGuard() IdentifierIsolationGuard

type IdentifierIsolationGuardFactory added in v0.18.3

type IdentifierIsolationGuardFactory interface {
	New(settings *properties.Settings) IdentifierIsolationGuard
}

func NewDefaultIdentifierIsolationGuardFactory added in v0.18.3

func NewDefaultIdentifierIsolationGuardFactory() IdentifierIsolationGuardFactory

func NewNoIsolationGuardFactory added in v0.18.3

func NewNoIsolationGuardFactory() IdentifierIsolationGuardFactory

NewNoIsolationGuardFactory creates a new no isolation guard factory - DANGER - mixes data from different properties in the same proto-session!

type MarkProtoSessionClosingForGivenBucketRequest added in v0.13.0

type MarkProtoSessionClosingForGivenBucketRequest struct {
	ProtoSessionID hits.ClientID
	BucketID       int64
}

MarkProtoSessionClosingForGivenBucketRequest marks a proto-session for closing in a bucket

func NewMarkProtoSessionClosingForGivenBucketRequest added in v0.13.0

func NewMarkProtoSessionClosingForGivenBucketRequest(
	protoSessionID hits.ClientID,
	bucketID int64,
) *MarkProtoSessionClosingForGivenBucketRequest

NewMarkProtoSessionClosingForGivenBucketRequest creates a new mark closing request

type MarkProtoSessionClosingForGivenBucketResponse added in v0.13.0

type MarkProtoSessionClosingForGivenBucketResponse struct {
	Err error
}

MarkProtoSessionClosingForGivenBucketResponse represents the result of marking closing

func NewMarkProtoSessionClosingForGivenBucketResponse added in v0.13.0

func NewMarkProtoSessionClosingForGivenBucketResponse(err error) *MarkProtoSessionClosingForGivenBucketResponse

NewMarkProtoSessionClosingForGivenBucketResponse creates a new mark closing response

type Middleware

type Middleware interface {
	Handle(
		ctx context.Context,
		hits []*hits.Hit,
	) error
}

Middleware defines an interface for task processing middleware

type Orchestrator added in v0.13.0

type Orchestrator struct {
	// contains filtered or unexported fields
}

func NewOrchestrator added in v0.13.0

func NewOrchestrator(
	ctx context.Context,
	backend BatchedIOBackend,
	tickerStateBackend TimingWheelStateBackend,
	closer Closer,
	requeuer receiver.Storage,
	settingsRegistry properties.SettingsRegistry,
	options ...OrchestratorOptionsFunc,
) *Orchestrator

type OrchestratorOptionsFunc added in v0.18.3

type OrchestratorOptionsFunc func(*Orchestrator)

func WithEvictionStrategy added in v0.18.3

func WithEvictionStrategy(
	evictionStrategy EvictionStrategy,
) OrchestratorOptionsFunc

func WithIdentifierIsolationGuardFactory added in v0.18.3

func WithIdentifierIsolationGuardFactory(
	identifierIsolationGuardFactory IdentifierIsolationGuardFactory,
) OrchestratorOptionsFunc

type PerBucketMutexes added in v0.13.0

type PerBucketMutexes map[int64]*sync.Mutex

func (PerBucketMutexes) Drop added in v0.13.0

func (m PerBucketMutexes) Drop(bucketNumber int64)

func (PerBucketMutexes) Lock added in v0.13.0

func (m PerBucketMutexes) Lock(bucketNumber int64)

func (PerBucketMutexes) TryLock added in v0.13.0

func (m PerBucketMutexes) TryLock(bucketNumber int64) bool

type ProtosessionError added in v0.13.0

type ProtosessionError interface {
	Error() string
	IsRetryable() bool
}

func NewErrorCausingTaskDrop added in v0.17.0

func NewErrorCausingTaskDrop(err error) ProtosessionError

func NewErrorCausingTaskRetry added in v0.17.0

func NewErrorCausingTaskRetry(err error) ProtosessionError

type RemoveAllHitRelatedMetadataRequest added in v0.13.0

type RemoveAllHitRelatedMetadataRequest struct {
	IdentifierType    string
	ExtractIdentifier func(*hits.Hit) string
	Hit               *hits.Hit
}

RemoveAllHitRelatedMetadataRequest represents a request to remove all hit-related metadata

func GetRemoveHitRelatedMetadataRequests added in v0.13.0

func GetRemoveHitRelatedMetadataRequests(
	protoSession []*hits.Hit,
	settings *properties.Settings,
) []*RemoveAllHitRelatedMetadataRequest

GetRemoveHitRelatedMetadataRequests returns a list of remove all hit related metadata requests for a given proto session and settings

func NewRemoveAllHitRelatedMetadataRequest added in v0.13.0

func NewRemoveAllHitRelatedMetadataRequest(
	hit *hits.Hit,
	identifierType string,
	extractIdentifier func(*hits.Hit) string,
) *RemoveAllHitRelatedMetadataRequest

NewRemoveAllHitRelatedMetadataRequest creates a new remove hit-related metadata request

type RemoveAllHitRelatedMetadataResponse added in v0.13.0

type RemoveAllHitRelatedMetadataResponse struct {
	Err error
}

RemoveAllHitRelatedMetadataResponse represents the result of removing all hit-related metadata

func NewRemoveAllHitRelatedMetadataResponse added in v0.13.0

func NewRemoveAllHitRelatedMetadataResponse(err error) *RemoveAllHitRelatedMetadataResponse

NewRemoveAllHitRelatedMetadataResponse creates a new remove hit-related metadata response

type RemoveBucketMetadataRequest added in v0.13.0

type RemoveBucketMetadataRequest struct {
	BucketID int64
}

RemoveBucketMetadataRequest represents a request to remove bucket metadata

func NewRemoveBucketMetadataRequest added in v0.13.0

func NewRemoveBucketMetadataRequest(bucketID int64) *RemoveBucketMetadataRequest

NewRemoveBucketMetadataRequest creates a new remove bucket metadata request

type RemoveBucketMetadataResponse added in v0.13.0

type RemoveBucketMetadataResponse struct {
	Err error
}

RemoveBucketMetadataResponse represents the result of removing bucket metadata

func NewRemoveBucketMetadataResponse added in v0.13.0

func NewRemoveBucketMetadataResponse(err error) *RemoveBucketMetadataResponse

NewRemoveBucketMetadataResponse creates a new remove bucket metadata response

type RemoveProtoSessionHitsRequest added in v0.13.0

type RemoveProtoSessionHitsRequest struct {
	ProtoSessionID hits.ClientID
}

RemoveProtoSessionHitsRequest represents a request to remove proto-session hits

func NewRemoveProtoSessionHitsRequest added in v0.13.0

func NewRemoveProtoSessionHitsRequest(protoSessionID hits.ClientID) *RemoveProtoSessionHitsRequest

NewRemoveProtoSessionHitsRequest creates a new remove proto-session hits request

type RemoveProtoSessionHitsResponse added in v0.13.0

type RemoveProtoSessionHitsResponse struct {
	Err error
}

RemoveProtoSessionHitsResponse represents the result of removing proto-session hits

func NewRemoveProtoSessionHitsResponse added in v0.13.0

func NewRemoveProtoSessionHitsResponse(err error) *RemoveProtoSessionHitsResponse

NewRemoveProtoSessionHitsResponse creates a new remove proto-session hits response

type TestBatchedIOBackend added in v0.13.0

type TestBatchedIOBackend struct {
	// contains filtered or unexported fields
}

TestBatchedIOBackend is a configurable test implementation of BatchedIOBackend

func (*TestBatchedIOBackend) Cleanup added in v0.13.0

Cleanup implements BatchedIOBackend

func (*TestBatchedIOBackend) GetAllProtosessionsForBucket added in v0.13.0

GetAllProtosessionsForBucket implements BatchedIOBackend

func (*TestBatchedIOBackend) GetIdentifierConflicts added in v0.13.0

func (t *TestBatchedIOBackend) GetIdentifierConflicts(
	_ context.Context,
	requests []*IdentifierConflictRequest,
) []*IdentifierConflictResponse

GetIdentifierConflicts implements BatchedIOBackend

func (*TestBatchedIOBackend) HandleBatch added in v0.13.0

HandleBatch implements BatchedIOBackend

func (*TestBatchedIOBackend) Stop added in v0.13.0

Stop implements BatchedIOBackend

type TestBatchedIOBackendOption added in v0.13.0

type TestBatchedIOBackendOption func(*TestBatchedIOBackend)

TestBatchedIOBackendOption configures TestBatchedIOBackend

func WithAppendHitsHandler added in v0.13.0

WithAppendHitsHandler sets custom handler for append hits requests

func WithCleanupMachineryHandler added in v0.13.0

func WithCleanupMachineryHandler(handler func() error) TestBatchedIOBackendOption

WithCleanupMachineryHandler sets custom handler for cleanup

func WithGetAllProtosessionsForBucketHandler added in v0.13.0

WithGetAllProtosessionsForBucketHandler sets custom handler for bucket proto sessions requests

func WithGetProtoSessionHitsHandler added in v0.13.0

func WithGetProtoSessionHitsHandler(
	h func(*GetProtoSessionHitsRequest) *GetProtoSessionHitsResponse,
) TestBatchedIOBackendOption

WithGetProtoSessionHitsHandler sets custom handler for get proto session hits requests

func WithIdentifierConflictHandler added in v0.13.0

func WithIdentifierConflictHandler(
	h func(*IdentifierConflictRequest) *IdentifierConflictResponse,
) TestBatchedIOBackendOption

WithIdentifierConflictHandler sets custom handler for identifier conflict requests

func WithMarkProtoSessionClosingHandler added in v0.13.0

WithMarkProtoSessionClosingHandler sets custom handler for mark closing requests

func WithRemoveAllHitRelatedMetadataHandler added in v0.13.0

WithRemoveAllHitRelatedMetadataHandler sets custom handler for remove metadata requests

func WithRemoveBucketMetadataHandler added in v0.13.0

func WithRemoveBucketMetadataHandler(
	h func(*RemoveBucketMetadataRequest) *RemoveBucketMetadataResponse,
) TestBatchedIOBackendOption

WithRemoveBucketMetadataHandler sets custom handler for remove bucket metadata requests

func WithRemoveProtoSessionHitsHandler added in v0.13.0

func WithRemoveProtoSessionHitsHandler(
	h func(*RemoveProtoSessionHitsRequest) *RemoveProtoSessionHitsResponse,
) TestBatchedIOBackendOption

WithRemoveProtoSessionHitsHandler sets custom handler for remove hits requests

type TestCloser added in v0.13.0

type TestCloser struct {
	// contains filtered or unexported fields
}

TestCloser is a configurable test implementation of Closer

func (*TestCloser) Close added in v0.13.0

func (c *TestCloser) Close(protosessions [][]*hits.Hit) error

Close implements Closer

type TestCloserOption added in v0.13.0

type TestCloserOption func(*TestCloser)

TestCloserOption configures TestCloser

func WithCloseHandler added in v0.13.0

func WithCloseHandler(handler func([][]*hits.Hit) error) TestCloserOption

WithCloseHandler sets custom handler for closing proto-sessions

type TimingWheel added in v0.13.0

type TimingWheel struct {
	// contains filtered or unexported fields
}

TimingWheel implements a timing wheel for scheduling protosession closures.

func NewTimingWheel added in v0.13.0

func NewTimingWheel(
	backend TimingWheelStateBackend,
	tickInterval time.Duration,
	processor BucketProcessorFunc,
) *TimingWheel

NewTimingWheel creates a timing wheel with the given tick interval.

func (*TimingWheel) BucketNumber added in v0.13.0

func (tw *TimingWheel) BucketNumber(theTime time.Time) int64

func (*TimingWheel) CurrentTime added in v0.13.0

func (tw *TimingWheel) CurrentTime() time.Time

CurrentTime returns the timing wheel's current time.

func (*TimingWheel) Start added in v0.13.0

func (tw *TimingWheel) Start(ctx context.Context)

Start begins the timing wheel loop in a goroutine.

func (*TimingWheel) Stop added in v0.13.0

func (tw *TimingWheel) Stop()

Stop signals the timing wheel to stop and waits for it to finish.

func (*TimingWheel) UpdateTime added in v0.13.0

func (tw *TimingWheel) UpdateTime(t time.Time)

UpdateTime updates the timing wheel's current time if the new time is after the existing time.

type TimingWheelStateBackend added in v0.13.0

type TimingWheelStateBackend interface {
	// GetNextBucket returns the next bucket to process.
	// Returns -1 if no bucket has been processed yet (first run).
	GetNextBucket(ctx context.Context) (int64, error)

	// SaveNextBucket persists the next bucket number to process.
	SaveNextBucket(ctx context.Context, bucketNumber int64) error
}

TimingWheelStateBackend provides abstract storage for timing wheel state.

func NewGenericKVTimingWheelBackend added in v0.13.0

func NewGenericKVTimingWheelBackend(
	name string,
	kv storage.KV,
) TimingWheelStateBackend

NewGenericKVTimingWheelBackend creates a TickerStateBackend using generic storage interfaces.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL