wsev

package module
v0.0.0-...-41452ad Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2025 License: MIT Imports: 22 Imported by: 0

README

wsev Go Reference Go Report Card Build Status

An event based websocket server implementation based on epoll, designed for ease of use and high connection concurrency.

Some parts for reading and writing websocket headers have been derrvied from the excellent github.com/gobwas/ws and lightly modified to support buffer reuse.

Features

  • Epoll based websocket handler
  • SO_REUSEPORT for multiple epoll listeners on the same port
  • Pooled write buffers for efficient memory usage
  • Passes the autobahn testsuite
  • Only depends on golang.org/x/sys
  • Detect connection timeouts

Setup

go get github.com/purehyperbole/wsev

Usage

package main

import (
    "log"
    "runtime"

    "github.com/purehyperbole/wsev"
)

func main() {
    h := &wsev.Handler{
        OnConnect: func(conn *wsev.Conn) {
            // client has connected
        },
        OnDisconnect: func(conn *wsev.Conn, err error) {
            // client has disconnected
        },
        OnPing: func(conn *wsev.Conn) {
            // client has sent pong
        },
        OnMessage: func(conn *wsev.Conn, msg []byte) {
            // client has sent a binary/text event
        },
        OnError: func(err error, isFatal bool) {
            // server has experienced an error
        }
    }

    // will start a new websocket server on port 9000
    err := wsev.New(
        h, 
        // the number of listener goroutines that will be started
        // defaults to GOMAXPROCS
        wsev.WithListeners(count int),
        // the deadline that used when reading from sockets that have data
        wsev.WithReadDeadline(time.Millisecond*100),
        // the deadline that data will be flushed to the underlying connection 
        // when the data in the buffer has not exceeded the buffer size
        wsev.WithWriteBufferDeadline(time.Millisecond * 100),
        // the size of the write buffer for a connection. these buffers are 
        // only allocated and used when there is data ready for writing to
        // the connection. Once the buffer has been flushed, the buffer is
        // returned to a pool for reuse. 
        wsev.WithWriteBufferSize(1<<14),
        // sets the size of the read buffer for reading from the connection.
        // a read buffer is allocated per event loop
        wsev.WithReadBufferSize(1<<14),
    ).Serve(9000)

    if err != nil {
        log.Fatal(err)
    }

    runtime.Goexit()
}

Example - Chat Room

package main

import (
    "io"
    "log"
    "net"
    "runtime"
    "sync"

    "github.com/purehyperbole/wsev"
)

func main() {
    // keep a list of all connected members
    var connections []io.Writer
    var lock sync.Mutex

    h := &wsev.Handler{
        OnConnect: func(conn *wsev.Conn) {
            // client has connected, so add it to the list
            lock.Lock()
            defer lock.Unlock()

            connections = append(connections, conn)
        },
        OnDisconnect: func(conn *wsev.Conn, err error) {
            // client has disconnected, so remove it from the list
            lock.Lock()
            defer lock.Unlock()

            for i := len(connections) - 1; i >= 0; i-- {
                if connections[i] == conn {
                    connections = append(connections[:i], connections[i+1:]...)
                }
            }
        },
        OnMessage: func(conn *wsev.Conn, msg []byte) {
            // a message has been recevied, broadcast it to the other connections
            lock.Lock()
            defer lock.Unlock()

            _, err := io.MultiWriter(connections...).Write(msg)
            if err != nil {
            	log.Printf("failed to write to connections: %s", err.Error())
            }
        },
    }

    err := wsev.New(
        h, 
    ).Serve(8000)

    if err != nil {
        log.Fatal(err)
    }

    runtime.Goexit()
}

Development

Running tests:

go test -v -race

Running with autobahn testsuite (requires Docker)

AUTOBAHN=1 go test -v -race

Documentation

Index

Constants

View Source
const (
	HeapRemoved    = -1
	HeapUnassigned = -2
)
View Source
const (
	DefaultBufferSize          = 1 << 14
	DefaultBufferFlushDeadline = time.Millisecond * 50
	DefaultReadDeadline        = time.Second
)

Variables

View Source
var (
	ErrHeaderLengthMSB        = fmt.Errorf("header error: the most significant bit must be 0")
	ErrHeaderLengthUnexpected = fmt.Errorf("header error: unexpected payload length bits")
)
View Source
var (
	ErrDataNeeded              = errors.New("not enough data to continue")
	ErrConnectionAlreadyClosed = errors.New("connection already closed")
	ErrConnectionTimeout       = &closeError{
		Status: CloseStatusAbnormalClosure,
		Reason: "connection timeout",
	}
	ErrInvalidRSVBits error = &closeError{
		Status: CloseStatusProtocolError,
		Reason: "non-zero rsv bits not supported",
	}
	ErrInvalidOpCode error = &closeError{
		Status: CloseStatusProtocolError,
		Reason: "invalid or reserved op code",
	}
	ErrInvalidContinuation error = &closeError{
		Status: CloseStatusProtocolError,
		Reason: "invalid continuation",
	}
	ErrInvalidPayloadEncoding error = &closeError{
		Status:    CloseStatusInvalidFramePayloadData,
		Reason:    "close frame reason invalid",
		immediate: true,
	}
	ErrInvalidControlLength error = &closeError{
		Status:    CloseStatusProtocolError,
		Reason:    "control frame payload exceeds 125 bytes",
		immediate: true,
		shutdown:  true,
	}
	ErrInvalidCloseReason error = &closeError{
		Status:    CloseStatusProtocolError,
		Reason:    "close frame reason invalid",
		immediate: true,
		shutdown:  true,
	}
	ErrInvalidCloseEncoding error = &closeError{
		Status:    CloseStatusInvalidFramePayloadData,
		Reason:    "close frame reason invalid",
		immediate: true,
		shutdown:  true,
	}
)

Functions

func WithListeners

func WithListeners(count int) option

WithListeners sets the number of listeners, defaults to GOMAXPROCS

func WithReadBufferSize

func WithReadBufferSize(size int) option

WithReadBufferSize sets the size of read buffer used for a connection

func WithReadDeadline

func WithReadDeadline(deadline time.Duration) option

WithReadDeadline sets the read deadline option when reading from sockets that have data available

func WithWriteBufferDeadline

func WithWriteBufferDeadline(deadline time.Duration) option

WithWriteBufferDeadline sets the timeout for flushing the buffer to the underlying connection

func WithWriteBufferSize

func WithWriteBufferSize(size int) option

WithWriteBufferSize sets the size of write buffer used for a connection when the buffer exceeds this size, it will be flushed

Types

type CloseStatus

type CloseStatus uint64
const (
	CloseStatusNormalClosure           CloseStatus = 1000
	CloseStatusGoingAway               CloseStatus = 1001
	CloseStatusProtocolError           CloseStatus = 1002
	CloseStatusUnsupportedData         CloseStatus = 1003
	CloseStatusNoStatusReceived        CloseStatus = 1005
	CloseStatusAbnormalClosure         CloseStatus = 1006
	CloseStatusInvalidFramePayloadData CloseStatus = 1007
	CloseStatusPolicyViolation         CloseStatus = 1008
	CloseStatusMessageTooBig           CloseStatus = 1009
	CloseStatusMandatoryExtension      CloseStatus = 1010
	CloseStatusInternalServerErr       CloseStatus = 1011
	CloseStatusServiceRestart          CloseStatus = 1012
	CloseStatusTryAgainLater           CloseStatus = 1013
	CloseStatusTLSHandshake            CloseStatus = 1015
)

type Conn

type Conn struct {
	net.Conn
	// contains filtered or unexported fields
}

A wrapped net.Conn with on demand buffers that implements the net.Conn interface

func (*Conn) CloseImmediatelyWith

func (c *Conn) CloseImmediatelyWith(status CloseStatus, reason string, disconnect bool) error

CloseImmediatelyWith sends close frame to the connection immediately, discarding any buffered state. if disconnect is specified as true, the underlying connection will be closed immediately

func (*Conn) CloseWith

func (c *Conn) CloseWith(status CloseStatus, reason string, disconnect bool) error

CloseWith writes all existing buffered state and sends close frame to the connection. if disconnect is specified as true, the underlying connection will be closed immediately

func (*Conn) Get

func (c *Conn) Get() any

Get gets a user specified value

func (*Conn) Set

func (c *Conn) Set(value any)

Set sets a user specified value

func (*Conn) Write

func (c *Conn) Write(b []byte) (int, error)

Write writes binary data to the connection via a buffer Write can be made to time out and return an error after a fixed time limit; see SetDeadline and SetWriteDeadline.

func (*Conn) WriteText

func (c *Conn) WriteText(s string) (int, error)

WriteText writes text data to the connection via a buffer Write can be made to time out and return an error after a fixed time limit; see SetDeadline and SetWriteDeadline.

type Handler

type Handler struct {
	// OnConnect is invoked upon a new connection
	OnConnect func(conn *Conn)
	// OnDisconnect is invoked when an existing connection disconnects
	OnDisconnect func(conn *Conn, err error)
	// OnPing is invoked when a connection sends a websocket ping frame
	OnPing func(conn *Conn)
	// OnPong is invoked when a connection sends a websocket pong frame
	OnPong func(conn *Conn)
	// OnMessage is invoked when a connection sends either a text or
	// binary message. the msg buffer that is passed is only safe for
	// use until the callback returns
	OnMessage func(conn *Conn, msg []byte)
	// OnBinary is invoked when a connection sends a binary message.
	// the msg buffer that is passed is only safe for use until the
	// callback returns
	OnBinary func(conn *Conn, msg []byte)
	// OnText is invoked when a connection sends a text message.
	// the msg buffer that is passed is only safe for use until the
	// callback returns
	OnText func(conn *Conn, msg string)
	// OnError is invoked when an error occurs
	OnError func(err error, isFatal bool)
}

type Server

type Server struct {
	// contains filtered or unexported fields
}

func New

func New(handler *Handler, opts ...option) *Server

func (*Server) Close

func (s *Server) Close() error

Close closes all of the http servers

func (*Server) Serve

func (s *Server) Serve(port int) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL