helper

package
v0.11.2-dev Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 20, 2022 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation ΒΆ

Index ΒΆ

Constants ΒΆ

This section is empty.

Variables ΒΆ

View Source
var DateTimeFormats []DateTimeFormatItem
View Source
var ErrIgnoreAndContinue = errors.New("ignore and continue")
View Source
var HttpMinStatusRetryCode = http.StatusBadRequest

Functions ΒΆ

func AddMissingSlashToURL ΒΆ

func AddMissingSlashToURL(baseUrl *string)

func ConvertStringToTime ΒΆ

func ConvertStringToTime(timeString string) (t time.Time, err error)

func DecodeMapStruct ΒΆ

func DecodeMapStruct(input map[string]interface{}, result interface{}) error

mapstructure.Decode with time.Time and Iso8601Time support

func DecodeStruct ΒΆ

func DecodeStruct(output *viper.Viper, input interface{}, data map[string]interface{}, tag string) error

DecodeStruct validates `input` struct with `validator` and set it into viper `tag` represent the fields when setting config, and the fields with `tag` shall prevail. `input` must be a pointer

func EncodeStruct ΒΆ

func EncodeStruct(input *viper.Viper, output interface{}, tag string) error

EncodeStruct encodes struct from viper `tag` represent the fields when setting config, and the fields with `tag` shall prevail. `object` must be a pointer

func GetRawMessageArrayFromResponse ΒΆ

func GetRawMessageArrayFromResponse(res *http.Response) ([]json.RawMessage, error)

func GetRawMessageDirectFromResponse ΒΆ

func GetRawMessageDirectFromResponse(res *http.Response) ([]json.RawMessage, error)

func GetURIStringPointer ΒΆ

func GetURIStringPointer(baseUrl string, relativePath string, query url.Values) (*string, error)

func Iso8601TimeToTime ΒΆ

func Iso8601TimeToTime(iso8601Time *Iso8601Time) *time.Time

func NewDefaultTaskContext ΒΆ

func NewDefaultTaskContext(
	cfg *viper.Viper,
	logger core.Logger,
	db *gorm.DB,
	ctx context.Context,
	name string,
	subtasks map[string]bool,
	progress chan core.RunningProgress,
) core.TaskContext

func NewDefaultTaskLogger ΒΆ

func NewDefaultTaskLogger(log *logrus.Logger, prefix string, loggerPool map[string]*logrus.Logger) core.Logger

func NewStandaloneSubTaskContext ΒΆ

func NewStandaloneSubTaskContext(
	cfg *viper.Viper,
	logger core.Logger,
	db *gorm.DB,
	ctx context.Context,
	name string,
	data interface{},
) core.SubTaskContext

This returns a stand-alone core.SubTaskContext, not attached to any core.TaskContext. Use this if you need to run/debug a subtask without going through the usual workflow.

func RemoveStartingSlashFromPath ΒΆ

func RemoveStartingSlashFromPath(relativePath string) string

func UnmarshalResponse ΒΆ

func UnmarshalResponse(res *http.Response, v interface{}) error

Types ΒΆ

type ApiAsyncCallback ΒΆ

type ApiAsyncCallback func(*http.Response, error) error

type ApiAsyncClient ΒΆ

type ApiAsyncClient struct {
	*ApiClient
	// contains filtered or unexported fields
}

ApiAsyncClient is built on top of ApiClient, to provide a asynchronous semantic You may submit multiple requests at once by calling `GetAsync`, and those requests will be performed in parallel with rate-limit support

func CreateAsyncApiClient ΒΆ

func CreateAsyncApiClient(
	taskCtx core.TaskContext,
	apiClient *ApiClient,
	rateLimiter *ApiRateLimitCalculator,
) (*ApiAsyncClient, error)

func (*ApiAsyncClient) Add ΒΆ

func (apiClient *ApiAsyncClient) Add(delta int)

func (*ApiAsyncClient) DoAsync ΒΆ

func (apiClient *ApiAsyncClient) DoAsync(
	method string,
	path string,
	query url.Values,
	body interface{},
	header http.Header,
	handler ApiAsyncCallback,
	retry int,
) error

func (*ApiAsyncClient) Done ΒΆ

func (apiClient *ApiAsyncClient) Done()

func (*ApiAsyncClient) GetAsync ΒΆ

func (apiClient *ApiAsyncClient) GetAsync(
	path string,
	query url.Values,
	header http.Header,
	handler ApiAsyncCallback,
) error

Enqueue an api get request, the request may be sent sometime in future in parallel with other api requests

func (*ApiAsyncClient) GetMaxRetry ΒΆ

func (apiClient *ApiAsyncClient) GetMaxRetry() int

func (*ApiAsyncClient) GetQps ΒΆ

func (apiClient *ApiAsyncClient) GetQps() float64

func (*ApiAsyncClient) SetMaxRetry ΒΆ

func (apiClient *ApiAsyncClient) SetMaxRetry(
	maxRetry int,
)

func (*ApiAsyncClient) WaitAsync ΒΆ

func (apiClient *ApiAsyncClient) WaitAsync() error

Wait until all async requests were done

type ApiClient ΒΆ

type ApiClient struct {
	// contains filtered or unexported fields
}

ApiClient is designed for simple api requests

func NewApiClient ΒΆ

func NewApiClient(
	endpoint string,
	headers map[string]string,
	timeout time.Duration,
	proxy string,
	ctx context.Context,
) (*ApiClient, error)

func (*ApiClient) Do ΒΆ

func (apiClient *ApiClient) Do(
	method string,
	path string,
	query url.Values,
	body interface{},
	headers http.Header,
) (*http.Response, error)

func (*ApiClient) Get ΒΆ

func (apiClient *ApiClient) Get(
	path string,
	query url.Values,
	headers http.Header,
) (*http.Response, error)

func (*ApiClient) GetEndpoint ΒΆ

func (apiClient *ApiClient) GetEndpoint() string

func (*ApiClient) GetHeaders ΒΆ

func (apiClient *ApiClient) GetHeaders() map[string]string

func (*ApiClient) Post ΒΆ

func (apiClient *ApiClient) Post(
	path string,
	query url.Values,
	body interface{},
	headers http.Header,
) (*http.Response, error)

func (*ApiClient) SetAfterFunction ΒΆ

func (apiClient *ApiClient) SetAfterFunction(callback ApiClientAfterResponse)

func (*ApiClient) SetBeforeFunction ΒΆ

func (apiClient *ApiClient) SetBeforeFunction(callback ApiClientBeforeRequest)

func (*ApiClient) SetContext ΒΆ

func (apiClient *ApiClient) SetContext(ctx context.Context)

func (*ApiClient) SetEndpoint ΒΆ

func (apiClient *ApiClient) SetEndpoint(endpoint string)

func (*ApiClient) SetHeaders ΒΆ

func (apiClient *ApiClient) SetHeaders(headers map[string]string)

func (*ApiClient) SetLogger ΒΆ

func (apiClient *ApiClient) SetLogger(logger core.Logger)

func (*ApiClient) SetProxy ΒΆ

func (apiClient *ApiClient) SetProxy(proxyUrl string) error

func (*ApiClient) SetTimeout ΒΆ

func (ApiClient *ApiClient) SetTimeout(timeout time.Duration)

func (*ApiClient) Setup ΒΆ

func (apiClient *ApiClient) Setup(
	endpoint string,
	headers map[string]string,
	timeout time.Duration,

)

type ApiClientAfterResponse ΒΆ

type ApiClientAfterResponse func(res *http.Response) error

type ApiClientBeforeRequest ΒΆ

type ApiClientBeforeRequest func(req *http.Request) error

type ApiCollector ΒΆ

type ApiCollector struct {
	*RawDataSubTask
	// contains filtered or unexported fields
}

func NewApiCollector ΒΆ

func NewApiCollector(args ApiCollectorArgs) (*ApiCollector, error)

NewApiCollector allocates a new ApiCollector with the given args. ApiCollector can help you collecting data from some api with ease, pass in a AsyncApiClient and tell it which part of response you want to save, ApiCollector will collect them from remote server and store them into database.

func (*ApiCollector) Execute ΒΆ

func (collector *ApiCollector) Execute() error

Start collection

func (*ApiCollector) SetAfterResponse ΒΆ

func (collector *ApiCollector) SetAfterResponse(f ApiClientAfterResponse)

type ApiCollectorArgs ΒΆ

type ApiCollectorArgs struct {
	RawDataSubTaskArgs
	/*
		url may use arbitrary variables from different source in any order, we need GoTemplate to allow more
		flexible for all kinds of possibility.
		Pager contains information for a particular page, calculated by ApiCollector, and will be passed into
		GoTemplate to generate a url for that page.
		We want to do page-fetching in ApiCollector, because the logic are highly similar, by doing so, we can
		avoid duplicate logic for every tasks, and when we have a better idea like improving performance, we can
		do it in one place
	*/
	UrlTemplate string `comment:"GoTemplate for API url"`
	// (Optional) Return query string for request, or you can plug them into UrlTemplate directly
	Query func(reqData *RequestData) (url.Values, error) `comment:"Extra query string when requesting API, like 'Since' option for jira issues collection"`
	// Some api might do pagination by http headers
	Header      func(reqData *RequestData) (http.Header, error)
	PageSize    int
	Incremental bool `comment:"Indicate this is a incremental collection, so the existing data won't get flushed"`
	ApiClient   RateLimitedApiClient
	/*
		Sometimes, we need to collect data based on previous collected data, like jira changelog, it requires
		issue_id as part of the url.
		We can mimic `stdin` design, to accept a `Input` function which produces a `Iterator`, collector
		should iterate all records, and do data-fetching for each on, either in parallel or sequential order
		UrlTemplate: "api/3/issue/{{ Input.ID }}/changelog"
	*/
	Input          Iterator
	InputRateLimit int
	/*
		For api endpoint that returns number of total pages, ApiCollector can collect pages in parallel with ease,
		or other techniques are required if this information was missing.
	*/
	GetTotalPages  func(res *http.Response, args *ApiCollectorArgs) (int, error)
	Concurrency    int
	ResponseParser func(res *http.Response) ([]json.RawMessage, error)
	AfterResponse  ApiClientAfterResponse
}

type ApiExtractor ΒΆ

type ApiExtractor struct {
	*RawDataSubTask
	// contains filtered or unexported fields
}

ApiExtractor helps you extract Raw Data from api responses to Tool Layer Data It reads rows from specified raw data table, and feed it into `Extract` handler you can return arbitrary tool layer entities in this handler, ApiExtractor would first delete old data by their RawDataOrigin information, and then perform a batch insertion for you.

func NewApiExtractor ΒΆ

func NewApiExtractor(args ApiExtractorArgs) (*ApiExtractor, error)

func (*ApiExtractor) Execute ΒΆ

func (extractor *ApiExtractor) Execute() error

type ApiExtractorArgs ΒΆ

type ApiExtractorArgs struct {
	RawDataSubTaskArgs
	Params    interface{}
	Extract   RawDataExtractor
	BatchSize int
}

type ApiRateLimitCalculator ΒΆ

type ApiRateLimitCalculator struct {
	UserRateLimitPerHour   int
	GlobalRateLimitPerHour int
	MaxRetry               int
	Method                 string
	ApiPath                string
	DynamicRateLimit       func(res *http.Response) (int, time.Duration, error)
}

A helper to calculate api rate limit dynamically, assuming api returning remaining/resettime information

func (*ApiRateLimitCalculator) Calculate ΒΆ

func (c *ApiRateLimitCalculator) Calculate(apiClient *ApiClient) (int, time.Duration, error)

type AsyncResponseHandler ΒΆ

type AsyncResponseHandler func(res *http.Response) error

type BatchSave ΒΆ

type BatchSave struct {
	// contains filtered or unexported fields
}

Insert data by batch can increase database performance drastically, this class aim to make batch-save easier, It takes care the database operation for specified `slotType`, records got saved into database whenever cache hits The `size` limit, remember to call the `Close` method to save the last batch

func NewBatchSave ΒΆ

func NewBatchSave(db *gorm.DB, slotType reflect.Type, size int) (*BatchSave, error)

func (*BatchSave) Add ΒΆ

func (c *BatchSave) Add(slot interface{}) error

func (*BatchSave) Close ΒΆ

func (c *BatchSave) Close() error

func (*BatchSave) Flush ΒΆ

func (c *BatchSave) Flush() error

type BatchSaveDivider ΒΆ

type BatchSaveDivider struct {
	// contains filtered or unexported fields
}

Holds a map of BatchInsert, return `*BatchInsert` for a specific records, so caller can do batch operation for it

func NewBatchSaveDivider ΒΆ

func NewBatchSaveDivider(db *gorm.DB, batchSize int) *BatchSaveDivider

Return a new BatchInsertDivider instance

func (*BatchSaveDivider) Close ΒΆ

func (d *BatchSaveDivider) Close() error

close all batches so all rest records get saved into db as well

func (*BatchSaveDivider) ForType ΒΆ

func (d *BatchSaveDivider) ForType(rowType reflect.Type) (*BatchSave, error)

return *BatchSave for specified type

func (*BatchSaveDivider) OnNewBatchSave ΒΆ

func (d *BatchSaveDivider) OnNewBatchSave(cb OnNewBatchSave)

type CSTTime ΒΆ

type CSTTime time.Time

func (*CSTTime) Scan ΒΆ

func (jt *CSTTime) Scan(v interface{}) error

func (*CSTTime) UnmarshalJSON ΒΆ

func (jt *CSTTime) UnmarshalJSON(b []byte) error

func (CSTTime) Value ΒΆ

func (jt CSTTime) Value() (driver.Value, error)

type CursorIterator ΒΆ

type CursorIterator struct {
	// contains filtered or unexported fields
}

func NewCursorIterator ΒΆ

func NewCursorIterator(db *gorm.DB, cursor *sql.Rows, elemType reflect.Type) (*CursorIterator, error)

func (*CursorIterator) Close ΒΆ

func (c *CursorIterator) Close() error

func (*CursorIterator) Fetch ΒΆ

func (c *CursorIterator) Fetch() (interface{}, error)

func (*CursorIterator) HasNext ΒΆ

func (c *CursorIterator) HasNext() bool

type DataConvertHandler ΒΆ

type DataConvertHandler func(row interface{}) ([]interface{}, error)

Accept row from source cursor, return list of entities that need to be stored

type DataConverter ΒΆ

type DataConverter struct {
	*RawDataSubTask
	// contains filtered or unexported fields
}

DataConverter helps you convert Data from Tool Layer Tables to Domain Layer Tables It reads rows from specified Iterator, and feed it into `Convter` handler you can return arbitrary domain layer entities from this handler, ApiConverter would first delete old data by their RawDataOrigin information, and then perform a batch save operation for you.

func NewDataConverter ΒΆ

func NewDataConverter(args DataConverterArgs) (*DataConverter, error)

func (*DataConverter) Execute ΒΆ

func (converter *DataConverter) Execute() error

type DataConverterArgs ΒΆ

type DataConverterArgs struct {
	RawDataSubTaskArgs
	// Domain layer entity Id prefix, i.e. `jira:JiraIssue:1`, `github:GithubIssue`
	InputRowType reflect.Type
	// Cursor to a set of Tool Layer Records
	Input     *sql.Rows
	Convert   DataConvertHandler
	BatchSize int
}

type DateIterator ΒΆ

type DateIterator struct {
	Days    int
	Current int
	// contains filtered or unexported fields
}

func NewDateIterator ΒΆ

func NewDateIterator(days int) (*DateIterator, error)

func (*DateIterator) Close ΒΆ

func (c *DateIterator) Close() error

func (*DateIterator) Fetch ΒΆ

func (c *DateIterator) Fetch() (interface{}, error)

func (*DateIterator) HasNext ΒΆ

func (c *DateIterator) HasNext() bool

type DatePair ΒΆ

type DatePair struct {
	PairStartTime time.Time
	PairEndTime   time.Time
}

type DateTimeFormatItem ΒΆ

type DateTimeFormatItem struct {
	Matcher *regexp.Regexp
	Format  string
}

TODO: move this to helper

type DefaultLogger ΒΆ

type DefaultLogger struct {
	// contains filtered or unexported fields
}

func NewDefaultLogger ΒΆ

func NewDefaultLogger(log *logrus.Logger, prefix string, loggerPool map[string]*logrus.Logger) *DefaultLogger

func (*DefaultLogger) Debug ΒΆ

func (l *DefaultLogger) Debug(format string, a ...interface{})

func (*DefaultLogger) Error ΒΆ

func (l *DefaultLogger) Error(format string, a ...interface{})

func (*DefaultLogger) Info ΒΆ

func (l *DefaultLogger) Info(format string, a ...interface{})

func (*DefaultLogger) IsLevelEnabled ΒΆ

func (l *DefaultLogger) IsLevelEnabled(level core.LogLevel) bool

func (*DefaultLogger) Log ΒΆ

func (l *DefaultLogger) Log(level core.LogLevel, format string, a ...interface{})

func (*DefaultLogger) Nested ΒΆ

func (l *DefaultLogger) Nested(name string) core.Logger

bind two writer to logger

func (*DefaultLogger) Printf ΒΆ

func (l *DefaultLogger) Printf(format string, a ...interface{})

func (*DefaultLogger) Warn ΒΆ

func (l *DefaultLogger) Warn(format string, a ...interface{})

type DefaultSubTaskContext ΒΆ

type DefaultSubTaskContext struct {
	// contains filtered or unexported fields
}

SubTaskContext default implementation

func (DefaultSubTaskContext) GetConfig ΒΆ

func (c DefaultSubTaskContext) GetConfig(name string) string

func (DefaultSubTaskContext) GetContext ΒΆ

func (c DefaultSubTaskContext) GetContext() context.Context

func (DefaultSubTaskContext) GetData ΒΆ

func (c DefaultSubTaskContext) GetData() interface{}

func (DefaultSubTaskContext) GetDb ΒΆ

func (c DefaultSubTaskContext) GetDb() *gorm.DB

func (DefaultSubTaskContext) GetLogger ΒΆ

func (c DefaultSubTaskContext) GetLogger() core.Logger

func (DefaultSubTaskContext) GetName ΒΆ

func (c DefaultSubTaskContext) GetName() string

func (*DefaultSubTaskContext) IncProgress ΒΆ

func (c *DefaultSubTaskContext) IncProgress(quantity int)

func (*DefaultSubTaskContext) SetProgress ΒΆ

func (c *DefaultSubTaskContext) SetProgress(current int, total int)

func (*DefaultSubTaskContext) TaskContext ΒΆ

func (c *DefaultSubTaskContext) TaskContext() core.TaskContext

type DefaultTaskContext ΒΆ

type DefaultTaskContext struct {
	// contains filtered or unexported fields
}

TaskContext default implementation

func (DefaultTaskContext) GetConfig ΒΆ

func (c DefaultTaskContext) GetConfig(name string) string

func (DefaultTaskContext) GetContext ΒΆ

func (c DefaultTaskContext) GetContext() context.Context

func (DefaultTaskContext) GetData ΒΆ

func (c DefaultTaskContext) GetData() interface{}

func (DefaultTaskContext) GetDb ΒΆ

func (c DefaultTaskContext) GetDb() *gorm.DB

func (DefaultTaskContext) GetLogger ΒΆ

func (c DefaultTaskContext) GetLogger() core.Logger

func (DefaultTaskContext) GetName ΒΆ

func (c DefaultTaskContext) GetName() string

func (*DefaultTaskContext) IncProgress ΒΆ

func (c *DefaultTaskContext) IncProgress(quantity int)

func (*DefaultTaskContext) SetData ΒΆ

func (c *DefaultTaskContext) SetData(data interface{})

func (*DefaultTaskContext) SetProgress ΒΆ

func (c *DefaultTaskContext) SetProgress(current int, total int)

func (*DefaultTaskContext) SubTaskContext ΒΆ

func (c *DefaultTaskContext) SubTaskContext(subtask string) (core.SubTaskContext, error)

type Iso8601Time ΒΆ

type Iso8601Time struct {
	// contains filtered or unexported fields
}

type Iso8601Time time.Time

func (Iso8601Time) MarshalJSON ΒΆ

func (jt Iso8601Time) MarshalJSON() ([]byte, error)

func (*Iso8601Time) String ΒΆ

func (jt *Iso8601Time) String() string

func (*Iso8601Time) ToNullableTime ΒΆ

func (jt *Iso8601Time) ToNullableTime() *time.Time

func (*Iso8601Time) ToTime ΒΆ

func (jt *Iso8601Time) ToTime() time.Time

func (*Iso8601Time) UnmarshalJSON ΒΆ

func (jt *Iso8601Time) UnmarshalJSON(b []byte) error

type Iterator ΒΆ

type Iterator interface {
	HasNext() bool
	Fetch() (interface{}, error)
	Close() error
}

type OnNewBatchSave ΒΆ

type OnNewBatchSave func(rowType reflect.Type) error

type Pager ΒΆ

type Pager struct {
	Page int
	Skip int
	Size int
}

type RateLimitedApiClient ΒΆ

type RateLimitedApiClient interface {
	GetAsync(path string, query url.Values, header http.Header, handler ApiAsyncCallback) error
	WaitAsync() error
	GetQps() float64
	Add(delta int)
	Done()
	SetAfterFunction(callback ApiClientAfterResponse)
}

type RawData ΒΆ

type RawData struct {
	ID        uint64 `gorm:"primaryKey"`
	Params    string `gorm:"type:varchar(255);index"`
	Data      []byte
	Url       string
	Input     datatypes.JSON
	CreatedAt time.Time
}

Table structure for raw data storage

type RawDataExtractor ΒΆ

type RawDataExtractor func(row *RawData) ([]interface{}, error)

Accept raw json body and params, return list of entities that need to be stored

type RawDataSubTask ΒΆ

type RawDataSubTask struct {
	// contains filtered or unexported fields
}

Common features for raw data sub tasks

type RawDataSubTaskArgs ΒΆ

type RawDataSubTaskArgs struct {
	Ctx core.SubTaskContext

	//	Table store raw data
	Table string `comment:"Raw data table name"`

	//	This struct will be JSONEncoded and stored into database along with raw data itself, to identity minimal
	//	set of data to be process, for example, we process JiraIssues by Board
	Params interface{} `comment:"To identify a set of records with same UrlTemplate, i.e. {ConnectionId, BoardId} for jira entities"`
}

type RequestData ΒΆ

type RequestData struct {
	Pager  *Pager
	Params interface{}
	Input  interface{}
}

type WorkerScheduler ΒΆ

type WorkerScheduler struct {
	// contains filtered or unexported fields
}

func NewWorkerScheduler ΒΆ

func NewWorkerScheduler(workerNum int, maxWork int, maxWorkDuration time.Duration, ctx context.Context, maxRetry int) (*WorkerScheduler, error)

NewWorkerScheduler εˆ›ε»ΊδΈ€δΈͺεΉΆθ‘Œζ‰§θ‘Œηš„θ°ƒεΊ¦ε™¨οΌŒζŽ§εˆΆζœ€ε€§θΏθ‘Œζ•°ε’Œζ―η§’ζœ€ε€§θΏθ‘Œζ•°ι‡ NewWorkerScheduler Create a parallel scheduler to control the maximum number of runs and the maximum number of runs per second 注意: taskζ‰§θ‘Œζ˜―ζ— εΊηš„ Warning: task execution is out of order

func (*WorkerScheduler) Add ΒΆ

func (s *WorkerScheduler) Add(delta int)

func (*WorkerScheduler) Done ΒΆ

func (s *WorkerScheduler) Done()

func (*WorkerScheduler) Release ΒΆ

func (s *WorkerScheduler) Release()

func (*WorkerScheduler) Submit ΒΆ

func (s *WorkerScheduler) Submit(task func() error, pool ...*ants.Pool) error

func (*WorkerScheduler) WaitUntilFinish ΒΆ

func (s *WorkerScheduler) WaitUntilFinish() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL