Data Distribution Algorithm
Background
The main requirement for a distribution algorithm is that profile series of the same tenant service should be
co-located, as spatial locality is crucial for compaction and query performance. The distribution algorithm should
not aim to place all profiles of a specific tenant service in a dedicated shard; instead, it should distribute them
among the optimal number of shards.
Distributors must be aware of availability zones and should only route profiles to segment writers in the home AZ.
Depending on the environment, crossing AZ network boundaries usually incurs penalties: cloud providers may charge
for cross-AZ traffic, or in on-premises deployments, AZs may represent different data centers with high-latency
connections.
The number of shards and segment writers is not constant and may change over time. The distribution algorithm should
aim to minimize data re-balancing when such changes occur. Nevertheless, we do not perform actual data re-balancing:
data written to a shard remains there until it is compacted or deleted. The only reason for minimizing re-balancing
is to optimize data locality; this concerns both data in transit, as segment writers are sensitive to the variance of
the datasets, and data at rest, as this is crucial for compaction efficiency and query performance in the end.
Overview
- Profiles are distributed among segment-writers based on the profile labels.
- Profile labels must include
service_name label, which denotes the dataset the profile belongs to.
- Each profile belongs to a tenant.
The choice of a placement for a profile involves a three-step process:
- Finding m suitable locations from the total of N options using the request
tenant_id.
- Finding n suitable locations from the total of m options using the
service_name label.
- Finding the exact location s from the total of n options.
Where:
- N is the total number of shards in the deployment.
- m – tenant shard limits – is configured explicitly.
- n – dataset shard limits – selected dynamically, based on the observed ingestion rate and patterns.
The number of shards in the deployment is determined by the number of nodes in the deployment:
- We seek to minimize the number of shards to optimize the cost of the solution: as we flush segments per shard,
the number of shards directly affects the number of write operations to the object storage.
- Experimentally, we found that a conservative processing rate is approximately 8 MB/s per core, depending on the
processor and the network bandwidth (thus, 128 cores should be generally enough to handle 1 GB/s). This unit is
recommended as a quantifier of the deployment size and the shard size.
Due to the nature of continuous profiling, it is usually beneficial to keep the same profile series on the same shard,
as this allows for more optimal utilization of the TSDB index (the inverted index used for searching by labels).
However, data is often distributed across profile series unevenly; using a series label hash as the distribution key
at any of the steps above may lead to significant data skews. To mitigate this, we propose to employ adaptive load
balancing: use fingerprint mod n as the distribution key at step 3 by default, and switch to random(n), when a
skew is observed.
In case of a failure, the next suitable segment writer is selected (from n options available to the tenant service,
increasing the number if needed). The shard identifier is specified explicitly in the request to the segment writer to
maintain data locality in case of transient failures and rollouts.
The proposed approach assumes that two requests with the same distribution key may end up in different shards.
This should be a rare occurrence, but such placement is expected.
Implementation
The existing ring implementation is used for discovery: the underlying memberlist
library is used to maintain the list of the segment-writer service instances:
graph RL
Lifecycler-.->Memberlist
Memberlist-.->Ring
subgraph SegmentWriter["segment-writer"]
Lifecycler["lifecycler"]
end
subgraph Memberlist["memberlist"]
end
subgraph Distributor["distributor"]
Ring["ring"]
end
Instead of using the ring for the actual placement, distributor builds its own view of the ring, which is then used to
determine the placement of the keys (profiles). The main reason for this is that the exising ring implementation is not
well suited for the proposed algorithm, as it does not provide a way to map a key to a specific shard.
In accordance to the algorithm, for each key (profile), we need to identify a subset of shards allowed for the tenant,
and subset of shards allowed for the dataset. Jump consistent hash is used to pick
the subring position:
int32_t JumpConsistentHash(uint64_t key, int32_t num_buckets) {
int64_t b = 1, j = 0;
while (j < num_buckets) {
b = j;
key = key * 2862933555777941757ULL + 1;
j = (b + 1) * (double(1LL << 31) / double((key >> 33) + 1));
}
return b;
}
The function ensures balance, which essentially states that objects are evenly distributed among buckets, and
monotonicity, which says that when the number of buckets is increased, objects move only from old buckets to new
buckets, thus doing no unnecessary rearrangement.
The diagram below illustrates how a specific key (profile) can be mapped to a specific shard and node:
- First subring (tenant shards) starts at offset 3 and its size is 8 (configured explicitly).
- Second subring (dataset shards) starts at offset 1 within the parent subring (tenant) and includes 4 shards (determined dynamically).
block-beta
columns 15
nodes["nodes"]:2
space
node_a["node A"]:4
node_b["node B"]:4
node_c["node C"]:4
shards["ring"]:2
space
shard_0["0"]
shard_1["1"]
shard_2["2"]
shard_3["3"]
shard_4["4"]
shard_5["5"]
shard_6["6"]
shard_7["7"]
shard_8["8"]
shard_9["9"]
shard_10["10"]
shard_11["11"]
tenant["tenant"]:2
space:4
ts_3["3"]
ts_4["4"]
ts_5["5"]
ts_6["6"]
ts_7["7"]
ts_8["8"]
ts_9["9"]
space:2
dataset["dataset"]:2
space:5
ds_4["4"]
ds_5["5"]
ds_6["6"]
ds_7["7"]
space:4
Such placement enables hot spots: in this specific example, all the dataset shards end up on the same node, which may
lead to uneven load distribution and poses problems in case of node failures. For example, if node B fails, all the
requests that target it, would be routed to node A (or C), which may lead to a cascading failure.
To mitigate this, shards are mapped to instances through a separate mapping table. The mapping table is updated every
time when the number of nodes changes, but it preserves the existing mapping as much as possible.
block-beta
columns 15
shards["ring"]:2
space
shard_0["0"]
shard_1["1"]
shard_2["2"]
shard_3["3"]
shard_4["4"]
shard_5["5"]
shard_6["6"]
shard_7["7"]
shard_8["8"]
shard_9["9"]
shard_10["10"]
shard_11["11"]
tenant["tenant"]:2
space:4
ts_3["3"]
ts_4["4"]
ts_5["5"]
ts_6["6"]
ts_7["7"]
ts_8["8"]
ts_9["9"]
space:2
dataset["dataset"]:2
space:5
ds_4["4"]
ds_5["5"]
ds_6["6"]
ds_7["7"]
space:4
space:15
mapping["mapping"]:2
space
map_4["4"]
map_11["11"]
map_5["5"]
map_2["2"]
map_3["3"]
map_0["0"]
map_7["7"]
map_9["9"]
map_8["8"]
map_10["10"]
map_1["1"]
map_6["6"]
space:15
m_shards["shards"]:2
space
m_shard_0["0"]
m_shard_1["1"]
m_shard_2["2"]
m_shard_3["3"]
m_shard_4["4"]
m_shard_5["5"]
m_shard_6["6"]
m_shard_7["7"]
m_shard_8["8"]
m_shard_9["9"]
m_shard_10["10"]
m_shard_11["11"]
nodes["nodes"]:2
space
node_a["node A"]:4
node_b["node B"]:4
node_c["node C"]:4
style m_shard_3 fill:#969,stroke:#333,stroke-width:4px
style m_shard_0 fill:#969,stroke:#333,stroke-width:4px
style m_shard_7 fill:#969,stroke:#333,stroke-width:4px
style m_shard_9 fill:#969,stroke:#333,stroke-width:4px
ds_4 --> map_3
map_3 --> m_shard_3
ds_5 --> map_0
map_0 --> m_shard_0
ds_6 --> map_7
map_7 --> m_shard_7
ds_7 --> map_9
map_9 --> m_shard_9
In the current implementation, the mapping is a simple permutation generated with a predefined random seed using the
Fisher-Yates: when N new shards added or removed, at max N shards are moved to a different node. Ideally, the random
distribution should ensure uniform distribution of the shards across the nodes.
Now, if node B fails, its shards are distributed among the remaining nodes, which ensures that the load is distributed
evenly even in case of a failure. In our example:
- Suppose, we selected shard 6.
- It's routed to node B via the mapping table.
- An attempt to store a profile in the shard 6 fails because of the node failure.
- We pick the next location (7) which is then mapped to node C.
- Until node B is back, we will route writes to the shard 6 to node C.
Use of continuous shard ranges in subrings allows to minimize the number of datasets affected by a topology change:
ones that overlap the parent ring boundaries are affected the most, and it is expected that a dataset may change its
mapping entirely. However, such impact is preferable over the alternative, where larger number of datasets is affected
in a more subtle way.
Placement management
Placement is managed by the Placement Manager, which resides in the metastore. The Placement Manager is a singleton and
runs only on the Raft leader node.
The Placement Manager keeps track of dataset statistics based on the metadata records received from the segment-writer
service instances. Currently, the only metric that affects placement is the dataset size after it is written in the wire
format.
The Placement Manager builds placement rules at regular intervals, which are then used by the distributor to determine
the placement for each received profile. Since actual data re-balancing is not performed, the placement rules are not
synchronized across the distributor instances.
graph LR
Distributor==>SegmentWriter
PlacementAgent-.-PlacementRules
SegmentWriter-->|metadata|PlacementManager
SegmentWriter==>|data|Segments
PlacementManager-.->PlacementRules
subgraph Distributor["distributor"]
PlacementAgent
end
subgraph Metastore["metastore"]
PlacementManager
end
subgraph ObjectStore["object store"]
PlacementRules(placement rules)
Segments(segments)
end
subgraph SegmentWriter["segment-writer"]
end
Placement rules are defined in the protobuf format.
As of now, placement rules do not include the exact shards and mappings to nodes. Instead, they specify how many shards
are allocated for a specific dataset and tenant, and what load balancing strategy should be used: fingerprint mod or
round robin. In the future, placement management might be extended to include direct shard-to-node mappings, thus
implementing directory-based sharding.
There are a number of basic heuristics to determine the minimal sufficient number of shards for a dataset with a minimal
control options. Specifically, due to a substantial lag in the feedback loop (up to tens of seconds), shard allocation
is pessimistic and may lead to over-allocation of shards if a persistent burst trend is observed. Conversely, when the
observed data rate decreases, the number of shards is not reduced immediately. For testing purposes, behavior of the
autoscaling has been simulated on some of the archetypes (and later validated on the read data).

