Documentation
¶
Index ¶
- Constants
- Variables
- func DeleteLease(tx *bdb.Tx) error
- func DeleteParticipant(tx *bdb.Tx, pid uint32) error
- func NewSRPCCoordinatorServiceHandler(impl SRPCCoordinatorServiceServer, serviceID string) srpc.Handler
- func NewSRPCParticipantServiceHandler(impl SRPCParticipantServiceServer, serviceID string) srpc.Handler
- func PutLease(tx *bdb.Tx, rec *LeaseRecord) error
- func PutParticipant(tx *bdb.Tx, rec *ParticipantRecord) error
- func SRPCRegisterCoordinatorService(mux srpc.Mux, impl SRPCCoordinatorServiceServer) error
- func SRPCRegisterParticipantService(mux srpc.Mux, impl SRPCParticipantServiceServer) error
- type Coordinator
- func (c *Coordinator) CountParticipants() (int, error)
- func (c *Coordinator) GetElection() *Election
- func (c *Coordinator) GetMesh() *Mesh
- func (c *Coordinator) GetRegistry() *Registry
- func (c *Coordinator) GetWatcher() *ParticipantWatcher
- func (c *Coordinator) Role() ParticipantRole
- func (c *Coordinator) Run(ctx context.Context) error
- func (c *Coordinator) WaitRole(ctx context.Context) (ParticipantRole, error)
- type Election
- func (e *Election) CurrentLeader() (*LeaseRecord, error)
- func (e *Election) IsLeader() (bool, error)
- func (e *Election) ReleaseLease() error
- func (e *Election) RenewLease() (bool, error)
- func (e *Election) RunLeaseRenewal(ctx context.Context) error
- func (e *Election) TryClaimLeadership() (bool, error)
- func (e *Election) TryReelect() (bool, error)
- type EngineFactory
- type FollowerEngine
- func (f *FollowerEngine) AccessWorldState(ctx context.Context, ref *bucket.ObjectRef, ...) error
- func (f *FollowerEngine) BuildStorageCursor(ctx context.Context) (*bucket_lookup.Cursor, error)
- func (f *FollowerEngine) GetSeqno(ctx context.Context) (uint64, error)
- func (f *FollowerEngine) NewTransaction(ctx context.Context, write bool) (world.Tx, error)
- func (f *FollowerEngine) Run(ctx context.Context) error
- func (f *FollowerEngine) WaitSeqno(ctx context.Context, value uint64) (uint64, error)
- type ForwardingWorldState
- type GetParticipantInfoRequest
- func (m *GetParticipantInfoRequest) CloneMessageVT() protobuf_go_lite.CloneMessage
- func (m *GetParticipantInfoRequest) CloneVT() *GetParticipantInfoRequest
- func (this *GetParticipantInfoRequest) EqualMessageVT(thatMsg any) bool
- func (this *GetParticipantInfoRequest) EqualVT(that *GetParticipantInfoRequest) bool
- func (x *GetParticipantInfoRequest) MarshalJSON() ([]byte, error)
- func (x *GetParticipantInfoRequest) MarshalProtoJSON(s *json.MarshalState)
- func (x *GetParticipantInfoRequest) MarshalProtoText() string
- func (m *GetParticipantInfoRequest) MarshalToSizedBufferVT(dAtA []byte) (int, error)
- func (m *GetParticipantInfoRequest) MarshalToVT(dAtA []byte) (int, error)
- func (m *GetParticipantInfoRequest) MarshalVT() (dAtA []byte, err error)
- func (*GetParticipantInfoRequest) ProtoMessage()
- func (x *GetParticipantInfoRequest) Reset()
- func (m *GetParticipantInfoRequest) SizeVT() (n int)
- func (x *GetParticipantInfoRequest) String() string
- func (x *GetParticipantInfoRequest) UnmarshalJSON(b []byte) error
- func (x *GetParticipantInfoRequest) UnmarshalProtoJSON(s *json.UnmarshalState)
- func (m *GetParticipantInfoRequest) UnmarshalVT(dAtA []byte) error
- type GetParticipantInfoResponse
- func (m *GetParticipantInfoResponse) CloneMessageVT() protobuf_go_lite.CloneMessage
- func (m *GetParticipantInfoResponse) CloneVT() *GetParticipantInfoResponse
- func (this *GetParticipantInfoResponse) EqualMessageVT(thatMsg any) bool
- func (this *GetParticipantInfoResponse) EqualVT(that *GetParticipantInfoResponse) bool
- func (x *GetParticipantInfoResponse) GetCapabilities() []string
- func (x *GetParticipantInfoResponse) GetPid() uint32
- func (x *GetParticipantInfoResponse) GetRole() ParticipantRole
- func (x *GetParticipantInfoResponse) MarshalJSON() ([]byte, error)
- func (x *GetParticipantInfoResponse) MarshalProtoJSON(s *json.MarshalState)
- func (x *GetParticipantInfoResponse) MarshalProtoText() string
- func (m *GetParticipantInfoResponse) MarshalToSizedBufferVT(dAtA []byte) (int, error)
- func (m *GetParticipantInfoResponse) MarshalToVT(dAtA []byte) (int, error)
- func (m *GetParticipantInfoResponse) MarshalVT() (dAtA []byte, err error)
- func (*GetParticipantInfoResponse) ProtoMessage()
- func (x *GetParticipantInfoResponse) Reset()
- func (m *GetParticipantInfoResponse) SizeVT() (n int)
- func (x *GetParticipantInfoResponse) String() string
- func (x *GetParticipantInfoResponse) UnmarshalJSON(b []byte) error
- func (x *GetParticipantInfoResponse) UnmarshalProtoJSON(s *json.UnmarshalState)
- func (m *GetParticipantInfoResponse) UnmarshalVT(dAtA []byte) error
- type LeaseRecord
- func (m *LeaseRecord) CloneMessageVT() protobuf_go_lite.CloneMessage
- func (m *LeaseRecord) CloneVT() *LeaseRecord
- func (this *LeaseRecord) EqualMessageVT(thatMsg any) bool
- func (this *LeaseRecord) EqualVT(that *LeaseRecord) bool
- func (x *LeaseRecord) GetLeaderPid() uint32
- func (x *LeaseRecord) GetLeaderSocketPath() string
- func (x *LeaseRecord) GetLeaseTimestampNanos() int64
- func (x *LeaseRecord) MarshalJSON() ([]byte, error)
- func (x *LeaseRecord) MarshalProtoJSON(s *json.MarshalState)
- func (x *LeaseRecord) MarshalProtoText() string
- func (m *LeaseRecord) MarshalToSizedBufferVT(dAtA []byte) (int, error)
- func (m *LeaseRecord) MarshalToVT(dAtA []byte) (int, error)
- func (m *LeaseRecord) MarshalVT() (dAtA []byte, err error)
- func (*LeaseRecord) ProtoMessage()
- func (x *LeaseRecord) Reset()
- func (m *LeaseRecord) SizeVT() (n int)
- func (x *LeaseRecord) String() string
- func (x *LeaseRecord) UnmarshalJSON(b []byte) error
- func (x *LeaseRecord) UnmarshalProtoJSON(s *json.UnmarshalState)
- func (m *LeaseRecord) UnmarshalVT(dAtA []byte) error
- type Mesh
- func (m *Mesh) Close()
- func (m *Mesh) Connect(ctx context.Context, pid uint32, socketPath string) (srpc.Client, error)
- func (m *Mesh) Disconnect(pid uint32)
- func (m *Mesh) GetClient(pid uint32) srpc.Client
- func (m *Mesh) Listen(dir string) error
- func (m *Mesh) Mux() srpc.Mux
- func (m *Mesh) Serve(ctx context.Context) error
- func (m *Mesh) SocketPath() string
- type ParticipantRecord
- func (m *ParticipantRecord) CloneMessageVT() protobuf_go_lite.CloneMessage
- func (m *ParticipantRecord) CloneVT() *ParticipantRecord
- func (this *ParticipantRecord) EqualMessageVT(thatMsg any) bool
- func (this *ParticipantRecord) EqualVT(that *ParticipantRecord) bool
- func (x *ParticipantRecord) GetCapabilities() []string
- func (x *ParticipantRecord) GetLastHeartbeatNanos() int64
- func (x *ParticipantRecord) GetPid() uint32
- func (x *ParticipantRecord) GetRole() ParticipantRole
- func (x *ParticipantRecord) GetSrpcSocketPath() string
- func (x *ParticipantRecord) GetStartTimeNanos() int64
- func (x *ParticipantRecord) MarshalJSON() ([]byte, error)
- func (x *ParticipantRecord) MarshalProtoJSON(s *json.MarshalState)
- func (x *ParticipantRecord) MarshalProtoText() string
- func (m *ParticipantRecord) MarshalToSizedBufferVT(dAtA []byte) (int, error)
- func (m *ParticipantRecord) MarshalToVT(dAtA []byte) (int, error)
- func (m *ParticipantRecord) MarshalVT() (dAtA []byte, err error)
- func (*ParticipantRecord) ProtoMessage()
- func (x *ParticipantRecord) Reset()
- func (m *ParticipantRecord) SizeVT() (n int)
- func (x *ParticipantRecord) String() string
- func (x *ParticipantRecord) UnmarshalJSON(b []byte) error
- func (x *ParticipantRecord) UnmarshalProtoJSON(s *json.UnmarshalState)
- func (m *ParticipantRecord) UnmarshalVT(dAtA []byte) error
- type ParticipantRole
- func (x ParticipantRole) Enum() *ParticipantRole
- func (x ParticipantRole) MarshalJSON() ([]byte, error)
- func (x ParticipantRole) MarshalProtoJSON(s *json.MarshalState)
- func (x ParticipantRole) MarshalProtoText() string
- func (x ParticipantRole) MarshalText() ([]byte, error)
- func (x ParticipantRole) String() string
- func (x *ParticipantRole) UnmarshalJSON(b []byte) error
- func (x *ParticipantRole) UnmarshalProtoJSON(s *json.UnmarshalState)
- func (x *ParticipantRole) UnmarshalText(b []byte) error
- type ParticipantWatcher
- type ReadHeadRefFunc
- type Registry
- type RoleChangeHandler
- type SRPCCoordinatorServiceClient
- type SRPCCoordinatorServiceHandler
- func (SRPCCoordinatorServiceHandler) GetMethodIDs() []string
- func (d *SRPCCoordinatorServiceHandler) GetServiceID() string
- func (d *SRPCCoordinatorServiceHandler) InvokeMethod(serviceID, methodID string, strm srpc.Stream) (bool, error)
- func (SRPCCoordinatorServiceHandler) InvokeMethod_SubmitWorldOp(impl SRPCCoordinatorServiceServer, strm srpc.Stream) error
- func (SRPCCoordinatorServiceHandler) InvokeMethod_WatchWorldSeqno(impl SRPCCoordinatorServiceServer, strm srpc.Stream) error
- type SRPCCoordinatorServiceServer
- type SRPCCoordinatorService_SubmitWorldOpStream
- type SRPCCoordinatorService_WatchWorldSeqnoClient
- type SRPCCoordinatorService_WatchWorldSeqnoStream
- type SRPCParticipantServiceClient
- type SRPCParticipantServiceHandler
- func (SRPCParticipantServiceHandler) GetMethodIDs() []string
- func (d *SRPCParticipantServiceHandler) GetServiceID() string
- func (d *SRPCParticipantServiceHandler) InvokeMethod(serviceID, methodID string, strm srpc.Stream) (bool, error)
- func (SRPCParticipantServiceHandler) InvokeMethod_GetParticipantInfo(impl SRPCParticipantServiceServer, strm srpc.Stream) error
- type SRPCParticipantServiceServer
- type SRPCParticipantService_GetParticipantInfoStream
- type SubmitWorldOpRequest
- func (m *SubmitWorldOpRequest) CloneMessageVT() protobuf_go_lite.CloneMessage
- func (m *SubmitWorldOpRequest) CloneVT() *SubmitWorldOpRequest
- func (this *SubmitWorldOpRequest) EqualMessageVT(thatMsg any) bool
- func (this *SubmitWorldOpRequest) EqualVT(that *SubmitWorldOpRequest) bool
- func (x *SubmitWorldOpRequest) GetOpData() []byte
- func (x *SubmitWorldOpRequest) MarshalJSON() ([]byte, error)
- func (x *SubmitWorldOpRequest) MarshalProtoJSON(s *json.MarshalState)
- func (x *SubmitWorldOpRequest) MarshalProtoText() string
- func (m *SubmitWorldOpRequest) MarshalToSizedBufferVT(dAtA []byte) (int, error)
- func (m *SubmitWorldOpRequest) MarshalToVT(dAtA []byte) (int, error)
- func (m *SubmitWorldOpRequest) MarshalVT() (dAtA []byte, err error)
- func (*SubmitWorldOpRequest) ProtoMessage()
- func (x *SubmitWorldOpRequest) Reset()
- func (m *SubmitWorldOpRequest) SizeVT() (n int)
- func (x *SubmitWorldOpRequest) String() string
- func (x *SubmitWorldOpRequest) UnmarshalJSON(b []byte) error
- func (x *SubmitWorldOpRequest) UnmarshalProtoJSON(s *json.UnmarshalState)
- func (m *SubmitWorldOpRequest) UnmarshalVT(dAtA []byte) error
- type SubmitWorldOpResponse
- func (m *SubmitWorldOpResponse) CloneMessageVT() protobuf_go_lite.CloneMessage
- func (m *SubmitWorldOpResponse) CloneVT() *SubmitWorldOpResponse
- func (this *SubmitWorldOpResponse) EqualMessageVT(thatMsg any) bool
- func (this *SubmitWorldOpResponse) EqualVT(that *SubmitWorldOpResponse) bool
- func (x *SubmitWorldOpResponse) GetError() string
- func (x *SubmitWorldOpResponse) GetSeqno() uint64
- func (x *SubmitWorldOpResponse) MarshalJSON() ([]byte, error)
- func (x *SubmitWorldOpResponse) MarshalProtoJSON(s *json.MarshalState)
- func (x *SubmitWorldOpResponse) MarshalProtoText() string
- func (m *SubmitWorldOpResponse) MarshalToSizedBufferVT(dAtA []byte) (int, error)
- func (m *SubmitWorldOpResponse) MarshalToVT(dAtA []byte) (int, error)
- func (m *SubmitWorldOpResponse) MarshalVT() (dAtA []byte, err error)
- func (*SubmitWorldOpResponse) ProtoMessage()
- func (x *SubmitWorldOpResponse) Reset()
- func (m *SubmitWorldOpResponse) SizeVT() (n int)
- func (x *SubmitWorldOpResponse) String() string
- func (x *SubmitWorldOpResponse) UnmarshalJSON(b []byte) error
- func (x *SubmitWorldOpResponse) UnmarshalProtoJSON(s *json.UnmarshalState)
- func (m *SubmitWorldOpResponse) UnmarshalVT(dAtA []byte) error
- type WatchWorldSeqnoRequest
- func (m *WatchWorldSeqnoRequest) CloneMessageVT() protobuf_go_lite.CloneMessage
- func (m *WatchWorldSeqnoRequest) CloneVT() *WatchWorldSeqnoRequest
- func (this *WatchWorldSeqnoRequest) EqualMessageVT(thatMsg any) bool
- func (this *WatchWorldSeqnoRequest) EqualVT(that *WatchWorldSeqnoRequest) bool
- func (x *WatchWorldSeqnoRequest) GetLastSeenSeqno() uint64
- func (x *WatchWorldSeqnoRequest) MarshalJSON() ([]byte, error)
- func (x *WatchWorldSeqnoRequest) MarshalProtoJSON(s *json.MarshalState)
- func (x *WatchWorldSeqnoRequest) MarshalProtoText() string
- func (m *WatchWorldSeqnoRequest) MarshalToSizedBufferVT(dAtA []byte) (int, error)
- func (m *WatchWorldSeqnoRequest) MarshalToVT(dAtA []byte) (int, error)
- func (m *WatchWorldSeqnoRequest) MarshalVT() (dAtA []byte, err error)
- func (*WatchWorldSeqnoRequest) ProtoMessage()
- func (x *WatchWorldSeqnoRequest) Reset()
- func (m *WatchWorldSeqnoRequest) SizeVT() (n int)
- func (x *WatchWorldSeqnoRequest) String() string
- func (x *WatchWorldSeqnoRequest) UnmarshalJSON(b []byte) error
- func (x *WatchWorldSeqnoRequest) UnmarshalProtoJSON(s *json.UnmarshalState)
- func (m *WatchWorldSeqnoRequest) UnmarshalVT(dAtA []byte) error
- type WatchWorldSeqnoResponse
- func (m *WatchWorldSeqnoResponse) CloneMessageVT() protobuf_go_lite.CloneMessage
- func (m *WatchWorldSeqnoResponse) CloneVT() *WatchWorldSeqnoResponse
- func (this *WatchWorldSeqnoResponse) EqualMessageVT(thatMsg any) bool
- func (this *WatchWorldSeqnoResponse) EqualVT(that *WatchWorldSeqnoResponse) bool
- func (x *WatchWorldSeqnoResponse) GetSeqno() uint64
- func (x *WatchWorldSeqnoResponse) MarshalJSON() ([]byte, error)
- func (x *WatchWorldSeqnoResponse) MarshalProtoJSON(s *json.MarshalState)
- func (x *WatchWorldSeqnoResponse) MarshalProtoText() string
- func (m *WatchWorldSeqnoResponse) MarshalToSizedBufferVT(dAtA []byte) (int, error)
- func (m *WatchWorldSeqnoResponse) MarshalToVT(dAtA []byte) (int, error)
- func (m *WatchWorldSeqnoResponse) MarshalVT() (dAtA []byte, err error)
- func (*WatchWorldSeqnoResponse) ProtoMessage()
- func (x *WatchWorldSeqnoResponse) Reset()
- func (m *WatchWorldSeqnoResponse) SizeVT() (n int)
- func (x *WatchWorldSeqnoResponse) String() string
- func (x *WatchWorldSeqnoResponse) UnmarshalJSON(b []byte) error
- func (x *WatchWorldSeqnoResponse) UnmarshalProtoJSON(s *json.UnmarshalState)
- func (m *WatchWorldSeqnoResponse) UnmarshalVT(dAtA []byte) error
- type WorldRoleHandler
Constants ¶
const LeaseRenewalInterval = 250 * time.Millisecond
LeaseRenewalInterval is the default interval between lease renewals.
const LeaseStaleThreshold = time.Second
LeaseStaleThreshold is the duration after which a lease is considered stale.
const SRPCCoordinatorServiceServiceID = "coord.CoordinatorService"
const SRPCParticipantServiceServiceID = "coord.ParticipantService"
Variables ¶
var ( ParticipantRole_name = map[int32]string{ 0: "ParticipantRole_UNKNOWN", 1: "ParticipantRole_LEADER", 2: "ParticipantRole_FOLLOWER", } ParticipantRole_value = map[string]int32{ "ParticipantRole_UNKNOWN": 0, "ParticipantRole_LEADER": 1, "ParticipantRole_FOLLOWER": 2, } )
Enum value maps for ParticipantRole.
Functions ¶
func DeleteParticipant ¶
DeleteParticipant removes a participant record by PID from a bbolt write transaction.
func NewSRPCCoordinatorServiceHandler ¶
func NewSRPCCoordinatorServiceHandler(impl SRPCCoordinatorServiceServer, serviceID string) srpc.Handler
NewSRPCCoordinatorServiceHandler constructs a new RPC handler. serviceID: if empty, uses default: coord.CoordinatorService
func NewSRPCParticipantServiceHandler ¶
func NewSRPCParticipantServiceHandler(impl SRPCParticipantServiceServer, serviceID string) srpc.Handler
NewSRPCParticipantServiceHandler constructs a new RPC handler. serviceID: if empty, uses default: coord.ParticipantService
func PutLease ¶
func PutLease(tx *bdb.Tx, rec *LeaseRecord) error
PutLease writes the leader lease record to the coordination bucket.
func PutParticipant ¶
func PutParticipant(tx *bdb.Tx, rec *ParticipantRecord) error
PutParticipant writes a participant record to the registry within a bbolt write transaction. The record is keyed by PID.
func SRPCRegisterCoordinatorService ¶
func SRPCRegisterCoordinatorService(mux srpc.Mux, impl SRPCCoordinatorServiceServer) error
SRPCRegisterCoordinatorService registers the implementation with the mux. Uses the default serviceID: coord.CoordinatorService
func SRPCRegisterParticipantService ¶
func SRPCRegisterParticipantService(mux srpc.Mux, impl SRPCParticipantServiceServer) error
SRPCRegisterParticipantService registers the implementation with the mux. Uses the default serviceID: coord.ParticipantService
Types ¶
type Coordinator ¶
type Coordinator struct {
// contains filtered or unexported fields
}
Coordinator manages the full lifecycle: participant registry, leader election, heartbeat, SRPC mesh, and role change notifications.
func NewCoordinator ¶
func NewCoordinator( le *logrus.Entry, db *bdb.DB, dir string, caps []string, handler RoleChangeHandler, ) *Coordinator
NewCoordinator creates a new coordinator. The dir is used for the SRPC socket (coord-{pid}.sock). Call Run to start the lifecycle.
func (*Coordinator) CountParticipants ¶
func (c *Coordinator) CountParticipants() (int, error)
CountParticipants returns the number of alive participants in the registry. Useful after becoming leader to determine if this is a fresh session (only ourselves) or a re-election (other participants alive).
func (*Coordinator) GetElection ¶
func (c *Coordinator) GetElection() *Election
Election returns the election manager.
func (*Coordinator) GetMesh ¶
func (c *Coordinator) GetMesh() *Mesh
GetMesh returns the SRPC mesh for registering services and obtaining clients to remote participants.
func (*Coordinator) GetRegistry ¶
func (c *Coordinator) GetRegistry() *Registry
Registry returns the participant registry.
func (*Coordinator) GetWatcher ¶
func (c *Coordinator) GetWatcher() *ParticipantWatcher
Watcher returns the participant watcher.
func (*Coordinator) Role ¶
func (c *Coordinator) Role() ParticipantRole
Role returns the current role.
func (*Coordinator) Run ¶
func (c *Coordinator) Run(ctx context.Context) error
Run executes the coordinator lifecycle. Blocks until ctx is cancelled.
func (*Coordinator) WaitRole ¶
func (c *Coordinator) WaitRole(ctx context.Context) (ParticipantRole, error)
WaitRole waits until the coordinator has determined this process's role (leader or follower). Returns the role once known, or an error if ctx is cancelled before the role is determined.
type Election ¶
type Election struct {
// contains filtered or unexported fields
}
Election manages leader election via bbolt lease records.
func NewElection ¶
NewElection creates a new election manager.
func (*Election) CurrentLeader ¶
func (e *Election) CurrentLeader() (*LeaseRecord, error)
CurrentLeader returns the current lease record, or nil if none exists.
func (*Election) ReleaseLease ¶
ReleaseLease deletes the lease record if this process is the leader. Called during graceful shutdown so followers detect the missing lease and trigger immediate election without waiting for staleness timeout.
func (*Election) RenewLease ¶
RenewLease updates the lease timestamp for the current leader. Returns false if this process is no longer the leader.
func (*Election) RunLeaseRenewal ¶
RunLeaseRenewal renews the lease periodically until ctx is cancelled or this process loses leadership. Returns nil on context cancellation.
func (*Election) TryClaimLeadership ¶
TryClaimLeadership attempts to claim leadership if no valid lease exists. Returns true if this process became the leader.
func (*Election) TryReelect ¶
TryReelect attempts to claim leadership when the current leader appears unreachable. Only succeeds if the lease is stale (timestamp older than LeaseStaleThreshold) or the leader PID is dead. The bbolt per-txn fcntl lock serializes competing re-election attempts from multiple followers.
type EngineFactory ¶
EngineFactory creates a world engine when this process becomes leader. The ctx is cancelled when leadership is lost.
type FollowerEngine ¶
type FollowerEngine struct {
// contains filtered or unexported fields
}
FollowerEngine provides a read-only world.Engine for follower processes. It watches the bbolt commit counter and calls SetRootRef on the underlying world_block.Engine when the leader commits changes.
func NewFollowerEngine ¶
func NewFollowerEngine( ctx context.Context, le *logrus.Entry, db *bdb.DB, baseCursor *bucket_lookup.Cursor, readHead ReadHeadRefFunc, lookupOp world.LookupOp, ) (*FollowerEngine, error)
NewFollowerEngine creates a follower engine. The baseCursor provides block storage access (same bbolt bucket/volume as the leader). readHead reads the current HEAD reference from the object store. lookupOp resolves operation types for world state construction.
func (*FollowerEngine) AccessWorldState ¶
func (f *FollowerEngine) AccessWorldState( ctx context.Context, ref *bucket.ObjectRef, cb func(*bucket_lookup.Cursor) error, ) error
AccessWorldState builds a bucket lookup cursor for reading world state.
func (*FollowerEngine) BuildStorageCursor ¶
func (f *FollowerEngine) BuildStorageCursor(ctx context.Context) (*bucket_lookup.Cursor, error)
BuildStorageCursor builds a cursor to the world storage.
func (*FollowerEngine) GetSeqno ¶
func (f *FollowerEngine) GetSeqno(ctx context.Context) (uint64, error)
GetSeqno returns the current world state sequence number.
func (*FollowerEngine) NewTransaction ¶
NewTransaction returns a read-only transaction. Write transactions are not supported on followers; use the leader's SubmitWorldOp SRPC.
type ForwardingWorldState ¶
type ForwardingWorldState struct {
world.WorldState
// contains filtered or unexported fields
}
ForwardingWorldState wraps a read-only WorldState and routes write operations (ApplyWorldOp) through the leader's SubmitWorldOp SRPC. Read operations go through the local follower engine directly.
func NewForwardingWorldState ¶
func NewForwardingWorldState(ws world.WorldState, client SRPCCoordinatorServiceClient) *ForwardingWorldState
NewForwardingWorldState creates a forwarding world state. The underlying WorldState should be backed by the follower engine (read-only). The client is used to forward write operations to the leader.
func (*ForwardingWorldState) ApplyWorldOp ¶
func (f *ForwardingWorldState) ApplyWorldOp( ctx context.Context, op world.Operation, sender peer.ID, ) (uint64, bool, error)
ApplyWorldOp serializes the operation and forwards it to the leader via SRPC. Returns the new seqno and any error from the leader.
func (*ForwardingWorldState) GetReadOnly ¶
func (f *ForwardingWorldState) GetReadOnly() bool
GetReadOnly returns false since this state supports writes (via forwarding).
type GetParticipantInfoRequest ¶
type GetParticipantInfoRequest struct {
// contains filtered or unexported fields
}
GetParticipantInfoRequest is the request for GetParticipantInfo.
func (*GetParticipantInfoRequest) CloneMessageVT ¶
func (m *GetParticipantInfoRequest) CloneMessageVT() protobuf_go_lite.CloneMessage
func (*GetParticipantInfoRequest) CloneVT ¶
func (m *GetParticipantInfoRequest) CloneVT() *GetParticipantInfoRequest
func (*GetParticipantInfoRequest) EqualMessageVT ¶
func (this *GetParticipantInfoRequest) EqualMessageVT(thatMsg any) bool
func (*GetParticipantInfoRequest) EqualVT ¶
func (this *GetParticipantInfoRequest) EqualVT(that *GetParticipantInfoRequest) bool
func (*GetParticipantInfoRequest) MarshalJSON ¶
func (x *GetParticipantInfoRequest) MarshalJSON() ([]byte, error)
MarshalJSON marshals the GetParticipantInfoRequest to JSON.
func (*GetParticipantInfoRequest) MarshalProtoJSON ¶
func (x *GetParticipantInfoRequest) MarshalProtoJSON(s *json.MarshalState)
MarshalProtoJSON marshals the GetParticipantInfoRequest message to JSON.
func (*GetParticipantInfoRequest) MarshalProtoText ¶
func (x *GetParticipantInfoRequest) MarshalProtoText() string
func (*GetParticipantInfoRequest) MarshalToSizedBufferVT ¶
func (m *GetParticipantInfoRequest) MarshalToSizedBufferVT(dAtA []byte) (int, error)
func (*GetParticipantInfoRequest) MarshalToVT ¶
func (m *GetParticipantInfoRequest) MarshalToVT(dAtA []byte) (int, error)
func (*GetParticipantInfoRequest) MarshalVT ¶
func (m *GetParticipantInfoRequest) MarshalVT() (dAtA []byte, err error)
func (*GetParticipantInfoRequest) ProtoMessage ¶
func (*GetParticipantInfoRequest) ProtoMessage()
func (*GetParticipantInfoRequest) Reset ¶
func (x *GetParticipantInfoRequest) Reset()
func (*GetParticipantInfoRequest) SizeVT ¶
func (m *GetParticipantInfoRequest) SizeVT() (n int)
func (*GetParticipantInfoRequest) String ¶
func (x *GetParticipantInfoRequest) String() string
func (*GetParticipantInfoRequest) UnmarshalJSON ¶
func (x *GetParticipantInfoRequest) UnmarshalJSON(b []byte) error
UnmarshalJSON unmarshals the GetParticipantInfoRequest from JSON.
func (*GetParticipantInfoRequest) UnmarshalProtoJSON ¶
func (x *GetParticipantInfoRequest) UnmarshalProtoJSON(s *json.UnmarshalState)
UnmarshalProtoJSON unmarshals the GetParticipantInfoRequest message from JSON.
func (*GetParticipantInfoRequest) UnmarshalVT ¶
func (m *GetParticipantInfoRequest) UnmarshalVT(dAtA []byte) error
type GetParticipantInfoResponse ¶
type GetParticipantInfoResponse struct {
// Pid is the participant's process ID.
Pid uint32 `protobuf:"varint,1,opt,name=pid,proto3" json:"pid,omitempty"`
// Role is the participant's current role.
Role ParticipantRole `protobuf:"varint,2,opt,name=role,proto3" json:"role,omitempty"`
// Capabilities is the participant's capability set.
Capabilities []string `protobuf:"bytes,3,rep,name=capabilities,proto3" json:"capabilities,omitempty"`
// contains filtered or unexported fields
}
GetParticipantInfoResponse is the response for GetParticipantInfo.
func (*GetParticipantInfoResponse) CloneMessageVT ¶
func (m *GetParticipantInfoResponse) CloneMessageVT() protobuf_go_lite.CloneMessage
func (*GetParticipantInfoResponse) CloneVT ¶
func (m *GetParticipantInfoResponse) CloneVT() *GetParticipantInfoResponse
func (*GetParticipantInfoResponse) EqualMessageVT ¶
func (this *GetParticipantInfoResponse) EqualMessageVT(thatMsg any) bool
func (*GetParticipantInfoResponse) EqualVT ¶
func (this *GetParticipantInfoResponse) EqualVT(that *GetParticipantInfoResponse) bool
func (*GetParticipantInfoResponse) GetCapabilities ¶
func (x *GetParticipantInfoResponse) GetCapabilities() []string
func (*GetParticipantInfoResponse) GetPid ¶
func (x *GetParticipantInfoResponse) GetPid() uint32
func (*GetParticipantInfoResponse) GetRole ¶
func (x *GetParticipantInfoResponse) GetRole() ParticipantRole
func (*GetParticipantInfoResponse) MarshalJSON ¶
func (x *GetParticipantInfoResponse) MarshalJSON() ([]byte, error)
MarshalJSON marshals the GetParticipantInfoResponse to JSON.
func (*GetParticipantInfoResponse) MarshalProtoJSON ¶
func (x *GetParticipantInfoResponse) MarshalProtoJSON(s *json.MarshalState)
MarshalProtoJSON marshals the GetParticipantInfoResponse message to JSON.
func (*GetParticipantInfoResponse) MarshalProtoText ¶
func (x *GetParticipantInfoResponse) MarshalProtoText() string
func (*GetParticipantInfoResponse) MarshalToSizedBufferVT ¶
func (m *GetParticipantInfoResponse) MarshalToSizedBufferVT(dAtA []byte) (int, error)
func (*GetParticipantInfoResponse) MarshalToVT ¶
func (m *GetParticipantInfoResponse) MarshalToVT(dAtA []byte) (int, error)
func (*GetParticipantInfoResponse) MarshalVT ¶
func (m *GetParticipantInfoResponse) MarshalVT() (dAtA []byte, err error)
func (*GetParticipantInfoResponse) ProtoMessage ¶
func (*GetParticipantInfoResponse) ProtoMessage()
func (*GetParticipantInfoResponse) Reset ¶
func (x *GetParticipantInfoResponse) Reset()
func (*GetParticipantInfoResponse) SizeVT ¶
func (m *GetParticipantInfoResponse) SizeVT() (n int)
func (*GetParticipantInfoResponse) String ¶
func (x *GetParticipantInfoResponse) String() string
func (*GetParticipantInfoResponse) UnmarshalJSON ¶
func (x *GetParticipantInfoResponse) UnmarshalJSON(b []byte) error
UnmarshalJSON unmarshals the GetParticipantInfoResponse from JSON.
func (*GetParticipantInfoResponse) UnmarshalProtoJSON ¶
func (x *GetParticipantInfoResponse) UnmarshalProtoJSON(s *json.UnmarshalState)
UnmarshalProtoJSON unmarshals the GetParticipantInfoResponse message from JSON.
func (*GetParticipantInfoResponse) UnmarshalVT ¶
func (m *GetParticipantInfoResponse) UnmarshalVT(dAtA []byte) error
type LeaseRecord ¶
type LeaseRecord struct {
// LeaderPid is the PID of the current leader process.
LeaderPid uint32 `protobuf:"varint,1,opt,name=leader_pid,json=leaderPid,proto3" json:"leaderPid,omitempty"`
// LeaseTimestampNanos is the last lease renewal time in nanoseconds.
LeaseTimestampNanos int64 `protobuf:"varint,2,opt,name=lease_timestamp_nanos,json=leaseTimestampNanos,proto3" json:"leaseTimestampNanos,omitempty"`
// LeaderSocketPath is the SRPC socket path of the leader.
LeaderSocketPath string `protobuf:"bytes,3,opt,name=leader_socket_path,json=leaderSocketPath,proto3" json:"leaderSocketPath,omitempty"`
// contains filtered or unexported fields
}
LeaseRecord is the leader election lease stored at _coord/leader.
func GetLease ¶
func GetLease(tx *bdb.Tx) (*LeaseRecord, error)
GetLease reads the leader lease record. Returns nil if no lease exists.
func (*LeaseRecord) CloneMessageVT ¶
func (m *LeaseRecord) CloneMessageVT() protobuf_go_lite.CloneMessage
func (*LeaseRecord) CloneVT ¶
func (m *LeaseRecord) CloneVT() *LeaseRecord
func (*LeaseRecord) EqualMessageVT ¶
func (this *LeaseRecord) EqualMessageVT(thatMsg any) bool
func (*LeaseRecord) EqualVT ¶
func (this *LeaseRecord) EqualVT(that *LeaseRecord) bool
func (*LeaseRecord) GetLeaderPid ¶
func (x *LeaseRecord) GetLeaderPid() uint32
func (*LeaseRecord) GetLeaderSocketPath ¶
func (x *LeaseRecord) GetLeaderSocketPath() string
func (*LeaseRecord) GetLeaseTimestampNanos ¶
func (x *LeaseRecord) GetLeaseTimestampNanos() int64
func (*LeaseRecord) MarshalJSON ¶
func (x *LeaseRecord) MarshalJSON() ([]byte, error)
MarshalJSON marshals the LeaseRecord to JSON.
func (*LeaseRecord) MarshalProtoJSON ¶
func (x *LeaseRecord) MarshalProtoJSON(s *json.MarshalState)
MarshalProtoJSON marshals the LeaseRecord message to JSON.
func (*LeaseRecord) MarshalProtoText ¶
func (x *LeaseRecord) MarshalProtoText() string
func (*LeaseRecord) MarshalToSizedBufferVT ¶
func (m *LeaseRecord) MarshalToSizedBufferVT(dAtA []byte) (int, error)
func (*LeaseRecord) MarshalToVT ¶
func (m *LeaseRecord) MarshalToVT(dAtA []byte) (int, error)
func (*LeaseRecord) MarshalVT ¶
func (m *LeaseRecord) MarshalVT() (dAtA []byte, err error)
func (*LeaseRecord) ProtoMessage ¶
func (*LeaseRecord) ProtoMessage()
func (*LeaseRecord) Reset ¶
func (x *LeaseRecord) Reset()
func (*LeaseRecord) SizeVT ¶
func (m *LeaseRecord) SizeVT() (n int)
func (*LeaseRecord) String ¶
func (x *LeaseRecord) String() string
func (*LeaseRecord) UnmarshalJSON ¶
func (x *LeaseRecord) UnmarshalJSON(b []byte) error
UnmarshalJSON unmarshals the LeaseRecord from JSON.
func (*LeaseRecord) UnmarshalProtoJSON ¶
func (x *LeaseRecord) UnmarshalProtoJSON(s *json.UnmarshalState)
UnmarshalProtoJSON unmarshals the LeaseRecord message from JSON.
func (*LeaseRecord) UnmarshalVT ¶
func (m *LeaseRecord) UnmarshalVT(dAtA []byte) error
type Mesh ¶
type Mesh struct {
// contains filtered or unexported fields
}
Mesh manages the cross-process SRPC mesh. Each participant listens on a Unix domain socket and connects to other participants discovered via the participant watcher.
func NewMesh ¶
NewMesh creates a new SRPC mesh. The mux is used for both the local server and can have additional services registered on it.
func (*Mesh) Close ¶
func (m *Mesh) Close()
Close shuts down the mesh: closes all client connections, the listener, and removes the socket file.
func (*Mesh) Connect ¶
Connect dials a remote participant's Unix socket and establishes a Yamux + starpc client connection. Caches the connection by PID.
func (*Mesh) Disconnect ¶
Disconnect closes and removes the connection to a participant.
func (*Mesh) Listen ¶
Listen creates a Unix domain socket listener at the given directory. The socket is named coord-{pid}.sock.
func (*Mesh) SocketPath ¶
SocketPath returns the path of the local Unix socket listener.
type ParticipantRecord ¶
type ParticipantRecord struct {
// Pid is the OS process ID.
Pid uint32 `protobuf:"varint,1,opt,name=pid,proto3" json:"pid,omitempty"`
// Role is the participant's current role (leader or follower).
Role ParticipantRole `protobuf:"varint,2,opt,name=role,proto3" json:"role,omitempty"`
// StartTimeNanos is the monotonic start time in nanoseconds.
StartTimeNanos int64 `protobuf:"varint,3,opt,name=start_time_nanos,json=startTimeNanos,proto3" json:"startTimeNanos,omitempty"`
// Capabilities is the set of capabilities this participant advertises.
Capabilities []string `protobuf:"bytes,4,rep,name=capabilities,proto3" json:"capabilities,omitempty"`
// SrpcSocketPath is the Unix socket path for this participant's SRPC server.
SrpcSocketPath string `protobuf:"bytes,5,opt,name=srpc_socket_path,json=srpcSocketPath,proto3" json:"srpcSocketPath,omitempty"`
// LastHeartbeatNanos is the last heartbeat timestamp in nanoseconds.
LastHeartbeatNanos int64 `protobuf:"varint,6,opt,name=last_heartbeat_nanos,json=lastHeartbeatNanos,proto3" json:"lastHeartbeatNanos,omitempty"`
// contains filtered or unexported fields
}
ParticipantRecord is a participant entry in the coordination registry. Stored in the _coord/participants bbolt bucket keyed by PID string.
func GetParticipant ¶
func GetParticipant(tx *bdb.Tx, pid uint32) (*ParticipantRecord, error)
GetParticipant reads a participant record by PID from a bbolt transaction. Returns nil if the record does not exist.
func ListParticipants ¶
func ListParticipants(tx *bdb.Tx) ([]*ParticipantRecord, error)
ListParticipants returns all participant records in the registry.
func (*ParticipantRecord) CloneMessageVT ¶
func (m *ParticipantRecord) CloneMessageVT() protobuf_go_lite.CloneMessage
func (*ParticipantRecord) CloneVT ¶
func (m *ParticipantRecord) CloneVT() *ParticipantRecord
func (*ParticipantRecord) EqualMessageVT ¶
func (this *ParticipantRecord) EqualMessageVT(thatMsg any) bool
func (*ParticipantRecord) EqualVT ¶
func (this *ParticipantRecord) EqualVT(that *ParticipantRecord) bool
func (*ParticipantRecord) GetCapabilities ¶
func (x *ParticipantRecord) GetCapabilities() []string
func (*ParticipantRecord) GetLastHeartbeatNanos ¶
func (x *ParticipantRecord) GetLastHeartbeatNanos() int64
func (*ParticipantRecord) GetPid ¶
func (x *ParticipantRecord) GetPid() uint32
func (*ParticipantRecord) GetRole ¶
func (x *ParticipantRecord) GetRole() ParticipantRole
func (*ParticipantRecord) GetSrpcSocketPath ¶
func (x *ParticipantRecord) GetSrpcSocketPath() string
func (*ParticipantRecord) GetStartTimeNanos ¶
func (x *ParticipantRecord) GetStartTimeNanos() int64
func (*ParticipantRecord) MarshalJSON ¶
func (x *ParticipantRecord) MarshalJSON() ([]byte, error)
MarshalJSON marshals the ParticipantRecord to JSON.
func (*ParticipantRecord) MarshalProtoJSON ¶
func (x *ParticipantRecord) MarshalProtoJSON(s *json.MarshalState)
MarshalProtoJSON marshals the ParticipantRecord message to JSON.
func (*ParticipantRecord) MarshalProtoText ¶
func (x *ParticipantRecord) MarshalProtoText() string
func (*ParticipantRecord) MarshalToSizedBufferVT ¶
func (m *ParticipantRecord) MarshalToSizedBufferVT(dAtA []byte) (int, error)
func (*ParticipantRecord) MarshalToVT ¶
func (m *ParticipantRecord) MarshalToVT(dAtA []byte) (int, error)
func (*ParticipantRecord) MarshalVT ¶
func (m *ParticipantRecord) MarshalVT() (dAtA []byte, err error)
func (*ParticipantRecord) ProtoMessage ¶
func (*ParticipantRecord) ProtoMessage()
func (*ParticipantRecord) Reset ¶
func (x *ParticipantRecord) Reset()
func (*ParticipantRecord) SizeVT ¶
func (m *ParticipantRecord) SizeVT() (n int)
func (*ParticipantRecord) String ¶
func (x *ParticipantRecord) String() string
func (*ParticipantRecord) UnmarshalJSON ¶
func (x *ParticipantRecord) UnmarshalJSON(b []byte) error
UnmarshalJSON unmarshals the ParticipantRecord from JSON.
func (*ParticipantRecord) UnmarshalProtoJSON ¶
func (x *ParticipantRecord) UnmarshalProtoJSON(s *json.UnmarshalState)
UnmarshalProtoJSON unmarshals the ParticipantRecord message from JSON.
func (*ParticipantRecord) UnmarshalVT ¶
func (m *ParticipantRecord) UnmarshalVT(dAtA []byte) error
type ParticipantRole ¶
type ParticipantRole int32
ParticipantRole is the role of a participant in the coordination group.
const ( // ParticipantRole_UNKNOWN is the default unknown role. ParticipantRole_ParticipantRole_UNKNOWN ParticipantRole = 0 // ParticipantRole_LEADER is the leader role. ParticipantRole_ParticipantRole_LEADER ParticipantRole = 1 // ParticipantRole_FOLLOWER is the follower role. ParticipantRole_ParticipantRole_FOLLOWER ParticipantRole = 2 )
func (ParticipantRole) Enum ¶
func (x ParticipantRole) Enum() *ParticipantRole
func (ParticipantRole) MarshalJSON ¶
func (x ParticipantRole) MarshalJSON() ([]byte, error)
MarshalJSON marshals the ParticipantRole to JSON.
func (ParticipantRole) MarshalProtoJSON ¶
func (x ParticipantRole) MarshalProtoJSON(s *json.MarshalState)
MarshalProtoJSON marshals the ParticipantRole to JSON.
func (ParticipantRole) MarshalProtoText ¶
func (x ParticipantRole) MarshalProtoText() string
func (ParticipantRole) MarshalText ¶
func (x ParticipantRole) MarshalText() ([]byte, error)
MarshalText marshals the ParticipantRole to text.
func (ParticipantRole) String ¶
func (x ParticipantRole) String() string
func (*ParticipantRole) UnmarshalJSON ¶
func (x *ParticipantRole) UnmarshalJSON(b []byte) error
UnmarshalJSON unmarshals the ParticipantRole from JSON.
func (*ParticipantRole) UnmarshalProtoJSON ¶
func (x *ParticipantRole) UnmarshalProtoJSON(s *json.UnmarshalState)
UnmarshalProtoJSON unmarshals the ParticipantRole from JSON.
func (*ParticipantRole) UnmarshalText ¶
func (x *ParticipantRole) UnmarshalText(b []byte) error
UnmarshalText unmarshals the ParticipantRole from text.
type ParticipantWatcher ¶
type ParticipantWatcher struct {
// contains filtered or unexported fields
}
ParticipantWatcher polls the participant registry keyed off commitCounter changes and maintains an in-memory snapshot of active participants.
func NewParticipantWatcher ¶
func NewParticipantWatcher(db *bdb.DB) *ParticipantWatcher
NewParticipantWatcher creates a new watcher.
func (*ParticipantWatcher) GetParticipants ¶
func (w *ParticipantWatcher) GetParticipants() []*ParticipantRecord
GetParticipants returns the current participant snapshot. Must be called inside bcast.HoldLock.
func (*ParticipantWatcher) Run ¶
func (w *ParticipantWatcher) Run(ctx context.Context) error
Run polls the participant registry using commitCounter for wake-up. Blocks until ctx is cancelled.
func (*ParticipantWatcher) WaitParticipants ¶
func (w *ParticipantWatcher) WaitParticipants(ctx context.Context, match func([]*ParticipantRecord) bool) ([]*ParticipantRecord, error)
WaitParticipants waits for the participant list to satisfy the given predicate. Returns the matching snapshot.
type ReadHeadRefFunc ¶
ReadHeadRefFunc reads the current HEAD reference for the world state. Called by the follower to refresh its view after a commit counter change.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry manages participant records in bbolt.
func NewRegistry ¶
NewRegistry creates a new participant registry.
func (*Registry) Deregister ¶
Deregister removes the local participant record from the registry.
type RoleChangeHandler ¶
type RoleChangeHandler interface {
// OnBecomeLeader is called when this process becomes the leader.
// ctx is cancelled when leadership is lost.
OnBecomeLeader(ctx context.Context) error
// OnBecomeFollower is called when this process becomes a follower.
// leaderSocketPath is the SRPC socket of the current leader.
OnBecomeFollower(ctx context.Context, leaderSocketPath string) error
}
RoleChangeHandler is called when this process's role changes. The handler should start or stop services accordingly.
type SRPCCoordinatorServiceClient ¶
type SRPCCoordinatorServiceClient interface {
// SRPCClient returns the underlying SRPC client.
SRPCClient() srpc.Client
// SubmitWorldOp submits a serialized world operation to the leader for
// execution. Returns the new sequence number after commit.
SubmitWorldOp(ctx context.Context, in *SubmitWorldOpRequest) (*SubmitWorldOpResponse, error)
// WatchWorldSeqno streams world sequence number updates to followers.
WatchWorldSeqno(ctx context.Context, in *WatchWorldSeqnoRequest) (SRPCCoordinatorService_WatchWorldSeqnoClient, error)
}
func NewSRPCCoordinatorServiceClient ¶
func NewSRPCCoordinatorServiceClient(cc srpc.Client) SRPCCoordinatorServiceClient
func NewSRPCCoordinatorServiceClientWithServiceID ¶
func NewSRPCCoordinatorServiceClientWithServiceID(cc srpc.Client, serviceID string) SRPCCoordinatorServiceClient
type SRPCCoordinatorServiceHandler ¶
type SRPCCoordinatorServiceHandler struct {
// contains filtered or unexported fields
}
func (SRPCCoordinatorServiceHandler) GetMethodIDs ¶
func (SRPCCoordinatorServiceHandler) GetMethodIDs() []string
func (*SRPCCoordinatorServiceHandler) GetServiceID ¶
func (d *SRPCCoordinatorServiceHandler) GetServiceID() string
func (*SRPCCoordinatorServiceHandler) InvokeMethod ¶
func (SRPCCoordinatorServiceHandler) InvokeMethod_SubmitWorldOp ¶
func (SRPCCoordinatorServiceHandler) InvokeMethod_SubmitWorldOp(impl SRPCCoordinatorServiceServer, strm srpc.Stream) error
func (SRPCCoordinatorServiceHandler) InvokeMethod_WatchWorldSeqno ¶
func (SRPCCoordinatorServiceHandler) InvokeMethod_WatchWorldSeqno(impl SRPCCoordinatorServiceServer, strm srpc.Stream) error
type SRPCCoordinatorServiceServer ¶
type SRPCCoordinatorServiceServer interface {
// SubmitWorldOp submits a serialized world operation to the leader for
// execution. Returns the new sequence number after commit.
SubmitWorldOp(context.Context, *SubmitWorldOpRequest) (*SubmitWorldOpResponse, error)
// WatchWorldSeqno streams world sequence number updates to followers.
WatchWorldSeqno(*WatchWorldSeqnoRequest, SRPCCoordinatorService_WatchWorldSeqnoStream) error
}
func NewCoordinatorServiceServer ¶
func NewCoordinatorServiceServer(handler *WorldRoleHandler, lookupOp world.LookupOp) SRPCCoordinatorServiceServer
NewCoordinatorServiceServer creates a CoordinatorService SRPC server. The handler provides access to the world engine. The lookupOp resolves operation types when executing submitted transactions.
type SRPCCoordinatorService_WatchWorldSeqnoClient ¶
type SRPCCoordinatorService_WatchWorldSeqnoClient interface {
srpc.Stream
Recv() (*WatchWorldSeqnoResponse, error)
RecvTo(*WatchWorldSeqnoResponse) error
}
type SRPCCoordinatorService_WatchWorldSeqnoStream ¶
type SRPCCoordinatorService_WatchWorldSeqnoStream interface {
srpc.Stream
Send(*WatchWorldSeqnoResponse) error
SendAndClose(*WatchWorldSeqnoResponse) error
}
type SRPCParticipantServiceClient ¶
type SRPCParticipantServiceClient interface {
// SRPCClient returns the underlying SRPC client.
SRPCClient() srpc.Client
// GetParticipantInfo returns this participant's role and capabilities.
GetParticipantInfo(ctx context.Context, in *GetParticipantInfoRequest) (*GetParticipantInfoResponse, error)
}
func NewSRPCParticipantServiceClient ¶
func NewSRPCParticipantServiceClient(cc srpc.Client) SRPCParticipantServiceClient
func NewSRPCParticipantServiceClientWithServiceID ¶
func NewSRPCParticipantServiceClientWithServiceID(cc srpc.Client, serviceID string) SRPCParticipantServiceClient
type SRPCParticipantServiceHandler ¶
type SRPCParticipantServiceHandler struct {
// contains filtered or unexported fields
}
func (SRPCParticipantServiceHandler) GetMethodIDs ¶
func (SRPCParticipantServiceHandler) GetMethodIDs() []string
func (*SRPCParticipantServiceHandler) GetServiceID ¶
func (d *SRPCParticipantServiceHandler) GetServiceID() string
func (*SRPCParticipantServiceHandler) InvokeMethod ¶
func (SRPCParticipantServiceHandler) InvokeMethod_GetParticipantInfo ¶
func (SRPCParticipantServiceHandler) InvokeMethod_GetParticipantInfo(impl SRPCParticipantServiceServer, strm srpc.Stream) error
type SRPCParticipantServiceServer ¶
type SRPCParticipantServiceServer interface {
// GetParticipantInfo returns this participant's role and capabilities.
GetParticipantInfo(context.Context, *GetParticipantInfoRequest) (*GetParticipantInfoResponse, error)
}
type SubmitWorldOpRequest ¶
type SubmitWorldOpRequest struct {
// OpData is the serialized world operation.
OpData []byte `protobuf:"bytes,1,opt,name=op_data,json=opData,proto3" json:"opData,omitempty"`
// contains filtered or unexported fields
}
SubmitWorldOpRequest is the request to submit a world operation.
func (*SubmitWorldOpRequest) CloneMessageVT ¶
func (m *SubmitWorldOpRequest) CloneMessageVT() protobuf_go_lite.CloneMessage
func (*SubmitWorldOpRequest) CloneVT ¶
func (m *SubmitWorldOpRequest) CloneVT() *SubmitWorldOpRequest
func (*SubmitWorldOpRequest) EqualMessageVT ¶
func (this *SubmitWorldOpRequest) EqualMessageVT(thatMsg any) bool
func (*SubmitWorldOpRequest) EqualVT ¶
func (this *SubmitWorldOpRequest) EqualVT(that *SubmitWorldOpRequest) bool
func (*SubmitWorldOpRequest) GetOpData ¶
func (x *SubmitWorldOpRequest) GetOpData() []byte
func (*SubmitWorldOpRequest) MarshalJSON ¶
func (x *SubmitWorldOpRequest) MarshalJSON() ([]byte, error)
MarshalJSON marshals the SubmitWorldOpRequest to JSON.
func (*SubmitWorldOpRequest) MarshalProtoJSON ¶
func (x *SubmitWorldOpRequest) MarshalProtoJSON(s *json.MarshalState)
MarshalProtoJSON marshals the SubmitWorldOpRequest message to JSON.
func (*SubmitWorldOpRequest) MarshalProtoText ¶
func (x *SubmitWorldOpRequest) MarshalProtoText() string
func (*SubmitWorldOpRequest) MarshalToSizedBufferVT ¶
func (m *SubmitWorldOpRequest) MarshalToSizedBufferVT(dAtA []byte) (int, error)
func (*SubmitWorldOpRequest) MarshalToVT ¶
func (m *SubmitWorldOpRequest) MarshalToVT(dAtA []byte) (int, error)
func (*SubmitWorldOpRequest) MarshalVT ¶
func (m *SubmitWorldOpRequest) MarshalVT() (dAtA []byte, err error)
func (*SubmitWorldOpRequest) ProtoMessage ¶
func (*SubmitWorldOpRequest) ProtoMessage()
func (*SubmitWorldOpRequest) Reset ¶
func (x *SubmitWorldOpRequest) Reset()
func (*SubmitWorldOpRequest) SizeVT ¶
func (m *SubmitWorldOpRequest) SizeVT() (n int)
func (*SubmitWorldOpRequest) String ¶
func (x *SubmitWorldOpRequest) String() string
func (*SubmitWorldOpRequest) UnmarshalJSON ¶
func (x *SubmitWorldOpRequest) UnmarshalJSON(b []byte) error
UnmarshalJSON unmarshals the SubmitWorldOpRequest from JSON.
func (*SubmitWorldOpRequest) UnmarshalProtoJSON ¶
func (x *SubmitWorldOpRequest) UnmarshalProtoJSON(s *json.UnmarshalState)
UnmarshalProtoJSON unmarshals the SubmitWorldOpRequest message from JSON.
func (*SubmitWorldOpRequest) UnmarshalVT ¶
func (m *SubmitWorldOpRequest) UnmarshalVT(dAtA []byte) error
type SubmitWorldOpResponse ¶
type SubmitWorldOpResponse struct {
// Seqno is the new world sequence number after the operation.
Seqno uint64 `protobuf:"varint,1,opt,name=seqno,proto3" json:"seqno,omitempty"`
// Error is set if the operation failed.
Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"`
// contains filtered or unexported fields
}
SubmitWorldOpResponse is the response after a world operation is committed.
func (*SubmitWorldOpResponse) CloneMessageVT ¶
func (m *SubmitWorldOpResponse) CloneMessageVT() protobuf_go_lite.CloneMessage
func (*SubmitWorldOpResponse) CloneVT ¶
func (m *SubmitWorldOpResponse) CloneVT() *SubmitWorldOpResponse
func (*SubmitWorldOpResponse) EqualMessageVT ¶
func (this *SubmitWorldOpResponse) EqualMessageVT(thatMsg any) bool
func (*SubmitWorldOpResponse) EqualVT ¶
func (this *SubmitWorldOpResponse) EqualVT(that *SubmitWorldOpResponse) bool
func (*SubmitWorldOpResponse) GetError ¶
func (x *SubmitWorldOpResponse) GetError() string
func (*SubmitWorldOpResponse) GetSeqno ¶
func (x *SubmitWorldOpResponse) GetSeqno() uint64
func (*SubmitWorldOpResponse) MarshalJSON ¶
func (x *SubmitWorldOpResponse) MarshalJSON() ([]byte, error)
MarshalJSON marshals the SubmitWorldOpResponse to JSON.
func (*SubmitWorldOpResponse) MarshalProtoJSON ¶
func (x *SubmitWorldOpResponse) MarshalProtoJSON(s *json.MarshalState)
MarshalProtoJSON marshals the SubmitWorldOpResponse message to JSON.
func (*SubmitWorldOpResponse) MarshalProtoText ¶
func (x *SubmitWorldOpResponse) MarshalProtoText() string
func (*SubmitWorldOpResponse) MarshalToSizedBufferVT ¶
func (m *SubmitWorldOpResponse) MarshalToSizedBufferVT(dAtA []byte) (int, error)
func (*SubmitWorldOpResponse) MarshalToVT ¶
func (m *SubmitWorldOpResponse) MarshalToVT(dAtA []byte) (int, error)
func (*SubmitWorldOpResponse) MarshalVT ¶
func (m *SubmitWorldOpResponse) MarshalVT() (dAtA []byte, err error)
func (*SubmitWorldOpResponse) ProtoMessage ¶
func (*SubmitWorldOpResponse) ProtoMessage()
func (*SubmitWorldOpResponse) Reset ¶
func (x *SubmitWorldOpResponse) Reset()
func (*SubmitWorldOpResponse) SizeVT ¶
func (m *SubmitWorldOpResponse) SizeVT() (n int)
func (*SubmitWorldOpResponse) String ¶
func (x *SubmitWorldOpResponse) String() string
func (*SubmitWorldOpResponse) UnmarshalJSON ¶
func (x *SubmitWorldOpResponse) UnmarshalJSON(b []byte) error
UnmarshalJSON unmarshals the SubmitWorldOpResponse from JSON.
func (*SubmitWorldOpResponse) UnmarshalProtoJSON ¶
func (x *SubmitWorldOpResponse) UnmarshalProtoJSON(s *json.UnmarshalState)
UnmarshalProtoJSON unmarshals the SubmitWorldOpResponse message from JSON.
func (*SubmitWorldOpResponse) UnmarshalVT ¶
func (m *SubmitWorldOpResponse) UnmarshalVT(dAtA []byte) error
type WatchWorldSeqnoRequest ¶
type WatchWorldSeqnoRequest struct {
// LastSeenSeqno is the last seqno the follower has seen.
LastSeenSeqno uint64 `protobuf:"varint,1,opt,name=last_seen_seqno,json=lastSeenSeqno,proto3" json:"lastSeenSeqno,omitempty"`
// contains filtered or unexported fields
}
WatchWorldSeqnoRequest is the request to watch world seqno changes.
func (*WatchWorldSeqnoRequest) CloneMessageVT ¶
func (m *WatchWorldSeqnoRequest) CloneMessageVT() protobuf_go_lite.CloneMessage
func (*WatchWorldSeqnoRequest) CloneVT ¶
func (m *WatchWorldSeqnoRequest) CloneVT() *WatchWorldSeqnoRequest
func (*WatchWorldSeqnoRequest) EqualMessageVT ¶
func (this *WatchWorldSeqnoRequest) EqualMessageVT(thatMsg any) bool
func (*WatchWorldSeqnoRequest) EqualVT ¶
func (this *WatchWorldSeqnoRequest) EqualVT(that *WatchWorldSeqnoRequest) bool
func (*WatchWorldSeqnoRequest) GetLastSeenSeqno ¶
func (x *WatchWorldSeqnoRequest) GetLastSeenSeqno() uint64
func (*WatchWorldSeqnoRequest) MarshalJSON ¶
func (x *WatchWorldSeqnoRequest) MarshalJSON() ([]byte, error)
MarshalJSON marshals the WatchWorldSeqnoRequest to JSON.
func (*WatchWorldSeqnoRequest) MarshalProtoJSON ¶
func (x *WatchWorldSeqnoRequest) MarshalProtoJSON(s *json.MarshalState)
MarshalProtoJSON marshals the WatchWorldSeqnoRequest message to JSON.
func (*WatchWorldSeqnoRequest) MarshalProtoText ¶
func (x *WatchWorldSeqnoRequest) MarshalProtoText() string
func (*WatchWorldSeqnoRequest) MarshalToSizedBufferVT ¶
func (m *WatchWorldSeqnoRequest) MarshalToSizedBufferVT(dAtA []byte) (int, error)
func (*WatchWorldSeqnoRequest) MarshalToVT ¶
func (m *WatchWorldSeqnoRequest) MarshalToVT(dAtA []byte) (int, error)
func (*WatchWorldSeqnoRequest) MarshalVT ¶
func (m *WatchWorldSeqnoRequest) MarshalVT() (dAtA []byte, err error)
func (*WatchWorldSeqnoRequest) ProtoMessage ¶
func (*WatchWorldSeqnoRequest) ProtoMessage()
func (*WatchWorldSeqnoRequest) Reset ¶
func (x *WatchWorldSeqnoRequest) Reset()
func (*WatchWorldSeqnoRequest) SizeVT ¶
func (m *WatchWorldSeqnoRequest) SizeVT() (n int)
func (*WatchWorldSeqnoRequest) String ¶
func (x *WatchWorldSeqnoRequest) String() string
func (*WatchWorldSeqnoRequest) UnmarshalJSON ¶
func (x *WatchWorldSeqnoRequest) UnmarshalJSON(b []byte) error
UnmarshalJSON unmarshals the WatchWorldSeqnoRequest from JSON.
func (*WatchWorldSeqnoRequest) UnmarshalProtoJSON ¶
func (x *WatchWorldSeqnoRequest) UnmarshalProtoJSON(s *json.UnmarshalState)
UnmarshalProtoJSON unmarshals the WatchWorldSeqnoRequest message from JSON.
func (*WatchWorldSeqnoRequest) UnmarshalVT ¶
func (m *WatchWorldSeqnoRequest) UnmarshalVT(dAtA []byte) error
type WatchWorldSeqnoResponse ¶
type WatchWorldSeqnoResponse struct {
// Seqno is the current world sequence number.
Seqno uint64 `protobuf:"varint,1,opt,name=seqno,proto3" json:"seqno,omitempty"`
// contains filtered or unexported fields
}
WatchWorldSeqnoResponse is a seqno update pushed to followers.
func (*WatchWorldSeqnoResponse) CloneMessageVT ¶
func (m *WatchWorldSeqnoResponse) CloneMessageVT() protobuf_go_lite.CloneMessage
func (*WatchWorldSeqnoResponse) CloneVT ¶
func (m *WatchWorldSeqnoResponse) CloneVT() *WatchWorldSeqnoResponse
func (*WatchWorldSeqnoResponse) EqualMessageVT ¶
func (this *WatchWorldSeqnoResponse) EqualMessageVT(thatMsg any) bool
func (*WatchWorldSeqnoResponse) EqualVT ¶
func (this *WatchWorldSeqnoResponse) EqualVT(that *WatchWorldSeqnoResponse) bool
func (*WatchWorldSeqnoResponse) GetSeqno ¶
func (x *WatchWorldSeqnoResponse) GetSeqno() uint64
func (*WatchWorldSeqnoResponse) MarshalJSON ¶
func (x *WatchWorldSeqnoResponse) MarshalJSON() ([]byte, error)
MarshalJSON marshals the WatchWorldSeqnoResponse to JSON.
func (*WatchWorldSeqnoResponse) MarshalProtoJSON ¶
func (x *WatchWorldSeqnoResponse) MarshalProtoJSON(s *json.MarshalState)
MarshalProtoJSON marshals the WatchWorldSeqnoResponse message to JSON.
func (*WatchWorldSeqnoResponse) MarshalProtoText ¶
func (x *WatchWorldSeqnoResponse) MarshalProtoText() string
func (*WatchWorldSeqnoResponse) MarshalToSizedBufferVT ¶
func (m *WatchWorldSeqnoResponse) MarshalToSizedBufferVT(dAtA []byte) (int, error)
func (*WatchWorldSeqnoResponse) MarshalToVT ¶
func (m *WatchWorldSeqnoResponse) MarshalToVT(dAtA []byte) (int, error)
func (*WatchWorldSeqnoResponse) MarshalVT ¶
func (m *WatchWorldSeqnoResponse) MarshalVT() (dAtA []byte, err error)
func (*WatchWorldSeqnoResponse) ProtoMessage ¶
func (*WatchWorldSeqnoResponse) ProtoMessage()
func (*WatchWorldSeqnoResponse) Reset ¶
func (x *WatchWorldSeqnoResponse) Reset()
func (*WatchWorldSeqnoResponse) SizeVT ¶
func (m *WatchWorldSeqnoResponse) SizeVT() (n int)
func (*WatchWorldSeqnoResponse) String ¶
func (x *WatchWorldSeqnoResponse) String() string
func (*WatchWorldSeqnoResponse) UnmarshalJSON ¶
func (x *WatchWorldSeqnoResponse) UnmarshalJSON(b []byte) error
UnmarshalJSON unmarshals the WatchWorldSeqnoResponse from JSON.
func (*WatchWorldSeqnoResponse) UnmarshalProtoJSON ¶
func (x *WatchWorldSeqnoResponse) UnmarshalProtoJSON(s *json.UnmarshalState)
UnmarshalProtoJSON unmarshals the WatchWorldSeqnoResponse message from JSON.
func (*WatchWorldSeqnoResponse) UnmarshalVT ¶
func (m *WatchWorldSeqnoResponse) UnmarshalVT(dAtA []byte) error
type WorldRoleHandler ¶
type WorldRoleHandler struct {
// contains filtered or unexported fields
}
WorldRoleHandler implements RoleChangeHandler to manage the world engine lifecycle based on the coordinator's leader/follower role.
func NewWorldRoleHandler ¶
func NewWorldRoleHandler(le *logrus.Entry, factory EngineFactory) *WorldRoleHandler
NewWorldRoleHandler creates a new WorldRoleHandler.
func (*WorldRoleHandler) GetEngine ¶
func (h *WorldRoleHandler) GetEngine() world.Engine
GetEngine returns the current world engine, or nil if not leader.
func (*WorldRoleHandler) OnBecomeFollower ¶
func (h *WorldRoleHandler) OnBecomeFollower(ctx context.Context, leaderSocketPath string) error
OnBecomeFollower blocks until the follower context is cancelled.
func (*WorldRoleHandler) OnBecomeLeader ¶
func (h *WorldRoleHandler) OnBecomeLeader(ctx context.Context) error
OnBecomeLeader creates the world engine and blocks until leadership is lost.
func (*WorldRoleHandler) WaitEngine ¶
WaitEngine waits for the world engine to become available.