bigqueue

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 30, 2020 License: Apache-2.0 Imports: 15 Imported by: 0

README

BigQueue-go

BigQueue-go is pure Golang implementation for big, fast and persistent queue based on memory mapped file. Its file storage structures is totally compatible with [BigQueue](https://github.com/bulldog2011/bigqueue)

Build Status codecov

Feature Highlight:

  1. Fast: close to the speed of direct memory access, both enqueue and dequeue are close to O(1) memory access.
  2. Big: the total size of the queue is only limited by the available disk space.
  3. Persistent: all data in the queue is persisted on disk, and is crash resistant.
  4. Reliable: OS will be responsible to presist the produced messages even your process crashes.
  5. Realtime: messages produced by producer threads will be immediately visible to consumer threads.
  6. Memory-efficient: automatic paging & swapping algorithm, only most-recently accessed data is kept in memory.
  7. Thread-safe: multiple threads can concurrently enqueue and dequeue without data corruption.
  8. Simple&Light : pure Golang implements without any 3rd-party library

Quick Start

Installing

To start using BigQueue-Go, install Go and run go get:

$ go get github.com/jhunters/bigqueue

Importing bigqueue

To use bigqueue as an file implements queue, import as:


import	"github.com/jhunters/bigqueue"

func main() {
	var queue = new(bigqueue.FileQueue)

	err := queue.Open(".", "testqueue", nil)

	if err != nil {
		fmt.Println(err)
	}
	defer queue.Close()
	
	data := []byte("hello xiemalin")
	
	i, err := queue.Enqueue(data)
	if err != nil {
		fmt.Println(err)
	} else {
		fmt.Println("Enqueued index=", i, string(data))
	}
	
	index, bb, err := queue.Dequeue()
	if err != nil {
		fmt.Println(err)
	}
	
	fmt.Println("Dequeue data:", index, string(bb))
}

The Big Picture

design

design

Benmark test

$ go test -bench=. -benchtime=3s -run=^$
goos: linux
goarch: amd64
pkg: github.com/bigqueue
Benchmark_EnqueueOnly-8                  2319403              1479 ns/op
Benchmark_DequeueOnly-8                  4704715               743 ns/op
Benchmark_EnqueueDequeue-8               1536244              2303 ns/op
Benchmark_ParallelEnqueueDequeue-8       1254315              2760 ns/op
PASS
ok      github.com/bigqueue     40.028s

License

BigQueue-Go is Apache 2.0 licensed.

Documentation

Overview

package bigqueue implements is pure Golang implementation for big, fast and persistent queue based on memory mapped file.

Index

Constants

View Source
const (

	// data file size
	DefaultDataPageSize = 128 * 1024 * 1024

	DefaultIndexItemsPerPage = 17

	MaxInt64 = 0x7fffffffffffffff

	IndexFileName = "index"
	DataFileName  = "data"
	MetaFileName  = "meta_data"
	FrontFileName = "front_index"
)

Variables

View Source
var (
	ErrEnqueueDataNull = errors.New("Enqueue data can not be null")

	IndexOutOfBoundTH = errors.New("Index is valid which should between tail and head index")
)

These errors can be returned when opening or calling methods on a DB.

View Source
var DefaultOptions = &Options{
	DataPageSize:      DefaultDataPageSize,
	indexPageSize:     defaultIndexPageSize,
	IndexItemsPerPage: DefaultIndexItemsPerPage,
	itemsPerPage:      defaultItemsPerPage,
	GcLock:            false,
}

Functions

func Assert

func Assert(condition bool, message string, v ...interface{})

_assert will panic with a given formatted message if the given condition is false.

func BytesToInt

func BytesToInt(b []byte) int64

字节转换成整形

func BytesToInt32

func BytesToInt32(b []byte) int32

func GetFileName

func GetFileName(prefix string, suffix string, index int64) string

func GetFiles

func GetFiles(pathname string) (*list.List, error)

func IntToBytes

func IntToBytes(n int64) []byte

整形转换成字节

func Mod

func Mod(val int64, bits int) int64

func PathExists

func PathExists(path string) (bool, error)

func RemoveFiles

func RemoveFiles(pathname string) error

Types

type DB

type DB struct {
	// If you want to read the entire database fast, you can set MmapFlag to
	// syscall.MAP_POPULATE on Linux 2.6.23+ for sequential read-ahead.
	MmapFlags int

	InitialMmapSize int
	// contains filtered or unexported fields
}

DB represents a collection of buckets persisted to a file on disk. All data access is performed through transactions which can be obtained through the DB. All the functions on DB will return a ErrDatabaseNotOpen if accessed before Open() is called.

func (*DB) Close

func (db *DB) Close() error

Close releases all resources.

func (*DB) GoString

func (db *DB) GoString() string

GoString returns the Go string representation of the database.

func (*DB) Open

func (db *DB) Open(mode os.FileMode) error

func (*DB) Path

func (db *DB) Path() string

Path returns the path to currently open database file.

type DBFactory

type DBFactory struct {
	InitialMmapSize int
	// contains filtered or unexported fields
}

func (*DBFactory) Close

func (f *DBFactory) Close() error

type FileQueue

type FileQueue struct {
	// front index of the big queue,
	FrontIndex int64

	// head index of the array, this is the read write barrier.
	// readers can only read items before this index, and writes can write this index or after
	HeadIndex int64

	// tail index of the array,
	// readers can't read items before this tail
	TailIndex int64
	// contains filtered or unexported fields
}

func (*FileQueue) Close

func (q *FileQueue) Close() error

func (*FileQueue) Dequeue

func (q *FileQueue) Dequeue() (int64, []byte, error)

func (*FileQueue) Enqueue

func (q *FileQueue) Enqueue(data []byte) (int64, error)

func (*FileQueue) EnqueueAsync

func (q *FileQueue) EnqueueAsync(data []byte, fn func(int64, error))

func (*FileQueue) Gc

func (q *FileQueue) Gc() error

func (*FileQueue) IsEmpty

func (q *FileQueue) IsEmpty() bool

func (*FileQueue) Open

func (q *FileQueue) Open(dir string, queueName string, options *Options) error

func (*FileQueue) Peek

func (q *FileQueue) Peek() (int64, []byte, error)

func (*FileQueue) Size

func (q *FileQueue) Size() int64

func (*FileQueue) Skip

func (q *FileQueue) Skip(count int64) error

type Options

type Options struct {

	// size in bytes of a data page
	DataPageSize int

	// if true enable write lock on gc action
	GcLock bool

	// the item count is  1 << IndexItemsPerPage
	IndexItemsPerPage int
	// contains filtered or unexported fields
}

type Queue

type Queue interface {
	Open(dir string, queueName string, options *Options) error

	// Determines whether a queue is empty
	// return ture if empty, false otherwise
	IsEmpty() bool

	// return avaiable queue size
	Size() int64

	// Append an item to the queue and return index no
	// if any error ocurres a non-nil error returned
	Enqueue(data []byte) (int64, error)

	EnqueueAsync(data []byte, fn func(int64, error))

	Dequeue() (int64, []byte, error)

	Peek() (int64, []byte, error)

	// To skip deqeue target number of items
	Skip(count int64) error

	Close() error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL