init sharding cluster stuff for fun

This commit is contained in:
r8zavetr8v 2024-12-16 00:08:37 -05:00
parent 5bdb7fb0b0
commit 5ce5cce076
13 changed files with 79 additions and 42 deletions

View File

@ -1,9 +1,11 @@
version: 3
tasks:
prepare-env:
run:
cmds:
- go run cmd/main.go
deploy-local:
cmds:
- sudo docker swarm init
- sudo docker stack deploy draincloud_core -c ./compose.rw.yaml
migrate-local-status:
cmds:

View File

@ -6,7 +6,6 @@ import (
"os/signal"
"git.optclblast.xyz/draincloud/draincloud-core/internal/app"
cleanupsessions "git.optclblast.xyz/draincloud/draincloud-core/internal/cron/cleanup_sessions"
filesengine "git.optclblast.xyz/draincloud/draincloud-core/internal/files_engine"
"git.optclblast.xyz/draincloud/draincloud-core/internal/plugin"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage/postgres"
@ -19,11 +18,11 @@ func main() {
plugin.MustNewPluginLoader(ctx, 8081, plugin.NewPluginStore()).
Run(ctx)
pg := postgres.New(ctx, "postgres://draincloud:draincloud@localhost:5432/draincloud?sslmode=disable")
pg := postgres.New(ctx, "postgres://draincloud:draincloud-rw-password-123@192.168.1.68:5433/draincloud?sslmode=disable")
// TODO move cron on a separate job (k8s cronjob / docker cron)
cleanupSessionsCron := cleanupsessions.New(pg)
cleanupSessionsCron.Run(ctx)
// cleanupSessionsCron := cleanupsessions.New(pg)
// cleanupSessionsCron.Run(ctx)
engine := filesengine.NewFilesEngine(nil, nil)

View File

@ -6,10 +6,12 @@ services:
- 5432:5432
environment:
- POSTGRES_USER=draincloud
- POSTGRES_PASSWORD=draincloud
- POSTGRES_PASSWORD=mysuperstrongpassword
- POSTGRES_DB=draincloud
volumes:
- draincloud-rw-1:/var/lib/postgresql/data
networks:
- draincloud-pg
rw_2:
image: postgres:17
@ -18,10 +20,12 @@ services:
- 5433:5432
environment:
- POSTGRES_USER=draincloud
- POSTGRES_PASSWORD=draincloud
- POSTGRES_PASSWORD=mysuperstrongpassword
- POSTGRES_DB=draincloud
volumes:
- draincloud-rw-2:/var/lib/postgresql/data
networks:
- draincloud-pg
rw_3:
image: postgres:17
@ -30,12 +34,17 @@ services:
- 5434:5432
environment:
- POSTGRES_USER=draincloud
- POSTGRES_PASSWORD=draincloud
- POSTGRES_PASSWORD=mysuperstrongpassword
- POSTGRES_DB=draincloud
volumes:
- draincloud-rw-3:/var/lib/postgresql/data
networks:
- draincloud-pg
volumes:
draincloud-rw-1: {}
draincloud-rw-2: {}
draincloud-rw-3: {}
draincloud-rw-3: {}
networks:
draincloud-pg: {}

2
go.mod
View File

@ -7,7 +7,7 @@ require (
github.com/fatih/color v1.17.0
github.com/fsnotify/fsnotify v1.7.0
github.com/gin-gonic/gin v1.10.0
github.com/google/uuid v1.4.0
github.com/google/uuid v1.6.0
github.com/jackc/pgx/v5 v5.7.1
github.com/jmoiron/sqlx v1.4.0
github.com/nats-io/nats.go v1.37.0

4
go.sum
View File

@ -39,8 +39,8 @@ github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MG
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=

View File

@ -37,7 +37,7 @@ func (d *DrainCloud) authorize(ctx *gin.Context) (*models.Session, error) {
return nil, ErrorUnauthorized
}
logger.Debug(ctx, "[authorize] user authorized", slog.Int64("session_id", session.ID))
logger.Debug(ctx, "[authorize] user authorized", slog.String("session_id", session.ID.String()))
return session, nil
}

View File

@ -12,6 +12,7 @@ import (
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage/models"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"golang.org/x/crypto/bcrypt"
)
@ -57,7 +58,7 @@ func (d *DrainCloud) login(ctx *gin.Context, req *domain.LoginRequest) (*domain.
return nil, err
}
logger.Debug(ctx, "[login] user is already logged in", slog.Int64("session_id", session.ID))
logger.Debug(ctx, "[login] user is already logged in", slog.String("session_id", session.ID.String()))
return &domain.LoginResponse{
Ok: true,
}, nil
@ -95,7 +96,13 @@ func (d *DrainCloud) login(ctx *gin.Context, req *domain.LoginRequest) (*domain.
}
ctx.SetCookie(csrfTokenCookie, csrfToken, int(sessionExpiredAt.Sub(sessionCreatedAt).Seconds()), "_path", "_domain", true, false)
sessionID, err := uuid.NewV7()
if err != nil {
return nil, fmt.Errorf("failed to generate session id: %w", err)
}
if _, err = d.database.AddSession(ctx, &models.Session{
ID: sessionID,
SessionToken: sessionToken,
CsrfToken: csrfToken,
UserID: user.ID,

View File

@ -9,6 +9,7 @@ import (
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage/models"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"golang.org/x/crypto/bcrypt"
)
@ -50,17 +51,22 @@ func (d *DrainCloud) register(ctx *gin.Context, req *domain.RegisterRequest) (*d
return nil, fmt.Errorf("failed to generate password hash: %w", err)
}
userID, err := uuid.NewV7()
if err != nil {
return nil, fmt.Errorf("failed to generate user id: %w", err)
}
user := &models.User{
ID: userID,
Username: req.Login,
Login: req.Login,
PasswordHash: passwordHash,
}
userID, err := d.database.AddUser(ctx, user.Login, user.Username, user.PasswordHash)
err = d.database.AddUser(ctx, userID, user.Login, user.Username, user.PasswordHash)
if err != nil {
return nil, fmt.Errorf("failed to add new user: %w", err)
}
user.ID = userID
sessionCreatedAt := time.Now()
sessionExpiredAt := sessionCreatedAt.Add(time.Hour * 24 * 7)
@ -77,7 +83,13 @@ func (d *DrainCloud) register(ctx *gin.Context, req *domain.RegisterRequest) (*d
}
ctx.SetCookie(csrfTokenCookie, csrfToken, int(sessionExpiredAt.Sub(sessionCreatedAt).Seconds()), "_path", "_domain", true, false)
sessionID, err := uuid.NewV7()
if err != nil {
return nil, fmt.Errorf("failed to generate session id: %w", err)
}
if _, err = d.database.AddSession(ctx, &models.Session{
ID: sessionID,
SessionToken: sessionToken,
CsrfToken: csrfToken,
UserID: user.ID,

View File

@ -11,6 +11,7 @@ import (
"git.optclblast.xyz/draincloud/draincloud-core/internal/reqcontext"
"github.com/davecgh/go-spew/spew"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
)
const (
@ -37,7 +38,7 @@ func (d *DrainCloud) UploadFile(ctx *gin.Context) {
}
}
func (d *DrainCloud) uploadFile(ctx *gin.Context, userID int64) error {
func (d *DrainCloud) uploadFile(ctx *gin.Context, userID uuid.UUID) error {
title := ctx.PostForm("file")
logger.Info(ctx, "uploadFile", slog.Any("postForm data", spew.Sdump(title)))

View File

@ -5,6 +5,7 @@ import (
"fmt"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage/models"
"github.com/google/uuid"
)
type CtxKey string
@ -14,15 +15,15 @@ const (
SessionCtxKey CtxKey = "_ctx_session"
)
func WithUserID(parent context.Context, userID int64) context.Context {
func WithUserID(parent context.Context, userID uuid.UUID) context.Context {
return context.WithValue(parent, UserIDCtxKey, userID)
}
func GetUserID(ctx context.Context) (int64, error) {
if id, ok := ctx.Value(UserIDCtxKey).(int64); ok {
func GetUserID(ctx context.Context) (uuid.UUID, error) {
if id, ok := ctx.Value(UserIDCtxKey).(uuid.UUID); ok {
return id, nil
}
return -1, fmt.Errorf("userID not passed with context")
return uuid.Nil, fmt.Errorf("userID not passed with context")
}
func WithSession(parent context.Context, session *models.Session) context.Context {
@ -34,4 +35,4 @@ func GetSession(ctx context.Context) (*models.Session, error) {
return ses, nil
}
return nil, fmt.Errorf("session not passed with context")
}
}

View File

@ -15,7 +15,7 @@ type Database interface {
}
type AuthStorage interface {
AddUser(ctx context.Context, login string, username string, passwordHash []byte) (uuid.UUID, error)
AddUser(ctx context.Context, id uuid.UUID, login string, username string, passwordHash []byte) error
GetUserByLogin(ctx context.Context, login string) (*models.User, error)
GetUserByID(ctx context.Context, id uuid.UUID) (*models.User, error)

View File

@ -16,7 +16,8 @@ import (
)
type Database struct {
db *pgx.Conn
db *pgx.Conn
cluster *ShardCluster
}
func New(ctx context.Context, dsn string) *Database {
@ -40,8 +41,8 @@ type dbtx interface {
Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
}
func (d *Database) AddUser(ctx context.Context, login string, username string, passwordHash []byte) (uuid.UUID, error) {
return addUser(ctx, d.db, login, username, passwordHash)
func (d *Database) AddUser(ctx context.Context, id uuid.UUID, login string, username string, passwordHash []byte) error {
return addUser(ctx, d.db, id, login, username, passwordHash)
}
func (d *Database) GetUserByID(ctx context.Context, id uuid.UUID) (*models.User, error) {
@ -99,17 +100,16 @@ func (d *Database) RemoveExpiredSessions(ctx context.Context) error {
return err
}
func addUser(ctx context.Context, conn dbtx, login string, username string, passwordHash []byte) (uuid.UUID, error) {
const stmt = `INSERT INTO users (login,username,password)
VALUES ($1,$2,$3) RETURNING id`
func addUser(ctx context.Context, conn dbtx, id uuid.UUID, login string, username string, passwordHash []byte) error {
const stmt = `INSERT INTO users (id,login,username,password)
VALUES ($1,$2,$3,$4);`
row := conn.QueryRow(ctx, stmt, login, username, passwordHash)
var id uuid.UUID
if err := row.Scan(&id); err != nil {
return uuid.Nil, fmt.Errorf("failed to insert user data into users table: %w", err)
_, err := conn.Exec(ctx, stmt, id, login, username, passwordHash)
if err != nil {
return fmt.Errorf("failed to insert user data into users table: %w", err)
}
return id, nil
return nil
}
func getUserByID(ctx context.Context, conn dbtx, id uuid.UUID) (*models.User, error) {
@ -135,10 +135,10 @@ func getUserByLogin(ctx context.Context, conn dbtx, login string) (*models.User,
}
func addSession(ctx context.Context, conn dbtx, session *models.Session) (uuid.UUID, error) {
const stmt = `INSERT INTO sessions (session_token, csrf_token, user_id,
created_at, expired_at) VALUES ($1, $2, $3, $4, $5) RETURNING id;`
const stmt = `INSERT INTO sessions (id,session_token, csrf_token, user_id,
created_at, expired_at) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id;`
var id uuid.UUID
row := conn.QueryRow(ctx, stmt, session.SessionToken, session.CsrfToken, session.UserID, session.CreatedAt, session.ExpiredAt)
row := conn.QueryRow(ctx, stmt, session.ID, session.SessionToken, session.CsrfToken, session.UserID, session.CreatedAt, session.ExpiredAt)
if err := row.Scan(&id); err != nil {
return uuid.Nil, fmt.Errorf("failed to insert new session: %w", err)
}

View File

@ -2,22 +2,24 @@ package postgres
import (
"context"
"hash/crc32"
"log/slog"
"sync"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
)
type ShardMap = map[uint16]*pgx.ConnConfig
type ShardMap = map[uint32]*pgx.ConnConfig
type ShardCluster struct {
m sync.Mutex
shards map[uint16]*pgx.Conn
shards []*pgx.Conn
}
func NewShardCluster(ctx context.Context, shardMap ShardMap) *ShardCluster {
shards := make(map[uint16]*pgx.Conn, len(shardMap))
shards := make([]*pgx.Conn, len(shardMap))
for n, cfg := range shardMap {
conn, err := pgx.ConnectConfig(ctx, cfg)
if err != nil {
@ -28,8 +30,12 @@ func NewShardCluster(ctx context.Context, shardMap ShardMap) *ShardCluster {
return &ShardCluster{shards: shards}
}
func (c *ShardCluster) SelectShard(n uint16) *pgx.Conn {
func (c *ShardCluster) PickShard(n uint32) *pgx.Conn {
c.m.Lock()
defer c.m.Unlock()
return c.shards[n]
}
func UUIDShardFn(id uuid.UUID, numShards uint32) uint32 {
return crc32.ChecksumIEEE(id[:]) % numShards
}