Documentation
¶
Index ¶
- Constants
- Variables
- func RegisterSpecialAggregator(cmdName string, fn func([]interface{}, []error) (interface{}, error))
- type AggLogicalAndAggregator
- func (a *AggLogicalAndAggregator) Add(result interface{}, err error) error
- func (a *AggLogicalAndAggregator) AddWithKey(key string, result interface{}, err error) error
- func (a *AggLogicalAndAggregator) BatchAdd(results map[string]AggregatorResErr) error
- func (a *AggLogicalAndAggregator) BatchSlice(results []AggregatorResErr) error
- func (a *AggLogicalAndAggregator) Result() (interface{}, error)
- type AggLogicalOrAggregator
- func (a *AggLogicalOrAggregator) Add(result interface{}, err error) error
- func (a *AggLogicalOrAggregator) AddWithKey(key string, result interface{}, err error) error
- func (a *AggLogicalOrAggregator) BatchAdd(results map[string]AggregatorResErr) error
- func (a *AggLogicalOrAggregator) BatchSlice(results []AggregatorResErr) error
- func (a *AggLogicalOrAggregator) Result() (interface{}, error)
- type AggMaxAggregator
- func (a *AggMaxAggregator) Add(result interface{}, err error) error
- func (a *AggMaxAggregator) AddWithKey(key string, result interface{}, err error) error
- func (a *AggMaxAggregator) BatchAdd(results map[string]AggregatorResErr) error
- func (a *AggMaxAggregator) BatchSlice(results []AggregatorResErr) error
- func (a *AggMaxAggregator) Result() (interface{}, error)
- type AggMinAggregator
- func (a *AggMinAggregator) Add(result interface{}, err error) error
- func (a *AggMinAggregator) AddWithKey(key string, result interface{}, err error) error
- func (a *AggMinAggregator) BatchAdd(results map[string]AggregatorResErr) error
- func (a *AggMinAggregator) BatchSlice(results []AggregatorResErr) error
- func (a *AggMinAggregator) Result() (interface{}, error)
- type AggSumAggregator
- func (a *AggSumAggregator) Add(result interface{}, err error) error
- func (a *AggSumAggregator) AddWithKey(key string, result interface{}, err error) error
- func (a *AggSumAggregator) BatchAdd(results map[string]AggregatorResErr) error
- func (a *AggSumAggregator) BatchSlice(results []AggregatorResErr) error
- func (a *AggSumAggregator) Result() (interface{}, error)
- type AggregatorResErr
- type AllSucceededAggregator
- func (a *AllSucceededAggregator) Add(result interface{}, err error) error
- func (a *AllSucceededAggregator) AddWithKey(key string, result interface{}, err error) error
- func (a *AllSucceededAggregator) BatchAdd(results map[string]AggregatorResErr) error
- func (a *AllSucceededAggregator) BatchSlice(results []AggregatorResErr) error
- func (a *AllSucceededAggregator) Result() (interface{}, error)
- type CommandPolicy
- type DefaultKeyedAggregator
- func (a *DefaultKeyedAggregator) Add(result interface{}, err error) error
- func (a *DefaultKeyedAggregator) AddWithKey(key string, result interface{}, err error) error
- func (a *DefaultKeyedAggregator) BatchAdd(results map[string]AggregatorResErr) error
- func (a *DefaultKeyedAggregator) BatchAddWithKeyOrder(results map[string]AggregatorResErr, keyOrder []string) error
- func (a *DefaultKeyedAggregator) BatchSlice(results []AggregatorResErr) error
- func (a *DefaultKeyedAggregator) Result() (interface{}, error)
- func (a *DefaultKeyedAggregator) SetKeyOrder(keyOrder []string)
- type DefaultKeylessAggregator
- func (a *DefaultKeylessAggregator) Add(result interface{}, err error) error
- func (a *DefaultKeylessAggregator) AddWithKey(key string, result interface{}, err error) error
- func (a *DefaultKeylessAggregator) BatchAdd(results map[string]AggregatorResErr) error
- func (a *DefaultKeylessAggregator) BatchSlice(results []AggregatorResErr) error
- func (a *DefaultKeylessAggregator) Result() (interface{}, error)
- type OneSucceededAggregator
- func (a *OneSucceededAggregator) Add(result interface{}, err error) error
- func (a *OneSucceededAggregator) AddWithKey(key string, result interface{}, err error) error
- func (a *OneSucceededAggregator) BatchAdd(results map[string]AggregatorResErr) error
- func (a *OneSucceededAggregator) BatchSlice(results []AggregatorResErr) error
- func (a *OneSucceededAggregator) Result() (interface{}, error)
- type RandomPicker
- type RequestPolicy
- type ResponseAggregator
- type ResponsePolicy
- type RoundRobinPicker
- type ShardPicker
- type SpecialAggregator
- func (a *SpecialAggregator) Add(result interface{}, err error) error
- func (a *SpecialAggregator) AddWithKey(key string, result interface{}, err error) error
- func (a *SpecialAggregator) BatchAdd(results map[string]AggregatorResErr) error
- func (a *SpecialAggregator) BatchSlice(results []AggregatorResErr) error
- func (a *SpecialAggregator) Result() (interface{}, error)
- type StaticShardPicker
Constants ¶
const (
ReadOnlyCMD string = "readonly"
)
Variables ¶
var ( ErrMaxAggregation = errors.New("redis: no valid results to aggregate for max operation") ErrMinAggregation = errors.New("redis: no valid results to aggregate for min operation") ErrAndAggregation = errors.New("redis: no valid results to aggregate for logical AND operation") ErrOrAggregation = errors.New("redis: no valid results to aggregate for logical OR operation") )
var SpecialAggregatorRegistry = make(map[string]func([]interface{}, []error) (interface{}, error))
SpecialAggregatorRegistry holds custom aggregation functions for specific commands.
Functions ¶
func RegisterSpecialAggregator ¶
func RegisterSpecialAggregator(cmdName string, fn func([]interface{}, []error) (interface{}, error))
RegisterSpecialAggregator registers a custom aggregation function for a command.
Types ¶
type AggLogicalAndAggregator ¶
type AggLogicalAndAggregator struct {
// contains filtered or unexported fields
}
AggLogicalAndAggregator performs logical AND on boolean values.
func (*AggLogicalAndAggregator) Add ¶
func (a *AggLogicalAndAggregator) Add(result interface{}, err error) error
func (*AggLogicalAndAggregator) AddWithKey ¶
func (a *AggLogicalAndAggregator) AddWithKey(key string, result interface{}, err error) error
func (*AggLogicalAndAggregator) BatchAdd ¶
func (a *AggLogicalAndAggregator) BatchAdd(results map[string]AggregatorResErr) error
func (*AggLogicalAndAggregator) BatchSlice ¶
func (a *AggLogicalAndAggregator) BatchSlice(results []AggregatorResErr) error
func (*AggLogicalAndAggregator) Result ¶
func (a *AggLogicalAndAggregator) Result() (interface{}, error)
type AggLogicalOrAggregator ¶
type AggLogicalOrAggregator struct {
// contains filtered or unexported fields
}
AggLogicalOrAggregator performs logical OR on boolean values.
func (*AggLogicalOrAggregator) Add ¶
func (a *AggLogicalOrAggregator) Add(result interface{}, err error) error
func (*AggLogicalOrAggregator) AddWithKey ¶
func (a *AggLogicalOrAggregator) AddWithKey(key string, result interface{}, err error) error
func (*AggLogicalOrAggregator) BatchAdd ¶
func (a *AggLogicalOrAggregator) BatchAdd(results map[string]AggregatorResErr) error
func (*AggLogicalOrAggregator) BatchSlice ¶
func (a *AggLogicalOrAggregator) BatchSlice(results []AggregatorResErr) error
func (*AggLogicalOrAggregator) Result ¶
func (a *AggLogicalOrAggregator) Result() (interface{}, error)
type AggMaxAggregator ¶
type AggMaxAggregator struct {
// contains filtered or unexported fields
}
AggMaxAggregator returns the maximum numeric value from all shards.
func (*AggMaxAggregator) Add ¶
func (a *AggMaxAggregator) Add(result interface{}, err error) error
func (*AggMaxAggregator) AddWithKey ¶
func (a *AggMaxAggregator) AddWithKey(key string, result interface{}, err error) error
func (*AggMaxAggregator) BatchAdd ¶
func (a *AggMaxAggregator) BatchAdd(results map[string]AggregatorResErr) error
func (*AggMaxAggregator) BatchSlice ¶
func (a *AggMaxAggregator) BatchSlice(results []AggregatorResErr) error
func (*AggMaxAggregator) Result ¶
func (a *AggMaxAggregator) Result() (interface{}, error)
type AggMinAggregator ¶
type AggMinAggregator struct {
// contains filtered or unexported fields
}
AggMinAggregator returns the minimum numeric value from all shards.
func (*AggMinAggregator) Add ¶
func (a *AggMinAggregator) Add(result interface{}, err error) error
func (*AggMinAggregator) AddWithKey ¶
func (a *AggMinAggregator) AddWithKey(key string, result interface{}, err error) error
func (*AggMinAggregator) BatchAdd ¶
func (a *AggMinAggregator) BatchAdd(results map[string]AggregatorResErr) error
func (*AggMinAggregator) BatchSlice ¶
func (a *AggMinAggregator) BatchSlice(results []AggregatorResErr) error
func (*AggMinAggregator) Result ¶
func (a *AggMinAggregator) Result() (interface{}, error)
type AggSumAggregator ¶
type AggSumAggregator struct {
// contains filtered or unexported fields
}
AggSumAggregator sums numeric replies from all shards.
func (*AggSumAggregator) Add ¶
func (a *AggSumAggregator) Add(result interface{}, err error) error
func (*AggSumAggregator) AddWithKey ¶
func (a *AggSumAggregator) AddWithKey(key string, result interface{}, err error) error
func (*AggSumAggregator) BatchAdd ¶
func (a *AggSumAggregator) BatchAdd(results map[string]AggregatorResErr) error
func (*AggSumAggregator) BatchSlice ¶
func (a *AggSumAggregator) BatchSlice(results []AggregatorResErr) error
func (*AggSumAggregator) Result ¶
func (a *AggSumAggregator) Result() (interface{}, error)
type AggregatorResErr ¶
type AggregatorResErr struct {
Result interface{}
Err error
}
type AllSucceededAggregator ¶
type AllSucceededAggregator struct {
// contains filtered or unexported fields
}
AllSucceededAggregator returns one non-error reply if every shard succeeded, propagates the first error otherwise.
func (*AllSucceededAggregator) Add ¶
func (a *AllSucceededAggregator) Add(result interface{}, err error) error
func (*AllSucceededAggregator) AddWithKey ¶
func (a *AllSucceededAggregator) AddWithKey(key string, result interface{}, err error) error
func (*AllSucceededAggregator) BatchAdd ¶
func (a *AllSucceededAggregator) BatchAdd(results map[string]AggregatorResErr) error
func (*AllSucceededAggregator) BatchSlice ¶
func (a *AllSucceededAggregator) BatchSlice(results []AggregatorResErr) error
func (*AllSucceededAggregator) Result ¶
func (a *AllSucceededAggregator) Result() (interface{}, error)
type CommandPolicy ¶
type CommandPolicy struct {
Request RequestPolicy
Response ResponsePolicy
// Tips that are not request_policy or response_policy
// e.g nondeterministic_output, nondeterministic_output_order.
Tips map[string]string
}
func (*CommandPolicy) CanBeUsedInPipeline ¶
func (p *CommandPolicy) CanBeUsedInPipeline() bool
func (*CommandPolicy) IsReadOnly ¶
func (p *CommandPolicy) IsReadOnly() bool
type DefaultKeyedAggregator ¶
type DefaultKeyedAggregator struct {
// contains filtered or unexported fields
}
DefaultKeyedAggregator reassembles replies in the exact key order of the original request.
func NewDefaultKeyedAggregator ¶
func NewDefaultKeyedAggregator(keyOrder []string) *DefaultKeyedAggregator
func (*DefaultKeyedAggregator) Add ¶
func (a *DefaultKeyedAggregator) Add(result interface{}, err error) error
func (*DefaultKeyedAggregator) AddWithKey ¶
func (a *DefaultKeyedAggregator) AddWithKey(key string, result interface{}, err error) error
func (*DefaultKeyedAggregator) BatchAdd ¶
func (a *DefaultKeyedAggregator) BatchAdd(results map[string]AggregatorResErr) error
func (*DefaultKeyedAggregator) BatchAddWithKeyOrder ¶
func (a *DefaultKeyedAggregator) BatchAddWithKeyOrder(results map[string]AggregatorResErr, keyOrder []string) error
func (*DefaultKeyedAggregator) BatchSlice ¶
func (a *DefaultKeyedAggregator) BatchSlice(results []AggregatorResErr) error
func (*DefaultKeyedAggregator) Result ¶
func (a *DefaultKeyedAggregator) Result() (interface{}, error)
func (*DefaultKeyedAggregator) SetKeyOrder ¶
func (a *DefaultKeyedAggregator) SetKeyOrder(keyOrder []string)
type DefaultKeylessAggregator ¶
type DefaultKeylessAggregator struct {
// contains filtered or unexported fields
}
DefaultKeylessAggregator collects all results in an array, order doesn't matter.
func (*DefaultKeylessAggregator) Add ¶
func (a *DefaultKeylessAggregator) Add(result interface{}, err error) error
func (*DefaultKeylessAggregator) AddWithKey ¶
func (a *DefaultKeylessAggregator) AddWithKey(key string, result interface{}, err error) error
func (*DefaultKeylessAggregator) BatchAdd ¶
func (a *DefaultKeylessAggregator) BatchAdd(results map[string]AggregatorResErr) error
func (*DefaultKeylessAggregator) BatchSlice ¶
func (a *DefaultKeylessAggregator) BatchSlice(results []AggregatorResErr) error
func (*DefaultKeylessAggregator) Result ¶
func (a *DefaultKeylessAggregator) Result() (interface{}, error)
type OneSucceededAggregator ¶
type OneSucceededAggregator struct {
// contains filtered or unexported fields
}
OneSucceededAggregator returns the first non-error reply, if all shards errored, returns any one of those errors.
func (*OneSucceededAggregator) Add ¶
func (a *OneSucceededAggregator) Add(result interface{}, err error) error
func (*OneSucceededAggregator) AddWithKey ¶
func (a *OneSucceededAggregator) AddWithKey(key string, result interface{}, err error) error
func (*OneSucceededAggregator) BatchAdd ¶
func (a *OneSucceededAggregator) BatchAdd(results map[string]AggregatorResErr) error
func (*OneSucceededAggregator) BatchSlice ¶
func (a *OneSucceededAggregator) BatchSlice(results []AggregatorResErr) error
func (*OneSucceededAggregator) Result ¶
func (a *OneSucceededAggregator) Result() (interface{}, error)
type RandomPicker ¶
type RandomPicker struct{}
func (RandomPicker) Next ¶
func (RandomPicker) Next(total int) int
type RequestPolicy ¶
type RequestPolicy uint8
const ( ReqDefault RequestPolicy = iota ReqAllNodes ReqAllShards ReqMultiShard ReqSpecial )
func ParseRequestPolicy ¶
func ParseRequestPolicy(raw string) (RequestPolicy, error)
func (RequestPolicy) String ¶
func (p RequestPolicy) String() string
type ResponseAggregator ¶
type ResponseAggregator interface {
// Add processes a single shard response.
Add(result interface{}, err error) error
// AddWithKey processes a single shard response for a specific key (used by keyed aggregators).
AddWithKey(key string, result interface{}, err error) error
BatchAdd(map[string]AggregatorResErr) error
BatchSlice([]AggregatorResErr) error
// Result returns the final aggregated result and any error.
Result() (interface{}, error)
}
ResponseAggregator defines the interface for aggregating responses from multiple shards.
func NewDefaultAggregator ¶
func NewDefaultAggregator(isKeyed bool) ResponseAggregator
func NewResponseAggregator ¶
func NewResponseAggregator(policy ResponsePolicy, cmdName string) ResponseAggregator
NewResponseAggregator creates an aggregator based on the response policy.
type ResponsePolicy ¶
type ResponsePolicy uint8
const ( RespDefaultKeyless ResponsePolicy = iota RespDefaultHashSlot RespAllSucceeded RespOneSucceeded RespAggSum RespAggMin RespAggMax RespAggLogicalAnd RespAggLogicalOr RespSpecial )
func ParseResponsePolicy ¶
func ParseResponsePolicy(raw string) (ResponsePolicy, error)
func (ResponsePolicy) String ¶
func (p ResponsePolicy) String() string
type RoundRobinPicker ¶
type RoundRobinPicker struct {
// contains filtered or unexported fields
}
func (*RoundRobinPicker) Next ¶
func (p *RoundRobinPicker) Next(total int) int
type ShardPicker ¶
ShardPicker chooses “one arbitrary shard” when the request_policy is ReqDefault and the command has no keys.
type SpecialAggregator ¶
type SpecialAggregator struct {
// contains filtered or unexported fields
}
SpecialAggregator provides a registry for command-specific aggregation logic.
func NewSpecialAggregator ¶
func NewSpecialAggregator(cmdName string) *SpecialAggregator
NewSpecialAggregator creates a special aggregator with command-specific logic if available.
func (*SpecialAggregator) Add ¶
func (a *SpecialAggregator) Add(result interface{}, err error) error
func (*SpecialAggregator) AddWithKey ¶
func (a *SpecialAggregator) AddWithKey(key string, result interface{}, err error) error
func (*SpecialAggregator) BatchAdd ¶
func (a *SpecialAggregator) BatchAdd(results map[string]AggregatorResErr) error
func (*SpecialAggregator) BatchSlice ¶
func (a *SpecialAggregator) BatchSlice(results []AggregatorResErr) error
func (*SpecialAggregator) Result ¶
func (a *SpecialAggregator) Result() (interface{}, error)
type StaticShardPicker ¶
type StaticShardPicker struct {
// contains filtered or unexported fields
}
StaticShardPicker always returns the same shard index.
func NewStaticShardPicker ¶
func NewStaticShardPicker(index int) *StaticShardPicker
func (*StaticShardPicker) Next ¶
func (p *StaticShardPicker) Next(total int) int