task

package
v0.0.0-...-2e8b363 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 30, 2025 License: MIT Imports: 18 Imported by: 0

Documentation

Overview

internal/task/fetch_goods_details_headless.go

internal/task/process_goods_details_headless.go

internal/task/remedy_video_details_headless.go

Index

Constants

View Source
const (
	FetchVideoRank              = "fetch:video_rank"
	ProcessVideoRank            = "process:video_rank"
	FetchVideoTrend             = "fetch:video_trend"
	ProcessVideoTrend           = "process:video_trend"
	FetchVideoSummary           = "fetch:video_summary"
	ProcessVideoSummary         = "process:video_summary"
	FetchVideoDetailsHeadless   = "fetch:video_details_headless"
	ProcessVideoDetailsHeadless = "process:video_details_headless" // <-- 新增此行
	RemedyVideoDetailsHeadless  = "remedy:video_details_headless"
	FetchGoodsDetailsHeadless   = "fetch:goods_details_headless"   // <--- 新增常量
	ProcessGoodsDetailsHeadless = "process:goods_details_headless" // <--- 新增常量
)

Variables

View Source
var SchedulerProviderSet = wire.NewSet(ProvideSchedulerWithTasks)

SchedulerProviderSet 暴露 ProvideSchedulerWithTasks 用于依赖注入。

Functions

func NewLoadCookiesAction

func NewLoadCookiesAction(cookiePath string, logger *log.Helper) chromedp.Action

NewLoadCookiesAction 【最终版】完全复刻您旧代码中成功的逻辑。 它接收一个文件路径,并返回一个Action。

func ProvideSchedulerWithTasks

func ProvideSchedulerWithTasks(
	fm *fetcher.FetcherManager,
	httpUC *fetcher.HttpUsecase,
	headlessUC *fetcher.HeadlessUsecase,
	etlUC *etl.ETLUsecase,
	logger log.Logger,
) *scheduler.Scheduler

ProvideSchedulerWithTasks 创建一个 Scheduler, 注册所有任务定义 与它们的策略,并返回未运行的调度器实例。

func WrapContextForDebug

func WrapContextForDebug(ctx context.Context, logger *log.Helper, name string) context.Context

WrapContextForDebug 包装一个现有的 context,用于追踪其取消事件。 当被包装的 context 的 Done() 通道关闭时,它会向日志中打印一条带有堆栈跟踪的警告。

Types

type FetchGoodsDetailsHeadlessTask

type FetchGoodsDetailsHeadlessTask struct {
	// contains filtered or unexported fields
}

func NewFetchGoodsDetailsHeadlessTask

func NewFetchGoodsDetailsHeadlessTask(
	logger log.Logger,
	huc *fetcher.HeadlessUsecase,
) *FetchGoodsDetailsHeadlessTask

func (*FetchGoodsDetailsHeadlessTask) Run

func (t *FetchGoodsDetailsHeadlessTask) Run(ctx context.Context, dataSourceName string, params ...interface{}) error

Run 遵循 TaskCallable 签名

type FetchVideoDetailsHeadlessTask

type FetchVideoDetailsHeadlessTask struct {
	// contains filtered or unexported fields
}

FetchVideoDetailsHeadlessTask 重新定义的任务结构体,只包含必要字段

func NewFetchVideoDetailsHeadlessTask

func NewFetchVideoDetailsHeadlessTask(
	logger log.Logger,
	huc *fetcher.HeadlessUsecase,
) *FetchVideoDetailsHeadlessTask

NewFetchVideoDetailsHeadlessTask 重写构造函数,初始化所有字段

func (*FetchVideoDetailsHeadlessTask) Run

func (t *FetchVideoDetailsHeadlessTask) Run(ctx context.Context, dataSourceName string, params ...interface{}) error

Run 方法遵循最终的、简化的 TaskCallable 签名。 它从 params 中解析出具体指令并执行。

type FetchVideoRankTask

type FetchVideoRankTask struct {
	// contains filtered or unexported fields
}

func NewFetchVideoRankTask

func NewFetchVideoRankTask(logger log.Logger, httpUC *fetcher.HttpUsecase) *FetchVideoRankTask

func (*FetchVideoRankTask) Run

func (t *FetchVideoRankTask) Run(ctx context.Context, dataSourceName string, params ...interface{}) error

todo 当前不能执行完一次就休息,理论上,应该可以每次启动任务时控制一些参数,决定采取多少的,让整个任务停下来。 INFO ts=2025-07-30T11:42:36+08:00 caller=task/fetch_video_rank.go:70 module=task.fetch_video_rank msg=Successfully completed video rank fetch task on datasource 'feigua_http_1' INFO ts=2025-07-30T11:42:36+08:00 caller=scheduler/scheduler.go:174 module=scheduler msg=在数据源 [feigua_http_1] 上成功完成一轮任务 'fetch:video_rank'。 INFO ts=2025-07-30T11:42:36+08:00 caller=scheduler/scheduler.go:194 module=scheduler msg=战役间休整:根据调度策略,长休眠 2m39s... 这里应该停下来了。

type FetchVideoSummaryTask

type FetchVideoSummaryTask struct {
	// contains filtered or unexported fields
}

func NewFetchVideoSummaryTask

func NewFetchVideoSummaryTask(fetcherUC *fetcher.HttpUsecase, videoRepo data.VideoRepo, logger log.Logger) *FetchVideoSummaryTask

func (*FetchVideoSummaryTask) Name

func (t *FetchVideoSummaryTask) Name() string

func (*FetchVideoSummaryTask) Run

func (t *FetchVideoSummaryTask) Run(ctx context.Context, args ...string) error

type FetchVideoTrendTask

type FetchVideoTrendTask struct {
	// contains filtered or unexported fields
}

FetchVideoTrendTask 负责每日拉取视频趋势数据的任务

func NewFetchVideoTrendTask

func NewFetchVideoTrendTask(fetcherUC *fetcher.HttpUsecase, videoRepo data.VideoRepo, logger log.Logger) *FetchVideoTrendTask

NewFetchVideoTrendTask 构造任务实例

func (*FetchVideoTrendTask) Name

func (t *FetchVideoTrendTask) Name() string

func (*FetchVideoTrendTask) Run

func (t *FetchVideoTrendTask) Run(ctx context.Context, args ...string) error

Run 任务的执行入口

type ProcessGoodsDetailsHeadlessTask

type ProcessGoodsDetailsHeadlessTask struct {
	// contains filtered or unexported fields
}

ProcessGoodsDetailsHeadlessTask 负责处理所有由无头浏览器采集的商品详情数据

func NewProcessGoodsDetailsHeadlessTask

func NewProcessGoodsDetailsHeadlessTask(etl *etl.ETLUsecase, logger log.Logger) *ProcessGoodsDetailsHeadlessTask

func (*ProcessGoodsDetailsHeadlessTask) Run

func (t *ProcessGoodsDetailsHeadlessTask) Run(ctx context.Context, _ string, _ ...interface{}) error

type ProcessVideoDetailHeadlessTask

type ProcessVideoDetailHeadlessTask struct {
	// contains filtered or unexported fields
}

ProcessVideoDetailHeadlessTask 负责处理所有由无头浏览器采集的详情数据

func NewProcessVideoDetailHeadlessTask

func NewProcessVideoDetailHeadlessTask(etl *etl.ETLUsecase, logger log.Logger) *ProcessVideoDetailHeadlessTask

NewProcessVideoDetailHeadlessTask .

func (*ProcessVideoDetailHeadlessTask) Run

func (t *ProcessVideoDetailHeadlessTask) Run(ctx context.Context, dataSourceName string, params ...interface{}) error

type ProcessVideoRankTask

type ProcessVideoRankTask struct {
	// contains filtered or unexported fields
}

func NewProcessVideoRankTask

func NewProcessVideoRankTask(etl *etl.ETLUsecase) *ProcessVideoRankTask

func (*ProcessVideoRankTask) Run

func (t *ProcessVideoRankTask) Run(ctx context.Context, dataSourceName string, params ...interface{}) error

type ProcessVideoSummaryTask

type ProcessVideoSummaryTask struct {
	// contains filtered or unexported fields
}

func NewProcessVideoSummaryTask

func NewProcessVideoSummaryTask(etl *etl.ETLUsecase) *ProcessVideoSummaryTask

func (*ProcessVideoSummaryTask) Run

func (t *ProcessVideoSummaryTask) Run(ctx context.Context, dataSourceName string, params ...interface{}) error

type ProcessVideoTrendTask

type ProcessVideoTrendTask struct {
	// contains filtered or unexported fields
}

func NewProcessVideoTrendTask

func NewProcessVideoTrendTask(etl *etl.ETLUsecase) *ProcessVideoTrendTask

func (*ProcessVideoTrendTask) Name

func (t *ProcessVideoTrendTask) Name() string

func (*ProcessVideoTrendTask) Run

func (t *ProcessVideoTrendTask) Run(ctx context.Context, args ...string) error

type RemedyVideoDetailsHeadlessTask

type RemedyVideoDetailsHeadlessTask struct {
	// contains filtered or unexported fields
}

RemedyVideoDetailsHeadlessTask 负责修复部分采集失败的视频详情

func NewRemedyVideoDetailsHeadlessTask

func NewRemedyVideoDetailsHeadlessTask(
	logger log.Logger,
	huc *fetcher.HeadlessUsecase,
) *RemedyVideoDetailsHeadlessTask

NewRemedyVideoDetailsHeadlessTask 创建一个新的修复任务实例

func (*RemedyVideoDetailsHeadlessTask) Run

func (t *RemedyVideoDetailsHeadlessTask) Run(ctx context.Context, dataSourceName string, params ...interface{}) error

Run 执行任务的核心逻辑

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL