Documentation
¶
Index ¶
- Constants
- Variables
- func MarshalMessage(msg *Message) *mboxrpc.MailboxMessage
- func UseLogger(logger btclog.Logger)
- type Client
- func (c *Client) SendMessage(ctx context.Context, receiverKey btcec.PublicKey, encryptedPayload []byte, ...) (uint64, error)
- func (c *Client) Start() error
- func (c *Client) StartAccountSubscription(ctx context.Context, msgChan chan<- *ReceivedMessages, ...) (ReceiveSubscription, error)
- func (c *Client) Stop() error
- type ClientConfig
- type Message
- type MessageFilter
- type MockMsgStore
- func (s *MockMsgStore) FetchMessage(_ context.Context, id uint64) (*Message, error)
- func (s *MockMsgStore) FetchMessageByOutPoint(ctx context.Context, claimedOp wire.OutPoint) (*Message, error)
- func (s *MockMsgStore) NumMessages(context.Context) uint64
- func (s *MockMsgStore) QueryMessages(_ context.Context, filter MessageFilter) ([]*Message, error)
- func (s *MockMsgStore) StoreMessage(_ context.Context, txProof proof.TxProof, msg *Message) (uint64, error)
- type MockServer
- type MsgStore
- type MultiSubscription
- type ReceiveSubscription
- type ReceivedMessages
- type Server
- func (s *Server) MailboxInfo(ctx context.Context, _ *mboxrpc.MailboxInfoRequest) (*mboxrpc.MailboxInfoResponse, error)
- func (s *Server) ReceiveMessages(grpcStream serverStream) error
- func (s *Server) RegisterSubscriber(receiver *fn.EventReceiver[[]*Message], deliverExisting bool, ...) error
- func (s *Server) RegisterWithGrpcServer(registrar grpc.ServiceRegistrar) error
- func (s *Server) RegisterWithRestProxy(restCtx context.Context, restMux *proxy.ServeMux, ...) error
- func (s *Server) RemoveSubscriber(subscriber *fn.EventReceiver[[]*Message]) error
- func (s *Server) SendMessage(ctx context.Context, req *mboxrpc.SendMessageRequest) (*mboxrpc.SendMessageResponse, error)
- func (s *Server) Start(cfg *ServerConfig) error
- func (s *Server) Stop() error
- type ServerConfig
Constants ¶
const (
// MsgMaxSize is the maximum size of a message in bytes.
MsgMaxSize = 65536
)
const Subsystem = "AMBX"
Subsystem defines the sub system name of this package.
Variables ¶
var ( // ErrServerShutdown is the error returned if the mailbox server signals // it's going to shut down. ErrServerShutdown = errors.New("server shutting down") // ErrServerInternal is the error returned if the mailbox server sends // back an error instead of a proper message. ErrServerInternal = errors.New("server sent unexpected error") // ErrClientShutdown is the error returned if the mailbox client itself // is shutting down. ErrClientShutdown = errors.New("client shutting down") // ErrAuthCanceled is returned if the authentication process of a single // mailbox subscription is aborted. ErrAuthCanceled = errors.New("authentication was canceled") )
var ( // ErrMessageTooLong is returned when a message exceeds the maximum // allowed length. ErrMessageTooLong = fmt.Errorf("message too long, max %d bytes", MsgMaxSize) // ErrMessageNotFound is returned when a message with the given ID or // outpoint cannot be found in the mailbox. ErrMessageNotFound = fmt.Errorf("message not found") )
Functions ¶
func MarshalMessage ¶
func MarshalMessage(msg *Message) *mboxrpc.MailboxMessage
MarshalMessage converts a Message to its gRPC representation.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client performs the client side part of mailbox message exchange.
func NewClient ¶
func NewClient(cfg *ClientConfig) *Client
NewClient returns a new instance to initiate mailbox connections with.
func (*Client) SendMessage ¶
func (c *Client) SendMessage(ctx context.Context, receiverKey btcec.PublicKey, encryptedPayload []byte, txProof proof.TxProof) (uint64, error)
SendMessage sends a message to the mailbox server. The receiverKey is the public key of the receiver, senderEphemeralKey is the ephemeral key used to encrypt the message, encryptedPayload is the encrypted message payload and txProof is the proof of the transaction that contains the message.
func (*Client) StartAccountSubscription ¶
func (c *Client) StartAccountSubscription(ctx context.Context, msgChan chan<- *ReceivedMessages, receiverKey keychain.KeyDescriptor, filter MessageFilter) (ReceiveSubscription, error)
StartAccountSubscription opens a stream to the server and subscribes to all updates that concern the given account, including all orders that spend from that account. Only a single stream is ever open to the server, so a second call to this method will send a second subscription over the same stream, multiplexing all messages into the same connection. A stream can be long-lived, so this can be called for every account as soon as it's confirmed open. This method will return as soon as the authentication was successful. Messages sent from the server can then be received on the FromServerChan channel.
type ClientConfig ¶
type ClientConfig struct {
// ServerAddress is the domain:port of the mailbox server.
ServerAddress string
// ProxyAddress is the SOCKS proxy that should be used to establish the
// connection.
ProxyAddress string
// Insecure signals that no TLS should be used if set to true.
Insecure bool
// SkipTlsVerify signals that the TLS certificate of the mailbox server
// should not be verified. This is only needed if the server uses a
// self-signed certificate.
SkipTlsVerify bool
// TLSPathServer is the path to a local file that holds the mailbox
// server's TLS certificate. This is only needed if the server is using
// a self-signed cert.
TLSPathServer string
// DialOpts is a list of additional options that should be used when
// dialing the gRPC connection.
DialOpts []grpc.DialOption
// Signer is the signing interface used to sign messages during the
// authentication handshake with the mailbox server.
Signer lndclient.SignerClient
// MinBackoff is the minimum time waited before the next re-connect
// attempt is made. After each try the backoff is doubled until
// MaxBackoff is reached.
MinBackoff time.Duration
// MaxBackoff is the maximum time waited between connection attempts.
MaxBackoff time.Duration
}
ClientConfig holds the configuration options for the mailbox client.
type Message ¶
type Message struct {
// ID is the unique identifier for this message, assigned by the
// mailbox server.
ID uint64
// ReceiverKey is the intended recipient of the message. This is the
// public key of the receiver.
ReceiverKey btcec.PublicKey
// EncryptedPayload is the encrypted message payload. This is the actual
// content of the message, encrypted with EICS.
EncryptedPayload []byte
// ArrivalTimestamp is the time when the message was received and
// validated by the mailbox server.
ArrivalTimestamp time.Time
// ProofBlockHeight is the block height of the block that was used as
// the tx proof for this message.
ProofBlockHeight uint32
}
Message represents a message in the mailbox.
type MessageFilter ¶
type MessageFilter struct {
// ReceiverKey is the message receiver's public key.
ReceiverKey btcec.PublicKey
// After is the time after which the message was received. If set, the
// filter will cause only messages that arrived after this time to be
// returned (exclusive).
After time.Time
// AfterID is the ID of the message after which the message was
// received. If set, the filter will cause only messages that arrived
// after this ID to be returned (exclusive).
AfterID uint64
// StartBlock is the block height after which the message was received.
// If set, the filter will cause only messages that arrived at this
// block or later to be returned (inclusive).
StartBlock uint32
}
MessageFilter is used to filter messages based on certain criteria.
func (*MessageFilter) DeliverExisting ¶
func (f *MessageFilter) DeliverExisting() bool
DeliverExisting returns true if the filter is set to deliver existing messages. This is the case if any of the fields other than the receiver key are set.
type MockMsgStore ¶
type MockMsgStore struct {
// contains filtered or unexported fields
}
func NewMockStore ¶
func NewMockStore() *MockMsgStore
func (*MockMsgStore) FetchMessage ¶
func (*MockMsgStore) FetchMessageByOutPoint ¶
func (*MockMsgStore) NumMessages ¶
func (s *MockMsgStore) NumMessages(context.Context) uint64
func (*MockMsgStore) QueryMessages ¶
func (s *MockMsgStore) QueryMessages(_ context.Context, filter MessageFilter) ([]*Message, error)
func (*MockMsgStore) StoreMessage ¶
type MockServer ¶
type MockServer struct {
ListenAddr string
// contains filtered or unexported fields
}
func NewMockServer ¶
func NewMockServer(t *testing.T) *MockServer
func NewMockServerWithSigner ¶
func NewMockServerWithSigner(t *testing.T, signer lndclient.SignerClient) *MockServer
func (*MockServer) PublishMessage ¶
func (m *MockServer) PublishMessage(msg *Message)
func (*MockServer) Start ¶
func (m *MockServer) Start(t *testing.T)
func (*MockServer) Stop ¶
func (m *MockServer) Stop(t *testing.T)
type MsgStore ¶
type MsgStore interface {
// StoreMessage stores a message in the mailbox, referencing the claimed
// outpoint of the transaction that was used to prove the message's
// authenticity. If a message with the same outpoint already exists,
// it returns proof.ErrTxMerkleProofExists.
StoreMessage(ctx context.Context, proof proof.TxProof,
msg *Message) (uint64, error)
// FetchMessage retrieves a message from the mailbox by its ID.
FetchMessage(ctx context.Context, id uint64) (*Message, error)
// FetchMessageByOutPoint retrieves a message from the mailbox by its
// claimed outpoint of the TX proof that was used to send it. If no
// message with the given outpoint exists, it returns
// ErrMessageNotFound.
FetchMessageByOutPoint(ctx context.Context,
claimedOp wire.OutPoint) (*Message, error)
// QueryMessages retrieves messages based on a query.
QueryMessages(ctx context.Context, filter MessageFilter) ([]*Message,
error)
// NumMessages returns the number of messages currently in the mailbox.
// Implementations should make sure that this can be calculated quickly
// and efficiently (e.g., by caching the result), as it might be queried
// often.
NumMessages(ctx context.Context) uint64
}
MsgStore is an interface for storing and retrieving messages in the mailbox.
type MultiSubscription ¶
MultiSubscription is a subscription manager that can handle multiple mailbox clients, allowing subscriptions to different accounts across different mailbox servers. It manages subscriptions and message queues for each client and provides a unified interface for receiving messages.
func NewMultiSubscription ¶
func NewMultiSubscription(baseClientConfig ClientConfig) *MultiSubscription
NewMultiSubscription creates a new MultiSubscription instance.
func (*MultiSubscription) MessageChan ¶
func (m *MultiSubscription) MessageChan() <-chan *ReceivedMessages
MessageChan returns a channel that can be used to receive messages from all subscriptions across all mailbox clients. This channel will receive ReceivedMessages, which contain the messages and their associated metadata, such as the sender and receiver keys.
func (*MultiSubscription) Stop ¶
func (m *MultiSubscription) Stop() error
Stop stops all active subscriptions and mailbox clients. It cancels all active subscription contexts and waits for all clients to stop gracefully.
func (*MultiSubscription) Subscribe ¶
func (m *MultiSubscription) Subscribe(ctx context.Context, serverURL url.URL, receiverKey keychain.KeyDescriptor, filter MessageFilter) error
Subscribe adds a new subscription for the specified client URL and receiver key. It starts a new mailbox client if one does not already exist for the given URL. The subscription will receive messages that match the provided filter and will send them to the shared message queue.
type ReceiveSubscription ¶
type ReceiveSubscription interface {
// IsSubscribed returns true if the subscription is active and the
// stream to the server is open. This might flip to false if the server
// connection is lost. As long as the subscription isn't stopped, it
// will try to reconnect to the server automatically and indefinitely.
IsSubscribed() bool
// Stop can be used to terminate the subscription. This will close the
// stream to the server.
Stop() error
}
ReceiveSubscription is the interface returned from a client to the caller for receiving messages from the server that are intended for a specific receiver.
type ReceivedMessages ¶
type ReceivedMessages struct {
// Receiver is the key descriptor of the receiver that these messages
// are intended for.
Receiver keychain.KeyDescriptor
// Messages is the list of messages that were received from the server.
Messages []*mboxrpc.MailboxMessage
}
ReceivedMessages holds the messages received from the mailbox server for a specific receiver. This is used to return the messages to the caller after the 3-way authentication handshake is complete. The receiver is included to make it easier to multiplex multiple subscriptions on the same channel.
type Server ¶
type Server struct {
mboxrpc.UnimplementedMailboxServer
*lfn.ContextGuard
// contains filtered or unexported fields
}
Server is the mailbox server that handles incoming messages from clients and sends them to the appropriate subscribers. It also handles the authentication process for clients and manages the connected streams.
func NewServer ¶
func NewServer() *Server
NewServer creates a new mailbox server with the given configuration.
func (*Server) MailboxInfo ¶
func (s *Server) MailboxInfo(ctx context.Context, _ *mboxrpc.MailboxInfoRequest) (*mboxrpc.MailboxInfoResponse, error)
MailboxInfo returns basic server information.
func (*Server) ReceiveMessages ¶
ReceiveMessages initiates a bidirectional stream to receive messages for a specific receiver. This stream implements the challenge-response handshake required for receiver authentication before messages are delivered. Expected flow:
- Client -> Server: ReceiveMessagesRequest(init = InitReceive{...})
- Server -> Client: ReceiveMessagesResponse(challenge = Challenge{...})
- Client -> Server: ReceiveMessagesRequest(auth_sig = AuthSignature{...})
- Server -> Client: [Stream of ReceiveMessagesResponse( message = MailboxMessage{...} )]
- Server -> Client: ReceiveMessagesResponse(eos = EndOfStream{}) or ReceiveMessagesResponse(error = ReceiveError{...})
func (*Server) RegisterSubscriber ¶
func (s *Server) RegisterSubscriber(receiver *fn.EventReceiver[[]*Message], deliverExisting bool, deliverFrom MessageFilter) error
RegisterSubscriber adds a new subscriber to the set of subscribers that will be notified of any new status update events.
func (*Server) RegisterWithGrpcServer ¶
func (s *Server) RegisterWithGrpcServer(registrar grpc.ServiceRegistrar) error
RegisterWithGrpcServer registers the rpcServer with the passed root gRPC server.
func (*Server) RegisterWithRestProxy ¶
func (s *Server) RegisterWithRestProxy(restCtx context.Context, restMux *proxy.ServeMux, restDialOpts []grpc.DialOption, restProxyDest string) error
RegisterWithRestProxy registers the RPC server with the given rest proxy.
func (*Server) RemoveSubscriber ¶
func (s *Server) RemoveSubscriber( subscriber *fn.EventReceiver[[]*Message]) error
RemoveSubscriber removes a subscriber from the set of status event subscribers.
func (*Server) SendMessage ¶
func (s *Server) SendMessage(ctx context.Context, req *mboxrpc.SendMessageRequest) (*mboxrpc.SendMessageResponse, error)
SendMessage sends a single message to a receiver's mailbox. Requires a valid, unused Bitcoin transaction outpoint as proof of work/stake.
func (*Server) Start ¶
func (s *Server) Start(cfg *ServerConfig) error
Start signals that the RPC server starts accepting requests.
type ServerConfig ¶
type ServerConfig struct {
// AuthTimeout is the maximum time the server will wait for the client
// to authenticate before closing the connection.
AuthTimeout time.Duration
// Signer is the lndclient.SignerClient used to verify the
// authentication signature.
Signer lndclient.SignerClient
// HeaderVerifier is the proof.HeaderVerifier used to verify the TX
// proofs sent by clients to prove the rate limit of their messages.
HeaderVerifier proof.HeaderVerifier
// MerkleVerifier is the proof.MerkleVerifier used to verify the
// TX proofs sent by clients to prove the rate limit of their messages.
MerkleVerifier proof.MerkleVerifier
// MsgStore is the message store used to store and retrieve messages
// sent to the mailbox server.
MsgStore MsgStore
}
ServerConfig is the configuration struct for the mailbox server. It contains all the dependencies needed to run the server.