Documentation
¶
Overview ¶
package bigqueue implements is pure Golang implementation for big, fast and persistent queue based on memory mapped file.
Index ¶
- Constants
- Variables
- func Assert(condition bool, message string, v ...interface{})
- func BytesToInt(b []byte) int64
- func BytesToInt32(b []byte) int32
- func GetFileName(prefix string, suffix string, index int64) string
- func GetFiles(pathname string) (*list.List, error)
- func IntToBytes(n int64) []byte
- func Mod(val int64, bits int) int64
- func PathExists(path string) (bool, error)
- func RemoveFiles(pathname string) error
- type DB
- type DBFactory
- type FanOutQueue
- type FileFanoutQueue
- func (q *FileFanoutQueue) Close()
- func (q *FileFanoutQueue) Dequeue(fanoutID int64) (int64, []byte, error)
- func (q *FileFanoutQueue) Enqueue(data []byte) (int64, error)
- func (q *FileFanoutQueue) FreeAllSubscribe()
- func (q *FileFanoutQueue) FreeSubscribe(fanoutID int64)
- func (q *FileFanoutQueue) IsEmpty(fanoutID int64) bool
- func (q *FileFanoutQueue) Open(dir string, queueName string, options *Options) error
- func (q *FileFanoutQueue) Peek(fanoutID int64) (int64, []byte, error)
- func (q *FileFanoutQueue) Size(fanoutID int64) int64
- func (q *FileFanoutQueue) Skip(fanoutID int64, count int64) error
- func (q *FileFanoutQueue) Subscribe(fanoutID int64, fn func(int64, []byte, error)) error
- type FileQueue
- func (q *FileQueue) Close() error
- func (q *FileQueue) Dequeue() (int64, []byte, error)
- func (q *FileQueue) Enqueue(data []byte) (int64, error)
- func (q *FileQueue) EnqueueAsync(data []byte, fn func(int64, error))
- func (q *FileQueue) FreeSubscribe()
- func (q *FileQueue) Gc() error
- func (q *FileQueue) IsEmpty() bool
- func (q *FileQueue) Open(dir string, queueName string, options *Options) error
- func (q *FileQueue) Peek() (int64, []byte, error)
- func (q *FileQueue) Size() int64
- func (q *FileQueue) Skip(count int64) error
- func (q *FileQueue) Subscribe(fn func(int64, []byte, error)) error
- type Options
- type Queue
- type QueueFront
Constants ¶
const ( // DefaultDataPageSize data file size DefaultDataPageSize = 128 * 1024 * 1024 // DefaultIndexItemsPerPage items numbers in one page DefaultIndexItemsPerPage = 17 // MaxInt64 max value of int64 MaxInt64 = 0x7fffffffffffffff // IndexFileName file name IndexFileName = "index" // DataFileName file name DataFileName = "data" // MetaFileName file name MetaFileName = "meta_data" // FrontFileName file name FrontFileName = "front_index" )
const (
// FanoutFrontFileName Fanout FrontFileName file name
FanoutFrontFileName = "front_index_"
)
Variables ¶
var ( ErrEnqueueDataNull = errors.New("Enqueue data can not be null") IndexOutOfBoundTH = errors.New("Index is valid which should between tail and head index") // repeat call Subscriber method SubscribeExistErr = errors.New("Subscriber alread set, can not repeat set.") // Subscribe should call after queue Open method SubscribeFailedNoOpenErr = errors.New("Subscriber method only support after queue opened.") )
These errors can be returned when opening or calling methods on a DB.
var DefaultOptions = &Options{ DataPageSize: DefaultDataPageSize, indexPageSize: defaultIndexPageSize, IndexItemsPerPage: DefaultIndexItemsPerPage, itemsPerPage: defaultItemsPerPage, }
DefaultOptions default options
Functions ¶
func Assert ¶
Assert assert will panic with a given formatted message if the given condition is false.
func GetFileName ¶
GetFileName to return joined file name
func PathExists ¶
PathExists to check the target path is exist exist return true otherwise return false
func RemoveFiles ¶
RemoveFiles remove all files from current directory. not include any sub directories
Types ¶
type DB ¶
type DB struct {
// If you want to read the entire database fast, you can set MmapFlag to
// syscall.MAP_POPULATE on Linux 2.6.23+ for sequential read-ahead.
MmapFlags int
InitialMmapSize int
// contains filtered or unexported fields
}
DB represents a collection of buckets persisted to a file on disk. All data access is performed through transactions which can be obtained through the DB. All the functions on DB will return a ErrDatabaseNotOpen if accessed before Open() is called.
type DBFactory ¶
type DBFactory struct {
InitialMmapSize int
// contains filtered or unexported fields
}
DBFactory is used to manupilate mulitple data files by index number
type FanOutQueue ¶ added in v1.0.2
type FanOutQueue interface {
Open(dir string, queueName string, options *Options) error
// IsEmpty Determines whether a queue is empty
//fanoutId queue index
// return ture if empty, false otherwise
IsEmpty(fanoutID int64) bool
// Size return avaiable queue size
Size(fanoutID int64) int64
// Enqueue Append an item to the queue and return index no
// if any error ocurres a non-nil error returned
Enqueue(data []byte) (int64, error)
// EnqueueAsync Append an item to the queue async way
EnqueueAsync(data []byte, fn func(int64, error))
Dequeue(fanoutID int64) (int64, []byte, error)
Peek(fanoutID int64) (int64, []byte, error)
// To skip deqeue target number of items
Skip(fanoutID int64, count int64) error
Close() error
// Set to asynchous subscribe
Subscribe(fanoutID int64, fn func(int64, []byte, error)) error
// to free asynchous subscribe
FreeSubscribe(fanoutID int64)
// FreeAllSubscribe to free all asynchous subscribe
FreeAllSubscribe()
}
FanOutQueue queue supports with pub-sub feature
type FileFanoutQueue ¶ added in v1.0.2
type FileFanoutQueue struct {
// contains filtered or unexported fields
}
FileFanoutQueue file fanout queue implements
func (*FileFanoutQueue) Close ¶ added in v1.0.2
func (q *FileFanoutQueue) Close()
Close free the resource
func (*FileFanoutQueue) Dequeue ¶ added in v1.0.2
func (q *FileFanoutQueue) Dequeue(fanoutID int64) (int64, []byte, error)
Dequeue dequeue data from target fanoutID
func (*FileFanoutQueue) Enqueue ¶ added in v1.0.2
func (q *FileFanoutQueue) Enqueue(data []byte) (int64, error)
Enqueue Append an item to the queue and return index no
func (*FileFanoutQueue) FreeAllSubscribe ¶ added in v1.0.2
func (q *FileFanoutQueue) FreeAllSubscribe()
FreeAllSubscribe to free all subscriber
func (*FileFanoutQueue) FreeSubscribe ¶ added in v1.0.2
func (q *FileFanoutQueue) FreeSubscribe(fanoutID int64)
FreeSubscribe to free subscriber by target fanout id
func (*FileFanoutQueue) IsEmpty ¶ added in v1.0.2
func (q *FileFanoutQueue) IsEmpty(fanoutID int64) bool
IsEmpty test if target fanoutID is empty
func (*FileFanoutQueue) Open ¶ added in v1.0.2
func (q *FileFanoutQueue) Open(dir string, queueName string, options *Options) error
Open the queue files
func (*FileFanoutQueue) Peek ¶ added in v1.0.2
func (q *FileFanoutQueue) Peek(fanoutID int64) (int64, []byte, error)
Peek peek the head item from target fanoutID
func (*FileFanoutQueue) Size ¶ added in v1.0.2
func (q *FileFanoutQueue) Size(fanoutID int64) int64
Size return item size with target fanoutID
type FileQueue ¶
type FileQueue struct {
// contains filtered or unexported fields
}
FileQueue queue implements with mapp file
func (*FileQueue) EnqueueAsync ¶
EnqueueAsync adds an item at the queue and HeadIndex will increase Asynchouous mode will call back with fn function
func (*FileQueue) FreeSubscribe ¶ added in v1.0.2
func (q *FileQueue) FreeSubscribe()
FreeSubscribe free subscriber
func (*FileQueue) Gc ¶
Gc Delete all used data files to free disk space.
BigQueue will persist enqueued data in disk files, these data files will remain even after the data in them has been dequeued later, so your application is responsible to periodically call this method to delete all used data files and free disk space.
func (*FileQueue) Peek ¶
Peek Retrieves the item at the front of a queue if item exist return with index id, item data
type Queue ¶
type Queue interface {
Open(dir string, queueName string, options *Options) error
// Determines whether a queue is empty
// return ture if empty, false otherwise
IsEmpty() bool
// return avaiable queue size
Size() int64
// Append an item to the queue and return index no
// if any error ocurres a non-nil error returned
Enqueue(data []byte) (int64, error)
EnqueueAsync(data []byte, fn func(int64, error))
Dequeue() (int64, []byte, error)
Peek() (int64, []byte, error)
// To skip deqeue target number of items
Skip(count int64) error
Close() error
// Delete all used data files to free disk space.
Gc() error
// Set to asynchous subscribe
Subscribe(fn func(int64, []byte, error)) error
// to free asynchous subscribe
FreeSubscribe()
}
type QueueFront ¶ added in v1.0.2
type QueueFront struct {
// contains filtered or unexported fields
}
QueueFront queue front struct


