Documentation
¶
Index ¶
- func NewRdbClient(config *DistributedWorkerConfig) *redis.Client
- func NewRdbClusterCLient(config *WorkerConfigWithRdbCluster) *redis.ClusterClient
- func NewRdbConfig(config *DistributedWorkerConfig) *redis.Options
- type CrawlMetricCollector
- type DistributeDupefilterOptions
- type DistributeOptions
- func DistributedWithBloomN(bloomN int) DistributeOptions
- func DistributedWithBloomP(bloomP float64) DistributeOptions
- func DistributedWithConnectionsSize(size int) DistributeOptions
- func DistributedWithGetBFKey(keyFunc GetRDBKey) DistributeOptions
- func DistributedWithGetLimitKey(keyFunc GetRDBKey) DistributeOptions
- func DistributedWithGetQueueKey(keyFunc GetRDBKey) DistributeOptions
- func DistributedWithLimiterRate(rate int) DistributeOptions
- func DistributedWithRdbMaxRetry(retry int) DistributeOptions
- func DistributedWithRdbTimeout(timeout time.Duration) DistributeOptions
- func DistributedWithSlave() DistributeOptions
- type DistributeQueueOptions
- type DistributedComponents
- func (d *DistributedComponents) CheckWorkersStop() bool
- func (d *DistributedComponents) GetDupefilter() tegenaria.RFPDupeFilterInterface
- func (d *DistributedComponents) GetEventHooks() tegenaria.EventHooksInterface
- func (d *DistributedComponents) GetLimiter() tegenaria.LimitInterface
- func (d *DistributedComponents) GetQueue() tegenaria.CacheInterface
- func (d *DistributedComponents) GetStats() tegenaria.StatisticInterface
- func (d *DistributedComponents) SetCurrentSpider(spider tegenaria.SpiderInterface)
- func (d *DistributedComponents) SpiderBeforeStart(engine *tegenaria.CrawlEngine, spider tegenaria.SpiderInterface) error
- type DistributedDupefilter
- func (d *DistributedDupefilter) Add(fingerprint []byte) error
- func (d *DistributedDupefilter) DoDupeFilter(ctx *tegenaria.Context) (bool, error)
- func (d *DistributedDupefilter) Fingerprint(ctx *tegenaria.Context) ([]byte, error)
- func (d *DistributedDupefilter) SetCurrentSpider(spider tegenaria.SpiderInterface)
- func (d *DistributedDupefilter) TestOrAdd(fingerprint []byte) (bool, error)
- type DistributedHooks
- func (d *DistributedHooks) Error(params ...interface{}) error
- func (d *DistributedHooks) EventsWatcher(ch chan tegenaria.EventType) error
- func (d *DistributedHooks) Exit(params ...interface{}) error
- func (d *DistributedHooks) Heartbeat(params ...interface{}) error
- func (d *DistributedHooks) Pause(params ...interface{}) error
- func (d *DistributedHooks) SetCurrentSpider(spider tegenaria.SpiderInterface)
- func (d *DistributedHooks) Start(params ...interface{}) error
- type DistributedQueue
- func (d *DistributedQueue) Close() error
- func (d *DistributedQueue) Dequeue() (interface{}, error)
- func (d *DistributedQueue) Enqueue(ctx *tegenaria.Context) error
- func (d *DistributedQueue) GetSize() uint64
- func (d *DistributedQueue) IsEmpty() bool
- func (d *DistributedQueue) SetCurrentSpider(spider tegenaria.SpiderInterface)
- type DistributedWorker
- func (w *DistributedWorker) AddNode() error
- func (w *DistributedWorker) CheckAllNodesStop() (bool, error)
- func (w *DistributedWorker) CheckMasterLive() (bool, error)
- func (w *DistributedWorker) DelMaster() error
- func (w *DistributedWorker) DelNode() error
- func (w *DistributedWorker) GetRDB() redis.Cmdable
- func (w *DistributedWorker) GetWorkerID() string
- func (w *DistributedWorker) Heartbeat() error
- func (w *DistributedWorker) IsMaster() bool
- func (w *DistributedWorker) PauseNode() error
- func (w *DistributedWorker) SetCurrentSpider(spider tegenaria.SpiderInterface)
- func (w *DistributedWorker) SetMaster(flag bool)
- type DistributedWorkerConfig
- type GetRDBKey
- type InfluxdbConfig
- type LeakyBucketLimiterWithRdb
- type RdbNodes
- type RedisConfig
- type WorkerConfigWithRdbCluster
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewRdbClient ¶
func NewRdbClient(config *DistributedWorkerConfig) *redis.Client
func NewRdbClusterCLient ¶
func NewRdbClusterCLient(config *WorkerConfigWithRdbCluster) *redis.ClusterClient
func NewRdbConfig ¶
func NewRdbConfig(config *DistributedWorkerConfig) *redis.Options
NewRdbConfig redis 配置构造函数
Types ¶
type CrawlMetricCollector ¶
type CrawlMetricCollector struct {
// contains filtered or unexported fields
}
CrawlMetricCollector 数据指标采集器
func NewCrawlMetricCollector ¶
func NewCrawlMetricCollector(serverURL string, token string, bucket string, org string) *CrawlMetricCollector
NewCrawlMetricCollector 构建采集器
func (*CrawlMetricCollector) Get ¶
func (c *CrawlMetricCollector) Get(metric string) uint64
Get 提取某一个指标的统计数据的总量
func (*CrawlMetricCollector) GetAllStats ¶
func (c *CrawlMetricCollector) GetAllStats() map[string]uint64
GetAllStats 获取所有字段的总量数据
func (*CrawlMetricCollector) Incr ¶
func (c *CrawlMetricCollector) Incr(metric string)
func (*CrawlMetricCollector) SetCurrentSpider ¶
func (c *CrawlMetricCollector) SetCurrentSpider(spider tegenaria.SpiderInterface)
type DistributeDupefilterOptions ¶
type DistributeDupefilterOptions func(w *DistributedDupefilter)
type DistributeOptions ¶
type DistributeOptions func(w *DistributedWorkerConfig)
DistributeOptions 分布式组件的可选参数
func DistributedWithBloomN ¶
func DistributedWithBloomN(bloomN int) DistributeOptions
DistributedWithBloomN 布隆过滤器数据规模
func DistributedWithBloomP ¶
func DistributedWithBloomP(bloomP float64) DistributeOptions
DistributedWithBloomP 布隆过滤器容错率
func DistributedWithConnectionsSize ¶
func DistributedWithConnectionsSize(size int) DistributeOptions
DistributedWithConnectionsSize rdb 连接池最大连接数
func DistributedWithGetBFKey ¶
func DistributedWithGetBFKey(keyFunc GetRDBKey) DistributeOptions
DistributedWithGetBFKey 布隆过滤器的key生成函数
func DistributedWithGetLimitKey ¶
func DistributedWithGetLimitKey(keyFunc GetRDBKey) DistributeOptions
DistributedWithGetLimitKey 限速器key的生成函数
func DistributedWithGetQueueKey ¶
func DistributedWithGetQueueKey(keyFunc GetRDBKey) DistributeOptions
DistributedWithGetQueueKey 队列key生成函数
func DistributedWithLimiterRate ¶
func DistributedWithLimiterRate(rate int) DistributeOptions
DistributedWithLimiterRate 分布式组件下限速器的限速值
func DistributedWithRdbMaxRetry ¶
func DistributedWithRdbMaxRetry(retry int) DistributeOptions
DistributedWithRdbMaxRetry rdb失败重试次数
func DistributedWithRdbTimeout ¶
func DistributedWithRdbTimeout(timeout time.Duration) DistributeOptions
DistributedWithRdbTimeout rdb超时时间设置
func DistributedWithSlave ¶
func DistributedWithSlave() DistributeOptions
type DistributeQueueOptions ¶
type DistributeQueueOptions func(w *DistributedQueue)
type DistributedComponents ¶
type DistributedComponents struct {
// contains filtered or unexported fields
}
func NewDefaultDistributedComponents ¶
func NewDefaultDistributedComponents(opts ...DistributeOptions) *DistributedComponents
NewDefaultDistributedComponents 构建默认的分布式组件
func NewDistributedComponents ¶
func NewDistributedComponents(config *DistributedWorkerConfig, worker tegenaria.DistributedWorkerInterface, rdb redis.Cmdable) *DistributedComponents
func (*DistributedComponents) CheckWorkersStop ¶
func (d *DistributedComponents) CheckWorkersStop() bool
CheckWorkersStop 检查所有节点是否都已经停止
func (*DistributedComponents) GetDupefilter ¶
func (d *DistributedComponents) GetDupefilter() tegenaria.RFPDupeFilterInterface
GetDupefilter 获取去重组件
func (*DistributedComponents) GetEventHooks ¶
func (d *DistributedComponents) GetEventHooks() tegenaria.EventHooksInterface
GetEventHooks 事件监听器
func (*DistributedComponents) GetLimiter ¶
func (d *DistributedComponents) GetLimiter() tegenaria.LimitInterface
GetLimiter 获取限速器
func (*DistributedComponents) GetQueue ¶
func (d *DistributedComponents) GetQueue() tegenaria.CacheInterface
GetQueue 请求消息队列组件
func (*DistributedComponents) GetStats ¶
func (d *DistributedComponents) GetStats() tegenaria.StatisticInterface
GetStats 获取指标采集器
func (*DistributedComponents) SetCurrentSpider ¶
func (d *DistributedComponents) SetCurrentSpider(spider tegenaria.SpiderInterface)
SetCurrentSpider 当前的爬虫实例
func (*DistributedComponents) SpiderBeforeStart ¶
func (d *DistributedComponents) SpiderBeforeStart(engine *tegenaria.CrawlEngine, spider tegenaria.SpiderInterface) error
SpiderBeforeStart 启动爬虫之前检查主节点的状态 若没有在线的主节点则从节点直接退出,并抛出panic
type DistributedDupefilter ¶
type DistributedDupefilter struct {
// contains filtered or unexported fields
}
func (*DistributedDupefilter) Add ¶
func (d *DistributedDupefilter) Add(fingerprint []byte) error
Add 添加指纹到布隆过滤器
func (*DistributedDupefilter) DoDupeFilter ¶
func (d *DistributedDupefilter) DoDupeFilter(ctx *tegenaria.Context) (bool, error)
DoDupeFilter request去重
func (*DistributedDupefilter) Fingerprint ¶
func (d *DistributedDupefilter) Fingerprint(ctx *tegenaria.Context) ([]byte, error)
Fingerprint request指纹计算
func (*DistributedDupefilter) SetCurrentSpider ¶
func (d *DistributedDupefilter) SetCurrentSpider(spider tegenaria.SpiderInterface)
SetCurrentSpider 设置当前的spider
type DistributedHooks ¶
type DistributedHooks struct {
// contains filtered or unexported fields
}
DistributedHooks 分布式事件监听器
func NewDistributedHooks ¶
func NewDistributedHooks(worker tegenaria.DistributedWorkerInterface) *DistributedHooks
DistributedHooks 构建新的分布式监听器组件对象
func (*DistributedHooks) Error ¶
func (d *DistributedHooks) Error(params ...interface{}) error
Error 用于处理分布式模式下的ERROR事件
func (*DistributedHooks) EventsWatcher ¶
func (d *DistributedHooks) EventsWatcher(ch chan tegenaria.EventType) error
EventsWatcher 分布式模式下的事件监听器
func (*DistributedHooks) Exit ¶
func (d *DistributedHooks) Exit(params ...interface{}) error
Exit 用于处理分布式模式下的Exit事件
func (*DistributedHooks) Heartbeat ¶
func (d *DistributedHooks) Heartbeat(params ...interface{}) error
Exit 用于处理分布式模式下的HEARTBEAT事件
func (*DistributedHooks) Pause ¶
func (d *DistributedHooks) Pause(params ...interface{}) error
Pause 用于处理分布式模式下的PAUSE事件
func (*DistributedHooks) SetCurrentSpider ¶
func (d *DistributedHooks) SetCurrentSpider(spider tegenaria.SpiderInterface)
SetCurrentSpider 设置spider实例
func (*DistributedHooks) Start ¶
func (d *DistributedHooks) Start(params ...interface{}) error
Start 用于处理分布式模式下的START事件
type DistributedQueue ¶
type DistributedQueue struct {
// contains filtered or unexported fields
}
func NewDistributedQueue ¶
func NewDistributedQueue(rdb redis.Cmdable, queueKey GetRDBKey) *DistributedQueue
func (*DistributedQueue) Dequeue ¶
func (d *DistributedQueue) Dequeue() (interface{}, error)
dequeue ctx 从缓存出队列
func (*DistributedQueue) Enqueue ¶
func (d *DistributedQueue) Enqueue(ctx *tegenaria.Context) error
enqueue ctx写入缓存
func (*DistributedQueue) SetCurrentSpider ¶
func (d *DistributedQueue) SetCurrentSpider(spider tegenaria.SpiderInterface)
SetCurrentSpider 设置当前的spider
type DistributedWorker ¶
type DistributedWorker struct {
// contains filtered or unexported fields
}
DistributedWorker 分布式组件,包含两个组件: request请求缓存队列,由各个节点上的引擎读队列消费, redis 队列缓存的是经过gob序列化之后的二进制数据 布隆过滤器主要是用于去重 该组件同时实现了RFPDupeFilterInterface 和CacheInterface
func NewDistributedWorker ¶
func NewDistributedWorker(config *DistributedWorkerConfig) *DistributedWorker
NewDistributedWorker 构建redis单机模式下的分布式工作组件
func NewWorkerWithRdbCluster ¶
func NewWorkerWithRdbCluster(config *WorkerConfigWithRdbCluster) *DistributedWorker
NewWorkerWithRdbCluster redis cluster模式下的分布式工作组件
func (*DistributedWorker) CheckAllNodesStop ¶
func (w *DistributedWorker) CheckAllNodesStop() (bool, error)
CheckAllNodesStop 检查所有的节点是否都已经停止
func (*DistributedWorker) CheckMasterLive ¶
func (w *DistributedWorker) CheckMasterLive() (bool, error)
CheckMasterLive 检查所有master 节点是否都在线
func (*DistributedWorker) DelMaster ¶
func (w *DistributedWorker) DelMaster() error
delMaster 删除master节点
func (*DistributedWorker) GetRDB ¶
func (w *DistributedWorker) GetRDB() redis.Cmdable
func (*DistributedWorker) GetWorkerID ¶
func (w *DistributedWorker) GetWorkerID() string
func (*DistributedWorker) IsMaster ¶
func (w *DistributedWorker) IsMaster() bool
func (*DistributedWorker) PauseNode ¶
func (w *DistributedWorker) PauseNode() error
PauseNode 停止当前节点的活动
func (*DistributedWorker) SetCurrentSpider ¶
func (w *DistributedWorker) SetCurrentSpider(spider tegenaria.SpiderInterface)
func (*DistributedWorker) SetMaster ¶
func (w *DistributedWorker) SetMaster(flag bool)
SetMaster 设置当前的节点是否为master
type DistributedWorkerConfig ¶
type DistributedWorkerConfig struct {
// LimiterRate 限速大小
LimiterRate int
// contains filtered or unexported fields
}
DistributedWorkerConfig 分布式组件的配置参数
func NewDistributedWorkerConfig ¶
func NewDistributedWorkerConfig(rdbConfig *RedisConfig, influxdbConfig *InfluxdbConfig, opts ...DistributeOptions) *DistributedWorkerConfig
NewDistributedWorkerConfig 新建分布式组件的配置
type InfluxdbConfig ¶
type InfluxdbConfig struct {
// contains filtered or unexported fields
}
InfluxdbConfig influxdb配置
func NewInfluxdbConfig ¶
func NewInfluxdbConfig(server string, token string, bucket string, org string) *InfluxdbConfig
type LeakyBucketLimiterWithRdb ¶
type LeakyBucketLimiterWithRdb struct {
// contains filtered or unexported fields
}
LeakyBucketLimiterWithRdb单机redis下的漏桶限速器
func NewLeakyBucketLimiterWithRdb ¶
func NewLeakyBucketLimiterWithRdb(safetyLevel int, rdb redis.Cmdable, keyFunc GetRDBKey) *LeakyBucketLimiterWithRdb
NewLeakyBucketLimiterWithRdb LeakyBucketLimiterWithRdb 构造函数
func (*LeakyBucketLimiterWithRdb) CheckAndWaitLimiterPass ¶
func (l *LeakyBucketLimiterWithRdb) CheckAndWaitLimiterPass() error
checkAndWaitLimiterPass 检查当前并发量 如果并发量达到上限则等待
func (*LeakyBucketLimiterWithRdb) SetCurrentSpider ¶
func (l *LeakyBucketLimiterWithRdb) SetCurrentSpider(spider tegenaria.SpiderInterface)
type RedisConfig ¶
type RedisConfig struct {
// RedisAddr redis 地址
RedisAddr string
// RedisPasswd redis 密码
RedisPasswd string
// RedisUsername redis 用户名
RedisUsername string
// RedisDB redis 数据库索引 index
RedisDB uint32
// RdbConnectionsSize 连接池大小
RdbConnectionsSize uint64
// RdbTimeout redis 超时时间
RdbTimeout time.Duration
// RdbMaxRetry redis操作失败后的重试次数
RdbMaxRetry int
}
RedisConfig redis配置
func NewRedisConfig ¶
func NewRedisConfig(addr string, username string, passwd string, db uint32) *RedisConfig
type WorkerConfigWithRdbCluster ¶
type WorkerConfigWithRdbCluster struct {
*DistributedWorkerConfig
RdbNodes
}
WorkerConfigWithRdbCluster redis cluster 模式下的分布式组件配置参数
func NewWorkerConfigWithRdbCluster ¶
func NewWorkerConfigWithRdbCluster(config *DistributedWorkerConfig, nodes RdbNodes) *WorkerConfigWithRdbCluster
NewWorkerConfigWithRdbCluster redis cluster模式的分布式组件