distributed

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2023 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewRdbClient

func NewRdbClient(config *DistributedWorkerConfig) *redis.Client

func NewRdbClusterCLient

func NewRdbClusterCLient(config *WorkerConfigWithRdbCluster) *redis.ClusterClient

func NewRdbConfig

func NewRdbConfig(config *DistributedWorkerConfig) *redis.Options

NewRdbConfig redis 配置构造函数

Types

type CrawlMetricCollector

type CrawlMetricCollector struct {
	// contains filtered or unexported fields
}

CrawlMetricCollector 数据指标采集器

func NewCrawlMetricCollector

func NewCrawlMetricCollector(serverURL string, token string, bucket string, org string) *CrawlMetricCollector

NewCrawlMetricCollector 构建采集器

func (*CrawlMetricCollector) Get

func (c *CrawlMetricCollector) Get(metric string) uint64

Get 提取某一个指标的统计数据的总量

func (*CrawlMetricCollector) GetAllStats

func (c *CrawlMetricCollector) GetAllStats() map[string]uint64

GetAllStats 获取所有字段的总量数据

func (*CrawlMetricCollector) Incr

func (c *CrawlMetricCollector) Incr(metric string)

func (*CrawlMetricCollector) SetCurrentSpider

func (c *CrawlMetricCollector) SetCurrentSpider(spider tegenaria.SpiderInterface)

type DistributeDupefilterOptions

type DistributeDupefilterOptions func(w *DistributedDupefilter)

type DistributeOptions

type DistributeOptions func(w *DistributedWorkerConfig)

DistributeOptions 分布式组件的可选参数

func DistributedWithBloomN

func DistributedWithBloomN(bloomN int) DistributeOptions

DistributedWithBloomN 布隆过滤器数据规模

func DistributedWithBloomP

func DistributedWithBloomP(bloomP float64) DistributeOptions

DistributedWithBloomP 布隆过滤器容错率

func DistributedWithConnectionsSize

func DistributedWithConnectionsSize(size int) DistributeOptions

DistributedWithConnectionsSize rdb 连接池最大连接数

func DistributedWithGetBFKey

func DistributedWithGetBFKey(keyFunc GetRDBKey) DistributeOptions

DistributedWithGetBFKey 布隆过滤器的key生成函数

func DistributedWithGetLimitKey

func DistributedWithGetLimitKey(keyFunc GetRDBKey) DistributeOptions

DistributedWithGetLimitKey 限速器key的生成函数

func DistributedWithGetQueueKey

func DistributedWithGetQueueKey(keyFunc GetRDBKey) DistributeOptions

DistributedWithGetQueueKey 队列key生成函数

func DistributedWithLimiterRate

func DistributedWithLimiterRate(rate int) DistributeOptions

DistributedWithLimiterRate 分布式组件下限速器的限速值

func DistributedWithRdbMaxRetry

func DistributedWithRdbMaxRetry(retry int) DistributeOptions

DistributedWithRdbMaxRetry rdb失败重试次数

func DistributedWithRdbTimeout

func DistributedWithRdbTimeout(timeout time.Duration) DistributeOptions

DistributedWithRdbTimeout rdb超时时间设置

func DistributedWithSlave

func DistributedWithSlave() DistributeOptions

type DistributeQueueOptions

type DistributeQueueOptions func(w *DistributedQueue)

type DistributedComponents

type DistributedComponents struct {
	// contains filtered or unexported fields
}

func NewDefaultDistributedComponents

func NewDefaultDistributedComponents(opts ...DistributeOptions) *DistributedComponents

NewDefaultDistributedComponents 构建默认的分布式组件

func (*DistributedComponents) CheckWorkersStop

func (d *DistributedComponents) CheckWorkersStop() bool

CheckWorkersStop 检查所有节点是否都已经停止

func (*DistributedComponents) GetDupefilter

GetDupefilter 获取去重组件

func (*DistributedComponents) GetEventHooks

GetEventHooks 事件监听器

func (*DistributedComponents) GetLimiter

GetLimiter 获取限速器

func (*DistributedComponents) GetQueue

GetQueue 请求消息队列组件

func (*DistributedComponents) GetStats

GetStats 获取指标采集器

func (*DistributedComponents) SetCurrentSpider

func (d *DistributedComponents) SetCurrentSpider(spider tegenaria.SpiderInterface)

SetCurrentSpider 当前的爬虫实例

func (*DistributedComponents) SpiderBeforeStart

func (d *DistributedComponents) SpiderBeforeStart(engine *tegenaria.CrawlEngine, spider tegenaria.SpiderInterface) error

SpiderBeforeStart 启动爬虫之前检查主节点的状态 若没有在线的主节点则从节点直接退出,并抛出panic

type DistributedDupefilter

type DistributedDupefilter struct {
	// contains filtered or unexported fields
}

func NewDistributedDupefilter

func NewDistributedDupefilter(bloomN int, bloomP float64, rdb redis.Cmdable, bfKeyFunc GetRDBKey) *DistributedDupefilter

func (*DistributedDupefilter) Add

func (d *DistributedDupefilter) Add(fingerprint []byte) error

Add 添加指纹到布隆过滤器

func (*DistributedDupefilter) DoDupeFilter

func (d *DistributedDupefilter) DoDupeFilter(ctx *tegenaria.Context) (bool, error)

DoDupeFilter request去重

func (*DistributedDupefilter) Fingerprint

func (d *DistributedDupefilter) Fingerprint(ctx *tegenaria.Context) ([]byte, error)

Fingerprint request指纹计算

func (*DistributedDupefilter) SetCurrentSpider

func (d *DistributedDupefilter) SetCurrentSpider(spider tegenaria.SpiderInterface)

SetCurrentSpider 设置当前的spider

func (*DistributedDupefilter) TestOrAdd

func (d *DistributedDupefilter) TestOrAdd(fingerprint []byte) (bool, error)

TestOrAdd 如果指纹已经存在则返回True,否则为False 指纹不存在的情况下会将指纹添加到缓存

type DistributedHooks

type DistributedHooks struct {
	// contains filtered or unexported fields
}

DistributedHooks 分布式事件监听器

func NewDistributedHooks

func NewDistributedHooks(worker tegenaria.DistributedWorkerInterface) *DistributedHooks

DistributedHooks 构建新的分布式监听器组件对象

func (*DistributedHooks) Error

func (d *DistributedHooks) Error(params ...interface{}) error

Error 用于处理分布式模式下的ERROR事件

func (*DistributedHooks) EventsWatcher

func (d *DistributedHooks) EventsWatcher(ch chan tegenaria.EventType) error

EventsWatcher 分布式模式下的事件监听器

func (*DistributedHooks) Exit

func (d *DistributedHooks) Exit(params ...interface{}) error

Exit 用于处理分布式模式下的Exit事件

func (*DistributedHooks) Heartbeat

func (d *DistributedHooks) Heartbeat(params ...interface{}) error

Exit 用于处理分布式模式下的HEARTBEAT事件

func (*DistributedHooks) Pause

func (d *DistributedHooks) Pause(params ...interface{}) error

Pause 用于处理分布式模式下的PAUSE事件

func (*DistributedHooks) SetCurrentSpider

func (d *DistributedHooks) SetCurrentSpider(spider tegenaria.SpiderInterface)

SetCurrentSpider 设置spider实例

func (*DistributedHooks) Start

func (d *DistributedHooks) Start(params ...interface{}) error

Start 用于处理分布式模式下的START事件

type DistributedQueue

type DistributedQueue struct {
	// contains filtered or unexported fields
}

func NewDistributedQueue

func NewDistributedQueue(rdb redis.Cmdable, queueKey GetRDBKey) *DistributedQueue

func (*DistributedQueue) Close

func (d *DistributedQueue) Close() error

close 关闭缓存

func (*DistributedQueue) Dequeue

func (d *DistributedQueue) Dequeue() (interface{}, error)

dequeue ctx 从缓存出队列

func (*DistributedQueue) Enqueue

func (d *DistributedQueue) Enqueue(ctx *tegenaria.Context) error

enqueue ctx写入缓存

func (*DistributedQueue) GetSize

func (d *DistributedQueue) GetSize() uint64

getSize 缓存大小

func (*DistributedQueue) IsEmpty

func (d *DistributedQueue) IsEmpty() bool

isEmpty 缓存是否为空

func (*DistributedQueue) SetCurrentSpider

func (d *DistributedQueue) SetCurrentSpider(spider tegenaria.SpiderInterface)

SetCurrentSpider 设置当前的spider

type DistributedWorker

type DistributedWorker struct {
	// contains filtered or unexported fields
}

DistributedWorker 分布式组件,包含两个组件: request请求缓存队列,由各个节点上的引擎读队列消费, redis 队列缓存的是经过gob序列化之后的二进制数据 布隆过滤器主要是用于去重 该组件同时实现了RFPDupeFilterInterface 和CacheInterface

func NewDistributedWorker

func NewDistributedWorker(config *DistributedWorkerConfig) *DistributedWorker

NewDistributedWorker 构建redis单机模式下的分布式工作组件

func NewWorkerWithRdbCluster

func NewWorkerWithRdbCluster(config *WorkerConfigWithRdbCluster) *DistributedWorker

NewWorkerWithRdbCluster redis cluster模式下的分布式工作组件

func (*DistributedWorker) AddNode

func (w *DistributedWorker) AddNode() error

AddNode 新增节点

func (*DistributedWorker) CheckAllNodesStop

func (w *DistributedWorker) CheckAllNodesStop() (bool, error)

CheckAllNodesStop 检查所有的节点是否都已经停止

func (*DistributedWorker) CheckMasterLive

func (w *DistributedWorker) CheckMasterLive() (bool, error)

CheckMasterLive 检查所有master 节点是否都在线

func (*DistributedWorker) DelMaster

func (w *DistributedWorker) DelMaster() error

delMaster 删除master节点

func (*DistributedWorker) DelNode

func (w *DistributedWorker) DelNode() error

DelNode 删除当前节点

func (*DistributedWorker) GetRDB

func (w *DistributedWorker) GetRDB() redis.Cmdable

func (*DistributedWorker) GetWorkerID

func (w *DistributedWorker) GetWorkerID() string

func (*DistributedWorker) Heartbeat

func (w *DistributedWorker) Heartbeat() error

Heartbeat 心跳包

func (*DistributedWorker) IsMaster

func (w *DistributedWorker) IsMaster() bool

func (*DistributedWorker) PauseNode

func (w *DistributedWorker) PauseNode() error

PauseNode 停止当前节点的活动

func (*DistributedWorker) SetCurrentSpider

func (w *DistributedWorker) SetCurrentSpider(spider tegenaria.SpiderInterface)

func (*DistributedWorker) SetMaster

func (w *DistributedWorker) SetMaster(flag bool)

SetMaster 设置当前的节点是否为master

type DistributedWorkerConfig

type DistributedWorkerConfig struct {

	// LimiterRate 限速大小
	LimiterRate int
	// contains filtered or unexported fields
}

DistributedWorkerConfig 分布式组件的配置参数

func NewDistributedWorkerConfig

func NewDistributedWorkerConfig(rdbConfig *RedisConfig, influxdbConfig *InfluxdbConfig, opts ...DistributeOptions) *DistributedWorkerConfig

NewDistributedWorkerConfig 新建分布式组件的配置

type GetRDBKey

type GetRDBKey func() (string, time.Duration)

GetRDBKey 获取缓存rdb key和ttl

type InfluxdbConfig

type InfluxdbConfig struct {
	// contains filtered or unexported fields
}

InfluxdbConfig influxdb配置

func NewInfluxdbConfig

func NewInfluxdbConfig(server string, token string, bucket string, org string) *InfluxdbConfig

type LeakyBucketLimiterWithRdb

type LeakyBucketLimiterWithRdb struct {
	// contains filtered or unexported fields
}

LeakyBucketLimiterWithRdb单机redis下的漏桶限速器

func NewLeakyBucketLimiterWithRdb

func NewLeakyBucketLimiterWithRdb(safetyLevel int, rdb redis.Cmdable, keyFunc GetRDBKey) *LeakyBucketLimiterWithRdb

NewLeakyBucketLimiterWithRdb LeakyBucketLimiterWithRdb 构造函数

func (*LeakyBucketLimiterWithRdb) CheckAndWaitLimiterPass

func (l *LeakyBucketLimiterWithRdb) CheckAndWaitLimiterPass() error

checkAndWaitLimiterPass 检查当前并发量 如果并发量达到上限则等待

func (*LeakyBucketLimiterWithRdb) SetCurrentSpider

func (l *LeakyBucketLimiterWithRdb) SetCurrentSpider(spider tegenaria.SpiderInterface)

type RdbNodes

type RdbNodes []string

RdbNodes redis cluster 节点地址

type RedisConfig

type RedisConfig struct {
	// RedisAddr redis 地址
	RedisAddr string
	// RedisPasswd redis 密码
	RedisPasswd string
	// RedisUsername redis 用户名
	RedisUsername string
	// RedisDB redis 数据库索引 index
	RedisDB uint32
	// RdbConnectionsSize 连接池大小
	RdbConnectionsSize uint64
	// RdbTimeout redis 超时时间
	RdbTimeout time.Duration
	// RdbMaxRetry redis操作失败后的重试次数
	RdbMaxRetry int
}

RedisConfig redis配置

func NewRedisConfig

func NewRedisConfig(addr string, username string, passwd string, db uint32) *RedisConfig

type WorkerConfigWithRdbCluster

type WorkerConfigWithRdbCluster struct {
	*DistributedWorkerConfig
	RdbNodes
}

WorkerConfigWithRdbCluster redis cluster 模式下的分布式组件配置参数

func NewWorkerConfigWithRdbCluster

func NewWorkerConfigWithRdbCluster(config *DistributedWorkerConfig, nodes RdbNodes) *WorkerConfigWithRdbCluster

NewWorkerConfigWithRdbCluster redis cluster模式的分布式组件

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL