Documentation
¶
Index ¶
- type ACLResolver
- type Backend
- type BidirectionalStream
- type Config
- type HandleStreamRequest
- type MaterializedViewStore
- type MockACLResolver
- type MockClient
- func (c *MockClient) Close()
- func (c *MockClient) DrainStream(t *testing.T)
- func (c *MockClient) Recv() (*pbpeerstream.ReplicationMessage, error)
- func (c *MockClient) RecvWithTimeout(dur time.Duration) (*pbpeerstream.ReplicationMessage, error)
- func (c *MockClient) Send(r *pbpeerstream.ReplicationMessage) error
- type MockStream
- func (s *MockStream) Context() context.Context
- func (s *MockStream) Recv() (*pbpeerstream.ReplicationMessage, error)
- func (s *MockStream) RecvMsg(m interface{}) error
- func (s *MockStream) Send(r *pbpeerstream.ReplicationMessage) error
- func (s *MockStream) SendHeader(metadata.MD) error
- func (s *MockStream) SendMsg(m interface{}) error
- func (s *MockStream) SetHeader(metadata.MD) error
- func (s *MockStream) SetTrailer(metadata.MD)
- type MutableStatus
- func (s *MutableStatus) Done() <-chan struct{}
- func (s *MutableStatus) GetExportedServicesCount() int
- func (s *MutableStatus) GetImportedServicesCount() int
- func (s *MutableStatus) GetStatus() Status
- func (s *MutableStatus) IsConnected() bool
- func (s *MutableStatus) SetExportedServices(serviceNames []structs.ServiceName)
- func (s *MutableStatus) SetImportedServices(serviceNames []structs.ServiceName)
- func (s *MutableStatus) TrackAck()
- func (s *MutableStatus) TrackConnected()
- func (s *MutableStatus) TrackDisconnectedDueToError(error string)
- func (s *MutableStatus) TrackDisconnectedGracefully()
- func (s *MutableStatus) TrackNack(msg string)
- func (s *MutableStatus) TrackRecvError(error string)
- func (s *MutableStatus) TrackRecvHeartbeat()
- func (s *MutableStatus) TrackRecvResourceSuccess()
- func (s *MutableStatus) TrackSendError(error string)
- func (s *MutableStatus) TrackSendSuccess()
- type Server
- func (s *Server) ConnectedStreams() map[string]chan struct{}
- func (s *Server) DrainStream(req HandleStreamRequest)
- func (s *Server) ExchangeSecret(ctx context.Context, req *pbpeerstream.ExchangeSecretRequest) (*pbpeerstream.ExchangeSecretResponse, error)
- func (s *Server) HandleStream(streamReq HandleStreamRequest) error
- func (s *Server) Register(grpcServer *grpc.Server)
- func (s *Server) StreamResources(stream pbpeerstream.PeerStreamService_StreamResourcesServer) error
- func (s *Server) StreamStatus(peerID string) (resp Status, found bool)
- type StateStore
- type Status
- type Subscriber
- type SubscriptionBackend
- type Tracker
- func (t *Tracker) Connected(id string) (*MutableStatus, error)
- func (t *Tracker) ConnectedStreams() map[string]chan struct{}
- func (t *Tracker) DeleteStatus(id string)
- func (t *Tracker) DisconnectedDueToError(id string, error string)
- func (t *Tracker) DisconnectedGracefully(id string)
- func (t *Tracker) IsHealthy(s Status) bool
- func (t *Tracker) Register(id string) (*MutableStatus, error)
- func (t *Tracker) StreamStatus(id string) (resp Status, found bool)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ACLResolver ¶
type ACLResolver interface {
ResolveTokenAndDefaultMeta(string, *acl.EnterpriseMeta, *acl.AuthorizerContext) (resolver.Result, error)
}
type Backend ¶
type Backend interface {
Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error)
// IsLeader indicates whether the consul server is in a leader state or not.
IsLeader() bool
// SetLeaderAddress is called on a raft.LeaderObservation in a go routine
// in the consul server; see trackLeaderChanges()
SetLeaderAddress(string)
// GetLeaderAddress provides the best hint for the current address of the
// leader. There is no guarantee that this is the actual address of the
// leader.
GetLeaderAddress() string
ValidateProposedPeeringSecret(id string) (bool, error)
PeeringSecretsWrite(req *pbpeering.SecretsWriteRequest) error
PeeringTerminateByID(req *pbpeering.PeeringTerminateByIDRequest) error
PeeringTrustBundleWrite(req *pbpeering.PeeringTrustBundleWriteRequest) error
CatalogRegister(req *structs.RegisterRequest) error
CatalogDeregister(req *structs.DeregisterRequest) error
PeeringWrite(req *pbpeering.PeeringWriteRequest) error
}
type BidirectionalStream ¶
type BidirectionalStream interface {
Send(*pbpeerstream.ReplicationMessage) error
Recv() (*pbpeerstream.ReplicationMessage, error)
Context() context.Context
}
type Config ¶
type Config struct {
Backend Backend
GetStore func() StateStore
Logger hclog.Logger
ForwardRPC func(structs.RPCInfo, func(*grpc.ClientConn) error) (bool, error)
ACLResolver ACLResolver
// Datacenter of the Consul server this gRPC server is hosted on
Datacenter string
ConnectEnabled bool
// contains filtered or unexported fields
}
type HandleStreamRequest ¶
type HandleStreamRequest struct {
// LocalID is the UUID for the peering in the local Consul datacenter.
LocalID string
// RemoteID is the UUID for the peering from the perspective of the peer.
RemoteID string
// PeerName is the name of the peering.
PeerName string
// Partition is the local partition associated with the peer.
Partition string
// Stream is the open stream to the peer cluster.
Stream BidirectionalStream
}
func (HandleStreamRequest) IsAcceptor ¶
func (r HandleStreamRequest) IsAcceptor() bool
type MaterializedViewStore ¶
type MaterializedViewStore interface {
Get(ctx context.Context, req submatview.Request) (submatview.Result, error)
Notify(ctx context.Context, req submatview.Request, cID string, ch chan<- cache.UpdateEvent) error
}
type MockACLResolver ¶
MockACLResolver is an autogenerated mock type for the ACLResolver type
func NewMockACLResolver ¶
func NewMockACLResolver(t mockConstructorTestingTNewMockACLResolver) *MockACLResolver
NewMockACLResolver creates a new instance of MockACLResolver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func (*MockACLResolver) ResolveTokenAndDefaultMeta ¶
func (_m *MockACLResolver) ResolveTokenAndDefaultMeta(_a0 string, _a1 *acl.EnterpriseMeta, _a2 *acl.AuthorizerContext) (resolver.Result, error)
ResolveTokenAndDefaultMeta provides a mock function with given fields: _a0, _a1, _a2
type MockClient ¶
type MockClient struct {
ErrCh chan error
ReplicationStream *MockStream
// contains filtered or unexported fields
}
func NewMockClient ¶
func NewMockClient(ctx context.Context) *MockClient
func (*MockClient) Close ¶
func (c *MockClient) Close()
func (*MockClient) DrainStream ¶
func (c *MockClient) DrainStream(t *testing.T)
DrainStream reads messages from the stream until both the exported service list and trust bundle messages have been read. We do this because their ording is indeterministic.
func (*MockClient) Recv ¶
func (c *MockClient) Recv() (*pbpeerstream.ReplicationMessage, error)
func (*MockClient) RecvWithTimeout ¶
func (c *MockClient) RecvWithTimeout(dur time.Duration) (*pbpeerstream.ReplicationMessage, error)
func (*MockClient) Send ¶
func (c *MockClient) Send(r *pbpeerstream.ReplicationMessage) error
type MockStream ¶
type MockStream struct {
// contains filtered or unexported fields
}
MockStream mocks peering.PeeringService_StreamResourcesServer
func (*MockStream) Context ¶
func (s *MockStream) Context() context.Context
Context implements grpc.ServerStream and grpc.ClientStream
func (*MockStream) Recv ¶
func (s *MockStream) Recv() (*pbpeerstream.ReplicationMessage, error)
Recv implements pbpeerstream.PeeringService_StreamResourcesServer
func (*MockStream) RecvMsg ¶
func (s *MockStream) RecvMsg(m interface{}) error
RecvMsg implements grpc.ServerStream and grpc.ClientStream
func (*MockStream) Send ¶
func (s *MockStream) Send(r *pbpeerstream.ReplicationMessage) error
Send implements pbpeerstream.PeeringService_StreamResourcesServer
func (*MockStream) SendHeader ¶
func (s *MockStream) SendHeader(metadata.MD) error
SendHeader implements grpc.ServerStream
func (*MockStream) SendMsg ¶
func (s *MockStream) SendMsg(m interface{}) error
SendMsg implements grpc.ServerStream and grpc.ClientStream
func (*MockStream) SetHeader ¶
func (s *MockStream) SetHeader(metadata.MD) error
SetHeader implements grpc.ServerStream
func (*MockStream) SetTrailer ¶
func (s *MockStream) SetTrailer(metadata.MD)
SetTrailer implements grpc.ServerStream
type MutableStatus ¶
type MutableStatus struct {
Status
// contains filtered or unexported fields
}
func (*MutableStatus) Done ¶
func (s *MutableStatus) Done() <-chan struct{}
func (*MutableStatus) GetExportedServicesCount ¶
func (s *MutableStatus) GetExportedServicesCount() int
func (*MutableStatus) GetImportedServicesCount ¶
func (s *MutableStatus) GetImportedServicesCount() int
func (*MutableStatus) GetStatus ¶
func (s *MutableStatus) GetStatus() Status
func (*MutableStatus) IsConnected ¶
func (s *MutableStatus) IsConnected() bool
func (*MutableStatus) SetExportedServices ¶
func (s *MutableStatus) SetExportedServices(serviceNames []structs.ServiceName)
func (*MutableStatus) SetImportedServices ¶
func (s *MutableStatus) SetImportedServices(serviceNames []structs.ServiceName)
func (*MutableStatus) TrackAck ¶
func (s *MutableStatus) TrackAck()
func (*MutableStatus) TrackConnected ¶
func (s *MutableStatus) TrackConnected()
func (*MutableStatus) TrackDisconnectedDueToError ¶
func (s *MutableStatus) TrackDisconnectedDueToError(error string)
TrackDisconnectedDueToError tracks when the stream was disconnected due to an error. For example the heartbeat timed out, or we couldn't send into the stream.
func (*MutableStatus) TrackDisconnectedGracefully ¶
func (s *MutableStatus) TrackDisconnectedGracefully()
TrackDisconnectedGracefully tracks when the stream was disconnected in a way we expected. For example, we got a terminated message, or we terminated the stream ourselves.
func (*MutableStatus) TrackNack ¶
func (s *MutableStatus) TrackNack(msg string)
func (*MutableStatus) TrackRecvError ¶
func (s *MutableStatus) TrackRecvError(error string)
func (*MutableStatus) TrackRecvHeartbeat ¶
func (s *MutableStatus) TrackRecvHeartbeat()
TrackRecvHeartbeat tracks receiving a heartbeat from our peer.
func (*MutableStatus) TrackRecvResourceSuccess ¶
func (s *MutableStatus) TrackRecvResourceSuccess()
TrackRecvResourceSuccess tracks receiving a replicated resource.
func (*MutableStatus) TrackSendError ¶
func (s *MutableStatus) TrackSendError(error string)
func (*MutableStatus) TrackSendSuccess ¶
func (s *MutableStatus) TrackSendSuccess()
type Server ¶
func (*Server) ConnectedStreams ¶
ConnectedStreams returns a map of connected stream IDs to the corresponding channel for tearing them down.
func (*Server) DrainStream ¶
func (s *Server) DrainStream(req HandleStreamRequest)
DrainStream attempts to gracefully drain the stream when the connection is going to be torn down. Tearing down the connection too quickly can lead our peer receiving a context cancellation error before the stream termination message. Handling the termination message is important to set the expectation that the peering will not be reestablished unless recreated.
func (*Server) ExchangeSecret ¶
func (s *Server) ExchangeSecret(ctx context.Context, req *pbpeerstream.ExchangeSecretRequest) (*pbpeerstream.ExchangeSecretResponse, error)
ExchangeSecret exchanges the one-time secret embedded in a peering token for a long-lived secret for use with the peering stream handler. This secret exchange prevents peering tokens from being reused.
Note that if the peering secret exchange fails, a peering token may need to be re-generated, since the one-time initiation secret may have been invalidated.
func (*Server) HandleStream ¶
func (s *Server) HandleStream(streamReq HandleStreamRequest) error
func (*Server) StreamResources ¶
func (s *Server) StreamResources(stream pbpeerstream.PeerStreamService_StreamResourcesServer) error
StreamResources handles incoming streaming connections.
type StateStore ¶
type StateStore interface {
PeeringRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.Peering, error)
PeeringReadByID(ws memdb.WatchSet, id string) (uint64, *pbpeering.Peering, error)
PeeringList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error)
PeeringTrustBundleRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.PeeringTrustBundle, error)
PeeringTrustBundleList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
PeeringSecretsRead(ws memdb.WatchSet, peerID string) (*pbpeering.PeeringSecrets, error)
ExportedServicesForPeer(ws memdb.WatchSet, peerID, dc string) (uint64, *structs.ExportedServiceList, error)
ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind bool, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
CheckServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
NodeServiceList(ws memdb.WatchSet, nodeNameOrID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeServiceList, error)
CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error)
TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
ServiceList(ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.ServiceList, error)
ConfigEntry(ws memdb.WatchSet, kind, name string, entMeta *acl.EnterpriseMeta) (uint64, structs.ConfigEntry, error)
AbandonCh() <-chan struct{}
}
StateStore provides a read-only interface for querying Peering data.
type Status ¶
type Status struct {
// Connected is true when there is an open stream for the peer.
Connected bool
// NeverConnected is true for peerings that have never connected, false otherwise.
NeverConnected bool
// DisconnectErrorMessage tracks the error that caused the stream to disconnect non-gracefully.
// If the stream is connected or it disconnected gracefully it will be empty.
DisconnectErrorMessage string
// If the status is not connected, DisconnectTime tracks when the stream was closed. Else it's zero.
DisconnectTime *time.Time
// LastAck tracks the time we received the last ACK for a resource replicated TO the peer.
LastAck *time.Time
// LastNack tracks the time we received the last NACK for a resource replicated to the peer.
LastNack *time.Time
// LastNackMessage tracks the reported error message associated with the last NACK from a peer.
LastNackMessage string
// LastSendError tracks the time of the last error sending into the stream.
LastSendError *time.Time
// LastSendErrorMessage tracks the last error message when sending into the stream.
LastSendErrorMessage string
// LastSendSuccess tracks the time we last successfully sent a resource TO the peer.
LastSendSuccess *time.Time
// LastRecvHeartbeat tracks when we last received a heartbeat from our peer.
LastRecvHeartbeat *time.Time
// LastRecvResourceSuccess tracks the time we last successfully stored a resource replicated FROM the peer.
LastRecvResourceSuccess *time.Time
// LastRecvError tracks either:
// - The time we failed to store a resource replicated FROM the peer.
// - The time of the last error when receiving from the stream.
LastRecvError *time.Time
// LastRecvErrorMessage tracks the last error message when receiving from the stream.
LastRecvErrorMessage string
// TODO(peering): consider keeping track of imported and exported services thru raft
// ImportedServices keeps track of which service names are imported for the peer
ImportedServices []string
// ExportedServices keeps track of which service names a peer asks to export
ExportedServices []string
}
Status contains information about the replication stream to a peer cluster. TODO(peering): There's a lot of fields here...
func (*Status) GetExportedServicesCount ¶
func (*Status) GetImportedServicesCount ¶
type Subscriber ¶
type Subscriber interface {
Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error)
}
type SubscriptionBackend ¶
type SubscriptionBackend interface {
Subscriber
}
type Tracker ¶
type Tracker struct {
// contains filtered or unexported fields
}
Tracker contains a map of (PeerID -> MutableStatus). As streams are opened and closed we track details about their status.
func NewTracker ¶
func (*Tracker) Connected ¶
func (t *Tracker) Connected(id string) (*MutableStatus, error)
Connected registers a stream for a given peer, and marks it as connected. It also enforces that there is only one active stream for a peer.
func (*Tracker) ConnectedStreams ¶
func (*Tracker) DeleteStatus ¶
func (*Tracker) DisconnectedDueToError ¶
DisconnectedDueToError marks the peer id's stream status as disconnected due to an error.
func (*Tracker) DisconnectedGracefully ¶
DisconnectedGracefully marks the peer id's stream status as disconnected gracefully.
func (*Tracker) IsHealthy ¶
IsHealthy is a calculates the health of a peering status. We define a peering as unhealthy if its status has been in the following states for longer than the configured incomingHeartbeatTimeout.
- If it is disconnected
- If the last received Nack is newer than last received Ack
- If the last received error is newer than last received success
If none of these conditions apply, we call the peering healthy.