Documentation
¶
Index ¶
- Variables
- type Cluster
- func (cl *Cluster) Close() error
- func (cl *Cluster) DB(number int64) (int, *pg.DB)
- func (cl *Cluster) DBs() []*pg.DB
- func (cl *Cluster) ForEachDB(fn func(db *pg.DB) error) error
- func (cl *Cluster) ForEachNShards(n int, fn func(shard *pg.DB) error) error
- func (cl *Cluster) ForEachShard(fn func(shard *pg.DB) error) error
- func (cl *Cluster) IDGen() *IDGen
- func (cl *Cluster) Shard(number int64) *pg.DB
- func (cl *Cluster) Shards(db *pg.DB) []*pg.DB
- func (cl *Cluster) SplitShard(id int64) *pg.DB
- func (cl *Cluster) SubCluster(number int64, size int) *SubCluster
- type IDGen
- type ShardIDGen
- type SubCluster
- type UUID
- func (u UUID) AppendValue(b []byte, quote int) ([]byte, error)
- func (u *UUID) IsZero() bool
- func (u UUID) MarshalBinary() ([]byte, error)
- func (u UUID) MarshalJSON() ([]byte, error)
- func (u UUID) MarshalText() ([]byte, error)
- func (u *UUID) Scan(b interface{}) error
- func (u *UUID) ShardID() int64
- func (u *UUID) Split() (shardID int64, tm time.Time)
- func (u UUID) String() string
- func (u *UUID) Time() time.Time
- func (u *UUID) UnmarshalBinary(b []byte) error
- func (u *UUID) UnmarshalJSON(b []byte) error
- func (u *UUID) UnmarshalText(b []byte) error
- func (u UUID) Value() (driver.Value, error)
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var (
DefaultIDGen = NewIDGen(41, 11, 12, _epoch)
)
Functions ¶
This section is empty.
Types ¶
type Cluster ¶
type Cluster struct {
// contains filtered or unexported fields
}
Cluster maps many (up to 2048) logical database shards implemented using PostgreSQL schemas to far fewer physical PostgreSQL servers.
Example ¶
package main
import (
"fmt"
"github.com/go-pg/sharding/v7"
"github.com/go-pg/pg/v9"
)
// Users are sharded by AccountId, i.e. users with same account id are
// placed on same shard.
type User struct {
tableName string `pg:"?SHARD.users"`
Id int64
AccountId int64
Name string
Emails []string
}
func (u User) String() string {
return u.Name
}
// CreateUser picks shard by account id and creates user in the shard.
func CreateUser(cluster *sharding.Cluster, user *User) error {
return cluster.Shard(user.AccountId).Insert(user)
}
// GetUser splits shard from user id and fetches user from the shard.
func GetUser(cluster *sharding.Cluster, id int64) (*User, error) {
var user User
err := cluster.SplitShard(id).Model(&user).Where("id = ?", id).Select()
return &user, err
}
// GetUsers picks shard by account id and fetches users from the shard.
func GetUsers(cluster *sharding.Cluster, accountId int64) ([]User, error) {
var users []User
err := cluster.Shard(accountId).Model(&users).Where("account_id = ?", accountId).Select()
return users, err
}
// createShard creates database schema for a given shard.
func createShard(shard *pg.DB) error {
queries := []string{
`DROP SCHEMA IF EXISTS ?SHARD CASCADE`,
`CREATE SCHEMA ?SHARD`,
sqlFuncs,
`CREATE TABLE ?SHARD.users (id bigint DEFAULT ?SHARD.next_id(), account_id int, name text, emails jsonb)`,
}
for _, q := range queries {
_, err := shard.Exec(q)
if err != nil {
return err
}
}
return nil
}
func main() {
db := pg.Connect(&pg.Options{
User: "postgres",
})
dbs := []*pg.DB{db} // list of physical PostgreSQL servers
nshards := 2 // 2 logical shards
// Create cluster with 1 physical server and 2 logical shards.
cluster := sharding.NewCluster(dbs, nshards)
// Create database schema for our logical shards.
for i := 0; i < nshards; i++ {
if err := createShard(cluster.Shard(int64(i))); err != nil {
panic(err)
}
}
// user1 will be created in shard1 because AccountId % nshards = shard1.
user1 := &User{
Name: "user1",
AccountId: 1,
Emails: []string{"user1@domain"},
}
err := CreateUser(cluster, user1)
if err != nil {
panic(err)
}
// user2 will be created in shard1 too AccountId is the same.
user2 := &User{
Name: "user2",
AccountId: 1,
Emails: []string{"user2@domain"},
}
err = CreateUser(cluster, user2)
if err != nil {
panic(err)
}
// user3 will be created in shard0 because AccountId % nshards = shard0.
user3 := &User{
Name: "user3",
AccountId: 2,
Emails: []string{"user3@domain"},
}
err = CreateUser(cluster, user3)
if err != nil {
panic(err)
}
user, err := GetUser(cluster, user1.Id)
if err != nil {
panic(err)
}
users, err := GetUsers(cluster, 1)
if err != nil {
panic(err)
}
fmt.Println(user)
fmt.Println(users[0], users[1])
}
const sqlFuncs = `
CREATE OR REPLACE FUNCTION public.make_id(tm timestamptz, seq_id bigint, shard_id int)
RETURNS bigint AS $$
DECLARE
max_shard_id CONSTANT bigint := 2048;
max_seq_id CONSTANT bigint := 4096;
id bigint;
BEGIN
shard_id := shard_id % max_shard_id;
seq_id := seq_id % max_seq_id;
id := (floor(extract(epoch FROM tm) * 1000)::bigint - ?EPOCH) << 23;
id := id | (shard_id << 12);
id := id | seq_id;
RETURN id;
END;
$$
LANGUAGE plpgsql IMMUTABLE;
CREATE FUNCTION ?SHARD.make_id(tm timestamptz, seq_id bigint)
RETURNS bigint AS $$
BEGIN
RETURN public.make_id(tm, seq_id, ?SHARD_ID);
END;
$$
LANGUAGE plpgsql IMMUTABLE;
CREATE FUNCTION ?SHARD.next_id()
RETURNS bigint AS $$
BEGIN
RETURN ?SHARD.make_id(clock_timestamp(), nextval('?SHARD.id_seq'));
END;
$$
LANGUAGE plpgsql;
CREATE SEQUENCE ?SHARD.id_seq;
`
Output: user1 user1 user2
func NewClusterWithGen ¶
NewClusterWithGen returns new PostgreSQL cluster consisting of physical dbs and running nshards logical shards.
func (*Cluster) ForEachNShards ¶
ForEachNShards concurrently calls the fn on each N shards in the cluster.
func (*Cluster) ForEachShard ¶
ForEachShard concurrently calls the fn on each shard in the cluster. It is the same as ForEachNShards(1, fn).
func (*Cluster) Shards ¶
Shards returns list of shards running in the db. If db is nil all shards are returned.
func (*Cluster) SplitShard ¶
SplitShard uses SplitID to extract shard id from the id and then returns corresponding Shard in the cluster.
func (*Cluster) SubCluster ¶
func (cl *Cluster) SubCluster(number int64, size int) *SubCluster
SubCluster returns a subset of the cluster of the given size.
type IDGen ¶ added in v7.1.0
type IDGen struct {
// contains filtered or unexported fields
}
func (*IDGen) MakeID ¶ added in v7.1.0
MakeId returns an id for the time. Note that you can only generate 4096 unique numbers per millisecond.
type ShardIDGen ¶ added in v7.1.0
type ShardIDGen struct {
// contains filtered or unexported fields
}
IDGen generates sortable unique int64 numbers that consist of: - 41 bits for time in milliseconds. - 11 bits for shard id. - 12 bits for auto-incrementing sequence.
As a result we can generate 4096 ids per millisecond for each of 2048 shards. Minimum supported time is 1975-02-28, maximum is 2044-12-31.
func NewShardIDGen ¶ added in v7.1.0
func NewShardIDGen(shard int64, gen *IDGen) *ShardIDGen
NewShardIDGen returns id generator for the shard.
func (*ShardIDGen) MaxID ¶ added in v7.1.0
func (g *ShardIDGen) MaxID(tm time.Time) int64
MaxId returns max id for the time.
func (*ShardIDGen) MinID ¶ added in v7.1.2
func (g *ShardIDGen) MinID(tm time.Time) int64
MinId returns min id for the time.
type SubCluster ¶
type SubCluster struct {
// contains filtered or unexported fields
}
SubCluster is a subset of the cluster.
func (*SubCluster) ForEachNShards ¶
ForEachNShards concurrently calls the fn on each N shards in the subcluster.
func (*SubCluster) ForEachShard ¶
func (cl *SubCluster) ForEachShard(fn func(shard *pg.DB) error) error
ForEachShard concurrently calls the fn on each shard in the subcluster. It is the same as ForEachNShards(1, fn).
func (*SubCluster) Shard ¶
func (cl *SubCluster) Shard(number int64) *pg.DB
Shard maps the number to the corresponding shard in the subscluster.
func (*SubCluster) SplitShard ¶
func (cl *SubCluster) SplitShard(id int64) *pg.DB
SplitShard uses SplitID to extract shard id from the id and then returns corresponding Shard in the subcluster.
type UUID ¶
type UUID [uuidLen]byte