webds

package module
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2020 License: Apache-2.0 Imports: 14 Imported by: 0

README

webds

Distributed System with websocket

设计

  • 整体结构

Server层为核心服务层,通过选举产生一个中心节点.
SubServer层实例不可被选举为中心节点,但可以跨网段部署,其余与server节点等效
client层节点可以连接任意server节点
每个server节点都有维护 superior/lateral master 列表, 用以记录上级和平级的节点.
每个节点最多只有一个实际往出连通的平级或上级的连接,用以维护数据广播的最终一致性
平级连接之间共享 superior/lateral master 列表
出口连接共享 topics 1级订阅列表
出口连接当成一个特殊的入口连接, 出口连接不存在则为整个广播系统最核心的master节点,出口连接断掉则根据选举算法重新确定连接
  • 通信方式

      websocket
    
  • 通信协议

      主要以多级topic设计
      参考iris  序列化成字节流,格式: prefix;target_topic;source_topic;random_tag;type;msg
      source_topic: 记录每一次进行转发广播时的来源id 
      random_tag: 5个byte
      msg: json or protobuf
      保留下列1级topic,其余topic用于分发
          /sys   用于系统指令, 消息不广播, 用以两个节点之间连接维护状态和共享信息
          /inner 用于client 连接的Server直接处理,不进行广播, 响应函数由 conn.On 函数指定
          /self  用以标记消息来源 转发一层添加一层 
                 如 /self/master_id0/master_id1/.../client_id
                 每个 self 下属topic 标志唯一个节点, 加上random_tag用于唯一回复消息
          /srv   用以注册同步类型服务 暂未开发
    
  • 命令行工具设计

      参照ros命令行设计:形如
          - webds topic list/pub/sub
          - webds node list/stop
    

TODO

  • client/go/py/js

  • command tools

  • 分布式,选举产生中心通信节点

  • 服务模式(同步消息机制)

  • 消息广播机制 优化和benchmark

update

  • v0.2.3 修改了消息协议 完善分布式设计思路
  • v0.2.2 修改了OnConnection的回调函数, 若回调函数返回error, 则 conn 直接关闭, 该功能可用于鉴定权限. 优化消息解析代码. 初步开始设计分布式结构.
  • v0.2.1 添加webds node list/stop 指令
  • v0.2.0 基于topic订阅机制初步重构完 server, go.client,command tool
  • v0.1.0 old version just a websocket server

Documentation

Index

Constants

View Source
const (
	// DefaultWebsocketWriteTimeout 0, no timeout
	DefaultWebsocketWriteTimeout = 0
	// DefaultWebsocketReadTimeout 0, no timeout
	DefaultWebsocketReadTimeout = 0
	// DefaultWebsocketPongTimeout 60 * time.Second
	DefaultWebsocketPongTimeout = 60 * time.Second
	// DefaultWebsocketPingPeriod (DefaultPongTimeout * 9) / 10
	DefaultWebsocketPingPeriod = (DefaultWebsocketPongTimeout * 9) / 10
	// DefaultWebsocketMaxMessageSize 1024
	DefaultWebsocketMaxMessageSize = 1024
	// DefaultWebsocketReadBufferSize 4096
	DefaultWebsocketReadBufferSize = 4096
	// DefaultWebsocketWriterBufferSize 4096
	DefaultWebsocketWriterBufferSize = 4096
	// DefaultEvtMessageKey is the default prefix of the underline websocket events
	// that are being established under the hoods.
	//
	// Last character of the prefix should be ':'.
	DefaultEvtMessageKey = "ws"
)
View Source
const (
	Version = "v0.2.3"
)

Variables

View Source
var (
	ErrDuplicatedClient = errors.New("duplicated client")
	ErrID               = errors.New("id is not exist")
	ErrOrigin           = errors.New("error origin")
)

Functions

func DefaultIDGenerator added in v0.2.0

func DefaultIDGenerator(r *http.Request) string

DefaultIDGenerator returns a random unique for a new connection. Used when config.IDGenerator is nil.

Types

type Config added in v0.2.0

type Config struct {
	// IDGenerator used to create (and later on, set)
	// an ID for each incoming websocket connections (clients).
	// The request is an input parameter which you can use to generate the ID (from headers for example).
	// If empty then the ID is generated by DefaultIDGenerator: randomString(64)
	IDGenerator func(r *http.Request) string
	// record the url address of the superior masters
	SuperiorMaster []string
	// record the url address of the lateral masters
	LateralMaster []string
	// EvtMessagePrefix is the prefix of the underline websocket events that are being established under the hoods.
	// This prefix is visible only to the javascript side (code) and it has nothing to do
	// with the message that the end-user receives.
	// Do not change it unless it is absolutely necessary.
	//
	// If empty then defaults to []byte("ws").
	EvtMessagePrefix []byte
	NewConn          func(server *Server, w http.ResponseWriter, r *http.Request) (Connection, error)
	// Error is the function that will be fired if any client couldn't upgrade the HTTP connection
	// to a websocket connection, a handshake error.
	Error func(w http.ResponseWriter, r *http.Request, status int, reason error)
	// CheckOrigin a function that is called right before the handshake,
	// if returns false then that client is not allowed to connect with the websocket server.
	CheckOrigin func(r *http.Request) bool
	// HandshakeTimeout specifies the duration for the handshake to complete.
	HandshakeTimeout time.Duration
	// WriteTimeout time allowed to write a message to the connection.
	// 0 means no timeout.
	// Default value is 0
	WriteTimeout time.Duration
	// ReadTimeout time allowed to read a message from the connection.
	// 0 means no timeout.
	// Default value is 0
	ReadTimeout time.Duration
	// PongTimeout allowed to read the next pong message from the connection.
	// Default value is 60 * time.Second
	PongTimeout time.Duration
	// PingPeriod send ping messages to the connection within this period. Must be less than PongTimeout.
	// Default value is 60 *time.Second
	PingPeriod time.Duration
	// MaxMessageSize max message size allowed from connection.
	// Default value is 1024
	MaxMessageSize int64
	// BinaryMessages set it to true in order to denotes binary data messages instead of utf-8 text
	// compatible if you wanna use the Connection's EmitMessage to send a custom binary data to the client, like a native server-client communication.
	// Default value is false
	BinaryMessages bool
	// ReadBufferSize is the buffer size for the connection reader.
	// Default value is 4096
	ReadBufferSize int
	// WriteBufferSize is the buffer size for the connection writer.
	// Default value is 4096
	WriteBufferSize int
	// EnableCompression specify if the server should attempt to negotiate per
	// message compression (RFC 7692). Setting this value to true does not
	// guarantee that compression will be supported. Currently only "no context
	// takeover" modes are supported.
	//
	// Defaults to false and it should be remain as it is, unless special requirements.
	EnableCompression bool

	// Subprotocols specifies the server's supported protocols in order of
	// preference. If this field is set, then the Upgrade method negotiates a
	// subprotocol by selecting the first match in this list with a protocol
	// requested by the client.
	Subprotocols []string
}

Config the websocket server configuration all of these are optional.

func (Config) Validate added in v0.2.0

func (c Config) Validate() Config

Validate validates the configuration

type Connection added in v0.2.0

type Connection interface {
	// ID returns the connection's identifier
	ID() string

	Request() *http.Request

	// Server returns the websocket server instance
	// which this connection is listening to.
	//
	// Its connection-relative operations are safe for use.
	Server() *Server

	// OnDisconnect registers a callback which is fired when this connection is closed by an error or manual
	OnDisconnect(DisconnectFunc)
	// OnError registers a callback which fires when this connection occurs an error
	OnError(ErrorFunc)
	// OnPing  registers a callback which fires on each ping
	// It does nothing more than firing the OnError listeners. It doesn't send anything to the client.
	FireOnError(err error)
	// To defines on which "topic" the server should send a message
	// returns an Publisher to send messages.
	// Broadcast to any node which subscribe this topic.
	// 发布给订阅主题的所有节点
	Publisher(string) Publisher
	// On registers a callback to a particular topic which is fired when a message to this topic is received
	// just for topic with prefix '/inner'
	// 注册某些话题的回调事件
	On(topic string, callback message.Func)

	// only send to the node of this conn
	Echo(topic string, data interface{}) error

	// subscribe registers this connection to a topic, if it doesn't exist then it creates a new.
	// One topic can have one or more connections. One connection can subscribe many topics.
	// All connections subscribe a topic specified by their `ID` automatically.
	// 代替节点订阅主题, 可以配置节点是否有权限订阅主题
	Subscribe(string)

	IsSubscribe(string) bool
	// Leave removes this connection entry from a room
	// Returns true if the connection has actually left from the particular room.
	CancelSubscribe(string)
	// Wait starts the ping and the messages reader,
	// it's named as "Wait" because it should be called LAST,
	// after the "Subscribe" events IF server's `Upgrade` is used,
	// otherwise you don't have to call it because the `Handler()` does it automatically.
	Wait()
	// Disconnect disconnects the client, close the underline websocket conn and removes it from the conn list
	// returns the error, if any, from the underline connection
	Disconnect(error) error
}

Connection is the front-end API that you will use to communicate with the client side

func NewConn added in v0.2.2

func NewConn(s *Server, w http.ResponseWriter, r *http.Request) (Connection, error)

type ConnectionFunc added in v0.2.0

type ConnectionFunc func(Connection) error

type DisconnectFunc added in v0.2.0

type DisconnectFunc func()

DisconnectFunc is the callback which is fired when a client/connection closed

type ErrorFunc added in v0.2.0

type ErrorFunc func(error)

ErrorFunc is the callback which fires whenever an error occurs

type Publisher added in v0.2.0

type Publisher interface {
	// Pub sends a message on a particular topic
	Pub(interface{}) error
}

Publisher is the message manager

type RowConn added in v0.2.2

type RowConn = connection

type Server added in v0.2.0

type Server struct {
	// contains filtered or unexported fields
}

func New added in v0.2.0

func New(cfg Config) *Server

func (*Server) Broadcast added in v0.2.0

func (s *Server) Broadcast(topic string, msg []byte) error

func (*Server) CancelAll added in v0.2.0

func (s *Server) CancelAll(id string)

func (*Server) CancelSubscribe added in v0.2.0

func (s *Server) CancelSubscribe(topic, id string)

func (*Server) Disconnect added in v0.2.0

func (s *Server) Disconnect(id string) error

func (*Server) GetConnection added in v0.2.0

func (s *Server) GetConnection(id string) *connection

func (*Server) GetConnections added in v0.2.0

func (s *Server) GetConnections() []Connection

func (*Server) GetConnectionsByTopic added in v0.2.0

func (s *Server) GetConnectionsByTopic(topic string) []Connection

func (*Server) IsConnected added in v0.2.0

func (s *Server) IsConnected(id string) bool

func (*Server) IsSubscribe added in v0.2.0

func (s *Server) IsSubscribe(topic string, id string) bool

func (*Server) LenConnections added in v0.2.0

func (s *Server) LenConnections() (n int)

func (*Server) OnConnection added in v0.2.0

func (s *Server) OnConnection(cb ConnectionFunc)

OnConnection 当有新连接生成时触发

func (*Server) Subscribe added in v0.2.0

func (s *Server) Subscribe(topic string, id string)

func (*Server) Upgrade added in v0.2.0

func (s *Server) Upgrade(w http.ResponseWriter, r *http.Request) (Connection, error)

Directories

Path Synopsis
client
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL