data

package
v0.2.28 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2025 License: AGPL-3.0 Imports: 38 Imported by: 0

README

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ConcurNum = 5 // 并发处理的数量

)
View Source
var (
	KlineParallelNum = 6 // 抓取K线时的同时并发数
)

* ******************************* Spider 爬虫部分 ****************************

Functions

func Build1mWithTicks

func Build1mWithTicks(args *config.CmdArgs) *errs.Error

func CalcFilePerfs

func CalcFilePerfs(args *config.CmdArgs) *errs.Error

CalcFilePerfs calc sharpe/sortino ratio for input data

func DownEmitHourKlines added in v0.2.28

func DownEmitHourKlines(dp *LiveProvider, endsMap map[int32]int64)

func FindPathNames

func FindPathNames(inPath, suffix string) ([]string, *errs.Error)

FindPathNames Finds all file paths of the specified type for a given path

An array of paths is returned, with the first being the parent directory followed by the relative child paths 查找给定路径所有指定类型的文件路径

返回路径数组,第一个是父目录,后续是相对子路径

func ReadZipCSVs

func ReadZipCSVs(inPath string, pBar *utils.PrgBar, handle FuncReadZipItem, arg interface{}) *errs.Error

ReadZipCSVs arg is for handle

func RunFormatTick

func RunFormatTick(args *config.CmdArgs) *errs.Error

func RunHistFeeders

func RunHistFeeders(makeFeeders func() []IHistFeeder, versions chan int, pBar *utils.PrgBar) *errs.Error

RunHistFeeders run hist feeders for historical data

versions: When an integer greater than the previous value is received, makeFeeders will be called to re-acquire and continue running; when a negative number is received, exit immediately

pBar: optional, used to display a progress bar

func RunSpider

func RunSpider(addr string) *errs.Error

func StrWarmLacks added in v0.1.29

func StrWarmLacks(skips map[string][2]int) string

Types

type Batch added in v0.2.23

type Batch interface {
	TimeMS() int64
}

type DBKlineFeeder

type DBKlineFeeder struct {
	KlineFeeder
	*TfKlineLoader
	TradeTimes [][2]int64 // Trading time 可交易时间
}

DBKlineFeeder Historical data feedback device. Is the base class for file feedback and database feedback.

Backtest mode: Read 3K bars each time, and backtest triggers in sequence according to nextMS size. 历史数据反馈器。是文件反馈器和数据库反馈器的基类。

回测模式:每次读取3K个bar,按nextMS大小依次回测触发。

func NewDBKlineFeeder

func NewDBKlineFeeder(exs *orm.ExSymbol, callBack FnPairKline, showLog bool) (*DBKlineFeeder, *errs.Error)

func (*DBKlineFeeder) CallNext added in v0.2.9

func (f *DBKlineFeeder) CallNext()

func (*DBKlineFeeder) GetBatch added in v0.2.23

func (f *DBKlineFeeder) GetBatch() Batch

func (*DBKlineFeeder) RunBatch added in v0.2.23

func (f *DBKlineFeeder) RunBatch(batch Batch) *errs.Error

func (*DBKlineFeeder) SetEndMS added in v0.2.9

func (f *DBKlineFeeder) SetEndMS(ms int64)

func (*DBKlineFeeder) SetSeek

func (f *DBKlineFeeder) SetSeek(since int64)

func (*DBKlineFeeder) SubTfs added in v0.2.9

func (f *DBKlineFeeder) SubTfs(timeFrames []string, delOther bool) []string

SubTfs Add monitoring to States and return the newly added TimeFrames 添加监听到States中,返回新增的TimeFrames

func (*DBKlineFeeder) Type added in v0.2.23

func (f *DBKlineFeeder) Type() string

type Feeder

type Feeder struct {
	*orm.ExSymbol
	States []*PairTFCache

	WaitBar  *banexg.Kline
	CallBack FnPairKline
	OnEnvEnd FuncEnvEnd // If the futures main force switches or the stock is ex-rights, the position needs to be closed first 期货主力切换或股票除权,需先平仓
	// contains filtered or unexported fields
}

Feeder Each Feeder corresponds to a trading pair. Can contain multiple time dimensions.

Supports dynamic addition of time dimension. Backtest mode: Call execution callbacks in sequence according to the next update time of the Feeder. Real mode: Subscribe to new data for this trading pair's time period and execute a callback when it is awakened. Support warm-up data. Each strategy + trading pair is preheated independently throughout the entire process, and cross-preheating is not allowed to avoid btime contamination. LiveFeeder requires preheating for both new trading pairs and new cycles; HistFeeder only requires preheating for new cycles. 每个Feeder对应一个交易对。可包含多个时间维度。

支持动态添加时间维度。
回测模式:根据Feeder的下次更新时间,按顺序调用执行回调。
实盘模式:订阅此交易对时间周期的新数据,被唤起时执行回调。
支持预热数据。每个策略+交易对全程单独预热,不可交叉预热,避免btime被污染。
LiveFeeder新交易对和新周期都需要预热;HistFeeder仅新周期需要预热

func (*Feeder) SubTfs

func (f *Feeder) SubTfs(timeFrames []string, delOther bool) []string

SubTfs Add monitoring to States and return the newly added TimeFrames 添加监听到States中,返回新增的TimeFrames

type FetchJob

type FetchJob struct {
	PairTFCache
	Pair      string
	CheckSecs int
	Since     int64
	NextRun   int64
}

type FnGetInt64 added in v0.2.0

type FnGetInt64 = func() int64

type FnPairKline

type FnPairKline = func(bar *orm.InfoKline)

type FuncConvert

type FuncConvert func(inPath string, file *zip.File, writer *zip.Writer) *errs.Error

type FuncEnvEnd

type FuncEnvEnd = func(bar *banexg.PairTFKline, adj *orm.AdjInfo)

type FuncReadZipItem

type FuncReadZipItem func(inPath string, fid int, file *zip.File, arg interface{}) *errs.Error

type FuncTickBar

type FuncTickBar func(inPath string, row []string) (string, int64, [5]float64)

type HistProvider

type HistProvider struct {
	Provider[IHistKlineFeeder]
	// contains filtered or unexported fields
}

func NewHistProvider

func NewHistProvider(callBack FnPairKline, envEnd FuncEnvEnd, getEnd FnGetInt64, showLog bool, pBar *utils.StagedPrg) *HistProvider

func (*HistProvider) LoopMain

func (p *HistProvider) LoopMain() *errs.Error

func (*HistProvider) SubWarmPairs

func (p *HistProvider) SubWarmPairs(items map[string]map[string]int, delOther bool) *errs.Error

func (*HistProvider) Terminate

func (p *HistProvider) Terminate()

func (*HistProvider) UnSubPairs

func (p *HistProvider) UnSubPairs(pairs ...string) *errs.Error

type IHistFeeder added in v0.2.23

type IHistFeeder interface {
	SetSeek(since int64)
	SetEndMS(ms int64)
	GetBatch() Batch
	RunBatch(bar Batch) *errs.Error
	CallNext()
	Type() string
	// contains filtered or unexported methods
}

func SortFeeders

func SortFeeders(holds []IHistFeeder, hold IHistFeeder, insert bool) []IHistFeeder

type IHistKlineFeeder

type IHistKlineFeeder interface {
	IKlineFeeder
	IHistFeeder
	// DownIfNeed Download the entire range of K lines, which needs to be called before SetSeek  下载整个范围的K线,需在SetSeek前调用
	DownIfNeed(sess *orm.Queries, exchange banexg.BanExchange, pBar *utils.PrgBar) *errs.Error
}

type IKlineFeeder

type IKlineFeeder interface {

	/*
		SubTfs Subscribe to data for a specified time period for the current target. Multiple 为当前标的订阅指定时间周期的数据,可多个
	*/
	SubTfs(timeFrames []string, delOther bool) []string
	/*
		WarmTfs The preheating time period gives the number of K lines to the specified time. 预热时间周期给定K线数量到指定时间
	*/
	WarmTfs(curMS int64, tfNums map[string]int, pBar *utils.PrgBar) (int64, map[string][2]int, *errs.Error)
	// contains filtered or unexported methods
}

type IProvider

type IProvider interface {
	LoopMain() *errs.Error
	SubWarmPairs(items map[string]map[string]int, delOther bool) *errs.Error
	UnSubPairs(pairs ...string) *errs.Error
	SetDirty()
}

type KLineMsg

type KLineMsg struct {
	NotifyKLines
	ExgName string // The name of the exchange 交易所名称
	Market  string // market 市场
	Pair    string // symbol  币种
}

type KLineState added in v0.2.18

type KLineState struct {
	Sid      int32
	ExpectMS int64 // next bar start time
	PrevBar  *banexg.Kline
}

type KLineWatcher

type KLineWatcher struct {
	*utils.ClientIO

	OnKLineMsg func(msg *KLineMsg) // 收到爬虫K线消息
	OnTrades   func(exgName, market, pair string, trades []*banexg.Trade)
	OnDepth    func(dep *banexg.OrderBook)
	// contains filtered or unexported fields
}

func NewKlineWatcher

func NewKlineWatcher(addr string) (*KLineWatcher, *errs.Error)

func (*KLineWatcher) GetJob added in v0.2.28

func (w *KLineWatcher) GetJob(msgType, symbol string) *PairTFCache

func (*KLineWatcher) GetJobs added in v0.2.28

func (w *KLineWatcher) GetJobs(msgType string) map[string]*PairTFCache

func (*KLineWatcher) SendMsg

func (w *KLineWatcher) SendMsg(action string, data interface{}) *errs.Error

func (*KLineWatcher) UnWatchJobs

func (w *KLineWatcher) UnWatchJobs(exgName, marketType, jobType string, pairs []string) *errs.Error

func (*KLineWatcher) WatchJobs

func (w *KLineWatcher) WatchJobs(exgName, marketType, jobType string, jobs ...WatchJob) *errs.Error

WatchJobs Subscribe data from crawlers. 从爬虫订阅数据。ohlcv/uohlcv/trade/depth

type KlineFeeder

type KlineFeeder struct {
	Feeder
	PreFire float64 // Ratio of triggering bar early 提前触发bar的比率
	// contains filtered or unexported fields
}

KlineFeeder Each Feeder corresponds to a trading pair. Can contain multiple time dimensions. Real use.

Supports dynamic addition of time dimension. Supports returning preheating data. Each strategy + trading pair is preheated independently throughout the entire process, and cross-preheating is not allowed to avoid btime contamination.

Backtest mode: Use derived structure: DbKlineFeeder

Real mode: Subscribe to new data for this trading pair's time period and execute a callback when it is awakened. Check whether this trading pair has been refreshed in the spider monitor. If not, send a message to the crawler monitor. 每个Feeder对应一个交易对。可包含多个时间维度。实盘使用。

支持动态添加时间维度。
支持返回预热数据。每个策略+交易对全程单独预热,不可交叉预热,避免btime被污染。

回测模式:使用派生结构体:DbKlineFeeder

实盘模式:订阅此交易对时间周期的新数据,被唤起时执行回调。
检查此交易对是否已在spider监听刷新,如没有则发消息给爬虫监听。

func NewKlineFeeder

func NewKlineFeeder(exs *orm.ExSymbol, callBack FnPairKline, showLog bool) (*KlineFeeder, *errs.Error)

func (*KlineFeeder) WarmTfs

func (f *KlineFeeder) WarmTfs(curMS int64, tfNums map[string]int, pBar *utils.PrgBar) (int64, map[string][2]int, *errs.Error)

type LiveProvider

type LiveProvider struct {
	Provider[IKlineFeeder]
	*KLineWatcher
	OnMinKlines func(msg *KLineMsg, bars []*banexg.Kline) *errs.Error
}

func NewLiveProvider

func NewLiveProvider(callBack FnPairKline, envEnd FuncEnvEnd) (*LiveProvider, *errs.Error)

func (*LiveProvider) LoopMain

func (p *LiveProvider) LoopMain() *errs.Error

func (*LiveProvider) SubWarmPairs

func (p *LiveProvider) SubWarmPairs(items map[string]map[string]int, delOther bool) *errs.Error

func (*LiveProvider) UnSubPairs

func (p *LiveProvider) UnSubPairs(pairs ...string) *errs.Error

type LiveSpider

type LiveSpider struct {
	*utils.ServerIO
	// contains filtered or unexported fields
}
var (
	Spider *LiveSpider
)

type Miner

type Miner struct {
	ExgName string
	Market  string

	Fetchs       map[string]*FetchJob
	KLineApis    *PairSubs
	KLines       *PairSubs
	Trades       *PairSubs
	Depths       *PairSubs
	IsWatchPrice bool
	IsLoopKline  bool
	// contains filtered or unexported fields
}

func (*Miner) SubPairs

func (m *Miner) SubPairs(jobType string, pairs ...string) *errs.Error

func (*Miner) UnSubPairs

func (m *Miner) UnSubPairs(jobType string, pairs ...string) *errs.Error

type NotifyKLines

type NotifyKLines struct {
	TFSecs   int
	Interval int // 推送更新间隔, <= TFSecs
	Arr      []*banexg.Kline
}

type PairSubs added in v0.2.16

type PairSubs struct {
	Status int // 0 not subscribed, 1 subscribing, 2 subscribed
	// contains filtered or unexported fields
}

func NewPairSubs added in v0.2.16

func NewPairSubs() *PairSubs

func (*PairSubs) GetNewSubs added in v0.2.16

func (s *PairSubs) GetNewSubs(pairs []string) []string

GetNewSubs get pairs need to be subscribed

func (*PairSubs) KeyMap added in v0.2.18

func (s *PairSubs) KeyMap() map[string]bool

func (*PairSubs) Keys added in v0.2.16

func (s *PairSubs) Keys() []string

func (*PairSubs) Len added in v0.2.16

func (s *PairSubs) Len() int

func (*PairSubs) Remove added in v0.2.16

func (s *PairSubs) Remove(pairs ...string) []string

func (*PairSubs) Set added in v0.2.16

func (s *PairSubs) Set(pairs ...string) []string

type PairTFCache

type PairTFCache struct {
	TimeFrame  string
	TFSecs     int
	SubNextMS  int64         // Record the start timestamp of the next bar expected to be received. If it is inconsistent, the bar is missing and needs to be queried and updated. 记录子周期K线下一个期待收到的bar起始时间戳,如果不一致,则出现了bar缺失,需查询更新。
	NextMS     int64         // 当前周期下一个K线期望的时间戳
	WaitBar    *banexg.Kline // Record unfinished bars. Should be set to nil when completed 记录尚未完成的bar。已完成时应置为nil
	Latest     *banexg.Kline // Record the latest bar data, which may not be completed or may be completed 记录最新bar数据,可能未完成,可能已完成
	AlignOffMS int64
}

type Provider

type Provider[T IKlineFeeder] struct {
	// contains filtered or unexported fields
}

func (*Provider[IKlineFeeder]) SetDirty

func (p *Provider[IKlineFeeder]) SetDirty()

func (*Provider[IKlineFeeder]) SubWarmPairs

func (p *Provider[IKlineFeeder]) SubWarmPairs(items map[string]map[string]int, delOther bool, pBar *utils.StagedPrg) ([]IKlineFeeder, map[string]int64, []string, *errs.Error)

SubWarmPairs Add new trading pair subscription from data provider.

items: pair[timeFrame]warmNum Return the trading pairs with the smallest period change (new/old pairs new period), warm-up tasks 从数据提供者添加新的交易对订阅。

items: pair[timeFrame]warmNum
返回最小周期变化的交易对(新增/旧对新周期)、预热任务

func (*Provider[IKlineFeeder]) UnSubPairs

func (p *Provider[IKlineFeeder]) UnSubPairs(pairs ...string) []string

type SaveKline

type SaveKline struct {
	Sid       int32
	TimeFrame string
	Arr       []*banexg.Kline
	MsgAction string
	ReceiveAt int64
}

type TfKlineLoader added in v0.2.9

type TfKlineLoader struct {
	*orm.ExSymbol
	Timeframe string
	TFMSecs   int64

	EndMS     int64
	FirstRead bool
	// contains filtered or unexported fields
}

TfKlineLoader 用于分批加载某个品种的指定周期K线,然后逐个读取的场景

func NewTfKlineLoader added in v0.2.9

func NewTfKlineLoader(exs *orm.ExSymbol, tf string) *TfKlineLoader

func (*TfKlineLoader) DownIfNeed added in v0.2.9

func (f *TfKlineLoader) DownIfNeed(sess *orm.Queries, exchange banexg.BanExchange, pBar *utils.PrgBar) *errs.Error

DownIfNeed Download data for a specified interval pBar is used for progress update, the total is 1000, and the amount is updated each time 下载指定区间的数据 pBar 用于进度更新,总和为1000,每次更新此次的量

func (*TfKlineLoader) GetBar added in v0.2.9

func (f *TfKlineLoader) GetBar() *banexg.Kline

func (*TfKlineLoader) ReadTo added in v0.2.9

func (f *TfKlineLoader) ReadTo(end int64, force bool) []*banexg.Kline

func (*TfKlineLoader) Reset added in v0.2.9

func (f *TfKlineLoader) Reset(since int64)

func (*TfKlineLoader) SetNext added in v0.2.9

func (f *TfKlineLoader) SetNext()

func (*TfKlineLoader) SetSeek added in v0.2.9

func (f *TfKlineLoader) SetSeek(since int64)

func (*TfKlineLoader) SetTimeFrame added in v0.2.9

func (f *TfKlineLoader) SetTimeFrame(tf string)

type TradeBatch added in v0.2.23

type TradeBatch []*banexg.Trade

func (TradeBatch) TimeMS added in v0.2.23

func (b TradeBatch) TimeMS() int64

type TradeFeeder added in v0.2.23

type TradeFeeder struct {
	*orm.ExSymbol
	// contains filtered or unexported fields
}

TradeFeeder feeds trade data from files for backtesting

func NewTradeFeeder added in v0.2.23

func NewTradeFeeder(exs *orm.ExSymbol, l *WsDataLoader) *TradeFeeder

NewTradeFeeder creates a new trade data feeder

func (*TradeFeeder) CallNext added in v0.2.23

func (f *TradeFeeder) CallNext()

func (*TradeFeeder) GetBatch added in v0.2.23

func (f *TradeFeeder) GetBatch() Batch

func (*TradeFeeder) RunBatch added in v0.2.23

func (f *TradeFeeder) RunBatch(batch Batch) *errs.Error

func (*TradeFeeder) SetEndMS added in v0.2.23

func (f *TradeFeeder) SetEndMS(ms int64)

func (*TradeFeeder) SetSeek added in v0.2.23

func (f *TradeFeeder) SetSeek(since int64)

func (*TradeFeeder) Type added in v0.2.23

func (f *TradeFeeder) Type() string

type WarmJob

type WarmJob struct {
	// contains filtered or unexported fields
}

type WatchJob

type WatchJob struct {
	Symbol    string
	TimeFrame string
	Since     int64
}

type WsDataLoader added in v0.2.23

type WsDataLoader struct {
	// contains filtered or unexported fields
}

func NewWsDataLoader added in v0.2.23

func NewWsDataLoader() (*WsDataLoader, *errs.Error)

func (*WsDataLoader) GetCachePath added in v0.2.23

func (l *WsDataLoader) GetCachePath(info *WsSymbol, tryRoot bool) (string, bool)

GetCachePath returns the cache path for a specific symbol, date and hour

func (*WsDataLoader) LoadTrades added in v0.2.23

func (l *WsDataLoader) LoadTrades(info *WsSymbol) ([]*banexg.Trade, *errs.Error)

LoadTrades loads trades for a specific hour from cache

func (*WsDataLoader) SplitBigZip added in v0.2.23

func (l *WsDataLoader) SplitBigZip(zipPath string, info *WsSymbol) *errs.Error

SplitBigZip extracts and splits a zip file into hourly

type WsSymbol added in v0.2.23

type WsSymbol struct {
	ExgId     string
	Market    string
	WsType    string
	Symbol    string
	RawSymbol string
	Date      string
	Hour      int
	// contains filtered or unexported fields
}

func (*WsSymbol) DownUrl added in v0.2.23

func (info *WsSymbol) DownUrl() string

func (*WsSymbol) FillDefaults added in v0.2.23

func (info *WsSymbol) FillDefaults() *errs.Error

func (*WsSymbol) MidPath added in v0.2.23

func (info *WsSymbol) MidPath() string

func (*WsSymbol) String added in v0.2.23

func (info *WsSymbol) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL