znet

package module
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2022 License: GPL-3.0 Imports: 14 Imported by: 1

README

znet

golang powerful network framework

Features

  • High-performance Event-Loop under multi-threads model
  • Supporting multiple protocols: TCP,Websocket
  • Supporting reactor event-notification mechanisms: epoll in Linux/Windows and kqueue in FreeBSD
  • Supporting safe goroutines worker pool
  • Supporting two contentType: JSON/Protobuf
  • Supporting router service for different operate and handle functions

Quick start

  • install
go get -u github.com/ebar-go/znet
  • go run server.go
// server.go
package main

import (
	"github.com/ebar-go/ego/utils/runtime/signal"
	"github.com/ebar-go/znet"
)

const(
	OperateFoo = 1 // define a foo operate
)

func main() {
	// new instance
	instance := znet.New()

	// listen tcp and websocket
	instance.ListenTCP(":8081")
	instance.ListenWebsocket(":8082")
    
	// register a router for some operate
	instance.Router().Route(OperateFoo, func(ctx *znet.Context) (any, error) {
		// return response to the client
		return map[string]any{"val": "bar"}, nil
	})
	
	// run the instance, graceful stop by ctrl-c.
	instance.Run(signal.SetupSignalHandler())
}

  • go run client.go
// client.go
package main

import (
	"context"
	"github.com/ebar-go/znet/codec"
	"log"
	"net"
	"time"
)

func main() {
	conn, err := net.Dial("tcp", "localhost:8081")
	if err != nil {
		t.Fatalf("unexpected error: %v", err)
	}

	go func() {
		for {
			bytes := make([]byte, 512)
			n, err := conn.Read(bytes)
			if err != nil {
				return
			}
			log.Println("receive response: ", string(bytes[:n]))
		}
	}()

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
	defer cancel()
	go func() {
		for {
			select {
			case <-ctx.Done():
				return
			default:
				
			}
			packet := codec.Factory().NewWithHeader(codec.Header{Operate: OperateFoo, ContentType: codec.ContentTypeJSON})
			bytes, err := packet.Pack(map[string]any{"key": "foo"})
			if err != nil {
				return
			}
			conn.Write(bytes)
			time.Sleep(time.Second)

		}
	}()
	<-ctx.Done()
}

Architecture

  • Framework Framework
  • Engine Start Sequence Diagram
    Sequence Diagram

Protocol

  • We design the protocol for solve TCP sticky packet problem
  • ByteOrder: big endian
|-------------- header ------------- |-------- body --------|
|packetLength|operate|contentType|seq|-------- body --------|
|     4      |   2   |      2    | 2 |          n           |

Benchmark

goos: linux
goarch: amd64
pkg: github.com/ebar-go/znet
cpu: Intel(R) Core(TM) i7-9700 CPU @ 3.00GHz

|-----------------------------------|
| connections  |  memory |    cpu   |
|-----------------------------------|
|     10000    |   50MB  |          |
|-----------------------------------|

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Action added in v0.1.3

type Action[Request, Response any] func(ctx *Context, request *Request) (*Response, error)

Action isa generic function that is friendly to user

type Callback

type Callback struct {
	// contains filtered or unexported fields
}

Callback manage connection callback handlers.

func (*Callback) OnConnect

func (callback *Callback) OnConnect(conn *Connection)

OnConnect is called when the connection is established

func (*Callback) OnDisconnect

func (callback *Callback) OnDisconnect(conn *Connection)

OnDisconnect is called when the connection is closed

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection represents client connection

func NewConnection

func NewConnection(conn net.Conn, fd int) *Connection

NewConnection returns a new Connection instance

func (*Connection) AddBeforeCloseHook

func (conn *Connection) AddBeforeCloseHook(hooks ...func(conn *Connection))

AddBeforeCloseHook adds a hook to the connection before closed

func (*Connection) Close

func (conn *Connection) Close()

Close closes the connection

func (*Connection) ID

func (conn *Connection) ID() string

ID returns the uuid of the connection

func (*Connection) IP added in v0.1.4

func (conn *Connection) IP() string

func (*Connection) Property

func (conn *Connection) Property() *internal.Container[string, any]

Property return properties container

func (*Connection) Push

func (conn *Connection) Push(p []byte)

Push send message to the connection

func (*Connection) Read

func (conn *Connection) Read(p []byte) (int, error)

Read reads message from the connection

func (*Connection) Write

func (conn *Connection) Write(p []byte) (int, error)

Write writes message to the connection

type ConnectionHandler

type ConnectionHandler func(conn *Connection)

ConnectionHandler represents a connection handler

type Context

type Context struct {
	context.Context
	// contains filtered or unexported fields
}

Context represents a context for request

func (*Context) Abort

func (ctx *Context) Abort()

Abort stop invoke handler

func (*Context) Conn

func (ctx *Context) Conn() *Connection

Conn return instance of Connection

func (*Context) Next

func (ctx *Context) Next()

Next invoke next handler

func (*Context) Packet added in v0.1.4

func (ctx *Context) Packet() *codec.Packet

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

func (*Engine) ListenTCP added in v0.1.4

func (eng *Engine) ListenTCP(addr string)

ListenTCP listens for tcp connections

func (*Engine) ListenWebsocket added in v0.1.4

func (eng *Engine) ListenWebsocket(addr string)

ListenWebsocket listens for websocket connections

func (*Engine) Router added in v0.1.4

func (eng *Engine) Router() *Router

Router return instance of Router

func (*Engine) Run added in v0.1.4

func (eng *Engine) Run(stopCh <-chan struct{}) error

Run starts the event-loop

type HandleFunc

type HandleFunc func(ctx *Context)

HandleFunc represents a handler function for Context

type Handler

type Handler func(ctx *Context) (any, error)

Handler is a handler for operation

func StandardHandler

func StandardHandler[Request, Response any](action Action[Request, Response]) Handler

StandardHandler is a function to convert standard handler.

type Instance

type Instance interface {
	// Router return the router instance
	Router() *Router

	// ListenTCP listen tcp server
	ListenTCP(addr string)

	// ListenWebsocket listen websocket server
	ListenWebsocket(addr string)

	// Run runs the instance with the given signal handler
	Run(stopCh <-chan struct{}) error
}

Instance represents an eng interface

func New

func New(setters ...Option) Instance

New returns a new eng instance

type Option

type Option func(options *Options)

type Options

type Options struct {
	// Debug enables debug logging
	Debug bool
	// OnConnect is a callback function that is called when the connection is established
	OnConnect ConnectionHandler

	// OnDisconnect is a callback function that is called when the connection is closed
	OnDisconnect ConnectionHandler

	// Middlewares is a lot of callback functions that are called when the connection send new message
	Middlewares []HandleFunc

	Reactor ReactorOptions

	Thread ThreadOptions
}

Options represents app options

func (*Options) NewReactorOrDie added in v0.1.4

func (options *Options) NewReactorOrDie() *Reactor

func (*Options) NewRouter added in v0.1.3

func (options *Options) NewRouter() *Router

func (*Options) NewThread added in v0.1.3

func (options *Options) NewThread() *Thread

func (*Options) Validate added in v0.1.1

func (options *Options) Validate() error

Validate validates the options parameter

type Reactor

type Reactor struct {
	// contains filtered or unexported fields
}

Reactor represents the epoll model for listen connections.

func NewReactor added in v0.1.4

func NewReactor(options ReactorOptions) (*Reactor, error)

NewReactor return a new main reactor instance

func (*Reactor) Run

func (reactor *Reactor) Run(stopCh <-chan struct{}, onRequest ConnectionHandler)

Run runs the Reactor with the given signal.

type ReactorOptions

type ReactorOptions struct {
	// EpollBufferSize is the size of the active connections in every duration,default is 100
	EpollBufferSize int

	// ThreadQueueCapacity is the cap of the thread queue, default is 100
	ThreadQueueCapacity int

	// SubReactorShardCount is the number of sub-reactor shards, default is 32
	// if the parameter is zero, the number of sub-reactor will be 1
	SubReactorShardCount int
}

ReactorOptions represents the options for the reactor

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router represents router instance

func NewRouter

func NewRouter() *Router

func (*Router) OnError

func (router *Router) OnError(handler func(ctx *Context, err error)) *Router

OnError is called when an error is encountered while processing a request

func (*Router) OnNotFound

func (router *Router) OnNotFound(handler HandleFunc) *Router

OnNotFound is called when operation is not found

func (*Router) Route

func (router *Router) Route(operate int16, handler Handler) *Router

Route register handler for operate

type ShardSubReactor

type ShardSubReactor struct {
	// contains filtered or unexported fields
}

func NewShardSubReactor

func NewShardSubReactor(shardCount, bufferSize int) *ShardSubReactor

func (*ShardSubReactor) GetConnection

func (shard *ShardSubReactor) GetConnection(fd int) *Connection

func (*ShardSubReactor) Offer

func (shard *ShardSubReactor) Offer(fds ...int)

func (*ShardSubReactor) Polling

func (shard *ShardSubReactor) Polling(stopCh <-chan struct{}, callback func(int))

func (*ShardSubReactor) RegisterConnection

func (shard *ShardSubReactor) RegisterConnection(conn *Connection)

func (*ShardSubReactor) UnregisterConnection

func (shard *ShardSubReactor) UnregisterConnection(conn *Connection)

type SingleSubReactor added in v0.1.3

type SingleSubReactor struct {
	// contains filtered or unexported fields
}

SingleSubReactor represents sub reactor

func NewSingleSubReactor added in v0.1.3

func NewSingleSubReactor(bufferSize int) *SingleSubReactor

NewSingleSubReactor creates an instance of a SingleSubReactor

func (*SingleSubReactor) GetConnection added in v0.1.3

func (sub *SingleSubReactor) GetConnection(fd int) *Connection

GetConnection returns a connection by fd

func (*SingleSubReactor) Offer added in v0.1.3

func (sub *SingleSubReactor) Offer(fds ...int)

Offer push the active connections fd to the queue

func (*SingleSubReactor) Polling added in v0.1.3

func (sub *SingleSubReactor) Polling(stopCh <-chan struct{}, callback func(int))

Polling poll with callback function

func (*SingleSubReactor) RegisterConnection added in v0.1.3

func (sub *SingleSubReactor) RegisterConnection(conn *Connection)

RegisterConnection registers a new connection to the epoll listener

func (*SingleSubReactor) UnregisterConnection added in v0.1.3

func (sub *SingleSubReactor) UnregisterConnection(conn *Connection)

UnregisterConnection removes the connection from the epoll listener

type SubReactor

type SubReactor interface {
	RegisterConnection(conn *Connection)
	UnregisterConnection(conn *Connection)
	GetConnection(fd int) *Connection
	Offer(fds ...int)
	Polling(stopCh <-chan struct{}, callback func(int))
}

type Thread added in v0.1.3

type Thread struct {
	// contains filtered or unexported fields
}

Thread represents context manager

func NewThread added in v0.1.3

func NewThread(options ThreadOptions) *Thread

NewThread returns a new Thread instance

func (*Thread) HandleRequest added in v0.1.3

func (e *Thread) HandleRequest(conn *Connection)

HandleRequest handle new request for connection

func (*Thread) Use added in v0.1.3

func (e *Thread) Use(handler ...HandleFunc)

Use registers middleware

type ThreadOptions added in v0.1.3

type ThreadOptions struct {
	// WorkerPollSize is the size of the worker pool, default is 1000
	WorkerPoolSize int
	// MaxReadBufferSize is the size of the max read buffer, default is 512
	MaxReadBufferSize int
	// contains filtered or unexported fields
}

Directories

Path Synopsis
examples
chat module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL