Documentation
¶
Index ¶
- Variables
- func EventIDString(id *eventpb.EventId) string
- func MustGenerateEventID() *eventpb.EventId
- type Bus
- type Forwarder
- type Handler
- type HandlerFunc
- type ProtoEventStream
- type Rendezvous
- type Server
- func (s *Server) ForwardEvents(ctx context.Context, req *eventpb.ForwardEventsRequest) (*eventpb.ForwardEventsResponse, error)
- func (s *Server) ForwardUserEvents(ctx context.Context, events ...*eventpb.UserEvent) error
- func (s *Server) OnEvent(userID *commonpb.UserId, e *eventpb.Event)
- func (s *Server) StreamEvents(...) error
- type Store
- type Stream
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrRendezvousExists = errors.New("rendezvous already exists") ErrRendezvousNotFound = errors.New("rendezvous not found") )
Functions ¶
func EventIDString ¶
func MustGenerateEventID ¶
Types ¶
type Bus ¶
type Bus[Key, Event any] struct { // contains filtered or unexported fields }
func (*Bus[Key, Event]) AddHandler ¶
type HandlerFunc ¶
type HandlerFunc[Key, Event any] func(Key, Event)
HandlerFunc is an adapter to allow the use of ordinary functions as Handlers.
func (HandlerFunc[Key, Event]) OnEvent ¶
func (f HandlerFunc[Key, Event]) OnEvent(key Key, e Event)
OnEvent calls f(key, e).
type ProtoEventStream ¶
type ProtoEventStream[E any, P proto.Message] struct { sync.Mutex // contains filtered or unexported fields }
func NewProtoEventStream ¶
func (*ProtoEventStream[E, P]) Channel ¶
func (s *ProtoEventStream[E, P]) Channel() <-chan P
func (*ProtoEventStream[E, P]) Close ¶
func (s *ProtoEventStream[E, P]) Close()
func (*ProtoEventStream[E, P]) ID ¶
func (s *ProtoEventStream[E, P]) ID() string
type Rendezvous ¶
func (*Rendezvous) Clone ¶
func (r *Rendezvous) Clone() *Rendezvous
type Server ¶
type Server struct {
eventpb.UnimplementedEventStreamingServer
// contains filtered or unexported fields
}
func (*Server) ForwardEvents ¶
func (s *Server) ForwardEvents(ctx context.Context, req *eventpb.ForwardEventsRequest) (*eventpb.ForwardEventsResponse, error)
func (*Server) ForwardUserEvents ¶
todo: utilize batching by receiver to optimize internal forwarding RPC calls
func (*Server) StreamEvents ¶
func (s *Server) StreamEvents(stream grpc.BidiStreamingServer[eventpb.StreamEventsRequest, eventpb.StreamEventsResponse]) error
type Store ¶
type Store interface {
// CreateRendezvous creates a new rendezvous for an event stream
CreateRendezvous(ctx context.Context, rendezvous *Rendezvous) error
// GetRendezvous gets an event stream rendezvous for a given key
GetRendezvous(ctx context.Context, key string) (*Rendezvous, error)
// ExtendRendezvousxpiry extends a rendezvous' expiry for a given key and address
ExtendRendezvousExpiry(ctx context.Context, key, address string, expiresAt time.Time) error
// DeleteRendezvous deletes an event stream rendezvous for a given key and address
DeleteRendezvous(ctx context.Context, key, address string) error
}
Click to show internal directories.
Click to hide internal directories.