Documentation
¶
Index ¶
- Variables
- type AdvanceCursor
- type Cursor
- type CursorError
- type CursorKey
- type CursorQuery
- type CursorStore
- type ResetCursor
- type Service
- func (s *Service) Advance(ctx context.Context, update AdvanceCursor) (Cursor, error)
- func (s *Service) Get(ctx context.Context, key CursorKey) (Cursor, error)
- func (s *Service) List(ctx context.Context, query CursorQuery) ([]Cursor, error)
- func (s *Service) RecordError(ctx context.Context, report CursorError) (Cursor, error)
- func (s *Service) Reset(ctx context.Context, reset ResetCursor) (Cursor, error)
Constants ¶
This section is empty.
Variables ¶
var ( // ErrCursorNotFound reports that no durable notification cursor matched the key. ErrCursorNotFound = errors.New("notifications: cursor not found") // ErrInvalidCursor reports invalid cursor identity or cursor payload. ErrInvalidCursor = errors.New("notifications: invalid cursor") // ErrNonMonotonicCursor reports a cursor advance that would move backward or fork delivery metadata. ErrNonMonotonicCursor = errors.New("notifications: non-monotonic cursor advance") // ErrResetReasonRequired reports a reset request without an explicit recovery reason. ErrResetReasonRequired = errors.New("notifications: reset reason required") )
Functions ¶
This section is empty.
Types ¶
type AdvanceCursor ¶
type AdvanceCursor struct {
Key CursorKey `json:"key"`
LastSequence int64 `json:"last_sequence"`
LastDeliveredAt time.Time `json:"last_delivered_at"`
DeliveryID string `json:"delivery_id,omitempty"`
Now time.Time `json:"now"`
}
AdvanceCursor records a confirmed delivery position.
func (AdvanceCursor) Normalize ¶
func (a AdvanceCursor) Normalize(fallbackNow time.Time) (AdvanceCursor, error)
Normalize validates an advance request and fills missing timestamps.
type Cursor ¶
type Cursor struct {
Key CursorKey `json:"key"`
LastSequence int64 `json:"last_sequence"`
LastDeliveryID string `json:"last_delivery_id,omitempty"`
LastDeliveredAt time.Time `json:"last_delivered_at"`
LastError string `json:"last_error,omitempty"`
UpdatedAt time.Time `json:"updated_at"`
}
Cursor stores the latest confirmed delivery position for one consumer.
type CursorError ¶
type CursorError struct {
Key CursorKey `json:"key"`
LastError string `json:"last_error"`
Now time.Time `json:"now"`
}
CursorError records a bounded diagnostic without advancing delivery progress.
func (CursorError) Normalize ¶
func (e CursorError) Normalize(fallbackNow time.Time) (CursorError, error)
Normalize validates an error report and fills missing timestamps.
type CursorKey ¶
type CursorKey struct {
ConsumerID string `json:"consumer_id"`
StreamName string `json:"stream_name"`
SubjectID string `json:"subject_id"`
}
CursorKey identifies one durable delivery cursor.
type CursorQuery ¶
type CursorQuery struct {
ConsumerID string `json:"consumer_id,omitempty"`
StreamName string `json:"stream_name,omitempty"`
SubjectID string `json:"subject_id,omitempty"`
Limit int `json:"limit,omitempty"`
}
CursorQuery filters cursor diagnostics.
func (CursorQuery) Normalize ¶
func (q CursorQuery) Normalize() CursorQuery
Normalize trims query filters.
type CursorStore ¶
type CursorStore interface {
GetCursor(ctx context.Context, key CursorKey) (Cursor, error)
ListCursors(ctx context.Context, query CursorQuery) ([]Cursor, error)
AdvanceCursor(ctx context.Context, update AdvanceCursor) (Cursor, error)
ResetCursor(ctx context.Context, reset ResetCursor) (Cursor, error)
RecordCursorError(ctx context.Context, report CursorError) (Cursor, error)
}
CursorStore persists confirmed notification delivery progress.
type ResetCursor ¶
type ResetCursor struct {
Key CursorKey `json:"key"`
LastSequence int64 `json:"last_sequence"`
LastDeliveryID string `json:"last_delivery_id,omitempty"`
LastDeliveredAt time.Time `json:"last_delivered_at"`
Reason string `json:"reason"`
Now time.Time `json:"now"`
}
ResetCursor rewinds or repairs one cursor after an explicit recovery decision.
func (ResetCursor) Normalize ¶
func (r ResetCursor) Normalize(fallbackNow time.Time) (ResetCursor, error)
Normalize validates a reset request and fills missing timestamps.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service validates cursor requests before delegating persistence to the store.
func NewService ¶
func NewService(store CursorStore) *Service
NewService creates a notification cursor service.
func (*Service) RecordError ¶
RecordError stores a diagnostic without moving the confirmed delivery sequence.