updater

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 2, 2025 License: MIT Imports: 17 Imported by: 0

README

Updater

execute function or embed sql script.

How dose it work

standalone
  1. fetch updater execute record
  2. compare and execute: skip failure or cannot retry
  3. create or update record
cluster

every node:

  1. try get distribute locker
  2. fetch updater execute record
  3. compare and execute: skip failure or cannot retry
  4. create or update record
  5. release lock

Usage

package main

import (
	"context"
	"embed"

	"github.com/junqirao/gocomponents/dkv"
	"github.com/junqirao/gocomponents/updater"
	"github.com/gogf/gf/v2/frame/g"
)

//go:embed somewhere
var EmbeddedFS embed.FS

func main() {
	ctx := context.Background()
	db, err := dkv.NewDB(ctx)
	if err != nil {
		return
	}

	// define functions
	// generate sql script function by read embed.FS sql/*.sql
	// create function named as sql_{script_name}
	fis := updater.SQLFuncFromEmbedFS(ctx, g.DB(), EmbeddedFS)
	// name  : unique name with type
	// fn    : function
	// must  : if must and error caused by this function updater will terminate
	// retry : function can retry
	// typ   : specific type, default 0=raw
	fis = append(fis, updater.NewFunc("my_update_func", func(ctx context.Context) (err error) {
		// do something
		return nil
	}, true, true))

	// use distribute kv database as record store database
	adaptor := updater.NewKVDatabaseAdaptor(db)
	// or use mysql as record store database
	// adaptor :=updater.NewMysqlAdaptor(ctx, g.DB(), g.Log())

	// execute functions
	// standalone, no distribute lock
	// err = updater.Update2Latest(ctx, updater.NewKVDatabaseAdaptor(db), fis...)
	// cluster with distribute lock
	err = updater.ConcurrencyUpdate2Latest(ctx, adaptor, db, fis...)
	if err != nil {
		return
	}
}

Documentation

Index

Constants

View Source
const (
	FuncTypeRaw = 0
	FuncTypeSql = 1
)
View Source
const (
	ExecStatusFailed  = 0
	ExecStatusSuccess = 1
)

Variables

View Source
var (
	ErrExecuteTimeout = errors.New("execute timeout")
)

Functions

func ConcurrencyUpdate2Latest

func ConcurrencyUpdate2Latest(ctx context.Context, adaptor RecordAdaptor, mu sync.Locker, functions ...*FuncInfo) (err error)

ConcurrencyUpdate2Latest use distributed lock to update latest. see kvdb.NewMutex, if you use sync.Locker locally, it may cause consistency problems in distributed environment.

func Update2Latest

func Update2Latest(ctx context.Context, adaptor RecordAdaptor, functions ...*FuncInfo) (err error)

Types

type FN

type FN = func(ctx context.Context) (err error)

FN ...

type FuncConfig

type FuncConfig struct {
	// contains filtered or unexported fields
}

FuncConfig function config

func NewFuncConfig

func NewFuncConfig() *FuncConfig

func (*FuncConfig) Must

func (c *FuncConfig) Must(b ...bool) *FuncConfig

func (*FuncConfig) Retry

func (c *FuncConfig) Retry(b ...bool) *FuncConfig

func (*FuncConfig) Timeout

func (c *FuncConfig) Timeout(d time.Duration) *FuncConfig

type FuncInfo

type FuncInfo struct {
	FuncConfig
	Name string
	Type int
	FN   FN
}

FuncInfo ...

func NewFunc

func NewFunc(name string, fn FN, cfg *FuncConfig, typ ...int) *FuncInfo

NewFunc constructor

func SQLFuncFromEmbedFS

func SQLFuncFromEmbedFS(ctx context.Context, db gdb.DB, fs embed.FS, inline ...bool) (fis []*FuncInfo)

SQLFuncFromEmbedFS read sql/*.sql from embed.FS, convert *.sql to FuncInfo named of sql_*

func (FuncInfo) Exec

func (i FuncInfo) Exec(ctx context.Context) (cost int64, err error)

type KVDatabaseAdaptor

type KVDatabaseAdaptor struct {
	// contains filtered or unexported fields
}

func (KVDatabaseAdaptor) Load

func (k KVDatabaseAdaptor) Load(ctx context.Context, params *RecordQueryParams) (res *RecordQueryResult, err error)

func (KVDatabaseAdaptor) Store

func (k KVDatabaseAdaptor) Store(ctx context.Context, record *Record) (err error)

type MysqlAdaptor

type MysqlAdaptor struct {
	// contains filtered or unexported fields
}

func NewMysqlAdaptor

func NewMysqlAdaptor(ctx context.Context, db gdb.DB, logger *glog.Logger, tableName ...string) (m *MysqlAdaptor, err error)

func (*MysqlAdaptor) Load

func (m *MysqlAdaptor) Load(ctx context.Context, params *RecordQueryParams) (res *RecordQueryResult, err error)

func (*MysqlAdaptor) Store

func (m *MysqlAdaptor) Store(ctx context.Context, record *Record) (err error)

type Record

type Record struct {
	Name      string `json:"name"`       // unique name+type
	Type      int    `json:"type"`       // 0: function, 1: sql
	Status    int    `json:"status"`     // 0: failed, 1: success
	Cost      int64  `json:"cost"`       // cost time in ms
	CreatedAt string `json:"created_at"` // created_at
}

type RecordAdaptor

type RecordAdaptor interface {
	Store(ctx context.Context, record *Record) (err error)
	Load(ctx context.Context, params *RecordQueryParams) (res *RecordQueryResult, err error)
}

func NewKVDatabaseAdaptor

func NewKVDatabaseAdaptor(db ...kvdb.Database) (a RecordAdaptor)

type RecordQueryParams

type RecordQueryParams struct {
	Name *string
	Type *int
}

type RecordQueryResult

type RecordQueryResult struct {
	Records []*Record `json:"records"`
	Total   int       `json:"total"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL