draincloud-core/internal/storage/postgres/sharded_cluster.go

42 lines
903 B
Go
Raw Normal View History

2024-12-14 07:42:59 +00:00
package postgres
import (
"context"
2024-12-16 05:08:37 +00:00
"hash/crc32"
2024-12-14 07:42:59 +00:00
"log/slog"
"sync"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
2024-12-16 05:08:37 +00:00
"github.com/google/uuid"
2024-12-14 07:42:59 +00:00
"github.com/jackc/pgx/v5"
)
2024-12-16 05:08:37 +00:00
type ShardMap = map[uint32]*pgx.ConnConfig
2024-12-14 07:42:59 +00:00
type ShardCluster struct {
m sync.Mutex
2024-12-16 05:08:37 +00:00
shards []*pgx.Conn
2024-12-14 07:42:59 +00:00
}
func NewShardCluster(ctx context.Context, shardMap ShardMap) *ShardCluster {
2024-12-16 05:08:37 +00:00
shards := make([]*pgx.Conn, len(shardMap))
2024-12-14 07:42:59 +00:00
for n, cfg := range shardMap {
conn, err := pgx.ConnectConfig(ctx, cfg)
if err != nil {
logger.Fatal(ctx, "failed to connect to shard", slog.Uint64("num", uint64(n)), logger.Err(err))
}
shards[n] = conn
}
return &ShardCluster{shards: shards}
}
2024-12-16 05:08:37 +00:00
func (c *ShardCluster) PickShard(n uint32) *pgx.Conn {
2024-12-14 07:42:59 +00:00
c.m.Lock()
defer c.m.Unlock()
return c.shards[n]
}
2024-12-16 05:08:37 +00:00
func UUIDShardFn(id uuid.UUID, numShards uint32) uint32 {
return crc32.ChecksumIEEE(id[:]) % numShards
}