orm

package
v0.3.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2026 License: AGPL-3.0 Imports: 42 Imported by: 2

README

数据库概述

需要存储到数据库的有三类:K线数据、交易数据、UI相关数据。
K线与相关元数据使用 QuestDB(PGWire)存储,并用 sranges 记录已下载/无数据区间,允许数据不连续。
为确保灵活性,交易数据(ormo)和UI相关数据(ormu)使用独立的sqlite文件存储。
ormo/ormu依赖orm,不可反向依赖,避免出现依赖环

从proto生成go代码

安装protoc:

go install google.golang.org/protobuf/cmd/protoc-gen-go@latest

生成:

protoc --go_out=. --go_opt=paths=source_relative kdata.proto

注意将生成后的kdata.pb.gopackage __改为package orm

Documentation

Index

Constants

View Source
const (
	DumpKline     = "kline"
	DumpStartUp   = "startup"
	DumpApiOrder  = "api_order"
	DumpWsMyTrade = "ws_my_trade"
)

Variables

View Source
var (
	DbTrades = "trades"
	// DbPub stores mutable relational/meta data (calendars/adj_factors/sranges/ins_kline/kline_un + ui task).
	DbPub = "banpub"
)
View Source
var (
	DebugDownKLine = false
)
View Source
var File_kdata_proto protoreflect.FileDescriptor

Functions

func AddDumpRow added in v0.2.13

func AddDumpRow(src, key string, val interface{})

func AddHourSymbol added in v0.2.28

func AddHourSymbol(exs *ExSymbol)

func AddInsJob added in v0.3.1

func AddInsJob(add AddInsKlineParams) (int64, *errs.Error)

func ApplyAdj

func ApplyAdj(adjs []*AdjInfo, klines []*banexg.Kline, adj int, cutEnd int64, limit int) []*banexg.Kline

ApplyAdj Calculate the K-line after adjustment 计算复权后K线 adjs Must be in ascending order 必须已升序 cutEnd Maximum end time of interception 截取的最大结束时间 adj Type of adjustment of Rights 复权类型 limit 返回数量

func BuildQuery added in v0.2.21

func BuildQuery(b *strings.Builder, params []interface{}, start int, fields []IfParam) ([]interface{}, int)

func BulkDownOHLCV

func BulkDownOHLCV(exchange banexg.BanExchange, exsList map[int32]*ExSymbol, timeFrame string, startMS, endMS int64, limit int, prg utils.PrgCB) *errs.Error

BulkDownOHLCV Batch simultaneous download of K-line 批量同时下载K线

func CalcAdjFactors

func CalcAdjFactors(args *config.CmdArgs) *errs.Error

CalcAdjFactors Calculate and update all weighting factors 计算更新所有复权因子

func CloseDump added in v0.2.13

func CloseDump()

func EnsureCurSymbols

func EnsureCurSymbols(symbols []string) *errs.Error

func EnsureExgSymbols

func EnsureExgSymbols(exchange banexg.BanExchange) *errs.Error

func EnsureListDates added in v0.1.21

func EnsureListDates(sess *Queries, exchange banexg.BanExchange, exsMap map[int32]*ExSymbol, exsList []*ExSymbol) *errs.Error

func EnsureSymbols

func EnsureSymbols(symbols []*ExSymbol, exchanges ...string) *errs.Error

func ExportKData added in v0.1.24

func ExportKData(configFile string, outputDir string, numWorkers int, pb *utils2.StagedPrg) *errs.Error

func FastBulkOHLCV

func FastBulkOHLCV(exchange banexg.BanExchange, symbols []string, timeFrame string,
	startMS, endMS int64, limit int, handler func(string, string, []*banexg.Kline, []*AdjInfo)) *errs.Error

FastBulkOHLCV Quickly obtain K-lines in bulk. Download all the required currencies first, then perform batch queries and group returns. Suitable for situations where there are multiple currencies, the required start and end times are consistent, and most of them have already been downloaded. For combination varieties, return the unweighted candlestick and the weighting factor, and call ApplyAdj for weighting as needed 快速批量获取K线。先下载所有需要的币种,然后批量查询再分组返回。

适用于币种较多,且需要的开始结束时间一致,且大部分已下载的情况。
对于组合品种,返回未复权的K线,和复权因子,自行根据需要调用ApplyAdj复权

func FetchApiOHLCV

func FetchApiOHLCV(ctx context.Context, exchange banexg.BanExchange, pair, timeFrame string, startMS, endMS int64, out chan []*banexg.Kline) *errs.Error

FetchApiOHLCV Download the K-line data of the trading pair according to the given time period. If you need to download from the end to the beginning, you should make startMS>endMS 按给定时间段下载交易对的K线数据。 如果需要从后往前下载,应该使startMS>endMS

func FlushDumps added in v0.2.13

func FlushDumps()

func GetAlignOff

func GetAlignOff(sid int32, toTfMSecs int64) int64

func GetAllExSymbols

func GetAllExSymbols() map[int32]*ExSymbol

GetAllExSymbols Gets all the objects that have been loaded into the cache 获取已加载到缓存的所有标的

func GetDownTF

func GetDownTF(timeFrame string) (string, *errs.Error)

GetDownTF Retrieve the download time period corresponding to the specified period. Only 1m and 1h allow downloading and writing to the super table. All other dimensions are aggregated from these two dimensions.

获取指定周期对应的下载的时间周期。
只有1m和1h允许下载并写入超表。其他维度都是由这两个维度聚合得到。

func GetExSHoles added in v0.3.1

func GetExSHoles(exchange banexg.BanExchange, exs *ExSymbol, start, stop int64, full bool) ([][2]int64, *errs.Error)

GetExSHoles Retrieve all non trading time ranges for the specified Sid within a certain time period. For the 365 * 24 coin circle, it will not stop and return empty 获取指定Sid在某个时间段内,所有非交易时间范围。 对于币圈365*24不休,返回空

func GetExSymbolMap added in v0.1.24

func GetExSymbolMap(exgName, market string) map[string]*ExSymbol

func GetExSymbols

func GetExSymbols(exgName, market string) map[int32]*ExSymbol

func GetHourOnlySymbols added in v0.2.28

func GetHourOnlySymbols() map[int32]*ExSymbol

func ImportData added in v0.1.24

func ImportData(dataDir string, numWorkers int, pb *utils2.StagedPrg) *errs.Error

func InitExg

func InitExg(exchange banexg.BanExchange) *errs.Error

func InitListDates

func InitListDates() *errs.Error

func LoadAllExSymbols

func LoadAllExSymbols() *errs.Error

func LoadMarkets

func LoadMarkets(exchange banexg.BanExchange, reload bool) (banexg.MarketMap, *errs.Error)

func MapExSymbols

func MapExSymbols(exchange banexg.BanExchange, symbols []string) (map[int32]*ExSymbol, *errs.Error)

func NewDbErr

func NewDbErr(code int, err_ error) *errs.Error

func PrintVerifyResults added in v0.3.1

func PrintVerifyResults(results []*VerifyTFResult)

func ResetSubSymbol added in v0.2.28

func ResetSubSymbol()

func SetDbPath added in v0.1.12

func SetDbPath(key, path string)

func SetDump added in v0.2.13

func SetDump(file *os.File)

func Setup

func Setup() *errs.Error

func Sub1mSymbol added in v0.2.28

func Sub1mSymbol(pair string)

func SyncKlineTFs

func SyncKlineTFs(args *config.CmdArgs, pb *utils.StagedPrg) *errs.Error

SyncKlineTFs Check the data consistency of each kline table. If there is more low dimensional data than high dimensional data, aggregate and update to high dimensional data 检查各kline表的数据一致性,如果低维度数据比高维度多,则聚合更新到高维度

func WithBanPubTx added in v0.3.1

func WithBanPubTx(ctx context.Context, fn func(tx *sql.Tx) error) *errs.Error

Types

type AddAdjFactorsParams

type AddAdjFactorsParams struct {
	Sid     int32   `json:"sid"`
	SubID   int32   `json:"sub_id"`
	StartMs int64   `json:"start_ms"`
	Factor  float64 `json:"factor"`
}

type AddCalendarsParams

type AddCalendarsParams struct {
	Name    string `json:"name"`
	StartMs int64  `json:"start_ms"`
	StopMs  int64  `json:"stop_ms"`
}

type AddInsKlineParams added in v0.1.5

type AddInsKlineParams struct {
	Sid       int32  `json:"sid"`
	Timeframe string `json:"timeframe"`
	StartMs   int64  `json:"start_ms"`
	StopMs    int64  `json:"stop_ms"`
	Timeout   int64  `json:"timeout"` // busy_timeout in ms, default 5000
}

type AddSymbolsParams

type AddSymbolsParams struct {
	Exchange string `json:"exchange"`
	ExgReal  string `json:"exg_real"`
	Market   string `json:"market"`
	Symbol   string `json:"symbol"`
}

type AdjFactor

type AdjFactor struct {
	ID      int64   `json:"id"`
	Sid     int32   `json:"sid"`
	SubID   int32   `json:"sub_id"`
	StartMs int64   `json:"start_ms"`
	Factor  float64 `json:"factor"`
}

type AdjFactorBlock added in v0.1.24

type AdjFactorBlock struct {
	Sid     int32   `protobuf:"varint,1,opt,name=sid,proto3" json:"sid,omitempty"`                        // symbol id
	SubId   int32   `protobuf:"varint,2,opt,name=sub_id,json=subId,proto3" json:"sub_id,omitempty"`       // sub symbol id
	StartMs int64   `protobuf:"varint,3,opt,name=start_ms,json=startMs,proto3" json:"start_ms,omitempty"` // start timestamp in milliseconds
	Factor  float64 `protobuf:"fixed64,4,opt,name=factor,proto3" json:"factor,omitempty"`                 // adjustment factor
	// contains filtered or unexported fields
}

AdjFactorBlock represents adjustment factors data

func (*AdjFactorBlock) Descriptor deprecated added in v0.1.24

func (*AdjFactorBlock) Descriptor() ([]byte, []int)

Deprecated: Use AdjFactorBlock.ProtoReflect.Descriptor instead.

func (*AdjFactorBlock) GetFactor added in v0.1.24

func (x *AdjFactorBlock) GetFactor() float64

func (*AdjFactorBlock) GetSid added in v0.1.24

func (x *AdjFactorBlock) GetSid() int32

func (*AdjFactorBlock) GetStartMs added in v0.1.24

func (x *AdjFactorBlock) GetStartMs() int64

func (*AdjFactorBlock) GetSubId added in v0.1.24

func (x *AdjFactorBlock) GetSubId() int32

func (*AdjFactorBlock) ProtoMessage added in v0.1.24

func (*AdjFactorBlock) ProtoMessage()

func (*AdjFactorBlock) ProtoReflect added in v0.1.24

func (x *AdjFactorBlock) ProtoReflect() protoreflect.Message

func (*AdjFactorBlock) Reset added in v0.1.24

func (x *AdjFactorBlock) Reset()

func (*AdjFactorBlock) String added in v0.1.24

func (x *AdjFactorBlock) String() string

type AdjInfo

type AdjInfo struct {
	*ExSymbol
	Factor    float64 // Original adjacent weighting factor 原始相邻复权因子
	CumFactor float64 // Cumulative weighting factor 累计复权因子
	StartMS   int64   // start timestamp 开始时间
	StopMS    int64   // stop timestamp 结束时间
}

func AutoFetchOHLCV

func AutoFetchOHLCV(exchange banexg.BanExchange, exs *ExSymbol, timeFrame string, startMS, endMS int64,
	limit int, withUnFinish bool, pBar *utils.PrgBar) ([]*AdjInfo, []*banexg.Kline, *errs.Error)

AutoFetchOHLCV

Get K-line data for a given trading pair, a given time dimension, and a given range.
Try to read from local first, download from the exchange if it doesn't exist, and then return.
获取给定交易对,给定时间维度,给定范围的K线数据。
先尝试从本地读取,不存在时从交易所下载,然后返回。

func GetAdjs added in v0.3.1

func GetAdjs(sid int32) ([]*AdjInfo, *errs.Error)

func GetOHLCV

func GetOHLCV(exs *ExSymbol, timeFrame string, startMS, endMS int64, limit int, withUnFinish bool) ([]*AdjInfo, []*banexg.Kline, *errs.Error)

GetOHLCV Get the variety K-line, if you need to rebalance, it will be automatically reweighted 获取品种K线,如需复权自动前复权

func (*AdjInfo) Apply

func (a *AdjInfo) Apply(bars []*banexg.Kline, adj int) []*banexg.Kline

type Calendar

type Calendar struct {
	ID      int64  `json:"id"`
	Name    string `json:"name"`
	StartMs int64  `json:"start_ms"`
	StopMs  int64  `json:"stop_ms"`
}

type CalendarBlock added in v0.1.24

type CalendarBlock struct {
	Name  string  `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` // calendar name
	Times []int64 `protobuf:"varint,2,rep,packed,name=times,proto3" json:"times,omitempty"`
	// contains filtered or unexported fields
}

CalendarBlock represents calendar data

func (*CalendarBlock) Descriptor deprecated added in v0.1.24

func (*CalendarBlock) Descriptor() ([]byte, []int)

Deprecated: Use CalendarBlock.ProtoReflect.Descriptor instead.

func (*CalendarBlock) GetName added in v0.1.24

func (x *CalendarBlock) GetName() string

func (*CalendarBlock) GetTimes added in v0.1.24

func (x *CalendarBlock) GetTimes() []int64

func (*CalendarBlock) ProtoMessage added in v0.1.24

func (*CalendarBlock) ProtoMessage()

func (*CalendarBlock) ProtoReflect added in v0.1.24

func (x *CalendarBlock) ProtoReflect() protoreflect.Message

func (*CalendarBlock) Reset added in v0.1.24

func (x *CalendarBlock) Reset()

func (*CalendarBlock) String added in v0.1.24

func (x *CalendarBlock) String() string

type DBTX

type DBTX interface {
	Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
	Query(context.Context, string, ...interface{}) (pgx.Rows, error)
	QueryRow(context.Context, string, ...interface{}) pgx.Row
	CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error)
}

type DumpRow added in v0.2.13

type DumpRow struct {
	Time int64
	Type string
	Key  string
	Val  interface{}
}

type EXInfo added in v0.1.24

type EXInfo struct {
	Symbols    []*ExSymbolBlock  `protobuf:"bytes,1,rep,name=symbols,proto3" json:"symbols,omitempty"`
	KHoles     []*KHoleBlock     `protobuf:"bytes,2,rep,name=kHoles,proto3" json:"kHoles,omitempty"`
	AdjFactors []*AdjFactorBlock `protobuf:"bytes,3,rep,name=adjFactors,proto3" json:"adjFactors,omitempty"`
	Calendars  []*CalendarBlock  `protobuf:"bytes,4,rep,name=calendars,proto3" json:"calendars,omitempty"`
	// contains filtered or unexported fields
}

func (*EXInfo) Descriptor deprecated added in v0.1.24

func (*EXInfo) Descriptor() ([]byte, []int)

Deprecated: Use EXInfo.ProtoReflect.Descriptor instead.

func (*EXInfo) GetAdjFactors added in v0.1.24

func (x *EXInfo) GetAdjFactors() []*AdjFactorBlock

func (*EXInfo) GetCalendars added in v0.1.24

func (x *EXInfo) GetCalendars() []*CalendarBlock

func (*EXInfo) GetKHoles added in v0.1.24

func (x *EXInfo) GetKHoles() []*KHoleBlock

func (*EXInfo) GetSymbols added in v0.1.24

func (x *EXInfo) GetSymbols() []*ExSymbolBlock

func (*EXInfo) ProtoMessage added in v0.1.24

func (*EXInfo) ProtoMessage()

func (*EXInfo) ProtoReflect added in v0.1.24

func (x *EXInfo) ProtoReflect() protoreflect.Message

func (*EXInfo) Reset added in v0.1.24

func (x *EXInfo) Reset()

func (*EXInfo) String added in v0.1.24

func (x *EXInfo) String() string

type ExSymbol

type ExSymbol struct {
	ID       int32  `json:"id"`
	Exchange string `json:"exchange"`
	ExgReal  string `json:"exg_real"`
	Market   string `json:"market"`
	Symbol   string `json:"symbol"`
	Combined bool   `json:"combined"`
	ListMs   int64  `json:"list_ms"`
	DelistMs int64  `json:"delist_ms"`
}

func GetExSymbol

func GetExSymbol(exchange banexg.BanExchange, symbol string) (*ExSymbol, *errs.Error)

func GetExSymbol2 added in v0.2.14

func GetExSymbol2(exgName, market, symbol string) *ExSymbol

func GetExSymbolCur

func GetExSymbolCur(symbol string) (*ExSymbol, *errs.Error)

func GetSymbolByID

func GetSymbolByID(id int32) *ExSymbol

func ParseShort

func ParseShort(exgName, short string) (*ExSymbol, *errs.Error)

func (*ExSymbol) GetValidStart

func (s *ExSymbol) GetValidStart(startMS int64) int64

func (*ExSymbol) InfoBy added in v0.2.14

func (s *ExSymbol) InfoBy() string

func (*ExSymbol) ToShort

func (s *ExSymbol) ToShort() string

type ExSymbolBlock added in v0.1.24

type ExSymbolBlock struct {
	Id       int32  `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
	Exchange string `protobuf:"bytes,2,opt,name=exchange,proto3" json:"exchange,omitempty"`
	ExgReal  string `protobuf:"bytes,3,opt,name=exg_real,json=exgReal,proto3" json:"exg_real,omitempty"`
	Market   string `protobuf:"bytes,4,opt,name=market,proto3" json:"market,omitempty"`
	Symbol   string `protobuf:"bytes,5,opt,name=symbol,proto3" json:"symbol,omitempty"`
	ListMs   int64  `protobuf:"varint,6,opt,name=list_ms,json=listMs,proto3" json:"list_ms,omitempty"`
	DelistMs int64  `protobuf:"varint,7,opt,name=delist_ms,json=delistMs,proto3" json:"delist_ms,omitempty"`
	// contains filtered or unexported fields
}

func (*ExSymbolBlock) Descriptor deprecated added in v0.1.24

func (*ExSymbolBlock) Descriptor() ([]byte, []int)

Deprecated: Use ExSymbolBlock.ProtoReflect.Descriptor instead.

func (*ExSymbolBlock) GetDelistMs added in v0.1.24

func (x *ExSymbolBlock) GetDelistMs() int64

func (*ExSymbolBlock) GetExchange added in v0.1.24

func (x *ExSymbolBlock) GetExchange() string

func (*ExSymbolBlock) GetExgReal added in v0.1.24

func (x *ExSymbolBlock) GetExgReal() string

func (*ExSymbolBlock) GetId added in v0.1.24

func (x *ExSymbolBlock) GetId() int32

func (*ExSymbolBlock) GetListMs added in v0.1.24

func (x *ExSymbolBlock) GetListMs() int64

func (*ExSymbolBlock) GetMarket added in v0.1.24

func (x *ExSymbolBlock) GetMarket() string

func (*ExSymbolBlock) GetSymbol added in v0.1.24

func (x *ExSymbolBlock) GetSymbol() string

func (*ExSymbolBlock) ProtoMessage added in v0.1.24

func (*ExSymbolBlock) ProtoMessage()

func (*ExSymbolBlock) ProtoReflect added in v0.1.24

func (x *ExSymbolBlock) ProtoReflect() protoreflect.Message

func (*ExSymbolBlock) Reset added in v0.1.24

func (x *ExSymbolBlock) Reset()

func (*ExSymbolBlock) String added in v0.1.24

func (x *ExSymbolBlock) String() string

type ExportKlineJob added in v0.1.24

type ExportKlineJob struct {
	*ExSymbol
	TimeFrame string
	StartMS   int64
	StopMS    int64
}

type ExportTask added in v0.1.24

type ExportTask struct {
	// contains filtered or unexported fields
}

type FindSRangesArgs added in v0.3.1

type FindSRangesArgs struct {
	Sid       int32
	Table     string
	TimeFrame string
	Start     int64
	Stop      int64
	HasData   *bool
	Limit     int
	Offset    int
}

type IfParam added in v0.2.21

type IfParam struct {
	Cond bool
	Val  interface{}
	Tpl  string
}

type InfoKline

type InfoKline struct {
	*banexg.PairTFKline
	Adj      *AdjInfo
	IsWarmUp bool
}

type InsKline added in v0.1.5

type InsKline struct {
	ID        int64  `json:"id"`
	Sid       int32  `json:"sid"`
	Timeframe string `json:"timeframe"`
	StartMs   int64  `json:"start_ms"`
	StopMs    int64  `json:"stop_ms"`
}

type KHoleBlock added in v0.1.24

type KHoleBlock struct {
	Sid       int32   `protobuf:"varint,1,opt,name=sid,proto3" json:"sid,omitempty"`
	Timeframe string  `protobuf:"bytes,2,opt,name=timeframe,proto3" json:"timeframe,omitempty"`
	Holes     []int64 `protobuf:"varint,3,rep,packed,name=holes,proto3" json:"holes,omitempty"`
	// contains filtered or unexported fields
}

func (*KHoleBlock) Descriptor deprecated added in v0.1.24

func (*KHoleBlock) Descriptor() ([]byte, []int)

Deprecated: Use KHoleBlock.ProtoReflect.Descriptor instead.

func (*KHoleBlock) GetHoles added in v0.1.24

func (x *KHoleBlock) GetHoles() []int64

func (*KHoleBlock) GetSid added in v0.1.24

func (x *KHoleBlock) GetSid() int32

func (*KHoleBlock) GetTimeframe added in v0.1.24

func (x *KHoleBlock) GetTimeframe() string

func (*KHoleBlock) ProtoMessage added in v0.1.24

func (*KHoleBlock) ProtoMessage()

func (*KHoleBlock) ProtoReflect added in v0.1.24

func (x *KHoleBlock) ProtoReflect() protoreflect.Message

func (*KHoleBlock) Reset added in v0.1.24

func (x *KHoleBlock) Reset()

func (*KHoleBlock) String added in v0.1.24

func (x *KHoleBlock) String() string

type KlineAgg

type KlineAgg struct {
	TimeFrame string
	MSecs     int64
	Table     string
	AggFrom   string
	AggStart  string
	AggEnd    string
	AggEvery  string
	CpsBefore string
	Retention string
}

func GetKlineAggs

func GetKlineAggs() []*KlineAgg

func NewKlineAgg

func NewKlineAgg(TimeFrame, Table, AggFrom, AggStart, AggEnd, AggEvery, CpsBefore, Retention string) *KlineAgg

type KlineBlock added in v0.1.24

type KlineBlock struct {
	Start     int64     `protobuf:"varint,1,opt,name=start,proto3" json:"start,omitempty"`              // start timestamp in milliseconds
	End       int64     `protobuf:"varint,2,opt,name=end,proto3" json:"end,omitempty"`                  // end timestamp in milliseconds
	ExsId     int32     `protobuf:"varint,3,opt,name=exs_id,json=exsId,proto3" json:"exs_id,omitempty"` // exchange symbol id
	Timeframe string    `protobuf:"bytes,4,opt,name=timeframe,proto3" json:"timeframe,omitempty"`       // timeframe like "1m", "5m", etc.
	Open      []float64 `protobuf:"fixed64,5,rep,packed,name=open,proto3" json:"open,omitempty"`        // open prices
	High      []float64 `protobuf:"fixed64,6,rep,packed,name=high,proto3" json:"high,omitempty"`        // high prices
	Low       []float64 `protobuf:"fixed64,7,rep,packed,name=low,proto3" json:"low,omitempty"`          // low prices
	Close     []float64 `protobuf:"fixed64,8,rep,packed,name=close,proto3" json:"close,omitempty"`      // close prices
	Volume    []float64 `protobuf:"fixed64,9,rep,packed,name=volume,proto3" json:"volume,omitempty"`    // volumes
	Info      []float64 `protobuf:"fixed64,10,rep,packed,name=info,proto3" json:"info,omitempty"`       // additional info (used for china market)
	// contains filtered or unexported fields
}

KlineBlock represents a block of k-line data

func (*KlineBlock) Append added in v0.1.24

func (b *KlineBlock) Append(klines []*banexg.Kline, tfMSec int64, hasInfo bool) []*banexg.Kline

Append 添加连续K线到末尾,如果中间缺失,则返回缺失的后面K线,创建新Block

func (*KlineBlock) Descriptor deprecated added in v0.1.24

func (*KlineBlock) Descriptor() ([]byte, []int)

Deprecated: Use KlineBlock.ProtoReflect.Descriptor instead.

func (*KlineBlock) Dump added in v0.1.24

func (b *KlineBlock) Dump(file *os.File) *errs.Error

func (*KlineBlock) GetClose added in v0.1.24

func (x *KlineBlock) GetClose() []float64

func (*KlineBlock) GetEnd added in v0.1.24

func (x *KlineBlock) GetEnd() int64

func (*KlineBlock) GetExsId added in v0.1.24

func (x *KlineBlock) GetExsId() int32

func (*KlineBlock) GetHigh added in v0.1.24

func (x *KlineBlock) GetHigh() []float64

func (*KlineBlock) GetInfo added in v0.1.24

func (x *KlineBlock) GetInfo() []float64

func (*KlineBlock) GetLow added in v0.1.24

func (x *KlineBlock) GetLow() []float64

func (*KlineBlock) GetOpen added in v0.1.24

func (x *KlineBlock) GetOpen() []float64

func (*KlineBlock) GetStart added in v0.1.24

func (x *KlineBlock) GetStart() int64

func (*KlineBlock) GetTimeframe added in v0.1.24

func (x *KlineBlock) GetTimeframe() string

func (*KlineBlock) GetVolume added in v0.1.24

func (x *KlineBlock) GetVolume() []float64

func (*KlineBlock) ProtoMessage added in v0.1.24

func (*KlineBlock) ProtoMessage()

func (*KlineBlock) ProtoReflect added in v0.1.24

func (x *KlineBlock) ProtoReflect() protoreflect.Message

func (*KlineBlock) Reset added in v0.1.24

func (x *KlineBlock) Reset()

func (*KlineBlock) String added in v0.1.24

func (x *KlineBlock) String() string

type KlineSid

type KlineSid struct {
	banexg.Kline
	Sid int32
}

type KlineUn

type KlineUn struct {
	Sid       int32   `json:"sid"`
	StartMs   int64   `json:"start_ms"`
	StopMs    int64   `json:"stop_ms"`
	ExpireMs  int64   `json:"expire_ms"`
	Timeframe string  `json:"timeframe"`
	Open      float64 `json:"open"`
	High      float64 `json:"high"`
	Low       float64 `json:"low"`
	Close     float64 `json:"close"`
	Volume    float64 `json:"volume"`
	Quote     float64 `json:"quote"`
	BuyVolume float64 `json:"buy_volume"`
	TradeNum  int64   `json:"trade_num"`
}

type MSRange added in v0.3.1

type MSRange struct {
	Start int64
	Stop  int64
}

type PriceVol

type PriceVol struct {
	Sid   int32
	Price float64
	Vol   float64
}

type PubQueries added in v0.3.1

type PubQueries struct{}

func PubQ added in v0.3.1

func PubQ() *PubQueries

func (*PubQueries) AddAdjFactors added in v0.3.1

func (q *PubQueries) AddAdjFactors(ctx context.Context, arg []AddAdjFactorsParams) (int64, error)

func (*PubQueries) AddCalendars added in v0.3.1

func (q *PubQueries) AddCalendars(ctx context.Context, arg []AddCalendarsParams) (int64, error)

func (*PubQueries) AddInsKline added in v0.3.1

func (q *PubQueries) AddInsKline(ctx context.Context, arg AddInsKlineParams) (int64, error)

func (*PubQueries) AddSymbols added in v0.3.1

func (q *PubQueries) AddSymbols(ctx context.Context, arg []AddSymbolsParams) (int64, error)

func (*PubQueries) DelAdjFactors added in v0.3.1

func (q *PubQueries) DelAdjFactors(ctx context.Context, sid int32) error

func (*PubQueries) DelFactors added in v0.3.1

func (q *PubQueries) DelFactors(sid int32, startMS, endMS int64) *errs.Error

func (*PubQueries) DelInsKline added in v0.3.1

func (q *PubQueries) DelInsKline(ctx context.Context, id int64) error

func (*PubQueries) DelKInfo added in v0.3.1

func (q *PubQueries) DelKInfo(sid int32, timeFrame string) *errs.Error

func (*PubQueries) DelKLineUn added in v0.3.1

func (q *PubQueries) DelKLineUn(sid int32, timeFrame string) *errs.Error

func (*PubQueries) FindSRanges added in v0.3.1

func (q *PubQueries) FindSRanges(args FindSRangesArgs) ([]*SRange, int64, *errs.Error)

func (*PubQueries) GetAdjFactors added in v0.3.1

func (q *PubQueries) GetAdjFactors(ctx context.Context, sid int32) ([]*AdjFactor, error)

func (*PubQueries) GetAllInsKlines added in v0.3.1

func (q *PubQueries) GetAllInsKlines(ctx context.Context) ([]*InsKline, error)

func (*PubQueries) GetCalendars added in v0.3.1

func (q *PubQueries) GetCalendars(name string, startMS, stopMS int64) ([][2]int64, *errs.Error)

func (*PubQueries) GetInsKline added in v0.3.1

func (q *PubQueries) GetInsKline(ctx context.Context, sid int32, timeframe string) (*InsKline, error)

func (*PubQueries) GetKlineRange added in v0.3.1

func (q *PubQueries) GetKlineRange(sid int32, timeFrame string) (int64, int64)

func (*PubQueries) GetKlineRanges added in v0.3.1

func (q *PubQueries) GetKlineRanges(sidList []int32, timeFrame string) map[int32][2]int64

func (*PubQueries) ListExchanges added in v0.3.1

func (q *PubQueries) ListExchanges(ctx context.Context) ([]string, error)

func (*PubQueries) ListSRanges added in v0.3.1

func (q *PubQueries) ListSRanges(ctx context.Context, sid int32, table, timeframe string, startMs, stopMs int64) ([]*SRange, error)

func (*PubQueries) ListSRangesBySid added in v0.3.1

func (q *PubQueries) ListSRangesBySid(ctx context.Context, sid int32) ([]*SRange, error)

func (*PubQueries) ListSymbols added in v0.3.1

func (q *PubQueries) ListSymbols(ctx context.Context, exchange string) ([]*ExSymbol, error)

func (*PubQueries) LoadExgSymbols added in v0.3.1

func (q *PubQueries) LoadExgSymbols(exgName string) *errs.Error

func (*PubQueries) PurgeKlineUn added in v0.3.1

func (q *PubQueries) PurgeKlineUn() *errs.Error

func (*PubQueries) SetCalendars added in v0.3.1

func (q *PubQueries) SetCalendars(name string, items [][2]int64) *errs.Error

func (*PubQueries) SetListMS added in v0.3.1

func (q *PubQueries) SetListMS(ctx context.Context, arg SetListMSParams) error

func (*PubQueries) SetUnfinish added in v0.3.1

func (q *PubQueries) SetUnfinish(sid int32, tf string, endMS int64, bar *banexg.Kline) *errs.Error

func (*PubQueries) UpdateSRanges added in v0.3.1

func (q *PubQueries) UpdateSRanges(ctx context.Context, sid int32, table, timeframe string, startMs, stopMs int64, hasData bool) error

type Queries

type Queries struct {
	// contains filtered or unexported fields
}

func Conn

func Conn(ctx context.Context) (*Queries, *pgxpool.Conn, *errs.Error)

func New

func New(db DBTX) *Queries

func (*Queries) CalcKLineRanges

func (q *Queries) CalcKLineRanges(timeFrame string, sids map[int32]bool) (map[int32][2]int64, *errs.Error)

func (*Queries) DelKData added in v0.1.12

func (q *Queries) DelKData(exsList []*ExSymbol, tfList []string, startMS, endMS int64) *errs.Error

func (*Queries) DelKLines

func (q *Queries) DelKLines(timeFrame string) *errs.Error

DelKLines 通过重写表的方式删除不在sranges和exsymbol中的K线数据。 先计算涵盖的K线数量占比,不足50%时才执行删除,否则跳过。

func (*Queries) DownOHLCV2DB

func (q *Queries) DownOHLCV2DB(exchange banexg.BanExchange, exs *ExSymbol, timeFrame string, startMS, endMS int64,
	pBar *utils.PrgBar) (int, *errs.Error)

DownOHLCV2DB Download K-line to database. This method should be called in a transaction, otherwise there will be errors in querying and updating related data. 下载K线到数据库,应在事务中调用此方法,否则查询更新相关数据会有错误

func (*Queries) Exec

func (q *Queries) Exec(sql string, args ...interface{}) *errs.Error

func (*Queries) FixKInfoZeros added in v0.1.12

func (q *Queries) FixKInfoZeros() *errs.Error

FixKInfoZeros 修复kinfo表中start=0或stop=0的记录。通过查询实际K线数据范围来更新正确的start和stop值。

func (*Queries) GetAdjOHLCV

func (q *Queries) GetAdjOHLCV(adjs []*AdjInfo, timeFrame string, startMS, endMS int64, limit int, withUnFinish bool) ([]*banexg.Kline, *errs.Error)

GetAdjOHLCV Obtain K-line and weighted information (returns K-line that has not been weighted yet, needs to call ApplyAdj for weighted) 获取K线和复权信息(返回的是尚未复权的K线,需调用ApplyAdj复权)

func (*Queries) GetKlineNum added in v0.1.24

func (q *Queries) GetKlineNum(sid int32, timeFrame string, start, end int64) int

func (*Queries) GetOHLCV

func (q *Queries) GetOHLCV(exs *ExSymbol, timeFrame string, startMS, endMS int64, limit int, withUnFinish bool) ([]*AdjInfo, []*banexg.Kline, *errs.Error)

GetOHLCV Obtain the variety K-line, return the unweighted K-line and the weighting factor, and the caller can call ApplyAdj to re-weight 获取品种K线,返回未复权K线和复权因子,调用方可调用ApplyAdj进行复权

func (*Queries) InsertKLines

func (q *Queries) InsertKLines(timeFrame string, sid int32, arr []*banexg.Kline) (int64, *errs.Error)

InsertKLines Only batch insert K-lines. To update associated information simultaneously, please use InsertKLinesAuto 只批量插入K线,如需同时更新关联信息,请使用InsertKLinesAuto

func (*Queries) InsertKLinesAuto

func (q *Queries) InsertKLinesAuto(timeFrame string, exs *ExSymbol, arr []*banexg.Kline, aggBig bool) (int64, *errs.Error)

InsertKLinesAuto Insert K-line into the database and call updateKRange to update associated information Before calling this method, it is necessary to determine whether it already exists in the database through GetKlineRange to avoid duplicate insertions 插入K线到数据库,同时调用UpdateKRange更新关联信息 调用此方法前必须通过GetKlineRange自行判断数据库中是否已存在,避免重复插入

func (*Queries) NewTx

func (q *Queries) NewTx(ctx context.Context) (*Tx, *Queries, *errs.Error)

func (*Queries) QueryOHLCV

func (q *Queries) QueryOHLCV(exs *ExSymbol, timeframe string, startMs, endMs int64, limit int, withUnFinish bool) ([]*banexg.Kline, *errs.Error)

func (*Queries) QueryOHLCVBatch

func (q *Queries) QueryOHLCVBatch(exsMap map[int32]*ExSymbol, timeframe string, startMs, endMs int64, limit int, handle func(int32, []*banexg.Kline)) *errs.Error

func (*Queries) SetShow added in v0.2.23

func (q *Queries) SetShow(allow bool)

func (*Queries) UpdateKRange

func (q *Queries) UpdateKRange(exs *ExSymbol, timeFrame string, startMS, endMS int64, klines []*banexg.Kline, aggBig bool, skipHoles ...bool) *errs.Error

UpdateKRange 1. Update the effective range of the K-line 2. Search for holes and update Khole 3. Update continuous aggregation with larger cycles 1. 更新K线的有效区间 2. 搜索空洞,更新Khole 3. 更新更大周期的连续聚合

func (*Queries) UpdatePendingIns added in v0.1.5

func (q *Queries) UpdatePendingIns() *errs.Error

UpdatePendingIns Update unfinished insertion tasks and call them when the robot starts, 更新未完成的插入任务,在机器人启动时调用,

func (*Queries) WithTx

func (q *Queries) WithTx(tx pgx.Tx) *Queries

type SRange added in v0.3.1

type SRange struct {
	ID        int64  `json:"id"`
	Sid       int32  `json:"sid"`
	Table     string `json:"table"`
	Timeframe string `json:"timeframe"`
	StartMs   int64  `json:"start_ms"`
	StopMs    int64  `json:"stop_ms"`
	HasData   bool   `json:"has_data"`
}

type SetListMSParams

type SetListMSParams struct {
	ID       int32 `json:"id"`
	ListMs   int64 `json:"list_ms"`
	DelistMs int64 `json:"delist_ms"`
}

type SubQueries added in v0.2.23

type SubQueries struct {
	ShowLog bool
	// contains filtered or unexported fields
}

func (*SubQueries) Begin added in v0.2.23

func (q *SubQueries) Begin(ctx context.Context) (pgx.Tx, error)

func (*SubQueries) Commit added in v0.2.23

func (q *SubQueries) Commit(ctx context.Context) error

func (*SubQueries) Conn added in v0.2.23

func (q *SubQueries) Conn() *pgx.Conn

func (*SubQueries) CopyFrom added in v0.2.23

func (q *SubQueries) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error)

func (*SubQueries) Exec added in v0.2.23

func (q *SubQueries) Exec(ctx context.Context, sql string, params ...interface{}) (pgconn.CommandTag, error)

func (*SubQueries) LargeObjects added in v0.2.23

func (q *SubQueries) LargeObjects() pgx.LargeObjects

func (*SubQueries) Prepare added in v0.2.23

func (q *SubQueries) Prepare(ctx context.Context, name, sql string) (*pgconn.StatementDescription, error)

func (*SubQueries) Query added in v0.2.23

func (q *SubQueries) Query(ctx context.Context, sql string, params ...interface{}) (pgx.Rows, error)

func (*SubQueries) QueryRow added in v0.2.23

func (q *SubQueries) QueryRow(ctx context.Context, sql string, params ...interface{}) pgx.Row

func (*SubQueries) Rollback added in v0.2.23

func (q *SubQueries) Rollback(ctx context.Context) error

func (*SubQueries) SendBatch added in v0.2.23

func (q *SubQueries) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults

type TrackedDB added in v0.2.23

type TrackedDB struct {
	*sql.DB
	// contains filtered or unexported fields
}

TrackedDB wraps sql.DB to track connection hold time and detect timeouts

func BanPubConn added in v0.3.1

func BanPubConn(write bool) (*TrackedDB, *errs.Error)

func DbLite added in v0.1.12

func DbLite(src string, path string, write bool, timeoutMs int64) (*TrackedDB, *errs.Error)

func (*TrackedDB) Close added in v0.2.23

func (t *TrackedDB) Close() error

Close marks the TrackedDB as closed and removes it from tracking 注意:不关闭底层 sql.DB,因为它是共享的连接池

func (*TrackedDB) IsClosed added in v0.2.23

func (t *TrackedDB) IsClosed() bool

IsClosed returns whether the TrackedDB has been closed

type Tx

type Tx struct {
	// contains filtered or unexported fields
}

func (*Tx) Close

func (t *Tx) Close(ctx context.Context, commit bool) *errs.Error

type VerifyArgs added in v0.3.1

type VerifyArgs struct {
	Sids      []int32
	Tables    []string // 留空默认检查kline
	BatchSize int      // 分批读取时间戳数量,默认500000
}

func ParseVerifyArgs added in v0.3.1

func ParseVerifyArgs(args *config.CmdArgs) (*VerifyArgs, *errs.Error)

type VerifyIssue added in v0.3.1

type VerifyIssue struct {
	Type    string // gap_no_hole, duplicate, orphan
	StartMs int64
	StopMs  int64
	Count   int // 重复次数或gap中缺失bar数
}

VerifyIssue 单个具体问题

type VerifyTFResult added in v0.3.1

type VerifyTFResult struct {
	Sid       int32
	Symbol    string
	TimeFrame string
	Issues    []*VerifyIssue
}

VerifyTFResult 单个sid+timeframe的检查结果

func VerifyDataRanges added in v0.3.1

func VerifyDataRanges(args *VerifyArgs) ([]*VerifyTFResult, *errs.Error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL