Documentation
¶
Index ¶
- func InitMetrics(registry *prometheus.Registry)
- func NewGrpcMessageClient(senderID NodeID, config *MessageClientConfig) *grpcMessageClient
- func NewMessageRouter(selfID NodeID, credentials *security.Credential, ...) *messageRouterImpl
- func NewMessageRouterWithLocalClient(selfID NodeID, credentials *security.Credential, ...) *messageRouterImpl
- type MessageClient
- type MessageClientConfig
- type MessageClientStream
- type MessageEntry
- type MessageRouter
- type MessageServer
- func (m *MessageServer) AddHandler(ctx context.Context, topic string, tpi typeInformation, ...) (chan struct{}, <-chan error, error)
- func (m *MessageServer) RemoveHandler(ctx context.Context, topic string) (chan struct{}, error)
- func (m *MessageServer) Run(ctx context.Context, localCh <-chan RawMessageEntry) error
- func (m *MessageServer) ScheduleDeregisterPeerTask(ctx context.Context, peerID string) error
- func (m *MessageServer) SendMessage(stream p2p.CDCPeerToPeer_SendMessageServer) error
- func (m *MessageServer) SyncAddHandler(ctx context.Context, topic string, tpi typeInformation, ...) (<-chan error, error)
- func (m *MessageServer) SyncRemoveHandler(ctx context.Context, topic string) error
- type MessageServerConfig
- type MessageServerStream
- type MockCluster
- type MockNode
- type NodeID
- type RawMessageEntry
- type Seq
- type Serializable
- type ServerWrapper
- type Topic
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func InitMetrics ¶
func InitMetrics(registry *prometheus.Registry)
InitMetrics initializes metrics used by pkg/p2p
func NewGrpcMessageClient ¶
func NewGrpcMessageClient(senderID NodeID, config *MessageClientConfig) *grpcMessageClient
NewGrpcMessageClient creates a new MessageClient senderID is an identifier for the local node.
func NewMessageRouter ¶
func NewMessageRouter(selfID NodeID, credentials *security.Credential, clientConfig *MessageClientConfig) *messageRouterImpl
NewMessageRouter creates a new MessageRouter
func NewMessageRouterWithLocalClient ¶
func NewMessageRouterWithLocalClient(selfID NodeID, credentials *security.Credential, clientConfig *MessageClientConfig) *messageRouterImpl
NewMessageRouterWithLocalClient creates a new MessageRouter with a local client.
Types ¶
type MessageClient ¶
type MessageClient interface {
// Run should be executed in a dedicated goroutine and it would block unless an irrecoverable error has been encountered.
Run(ctx context.Context, network string, addr string, receiverID NodeID, credential *security.Credential) (ret error)
// SendMessage sends a message of a given topic. It would block if the inner channel is congested.
SendMessage(ctx context.Context, topic Topic, value interface{}) (seq Seq, ret error)
// TrySendMessage tries to send a message of a given topic. It will return an error if the inner channel is congested.
TrySendMessage(ctx context.Context, topic Topic, value interface{}) (seq Seq, ret error)
// CurrentAck is used to query the latest sequence number for a topic that is acknowledged by the server.
// Note: currently only used for test.
CurrentAck(topic Topic) (Seq, bool)
}
MessageClient is an interface for sending messages to a remote peer.
type MessageClientConfig ¶
type MessageClientConfig struct {
// The size of the sending channel used to buffer
// messages before they go to gRPC.
SendChannelSize int
// The maximum duration for which messages wait to be batched.
BatchSendInterval time.Duration
// The maximum size in bytes of a batch.
MaxBatchBytes int
// The maximum number of messages in a batch.
MaxBatchCount int
// The limit of the rate at which the connection to the server is retried.
RetryRateLimitPerSecond float64
// The dial timeout for the gRPC client
DialTimeout time.Duration
// The advertised address of this node. Used for logging and monitoring purposes.
AdvertisedAddr string
// The version of the client for compatibility check.
// It should be in semver format. Empty string means no check.
ClientVersion string
// MaxRecvMsgSize is the maximum message size in bytes TiCDC can receive.
MaxRecvMsgSize int
}
MessageClientConfig is used to configure MessageClient
type MessageClientStream ¶
type MessageClientStream = proto.CDCPeerToPeer_SendMessageClient
MessageClientStream is an alias for the protobuf-generated interface for the message service.
type MessageEntry ¶
type MessageEntry = *proto.MessageEntry
MessageEntry is an alias for the protobuf-generated type for a message.
type MessageRouter ¶
type MessageRouter interface {
// AddPeer should be invoked when a new peer is discovered.
AddPeer(id NodeID, addr string)
// RemovePeer should be invoked when a peer is determined to
// be permanently unavailable.
RemovePeer(id NodeID)
// GetClient returns a MessageClient for `target`. It returns
// nil if the target peer does not exist. The returned client
// is canceled if RemovePeer is called on `target`.
GetClient(target NodeID) MessageClient
// GetLocalChannel returns a channel that can be used for intra-node communication.
GetLocalChannel() <-chan RawMessageEntry
// Close cancels all clients maintained internally and waits for all clients to exit.
Close()
// Err returns a channel to receive errors from.
Err() <-chan error
}
MessageRouter is used to maintain clients to all the peers in the cluster that the local node needs to communicate with.
type MessageServer ¶
type MessageServer struct {
// contains filtered or unexported fields
}
MessageServer is an implementation of the gRPC server for the peer-to-peer system
func NewMessageServer ¶
func NewMessageServer(serverID NodeID, config *MessageServerConfig) *MessageServer
NewMessageServer creates a new MessageServer
func (*MessageServer) AddHandler ¶
func (m *MessageServer) AddHandler( ctx context.Context, topic string, tpi typeInformation, fn func(string, interface{}) error, ) (chan struct{}, <-chan error, error)
AddHandler registers a handler for messages in a given topic.
func (*MessageServer) RemoveHandler ¶
func (m *MessageServer) RemoveHandler(ctx context.Context, topic string) (chan struct{}, error)
RemoveHandler removes the registered handler for the given topic.
func (*MessageServer) Run ¶
func (m *MessageServer) Run(ctx context.Context, localCh <-chan RawMessageEntry) error
Run starts the MessageServer's worker goroutines. It must be running to provide the gRPC service.
func (*MessageServer) ScheduleDeregisterPeerTask ¶
func (m *MessageServer) ScheduleDeregisterPeerTask(ctx context.Context, peerID string) error
ScheduleDeregisterPeerTask schedules a task to deregister a peer.
func (*MessageServer) SendMessage ¶
func (m *MessageServer) SendMessage(stream p2p.CDCPeerToPeer_SendMessageServer) error
SendMessage implements the gRPC call SendMessage.
func (*MessageServer) SyncAddHandler ¶
func (m *MessageServer) SyncAddHandler( ctx context.Context, topic string, tpi typeInformation, fn func(string, interface{}) error, ) (<-chan error, error)
SyncAddHandler registers a handler for messages in a given topic and waits for the operation to complete.
func (*MessageServer) SyncRemoveHandler ¶
func (m *MessageServer) SyncRemoveHandler(ctx context.Context, topic string) error
SyncRemoveHandler removes the registered handler for the given topic and wait for the operation to complete.
type MessageServerConfig ¶
type MessageServerConfig struct {
// The maximum number of entries to be cached for topics with no handler registered
MaxPendingMessageCountPerTopic int
// The maximum number of unhandled internal tasks for the main thread.
MaxPendingTaskCount int
// The size of the channel for pending messages before sending them to gRPC.
SendChannelSize int
// The interval between ACKs.
AckInterval time.Duration
// The size of the goroutine pool for running the handlers.
WorkerPoolSize int
// The maximum send rate per stream (per peer).
SendRateLimitPerStream float64
// The maximum number of peers acceptable by this server
MaxPeerCount int
// Semver of the server. Empty string means no version check.
ServerVersion string
// MaxRecvMsgSize is the maximum message size in bytes TiCDC can receive.
MaxRecvMsgSize int
// After a duration of this time if the server doesn't see any activity it
// pings the client to see if the transport is still alive.
KeepAliveTime time.Duration
// After having pinged for keepalive check, the server waits for a duration
// of Timeout and if no activity is seen even after that the connection is
// closed.
KeepAliveTimeout time.Duration
// The maximum time duration to wait before forcefully removing a handler.
//
// waitUnregisterHandleTimeout specifies how long to wait for
// the topic handler to consume all pending messages before
// forcefully unregister the handler.
// For a correct implementation of the handler, the time it needs
// to consume these messages is minimal, as the handler is not
// expected to block on channels, etc.
WaitUnregisterHandleTimeoutThreshold time.Duration
}
MessageServerConfig stores configurations for the MessageServer
type MessageServerStream ¶
type MessageServerStream = proto.CDCPeerToPeer_SendMessageServer
MessageServerStream is an alias for the protobuf-generated interface for the message service.
type MockCluster ¶
MockCluster mocks the whole peer-messaging cluster.
func NewMockCluster ¶
func NewMockCluster(t *testing.T, nodeCount int) *MockCluster
NewMockCluster creates a mock cluster.
type MockNode ¶
type MockNode struct {
Addr string
ID NodeID
Server *MessageServer
Router MessageRouter
// contains filtered or unexported fields
}
MockNode represents one mock node.
type NodeID ¶
type NodeID = string
NodeID represents the identifier of a sender node. Using IP address is not enough because of possible restarts.
type RawMessageEntry ¶
type RawMessageEntry struct {
// contains filtered or unexported fields
}
RawMessageEntry is an alias for the protobuf-generated type for a message.
type Serializable ¶
Serializable is an interface for defining custom serialization methods for peer messages.
type ServerWrapper ¶
type ServerWrapper struct {
// contains filtered or unexported fields
}
ServerWrapper implements a CDCPeerToPeerServer, and it maintains an inner CDCPeerToPeerServer instance that can be replaced as needed.
func NewServerWrapper ¶
func NewServerWrapper(cfg *MessageServerConfig) *ServerWrapper
NewServerWrapper creates a new ServerWrapper
func (*ServerWrapper) Reset ¶
func (s *ServerWrapper) Reset(inner p2p.CDCPeerToPeerServer)
Reset resets the inner server object in the ServerWrapper
func (*ServerWrapper) SendMessage ¶
func (s *ServerWrapper) SendMessage(stream p2p.CDCPeerToPeer_SendMessageServer) error
SendMessage implements p2p.CDCPeerToPeerServer
func (*ServerWrapper) ServerOptions ¶
func (s *ServerWrapper) ServerOptions() []grpc.ServerOption
ServerOptions returns server option for creating grpc servers.