diskcache

package
v2.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2026 License: MIT Imports: 19 Imported by: 0

README

diskcache

diskcache 是一种类似 wal 的磁盘缓存,它有如下特性:

  • 支持并行读写
  • 支持分片大小控制
  • 支持单条数据大小控制
  • 支持磁盘大小控制(FIFO)

限制:

  • 不支持随机读取,只支持按照 FIFO 的顺序来消费数据

实现算法

Always put data to this file.
 |
 |   
 v   
data 
 |    Rotate: if `data` full, move to tail[data.0000000(n+1)]
 `-----------------------------------------------------------.
                                                             |
| data.00000000 | data.00000001 | data.00000002 | ....  <----`
      ^
      `----------------- Always read from this file(the file with smallest number)
  • 当前正在写入的文件 data 不会实时消费,如果最近没有写入(3s),读取操作会将 data rotate 一下并消费
  • 数据从 data.00000001 处开始消费(Get),如果队列上没有可消费的数据,Get 操作将返回 ErrEOF
  • data 写满之后,将会在队列尾部追加一个新的文件,并重新创建 data 写入

使用

以下是基本的使用方式:

import "github.com/GuanceCloud/diskcache"

// Create new cache under /some/path
c, err := diskcache.Open(WithPath("/some/path"))

// Create new cache under /some/path, set batch size to 4MB
c, err := diskcache.Open(WithPath("/some/path"), WithBatchSize(4*1024*1024))

// Create new cache under /some/path, set cache capacity to 1GB
c, err := diskcache.Open(WithPath("/some/path"), WithCapacity(1024*1024*1024))

if err != nil {
	log.Printf(err)
	return
}

// Put data to
data := []byte("my app data...")
if err := c.Put(data); err != nil {
	log.Printf(err)
	return
}

if err := c.Get(func(x []byte) error {
	// Do something with the cached data...
	return nil
	}); err != nil {
	log.Printf(err)
	return
}

// get cache metrics
m := c.Metrics()
log.Println(m.LineProto()) // get line-protocol format of metrics

这种方式可以直接以并行的方式来使用,调用方无需针对这里的 diskcache 对象 c 做互斥处理。

通过 ENV 控制缓存 option

支持通过如下环境变量来覆盖默认的缓存配置:

环境变量 单位 描述
ENV_DISKCACHE_BATCH_SIZE byte 设置单个磁盘文件大小,单位字节,默认 64MB
ENV_DISKCACHE_MAX_DATA_SIZE byte 限制单次写入的字节大小,避免意料之外的巨量数据写入,单位字节,默认不限制
ENV_DISKCACHE_CAPACITY byte 限制缓存能使用的磁盘上限,一旦用量超过该限制,老数据将被移除掉。默认不限制
ENV_DISKCACHE_NO_SYNC N/A 禁用磁盘写入的 sync 同步,默认不开启。一旦开启,可能导致磁盘数据丢失问题
ENV_DISKCACHE_NO_LOCK N/A 禁用文件目录夹锁。默认是加锁状态,一旦不加锁,在同一个目录多开(Open)可能导致文件混乱
ENV_DISKCACHE_NO_POS N/A 禁用磁盘写入位置记录,默认带有位置记录。一旦不记录,程序重启会导致部分数据重复消费(Get
ENV_DISKCACHE_NO_FALLBACK_ON_ERROR N/A 禁用错误回退机制

Prometheus 指标

所有指标可选的 label 列表如下:

label 取值 说明
no_fallback_on_error true/false 是否关闭错误回退(即禁止 Get() 回调失败时,再次读到老的数据)
no_lock true/false 是否关闭加锁功能(即允许一个 cache 目录同时被多次 Open()
no_pos true/false 是否关闭 pos 功能
no_sync true/false 是否关闭同步写入功能
path cache 所在磁盘目录 cache 所在磁盘目录

指标列表如下:

TYPE NAME LABELS HELP
SUMMARY diskcache_dropped_data path,reason Dropped data during Put() when capacity reached.
COUNTER diskcache_rotate_total path Cache rotate count, mean file rotate from data to data.0000xxx
COUNTER diskcache_remove_total path Removed file count, if some file read EOF, remove it from un-read list
COUNTER diskcache_wakeup_total path Wakeup count on sleeping write file
COUNTER diskcache_seek_back_total path Seek back when Get() got any error
GAUGE diskcache_capacity path Current capacity(in bytes)
GAUGE diskcache_max_data path Max data to Put(in bytes), default 0
GAUGE diskcache_batch_size path Data file size(in bytes)
GAUGE diskcache_size path Current cache size(in bytes)
GAUGE diskcache_open_time no_fallback_on_error,no_lock,no_pos,no_sync,path Current cache Open time in unix timestamp(second)
GAUGE diskcache_last_close_time path Current cache last Close time in unix timestamp(second)
GAUGE diskcache_datafiles path Current un-read data files
SUMMARY diskcache_stream_put path Stream put times
SUMMARY diskcache_get_latency path Get() cost seconds
SUMMARY diskcache_put_latency path Put() cost seconds
SUMMARY diskcache_put_bytes path Cache Put() bytes
SUMMARY diskcache_get_bytes path Cache Get() bytes

性能估算

测试环境:

  • Model Name : MacBook Pro
  • Model Identifier : MacBookPro18,1
  • Chip : Apple M1 Pro
  • Total Number of Cores : 10 (8 performance and 2 efficiency)
  • Memory : 16 GB

详见测试用例 TestConcurrentPutGetPerf

单次写入的数据量在 100KB ~ 1MB 之间,分别测试单线程写入、多线程写入、多线程读写情况下的性能:

测试情况 worker 性能(字节/毫秒)
单线程写入 1 119708 bytes/ms
多线程写入 10 118920 bytes/ms
多线程读写 10+10 118920 bytes/ms

综合下来,不管多线程读写还是单线程读写,其 IO 性能在当前的硬件上能达到 100MB/s 的速度。

TODO

  • 支持一次 Get()/Put() 多个数据,提高加锁的数据吞吐量
  • 支持 Get() 出错时重试机制(WithErrorRetry(n)
  • 可执行程序(cmd/diskcache)支持查看已有(可能正在被其它进程占用)diskcache 的存储情况

Documentation

Overview

Package diskcache is a simple local-disk cache implements.

The diskcache package is a local-disk cache, it implements following functions:

  1. Concurrent Put()/Get().
  2. Recoverable last-read-position on restart.
  3. Exclusive Open() on same path.
  4. Errors during Get() are retriable.
  5. Auto-rotate on batch size.
  6. Drop in FIFO policy when max capacity reached.
  7. We can configure various specifics in environments without to modify options source code.

Index

Constants

View Source
const (

	// EOFHint labels a file's end.
	EOFHint = uint32(0xdeadbeef)
)

Variables

View Source
var (
	// Invalid read size.
	ErrUnexpectedReadSize = errors.New("unexpected read size")

	ErrTooSmallReadBuf = errors.New("too small read buffer")

	// Data send to Put() exceed the maxDataSize.
	ErrTooLargeData = errors.New("too large data")

	// Get on no data cache.
	ErrNoData = errors.New("no data")

	// Diskcache full, no data can be write now.
	ErrCacheFull = errors.New("cache full")

	ErrInvalidStreamSize = errors.New("invalid stream size")

	// Invalid cache filename.
	ErrInvalidDataFileName       = errors.New("invalid datafile name")
	ErrInvalidDataFileNameSuffix = errors.New("invalid datafile name suffix")

	// Invalid file header.
	ErrBadHeader = errors.New("bad header")
)

Generic diskcache errors.

Functions

func GetErrorContext

func GetErrorContext(err error) map[string]interface{}

GetErrorContext extracts useful context information from errors.

func IsRetryable

func IsRetryable(err error) bool

IsRetryable checks if an error is retryable based on its type and context.

func Metrics

func Metrics() []prometheus.Collector

func ResetMetrics

func ResetMetrics()

ResetMetrics used to cleanup exist metrics of diskcache.

Types

type BufFunc

type BufFunc func() []byte

type CacheError

type CacheError struct {
	Operation Operation
	Path      string
	File      string
	Details   string
	Err       error
	Caller    string
}

CacheError represents an enhanced error with operation context and details.

func NewCacheError

func NewCacheError(op Operation, err error, details string) *CacheError

NewCacheError creates a new CacheError with enhanced context.

func WrapCloseError

func WrapCloseError(err error, path string, fdType string) *CacheError

WrapCloseError wraps errors from Close operations.

func WrapFileOperationError

func WrapFileOperationError(op Operation, err error, path, file string) *CacheError

WrapFileOperationError wraps errors from generic file operations.

func WrapGetError

func WrapGetError(err error, path string, file string) *CacheError

WrapGetError wraps errors from Get operations.

func WrapLockError

func WrapLockError(err error, path string, pid int) *CacheError

WrapLockError wraps errors from locking operations.

func WrapOpenError

func WrapOpenError(err error, path string) *CacheError

WrapOpenError wraps errors from Open operations.

func WrapPosError

func WrapPosError(err error, path string, seek int64) *CacheError

WrapPosError wraps errors from position operations.

func WrapPutError

func WrapPutError(err error, path string, dataSize int) *CacheError

WrapPutError wraps errors from Put operations.

func WrapRotateError

func WrapRotateError(err error, path string, oldFile, newFile string) *CacheError

WrapRotateError wraps errors from Rotate operations.

func (*CacheError) Error

func (e *CacheError) Error() string

Error implements the error interface.

func (*CacheError) Unwrap

func (e *CacheError) Unwrap() error

Unwrap returns the underlying error for compatibility with errors.Is/As.

func (*CacheError) WithDetails

func (e *CacheError) WithDetails(details string) *CacheError

WithDetails adds additional details to the error.

func (*CacheError) WithFile

func (e *CacheError) WithFile(file string) *CacheError

WithFile adds file context to the error.

func (*CacheError) WithPath

func (e *CacheError) WithPath(path string) *CacheError

WithPath adds path context to the error.

type CacheOption

type CacheOption func(c *DiskCache)

A CacheOption used to set various options on DiskCache.

func WithBatchSize

func WithBatchSize(size int64) CacheOption

WithBatchSize set file size, default 64MB.

func WithCapacity

func WithCapacity(size int64) CacheOption

WithCapacity set cache capacity, default unlimited.

func WithDirPermission

func WithDirPermission(perms os.FileMode) CacheOption

WithDirPermission set disk dir permission mode.

func WithExtraCapacity

func WithExtraCapacity(size int64) CacheOption

WithExtraCapacity add capacity to existing cache.

func WithFILODrop

func WithFILODrop(on bool) CacheOption

WithFILODrop set drop policy during Put() when cache's size almost reached it's capacity(). When set FILO drop, the Put() will fail immediately with a error.

Default drop policy is FIFO, means all Put() will OK and the cache drop old data automatically.

func WithFilePermission

func WithFilePermission(perms os.FileMode) CacheOption

WithFilePermission set cache file permission mode.

func WithMaxDataSize

func WithMaxDataSize(size int32) CacheOption

WithMaxDataSize set max single data size, default 32MB.

func WithNoDrop

func WithNoDrop(on bool) CacheOption

WithNoDrop set no-drop policy during Put() when cache's size almost reached it's capacity(). When set no-drop, the Put() will fail immediately with a error.

func WithNoFallbackOnError

func WithNoFallbackOnError(on bool) CacheOption

WithNoFallbackOnError disable fallback on fn() error.

During Get(fn(data []btye)error{...}), if fn() failed with error, the next Get still get the same data from cache. If fallback disabled, the next read will read new data from cache, and the previous failed data skipped(and eventually dropped).

func WithNoLock

func WithNoLock(on bool) CacheOption

WithNoLock set .lock on or off.

File '.lock' used to exclude Open() on same path.

func WithNoPos

func WithNoPos(on bool) CacheOption

WithNoPos set .pos on or off.

The file '.pos' used to remember last Get() position, without '.pos', on process restart, some already-Get() data will Get() again in the new process, this maybe not the right action we expect.

func WithNoSync

func WithNoSync(on bool) CacheOption

WithNoSync enable/disable sync on cache write.

NOTE: Without sync, the write performance 60~80 times faster for 512KB/1MB put, for smaller put will get more faster(1kb for 4000+ times).

func WithPath

func WithPath(x string) CacheOption

WithPath set disk dirname.

func WithPosUpdate

func WithPosUpdate(cnt int, du time.Duration) CacheOption

WithPosUpdate set .pos update intervals.

cnt used to specify how many update on .pos triger a real disk update. We can set cnt = 0 to force update .pos on every Get action.

du used to specify how often to triger a real disk update on file .pos.

func WithWakeup

func WithWakeup(wakeup time.Duration) CacheOption

WithWakeup set duration on wakeup(default 3s), this wakeup time used to shift current-writing-file to ready-to-reading-file.

NOTE: without wakeup, current-writing-file maybe not read-available for a long time.

type DiskCache

type DiskCache struct {
	LastErr error
	// contains filtered or unexported fields
}

DiskCache is the representation of a disk cache. A DiskCache is safe for concurrent use by multiple goroutines. Do not Open the same-path diskcache among goroutines.

func Open

func Open(opts ...CacheOption) (*DiskCache, error)

Open init and create a new disk cache. We can set other options with various options.

func (*DiskCache) BufCallbackGet

func (c *DiskCache) BufCallbackGet(bfn BufFunc, fn Fn) error

BufCallbackGet fetch new data from disk cache, and read into buffer that returned by bfn. If there is nothing to read, the bfn will not be called.

func (*DiskCache) BufGet

func (c *DiskCache) BufGet(buf []byte, fn Fn) error

BufGet fetch new data from disk cache, and read into buf.

func (*DiskCache) Capacity

func (c *DiskCache) Capacity() int64

Capacity return max capacity of the cache.

func (*DiskCache) Close

func (c *DiskCache) Close() error

Close reclame fd resources. Close is safe to call concurrently with other operations and will block until all other operations finish.

func (*DiskCache) Get

func (c *DiskCache) Get(fn Fn) error

Get fetch new data from disk cache, then passing to fn

Get is safe to call concurrently with other operations and will block until all other operations finish.

func (*DiskCache) IsFull

func (c *DiskCache) IsFull(newData []byte) bool

IsFull test if reach max capacity limit after put newData into cache.

func (*DiskCache) MaxBatchSize

func (c *DiskCache) MaxBatchSize() int64

MaxBatchSize return max single data file size of the cache.

With proper data file size(default is 20MB), we can make the switch/rotate and garbage collection more quickly when all piece of data wthin the data file has been Get() out of the file.

func (*DiskCache) MaxDataSize

func (c *DiskCache) MaxDataSize() int32

MaxDataSize return max single data piece size of the cache.

func (*DiskCache) Path

func (c *DiskCache) Path() string

Path return dir of current diskcache.

func (*DiskCache) Pretty

func (c *DiskCache) Pretty() string

func (*DiskCache) Put

func (c *DiskCache) Put(data []byte) error

Put write @data to disk cache, if reached batch size, a new batch is rotated. Put is safe to call concurrently with other operations and will block until all other operations finish.

func (*DiskCache) RawSize

func (c *DiskCache) RawSize() int64

RawSize return current size plus current writing file(`data') of the cache.

func (*DiskCache) Rotate

func (c *DiskCache) Rotate() error

Rotate force diskcache switch(rotate) from current write file(cwf) to next new file, leave cwf become readble on successive Get().

NOTE: You do not need to call Rotate() during daily usage, we export that function for testing cases.

func (*DiskCache) Size

func (c *DiskCache) Size() int64

Size return current size of the cache.

func (*DiskCache) StreamPut

func (c *DiskCache) StreamPut(r io.Reader, size int) error

StreamPut read from r for bytes and write to storage.

If we read the data from some network stream(such as HTTP response body), we can use StreamPut to avoid a intermediate buffer to accept the huge(may be) body.

func (*DiskCache) String

func (c *DiskCache) String() string

type Fn

type Fn func([]byte) error

Fn is the handler to eat cache from diskcache.

type InstrumentedMutex

type InstrumentedMutex struct {
	// contains filtered or unexported fields
}

InstrumentedMutex wraps sync.Mutex with contention tracking.

func NewInstrumentedMutex

func NewInstrumentedMutex(lockType LockType,
	path string,
	lockWaitTime *prometheus.HistogramVec,
	contention *prometheus.CounterVec,
) *InstrumentedMutex

NewInstrumentedMutex creates a new instrumented mutex.

func (*InstrumentedMutex) Lock

func (im *InstrumentedMutex) Lock()

Lock acquires the mutex with contention tracking.

func (*InstrumentedMutex) TryLock

func (im *InstrumentedMutex) TryLock() bool

TryLock attempts to acquire mutex without blocking.

func (*InstrumentedMutex) Unlock

func (im *InstrumentedMutex) Unlock()

Unlock releases the mutex.

type InstrumentedRWMutex

type InstrumentedRWMutex struct {
	// contains filtered or unexported fields
}

InstrumentedRWMutex wraps sync.RWMutex with contention tracking.

func NewInstrumentedRWMutex

func NewInstrumentedRWMutex(path string, lockWaitTime *prometheus.HistogramVec, contention *prometheus.CounterVec) *InstrumentedRWMutex

NewInstrumentedRWMutex creates a new instrumented RWMutex.

func (*InstrumentedRWMutex) Lock

func (irm *InstrumentedRWMutex) Lock()

Lock acquires write lock with contention tracking.

func (*InstrumentedRWMutex) RLock

func (irm *InstrumentedRWMutex) RLock()

RLock acquires read lock with contention tracking.

func (*InstrumentedRWMutex) RUnlock

func (irm *InstrumentedRWMutex) RUnlock()

RUnlock releases read lock.

func (*InstrumentedRWMutex) TryLock

func (irm *InstrumentedRWMutex) TryLock() bool

TryLock attempts to acquire write lock without blocking.

func (*InstrumentedRWMutex) TryRLock

func (irm *InstrumentedRWMutex) TryRLock() bool

TryRLock attempts to acquire read lock without blocking.

func (*InstrumentedRWMutex) Unlock

func (irm *InstrumentedRWMutex) Unlock()

Unlock releases write lock.

type LockType

type LockType string

LockType represents different types of locks in diskcache.

const (
	LockTypeWrite LockType = "write"
	LockTypeRead  LockType = "read"
	LockTypeRW    LockType = "rw"
)

type Operation

type Operation string

Operation type for error context.

const (
	OpOpen      Operation = "Open"
	OpClose     Operation = "Close"
	OpPut       Operation = "Put"
	OpStreamPut Operation = "StreamPut"
	OpGet       Operation = "Get"
	OpRotate    Operation = "Rotate"
	OpSwitch    Operation = "Switch"
	OpDrop      Operation = "Drop"
	OpLock      Operation = "Lock"
	OpUnlock    Operation = "Unlock"
	OpPos       Operation = "Pos"
	OpSeek      Operation = "Seek"
	OpWrite     Operation = "Write"
	OpRead      Operation = "Read"
	OpSync      Operation = "Sync"
	OpCreate    Operation = "Create"
	OpRemove    Operation = "Remove"
	OpRename    Operation = "Rename"
	OpStat      Operation = "Stat"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL