Documentation
¶
Index ¶
- Constants
- Variables
- func Append(buf []byte, more ...byte) []byte
- func AppendString(buf []byte, more string) []byte
- func AsyncExecute(f func())
- func AsyncResponse() bool
- func BatchRecv() bool
- func BatchSend() bool
- func BeforeRecv(h func(net.Conn) error)
- func BeforeSend(h func(net.Conn) error)
- func Free(buf []byte)
- func Handle(m string, h HandlerFunc, argv ...interface{})
- func HandleConnected(onConnected func(*Client))
- func HandleDisconnected(onDisConnected func(*Client))
- func HandleMalloc(f func(int) []byte)
- func HandleNotFound(h HandlerFunc)
- func HandleSessionMiss(onSessionMiss func(c *Client, m *Message))
- func LogDebugInfo()
- func Malloc(size int) []byte
- func MaxBodyLen() int
- func ReadTimeout() time.Duration
- func Realloc(buf []byte, size int) []byte
- func RecvBufferSize() int
- func SendBufferSize() int
- func SendQueueSize() int
- func SetAsyncExecutor(executor func(f func()))
- func SetAsyncResponse(async bool)
- func SetBatchRecv(batch bool)
- func SetBatchSend(batch bool)
- func SetDebug(enable bool)
- func SetHandler(h Handler)
- func SetLogTag(tag string)
- func SetMaxBodyLen(l int)
- func SetReadTimeout(timeout time.Duration)
- func SetReaderWrapper(wrapper func(conn net.Conn) io.Reader)
- func SetRecvBufferSize(size int)
- func SetSendBufferSize(size int)
- func SetSendQueueSize(size int)
- func SetWriteTimeout(timeout time.Duration)
- func Use(h HandlerFunc)
- func UseCoder(coder MessageCoder)
- func WriteTimeout() time.Duration
- type Allocator
- type BufferPool
- func (bp *BufferPool) Append(buf []byte, more ...byte) []byte
- func (bp *BufferPool) AppendString(buf []byte, more string) []byte
- func (bp *BufferPool) Free(buf []byte)
- func (bp *BufferPool) LogDebugInfo()
- func (bp *BufferPool) Malloc(size int) []byte
- func (bp *BufferPool) Realloc(buf []byte, size int) []byte
- type Client
- func (c *Client) Call(method string, req interface{}, rsp interface{}, timeout time.Duration, ...) error
- func (c *Client) CallWith(ctx context.Context, method string, req interface{}, rsp interface{}, ...) error
- func (c *Client) CheckState() error
- func (c *Client) Delete(key interface{})
- func (c *Client) Get(key interface{}) (interface{}, bool)
- func (c *Client) Keepalive(interval time.Duration)
- func (c *Client) NewMessage(cmd byte, method string, v interface{}, args ...interface{}) *Message
- func (c *Client) Ping()
- func (c *Client) Pong()
- func (c *Client) PushMsg(msg *Message, timeout time.Duration) error
- func (c *Client) Restart() error
- func (c *Client) Set(key interface{}, value interface{})
- func (c *Client) SetState(running bool)
- func (c *Client) Stop()
- type ClientPool
- type Context
- func (ctx *Context) Abort()
- func (ctx *Context) Arg() interface{}
- func (ctx *Context) Bind(v interface{}) error
- func (ctx *Context) Deadline() (deadline time.Time, ok bool)
- func (ctx *Context) Done() <-chan struct{}
- func (ctx *Context) Err() error
- func (ctx *Context) Error(v interface{}) error
- func (ctx *Context) Get(key interface{}) (interface{}, bool)
- func (ctx *Context) Next()
- func (ctx *Context) Release()
- func (ctx *Context) Set(key interface{}, value interface{})
- func (ctx *Context) Value(key interface{}) interface{}
- func (ctx *Context) Values() map[interface{}]interface{}
- func (ctx *Context) Write(v interface{}) error
- func (ctx *Context) WriteWithTimeout(v interface{}, timeout time.Duration) error
- type DialerFunc
- type Handler
- type HandlerFunc
- type Header
- type Message
- func (m *Message) BodyLen() int
- func (m *Message) Cmd() byte
- func (m *Message) Data() []byte
- func (m *Message) Error() error
- func (m *Message) Get(key interface{}) (interface{}, bool)
- func (m *Message) IsAsync() bool
- func (m *Message) IsError() bool
- func (m *Message) IsFlagBitSet(index int) bool
- func (m *Message) Len() int
- func (m *Message) Method() string
- func (m *Message) MethodLen() int
- func (m *Message) Payback()
- func (m *Message) Release()
- func (m *Message) ResetAttrs()
- func (m *Message) Seq() uint64
- func (m *Message) Set(key interface{}, value interface{})
- func (m *Message) SetAsync(isAsync bool)
- func (m *Message) SetBodyLen(l int)
- func (m *Message) SetCmd(cmd byte)
- func (m *Message) SetError(isError bool)
- func (m *Message) SetFlagBit(index int, value bool) error
- func (m *Message) SetMethodLen(l int)
- func (m *Message) SetSeq(seq uint64)
- func (m *Message) Values() map[interface{}]interface{}
- type MessageCoder
- type NativeAllocator
- type Server
- func (s *Server) ForEach(h func(*Client))
- func (s *Server) ForEachWithFilter(h func(*Client), filter func(*Client) bool)
- func (s *Server) NewMessage(cmd byte, method string, v interface{}, args ...interface{}) *Message
- func (s *Server) Run(addr string) error
- func (s *Server) Serve(ln net.Listener) error
- func (s *Server) Shutdown(ctx context.Context) error
- func (s *Server) Stop() error
- type WebsocketConn
Constants ¶
const ( // TimeZero represents zero time. TimeZero time.Duration = 0 // TimeForever represents forever time. TimeForever time.Duration = 1<<63 - 1 )
const ( // CmdNone is invalid CmdNone byte = 0 // CmdRequest the other side should response to a request message CmdRequest byte = 1 // CmdResponse the other side should not response to a request message CmdResponse byte = 2 // CmdNotify the other side should not response to a request message CmdNotify byte = 3 // CmdPing . CmdPing byte = 4 // CmdPong . CmdPong byte = 5 )
const ( // HeaderIndexBodyLenBegin . HeaderIndexBodyLenBegin = 0 // HeaderIndexBodyLenEnd . HeaderIndexBodyLenEnd = 4 // HeaderIndexReserved . HeaderIndexReserved = 4 // HeaderIndexCmd . HeaderIndexCmd = 5 // HeaderIndexFlag . HeaderIndexFlag = 6 // HeaderIndexMethodLen . HeaderIndexMethodLen = 7 // HeaderIndexSeqBegin . HeaderIndexSeqBegin = 8 // HeaderIndexSeqEnd . HeaderIndexSeqEnd = 16 // HeaderFlagMaskError . HeaderFlagMaskError byte = 0x01 // HeaderFlagMaskAsync . HeaderFlagMaskAsync byte = 0x02 )
const ( // HeadLen represents Message head length. HeadLen int = 16 // MaxMethodLen limits Message method length. MaxMethodLen int = 127 // DefaultMaxBodyLen limits Message body length. DefaultMaxBodyLen int = 1024*1024*64 - 16 )
Variables ¶
var ( // ErrClientTimeout represents a timeout error because of timer or context. ErrClientTimeout = errors.New("timeout") // ErrClientInvalidTimeoutZero represents an error of 0 time parameter. ErrClientInvalidTimeoutZero = errors.New("invalid timeout, should not be 0") // ErrClientInvalidTimeoutLessThanZero represents an error of less than 0 time parameter. ErrClientInvalidTimeoutLessThanZero = errors.New("invalid timeout, should not be < 0") // ErrClientInvalidTimeoutZeroWithNonNilCallback represents an error with 0 time parameter but with non-nil callback. ErrClientInvalidTimeoutZeroWithNonNilCallback = errors.New("invalid timeout 0 with non-nil callback") // ErrClientOverstock represents an error of Client's send queue is full. ErrClientOverstock = errors.New("timeout: rpc Client's send queue is full") // ErrClientReconnecting represents an error that Client is reconnecting. ErrClientReconnecting = errors.New("client reconnecting") // ErrClientStopped represents an error that Client is stopped. ErrClientStopped = errors.New("client stopped") // ErrClientInvalidPoolDialers represents an error of empty dialer array. ErrClientInvalidPoolDialers = errors.New("invalid dialers: empty array") )
client error
var ( // ErrInvalidRspMessage represents an error of invalid message CMD. ErrInvalidRspMessage = errors.New("invalid response message cmd") // ErrMethodNotFound represents an error of method not found. ErrMethodNotFound = errors.New("method not found") // ErrInvalidFlagBitIndex represents an error of invlaid flag bit index. ErrInvalidFlagBitIndex = errors.New("invalid index, should be 0-7") )
message error
var ( // PingMessage . PingMessage = newMessage(CmdPing, "", nil, false, false, 0, nil, nil, nil) // PongMessage . PongMessage = newMessage(CmdPong, "", nil, false, false, 0, nil, nil, nil) )
var ( // ErrContextResponseToNotify represents an error that response to a notify message. ErrContextResponseToNotify = errors.New("should not response to a context with notify message") )
context error
var ( // ErrTimeout represents an error of timeout. ErrTimeout = errors.New("timeout") )
general errors
Functions ¶
func AppendString ¶
AppendString exports default package method.
func BeforeRecv ¶
BeforeRecv registers default handler which will be called before Recv.
func BeforeSend ¶
BeforeSend registers default handler which will be called before Send.
func Handle ¶
func Handle(m string, h HandlerFunc, argv ...interface{})
Handle registers default method/router handler.
If pass a Boolean value of "true", the handler will be called asynchronously in a new goroutine, Else the handler will be called synchronously in the client's reading goroutine one by one.
func HandleConnected ¶
func HandleConnected(onConnected func(*Client))
HandleConnected registers default handler which will be called when client connected.
func HandleDisconnected ¶
func HandleDisconnected(onDisConnected func(*Client))
HandleDisconnected registers default handler which will be called when client disconnected.
func HandleMalloc ¶
HandleMalloc registers default buffer maker.
func HandleNotFound ¶
func HandleNotFound(h HandlerFunc)
HandleNotFound registers default "" method/router handler, It will be called when mothod/router is not found.
func HandleSessionMiss ¶
HandleSessionMiss registers default handler which will be called when async message seq not found.
func MaxBodyLen ¶
func MaxBodyLen() int
func RecvBufferSize ¶
func RecvBufferSize() int
RecvBufferSize returns default client's read buffer size.
func SendBufferSize ¶
func SendBufferSize() int
SendBufferSize returns default client's read buffer size.
func SendQueueSize ¶
func SendQueueSize() int
SendQueueSize returns default client's send queue channel capacity.
func SetAsyncExecutor ¶
func SetAsyncExecutor(executor func(f func()))
SetAsyncExecutor sets executor. AsyncExecute executes a func
func SetAsyncResponse ¶
func SetAsyncResponse(async bool)
SetAsyncResponse sets default AsyncResponse flag.
func SetMaxBodyLen ¶
func SetMaxBodyLen(l int)
func SetReadTimeout ¶
SetReadTimeout sets client's read timeout.
func SetReaderWrapper ¶
SetReaderWrapper registers default reader wrapper for net.Conn.
func SetRecvBufferSize ¶
func SetRecvBufferSize(size int)
SetRecvBufferSize sets default client's read buffer size.
func SetSendBufferSize ¶
func SetSendBufferSize(size int)
SetSendBufferSize sets default client's read buffer size.
func SetSendQueueSize ¶
func SetSendQueueSize(size int)
SetSendQueueSize sets default client's send queue channel capacity.
func SetWriteTimeout ¶
SetWriteTimeout sets client's write timeout.
func UseCoder ¶
func UseCoder(coder MessageCoder)
UseCoder registers default message coding middleware, coder.Encode will be called before message send, coder.Decode will be called after message recv.
Types ¶
type Allocator ¶
type BufferPool ¶
type BufferPool struct {
Debug bool
// contains filtered or unexported fields
}
BufferPool .
func (*BufferPool) AppendString ¶
func (bp *BufferPool) AppendString(buf []byte, more string) []byte
AppendString .
func (*BufferPool) LogDebugInfo ¶
func (bp *BufferPool) LogDebugInfo()
type Client ¶
type Client struct {
Conn net.Conn
Codec codec.Codec
Handler Handler
Reader io.Reader
Dialer DialerFunc
// contains filtered or unexported fields
}
Client represents an arpc Client. There may be multiple outstanding Calls or Notifys associated with a single Client, and a Client may be used by multiple goroutines simultaneously.
func NewClient ¶
func NewClient(dialer DialerFunc, args ...interface{}) (*Client, error)
NewClient creates a Client.
func (*Client) Call ¶
func (c *Client) Call(method string, req interface{}, rsp interface{}, timeout time.Duration, args ...interface{}) error
Call makes an rpc call with a timeout. Call will block waiting for the server's response until timeout.
func (*Client) CallWith ¶
func (c *Client) CallWith(ctx context.Context, method string, req interface{}, rsp interface{}, args ...interface{}) error
CallWith uses context to make rpc call. CallWith blocks to wait for a response from the server until it times out.
func (*Client) NewMessage ¶
NewMessage creates a Message by client's seq, handler and codec.
func (*Client) Set ¶
func (c *Client) Set(key interface{}, value interface{})
Set sets key-value pair.
type ClientPool ¶
type ClientPool struct {
// contains filtered or unexported fields
}
ClientPool represents an arpc Client Pool.
func NewClientPool ¶
func NewClientPool(dialer DialerFunc, size int, args ...interface{}) (*ClientPool, error)
NewClientPool creates a ClientPool.
func NewClientPoolFromDialers ¶
func NewClientPoolFromDialers(dialers []DialerFunc, args ...interface{}) (*ClientPool, error)
NewClientPoolFromDialers creates a ClientPool by multiple dialers.
func (*ClientPool) Get ¶
func (pool *ClientPool) Get(index int) *Client
Get returns a Client by index.
func (*ClientPool) Next ¶
func (pool *ClientPool) Next() *Client
Next returns a Client by round robin.
type Context ¶
type Context struct {
Client *Client
// contains filtered or unexported fields
}
Context represents an arpc Call's context.
func (*Context) Abort ¶
func (ctx *Context) Abort()
Abort stops the one-by-one-calling of middlewares and method/router handler.
func (*Context) Bind ¶
Bind parses the body data and stores the result in the value pointed to by v.
func (*Context) Next ¶
func (ctx *Context) Next()
Next calls next middleware or method/router handler.
func (*Context) Set ¶
func (ctx *Context) Set(key interface{}, value interface{})
Set sets key-value pair.
func (*Context) Value ¶
func (ctx *Context) Value(key interface{}) interface{}
Value returns the value associated with this context for key, implements stdlib's Context.
func (*Context) Values ¶
func (ctx *Context) Values() map[interface{}]interface{}
Values returns values.
type DialerFunc ¶
DialerFunc defines the dialer used by arpc Client to connect to the server.
type Handler ¶
type Handler interface {
// Clone returns a copy of Handler.
Clone() Handler
// LogTag returns log tag value.
LogTag() string
// SetLogTag sets log tag.
SetLogTag(tag string)
// HandleConnected registers handler which will be called when client connected.
HandleConnected(onConnected func(*Client))
// OnConnected will be called when client is connected.
OnConnected(c *Client)
// HandleDisconnected registers handler which will be called when client is disconnected.
HandleDisconnected(onDisConnected func(*Client))
// OnDisconnected will be called when client is disconnected.
OnDisconnected(c *Client)
// MaxReconnectTimes returns client's max reconnect times.
MaxReconnectTimes() int
// SetMaxReconnectTimes sets client's max reconnect times for.
SetMaxReconnectTimes(n int)
// HandleSessionMiss registers handler which will be called when async message seq not found.
HandleSessionMiss(onSessionMiss func(c *Client, m *Message))
// OnSessionMiss will be called when async message seq not found.
OnSessionMiss(c *Client, m *Message)
// HandleContextDone registers handler which will be called when message dropped.
HandleContextDone(onContextDone func(ctx *Context))
// OnContextDone will be called when message is dropped.
OnContextDone(ctx *Context)
// BeforeRecv registers handler which will be called before Recv.
BeforeRecv(h func(net.Conn) error)
// BeforeSend registers handler which will be called before Send.
BeforeSend(h func(net.Conn) error)
// BatchRecv returns BatchRecv flag.
BatchRecv() bool
// SetBatchRecv sets BatchRecv flag.
SetBatchRecv(batch bool)
// BatchSend returns BatchSend flag.
BatchSend() bool
// SetBatchSend sets BatchSend flag.
SetBatchSend(batch bool)
// AsyncWrite returns AsyncWrite flag.
AsyncWrite() bool
// SetAsyncWrite sets AsyncWrite flag.
SetAsyncWrite(async bool)
// AsyncResponse returns AsyncResponse flag.
AsyncResponse() bool
// SetAsyncResponse sets AsyncResponse flag.
SetAsyncResponse(async bool)
// WrapReader wraps net.Conn to Read data with io.Reader.
WrapReader(conn net.Conn) io.Reader
// SetReaderWrapper registers reader wrapper for net.Conn.
SetReaderWrapper(wrapper func(conn net.Conn) io.Reader)
// Send writes buffer data to a connection.
Send(c net.Conn, buffer []byte) (int, error)
// SendN writes multiple buffer data to a connection.
SendN(conn net.Conn, buffers net.Buffers) (int, error)
// RecvBufferSize returns client's recv buffer size.
RecvBufferSize() int
// SetRecvBufferSize sets client's recv buffer size.
SetRecvBufferSize(size int)
// SendBufferSize returns client's send buffer size.
SendBufferSize() int
// SetSendBufferSize sets client's send buffer size.
SetSendBufferSize(size int)
// ReadTimeout returns client's read timeout.
ReadTimeout() time.Duration
// SetReadTimeout sets client's read timeout.
SetReadTimeout(timeout time.Duration)
// WriteTimeout returns client's write timeout.
WriteTimeout() time.Duration
// SetWriteTimeout sets client's write timeout.
SetWriteTimeout(timeout time.Duration)
// SendQueueSize returns client's send queue channel capacity.
SendQueueSize() int
// SetSendQueueSize sets client's send queue channel capacity.
SetSendQueueSize(size int)
// MaxBodyLen returns max body length of a message.
MaxBodyLen() int
// SetMaxBodyLen sets max body length of a message.
SetMaxBodyLen(l int)
// Use registers method/router handler middleware.
Use(h HandlerFunc)
// UseCoder registers message coding middleware,
// coder.Encode will be called before message send,
// coder.Decode will be called after message recv.
UseCoder(coder MessageCoder)
// Coders returns coding middlewares.
Coders() []MessageCoder
// Handle registers method/router handler.
//
// If pass a Boolean value of "true", the handler will be called asynchronously in a new goroutine,
// Else the handler will be called synchronously in the client's reading goroutine one by one.
Handle(m string, h HandlerFunc, argv ...interface{})
// HandleNotFound registers "" method/router handler,
// It will be called when mothod/router is not found.
HandleNotFound(h HandlerFunc)
// OnMessage finds method/router middlewares and handler, then call them one by one.
OnMessage(c *Client, m *Message)
// Malloc makes a buffer by size.
Malloc(size int) []byte
// HandleMalloc registers buffer maker.
HandleMalloc(f func(size int) []byte)
// Append append bytes to buffer.
Append(b []byte, more ...byte) []byte
// HandleAppend registers buffer appender.
HandleAppend(f func(b []byte, more ...byte) []byte)
// Free release a buffer.
Free([]byte)
// HandleFree registers buffer releaser.
HandleFree(f func(buf []byte))
Context() (context.Context, context.CancelFunc)
SetContext(ctx context.Context, cancel context.CancelFunc)
Cancel()
// NewMessage creates a Message.
NewMessage(cmd byte, method string, v interface{}, isError bool, isAsync bool, seq uint64, codec codec.Codec, values map[interface{}]interface{}) *Message
// NewMessageWithBuffer creates a message with the buffer and manage the message by the pool.
// The buffer arg should be managed by a pool if EnablePool(true) .
NewMessageWithBuffer(buffer []byte) *Message
// SetAsyncExecutor sets executor.
SetAsyncExecutor(executor func(f func()))
// AsyncExecute executes a func
AsyncExecute(f func())
// SetAsyncExecutor sets whether communication type is RPC only.
SetRpcOnly(b bool)
// SetAsyncExecutor returns whether communication type is RPC only.
RpcOnly() bool
}
Handler defines net message handler interface.
var DefaultHandler Handler = NewHandler()
DefaultHandler is the default Handler used by arpc
type HandlerFunc ¶
type HandlerFunc func(*Context)
HandlerFunc defines message handler of arpc middleware and method/router.
type Message ¶
type Message struct {
Buffer []byte
// contains filtered or unexported fields
}
Message represents an arpc Message.
func NewMessage ¶
func NewMessage(cmd byte, method string, v interface{}, isError bool, isAsync bool, seq uint64, h Handler, codec codec.Codec, values map[interface{}]interface{}) *Message
NewMessage creates a Message.
func (*Message) IsFlagBitSet ¶
IsFlagBitSet returns flag bit value.
func (*Message) Release ¶
func (m *Message) Release()
Release decrement the reference count and returns the current value.
func (*Message) ResetAttrs ¶
func (m *Message) ResetAttrs()
ResetAttrs resets reserved/cmd/flag/methodLen to 0.
func (*Message) Set ¶
func (m *Message) Set(key interface{}, value interface{})
Set sets key-value pair.
func (*Message) SetFlagBit ¶
SetFlagBit sets flag bit value by index.
func (*Message) SetMethodLen ¶
SetMethodLen sets method length.
type MessageCoder ¶
type MessageCoder interface {
// Encode wrap message before send to client
Encode(*Client, *Message) *Message
// Decode unwrap message between recv and handle
Decode(*Client, *Message) *Message
}
MessageCoder defines Message coding middleware interface.
type Server ¶
type Server struct {
Accepted int64
CurrLoad int64
MaxLoad int64
Codec codec.Codec
Handler Handler
Listener net.Listener
// contains filtered or unexported fields
}
Server represents an arpc Server.
func (*Server) ForEachWithFilter ¶
func (*Server) NewMessage ¶
NewMessage creates a Message.
type WebsocketConn ¶
type WebsocketConn interface {
HandleWebsocket(func())
}
WebsocketConn defines websocket-conn interface.