znet

package module
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 14, 2022 License: GPL-3.0 Imports: 14 Imported by: 1

README

znet

golang powerful network framework

Features

  • High-performance Event-Loop under multi-threads model
  • Supporting multiple protocols: TCP,Websocket
  • Supporting reactor event-notification mechanisms: epoll in Linux/Windows and kqueue in FreeBSD
  • Supporting safe goroutines worker pool
  • Supporting two contentType: JSON/Protobuf
  • Supporting router service for different operate and handle functions

Quick start

  • install
go get -u github.com/ebar-go/znet
  • go run server.go
// server.go
package main

import (
	"github.com/ebar-go/ego/utils/runtime/signal"
	"github.com/ebar-go/znet"
)

const(
	OperateFoo = 1 // define a foo operate
)

func main() {
	// new instance
	instance := znet.New()

	// listen tcp and websocket
	instance.ListenTCP(":8081")
	instance.ListenWebsocket(":8082")
    
	// register a router for some operate
	instance.Router().Route(OperateFoo, func(ctx *znet.Context) (any, error) {
		// return response to the client
		return map[string]any{"val": "bar"}, nil
	})
	
	// run the instance, graceful stop by ctrl-c.
	instance.Run(signal.SetupSignalHandler())
}

  • go run client.go
// client.go
package main

import (
  "github.com/ebar-go/znet/codec"
  "log"
  "net"
  "time"
)

func main() {
  conn, err := net.Dial("tcp", "localhost:8081")
  if err != nil {
    log.Fatalf("unexpected error: %v", err)
  }

  decoder := codec.NewDecoder(4)

  go func() {
    for {
      bytes, err := decoder.Decode(conn)
      if err != nil {
        return
      }
      log.Println("receive response: ", string(bytes[:n]))
    }
  }()

  packet := codec.NewPacket(codec.NewJsonCodec())
  packet.Action = 1
  packet.Seq = 1

  _ = packet.Marshal(map[string]interface{}{"foo": "bar"})
  bytes, _ := packet.Pack()
  
  for {
    n, err := decoder.Encode(conn, bytes)
    if err != nil {
      break
    }
    log.Println("send message length=", n)
    time.Sleep(time.Second)
  }
}

Architecture

  • Framework Framework
  • Engine Start Sequence Diagram
    Sequence Diagram

Protocol

  • We design the protocol for solve TCP sticky packet problem
  • ByteOrder: big endian
|-------------- header --------------|-------- body --------|
|packetLength| action |      seq     |-------- body --------|
|     4      |    2   |       2      |          n           |

Benchmark

goos: linux
goarch: amd64
pkg: github.com/ebar-go/znet
cpu: Intel(R) Core(TM) i7-9700 CPU @ 3.00GHz

|-----------------------------------|
| connections  |  memory |    cpu   |
|-----------------------------------|
|     10000    |   50MB  |          |
|-----------------------------------|

Documentation

Index

Constants

View Source
const (
	ContentTypeJson  = "json"
	ContentTypeProto = "protobuf"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Action added in v0.1.3

type Action[Request, Response any] func(ctx *Context, request *Request) (*Response, error)

Action isa generic function that is friendly to user

type Callback

type Callback struct {
	// contains filtered or unexported fields
}

Callback manage connection callback handlers.

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection represents client connection

func NewConnection

func NewConnection(conn net.Conn, fd int) *Connection

NewConnection returns a new Connection instance

func (*Connection) AddBeforeCloseHook

func (conn *Connection) AddBeforeCloseHook(hooks ...func(conn *Connection))

AddBeforeCloseHook adds a hook to the connection before closed

func (*Connection) Close

func (conn *Connection) Close()

Close closes the connection

func (*Connection) ID

func (conn *Connection) ID() string

ID returns the uuid of the connection

func (*Connection) IP added in v0.1.4

func (conn *Connection) IP() string

func (*Connection) Property

func (conn *Connection) Property() *internal.Container[string, any]

Property return properties container

func (*Connection) Push

func (conn *Connection) Push(p []byte)

Push send message to the connection

func (*Connection) Read

func (conn *Connection) Read(p []byte) (int, error)

Read reads message from the connection

func (*Connection) Write

func (conn *Connection) Write(p []byte) (int, error)

Write writes message to the connection

type ConnectionHandler

type ConnectionHandler func(conn *Connection)

ConnectionHandler represents a connection handler

type Context

type Context struct {
	context.Context
	// contains filtered or unexported fields
}

Context represents a context for request

func (*Context) Abort

func (ctx *Context) Abort()

Abort stop invoke handler

func (*Context) Conn

func (ctx *Context) Conn() *Connection

Conn return instance of Connection

func (*Context) Next

func (ctx *Context) Next()

Next invoke next handler

func (*Context) Packet added in v0.1.4

func (ctx *Context) Packet() *codec.Packet

type ContextEngine added in v0.1.5

type ContextEngine struct {
	// contains filtered or unexported fields
}

func NewContextEngine added in v0.1.5

func NewContextEngine() *ContextEngine

func (*ContextEngine) AcquireAndResetContext added in v0.1.5

func (e *ContextEngine) AcquireAndResetContext(conn *Connection, packet *codec.Packet) *Context

func (*ContextEngine) AcquireContext added in v0.1.5

func (e *ContextEngine) AcquireContext() *Context

func (*ContextEngine) ReleaseContext added in v0.1.5

func (e *ContextEngine) ReleaseContext(ctx *Context)

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine implements of Instance interface

func (*Engine) ListenTCP added in v0.1.4

func (eng *Engine) ListenTCP(addr string)

ListenTCP listens for tcp connections

func (*Engine) ListenWebsocket added in v0.1.4

func (eng *Engine) ListenWebsocket(addr string)

ListenWebsocket listens for websocket connections

func (*Engine) Router added in v0.1.4

func (eng *Engine) Router() *Router

Router return instance of Router

func (*Engine) Run added in v0.1.4

func (eng *Engine) Run(stopCh <-chan struct{}) error

Run starts the event-loop

type HandleFunc

type HandleFunc func(ctx *Context)

HandleFunc represents a handler function for Context

type Handler

type Handler func(ctx *Context) (any, error)

Handler is a handler for operation

func StandardHandler

func StandardHandler[Request, Response any](action Action[Request, Response]) Handler

StandardHandler is a function to convert standard handler.

type Instance

type Instance interface {
	// Router return the router instance
	Router() *Router

	// ListenTCP listen tcp server
	ListenTCP(addr string)

	// ListenWebsocket listen websocket server
	ListenWebsocket(addr string)

	// Run runs the instance with the given signal handler
	Run(stopCh <-chan struct{}) error
}

Instance represents an eng interface

func New

func New(setters ...Option) Instance

New returns a new eng instance

type Option

type Option func(options *Options)

type Options

type Options struct {
	// Debug enables debug logging
	Debug bool
	// OnOpen is a callback function that is called when the connection is established
	OnOpen ConnectionHandler

	// OnClose is a callback function that is called when the connection is closed
	OnClose ConnectionHandler

	// Middlewares is a lot of callback functions that are called when the connection send new message
	Middlewares []HandleFunc

	Reactor ReactorOptions

	Thread ThreadOptions
}

Options represents app options

func (*Options) NewReactorOrDie added in v0.1.4

func (options *Options) NewReactorOrDie() *Reactor

func (*Options) NewRouter added in v0.1.3

func (options *Options) NewRouter() *Router

func (*Options) NewThread added in v0.1.3

func (options *Options) NewThread() *Thread

func (*Options) Validate added in v0.1.1

func (options *Options) Validate() error

Validate validates the options parameter

type Reactor

type Reactor struct {
	// contains filtered or unexported fields
}

Reactor represents the epoll model for listen connections.

func NewReactor added in v0.1.4

func NewReactor(options ReactorOptions) (reactor *Reactor, err error)

NewReactor return a new main reactor instance

func (*Reactor) Run

func (reactor *Reactor) Run(stopCh <-chan struct{}, onRequest ConnectionHandler)

Run runs the Reactor with the given signal.

type ReactorOptions

type ReactorOptions struct {
	// EpollBufferSize is the size of the active connections in every duration,default is 100
	EpollBufferSize int

	// ThreadQueueCapacity is the cap of the thread queue, default is 100
	ThreadQueueCapacity int

	// SubReactorShardCount is the number of sub-reactor shards, default is 32
	// if the parameter is zero, the number of sub-reactor will be 1
	SubReactorShardCount int
}

ReactorOptions represents the options for the reactor

func (ReactorOptions) NewSubReactor added in v0.1.5

func (options ReactorOptions) NewSubReactor() SubReactor

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router represents router instance

func NewRouter

func NewRouter() *Router

func (*Router) OnError

func (router *Router) OnError(handler func(ctx *Context, err error)) *Router

OnError is called when an error is encountered while processing a request

func (*Router) OnNotFound

func (router *Router) OnNotFound(handler HandleFunc) *Router

OnNotFound is called when operation is not found

func (*Router) Route

func (router *Router) Route(operate int16, handler Handler) *Router

Route register handler for operate

type ShardSubReactor

type ShardSubReactor struct {
	// contains filtered or unexported fields
}

func NewShardSubReactor

func NewShardSubReactor(shardCount, bufferSize int) *ShardSubReactor

func (*ShardSubReactor) GetConnection

func (shard *ShardSubReactor) GetConnection(fd int) *Connection

func (*ShardSubReactor) Offer

func (shard *ShardSubReactor) Offer(fds ...int)

func (*ShardSubReactor) Polling

func (shard *ShardSubReactor) Polling(stopCh <-chan struct{}, callback func(int))

func (*ShardSubReactor) RegisterConnection

func (shard *ShardSubReactor) RegisterConnection(conn *Connection)

func (*ShardSubReactor) UnregisterConnection

func (shard *ShardSubReactor) UnregisterConnection(conn *Connection)

type SingleSubReactor added in v0.1.3

type SingleSubReactor struct {
	// contains filtered or unexported fields
}

SingleSubReactor represents sub reactor

func NewSingleSubReactor added in v0.1.3

func NewSingleSubReactor(bufferSize int) *SingleSubReactor

NewSingleSubReactor creates an instance of a SingleSubReactor

func (*SingleSubReactor) GetConnection added in v0.1.3

func (sub *SingleSubReactor) GetConnection(fd int) *Connection

GetConnection returns a connection by fd

func (*SingleSubReactor) Offer added in v0.1.3

func (sub *SingleSubReactor) Offer(fds ...int)

Offer push the active connections fd to the queue

func (*SingleSubReactor) Polling added in v0.1.3

func (sub *SingleSubReactor) Polling(stopCh <-chan struct{}, callback func(int))

Polling poll with callback function

func (*SingleSubReactor) RegisterConnection added in v0.1.3

func (sub *SingleSubReactor) RegisterConnection(conn *Connection)

RegisterConnection registers a new connection to the epoll listener

func (*SingleSubReactor) UnregisterConnection added in v0.1.3

func (sub *SingleSubReactor) UnregisterConnection(conn *Connection)

UnregisterConnection removes the connection from the epoll listener

type SubReactor

type SubReactor interface {
	RegisterConnection(conn *Connection)
	UnregisterConnection(conn *Connection)
	GetConnection(fd int) *Connection
	Offer(fds ...int)
	Polling(stopCh <-chan struct{}, callback func(int))
}

type Thread added in v0.1.3

type Thread struct {
	// contains filtered or unexported fields
}

Thread represents context manager

func NewThread added in v0.1.3

func NewThread(options ThreadOptions) *Thread

NewThread returns a new Thread instance

func (*Thread) HandleRequest added in v0.1.3

func (thread *Thread) HandleRequest(conn *Connection)

HandleRequest handle new request for connection

func (*Thread) Use added in v0.1.3

func (thread *Thread) Use(handler ...HandleFunc)

Use registers middleware

type ThreadOptions added in v0.1.3

type ThreadOptions struct {
	// WorkerPollSize is the size of the worker pool, default is 1000
	WorkerPoolSize int
	// MaxReadBufferSize is the size of the max read buffer, default is 512
	MaxReadBufferSize int

	ContentType string
	// contains filtered or unexported fields
}

func (ThreadOptions) NewCodec added in v0.1.5

func (options ThreadOptions) NewCodec() (cc codec.Codec)

Directories

Path Synopsis
examples
chat module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL