From 5ce5cce07688cfa4a28cbd6940af072c890f42e1 Mon Sep 17 00:00:00 2001 From: optclblast Date: Mon, 16 Dec 2024 00:08:37 -0500 Subject: [PATCH] init sharding cluster stuff for fun --- Taskfile.yaml | 6 ++-- cmd/main.go | 7 ++--- compose.rw.yaml | 17 ++++++++--- go.mod | 2 +- go.sum | 4 +-- internal/app/auth.go | 2 +- internal/app/login.go | 9 +++++- internal/app/register.go | 16 +++++++++-- internal/app/upload_file.go | 3 +- internal/reqcontext/auth.go | 11 ++++---- internal/storage/interface.go | 2 +- internal/storage/postgres/database.go | 28 +++++++++---------- ...{connection_pool.go => sharded_cluster.go} | 14 +++++++--- 13 files changed, 79 insertions(+), 42 deletions(-) rename internal/storage/postgres/{connection_pool.go => sharded_cluster.go} (65%) diff --git a/Taskfile.yaml b/Taskfile.yaml index df2ec47..72c5080 100644 --- a/Taskfile.yaml +++ b/Taskfile.yaml @@ -1,9 +1,11 @@ version: 3 tasks: - prepare-env: + run: + cmds: + - go run cmd/main.go + deploy-local: cmds: - - sudo docker swarm init - sudo docker stack deploy draincloud_core -c ./compose.rw.yaml migrate-local-status: cmds: diff --git a/cmd/main.go b/cmd/main.go index 893a8a7..f3c3d97 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -6,7 +6,6 @@ import ( "os/signal" "git.optclblast.xyz/draincloud/draincloud-core/internal/app" - cleanupsessions "git.optclblast.xyz/draincloud/draincloud-core/internal/cron/cleanup_sessions" filesengine "git.optclblast.xyz/draincloud/draincloud-core/internal/files_engine" "git.optclblast.xyz/draincloud/draincloud-core/internal/plugin" "git.optclblast.xyz/draincloud/draincloud-core/internal/storage/postgres" @@ -19,11 +18,11 @@ func main() { plugin.MustNewPluginLoader(ctx, 8081, plugin.NewPluginStore()). Run(ctx) - pg := postgres.New(ctx, "postgres://draincloud:draincloud@localhost:5432/draincloud?sslmode=disable") + pg := postgres.New(ctx, "postgres://draincloud:draincloud-rw-password-123@192.168.1.68:5433/draincloud?sslmode=disable") // TODO move cron on a separate job (k8s cronjob / docker cron) - cleanupSessionsCron := cleanupsessions.New(pg) - cleanupSessionsCron.Run(ctx) + // cleanupSessionsCron := cleanupsessions.New(pg) + // cleanupSessionsCron.Run(ctx) engine := filesengine.NewFilesEngine(nil, nil) diff --git a/compose.rw.yaml b/compose.rw.yaml index 5571d68..f88c9a8 100644 --- a/compose.rw.yaml +++ b/compose.rw.yaml @@ -6,10 +6,12 @@ services: - 5432:5432 environment: - POSTGRES_USER=draincloud - - POSTGRES_PASSWORD=draincloud + - POSTGRES_PASSWORD=mysuperstrongpassword - POSTGRES_DB=draincloud volumes: - draincloud-rw-1:/var/lib/postgresql/data + networks: + - draincloud-pg rw_2: image: postgres:17 @@ -18,10 +20,12 @@ services: - 5433:5432 environment: - POSTGRES_USER=draincloud - - POSTGRES_PASSWORD=draincloud + - POSTGRES_PASSWORD=mysuperstrongpassword - POSTGRES_DB=draincloud volumes: - draincloud-rw-2:/var/lib/postgresql/data + networks: + - draincloud-pg rw_3: image: postgres:17 @@ -30,12 +34,17 @@ services: - 5434:5432 environment: - POSTGRES_USER=draincloud - - POSTGRES_PASSWORD=draincloud + - POSTGRES_PASSWORD=mysuperstrongpassword - POSTGRES_DB=draincloud volumes: - draincloud-rw-3:/var/lib/postgresql/data + networks: + - draincloud-pg volumes: draincloud-rw-1: {} draincloud-rw-2: {} - draincloud-rw-3: {} \ No newline at end of file + draincloud-rw-3: {} + +networks: + draincloud-pg: {} \ No newline at end of file diff --git a/go.mod b/go.mod index c3ce09d..fd4c72d 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/fatih/color v1.17.0 github.com/fsnotify/fsnotify v1.7.0 github.com/gin-gonic/gin v1.10.0 - github.com/google/uuid v1.4.0 + github.com/google/uuid v1.6.0 github.com/jackc/pgx/v5 v5.7.1 github.com/jmoiron/sqlx v1.4.0 github.com/nats-io/nats.go v1.37.0 diff --git a/go.sum b/go.sum index 6a37bc1..ddd07a2 100644 --- a/go.sum +++ b/go.sum @@ -39,8 +39,8 @@ github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MG github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= -github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= diff --git a/internal/app/auth.go b/internal/app/auth.go index 371c0e7..808e7ed 100644 --- a/internal/app/auth.go +++ b/internal/app/auth.go @@ -37,7 +37,7 @@ func (d *DrainCloud) authorize(ctx *gin.Context) (*models.Session, error) { return nil, ErrorUnauthorized } - logger.Debug(ctx, "[authorize] user authorized", slog.Int64("session_id", session.ID)) + logger.Debug(ctx, "[authorize] user authorized", slog.String("session_id", session.ID.String())) return session, nil } diff --git a/internal/app/login.go b/internal/app/login.go index 59bddf2..133614f 100644 --- a/internal/app/login.go +++ b/internal/app/login.go @@ -12,6 +12,7 @@ import ( "git.optclblast.xyz/draincloud/draincloud-core/internal/logger" "git.optclblast.xyz/draincloud/draincloud-core/internal/storage/models" "github.com/gin-gonic/gin" + "github.com/google/uuid" "golang.org/x/crypto/bcrypt" ) @@ -57,7 +58,7 @@ func (d *DrainCloud) login(ctx *gin.Context, req *domain.LoginRequest) (*domain. return nil, err } - logger.Debug(ctx, "[login] user is already logged in", slog.Int64("session_id", session.ID)) + logger.Debug(ctx, "[login] user is already logged in", slog.String("session_id", session.ID.String())) return &domain.LoginResponse{ Ok: true, }, nil @@ -95,7 +96,13 @@ func (d *DrainCloud) login(ctx *gin.Context, req *domain.LoginRequest) (*domain. } ctx.SetCookie(csrfTokenCookie, csrfToken, int(sessionExpiredAt.Sub(sessionCreatedAt).Seconds()), "_path", "_domain", true, false) + sessionID, err := uuid.NewV7() + if err != nil { + return nil, fmt.Errorf("failed to generate session id: %w", err) + } + if _, err = d.database.AddSession(ctx, &models.Session{ + ID: sessionID, SessionToken: sessionToken, CsrfToken: csrfToken, UserID: user.ID, diff --git a/internal/app/register.go b/internal/app/register.go index b0303c3..f5eb28d 100644 --- a/internal/app/register.go +++ b/internal/app/register.go @@ -9,6 +9,7 @@ import ( "git.optclblast.xyz/draincloud/draincloud-core/internal/logger" "git.optclblast.xyz/draincloud/draincloud-core/internal/storage/models" "github.com/gin-gonic/gin" + "github.com/google/uuid" "golang.org/x/crypto/bcrypt" ) @@ -50,17 +51,22 @@ func (d *DrainCloud) register(ctx *gin.Context, req *domain.RegisterRequest) (*d return nil, fmt.Errorf("failed to generate password hash: %w", err) } + userID, err := uuid.NewV7() + if err != nil { + return nil, fmt.Errorf("failed to generate user id: %w", err) + } + user := &models.User{ + ID: userID, Username: req.Login, Login: req.Login, PasswordHash: passwordHash, } - userID, err := d.database.AddUser(ctx, user.Login, user.Username, user.PasswordHash) + err = d.database.AddUser(ctx, userID, user.Login, user.Username, user.PasswordHash) if err != nil { return nil, fmt.Errorf("failed to add new user: %w", err) } - user.ID = userID sessionCreatedAt := time.Now() sessionExpiredAt := sessionCreatedAt.Add(time.Hour * 24 * 7) @@ -77,7 +83,13 @@ func (d *DrainCloud) register(ctx *gin.Context, req *domain.RegisterRequest) (*d } ctx.SetCookie(csrfTokenCookie, csrfToken, int(sessionExpiredAt.Sub(sessionCreatedAt).Seconds()), "_path", "_domain", true, false) + sessionID, err := uuid.NewV7() + if err != nil { + return nil, fmt.Errorf("failed to generate session id: %w", err) + } + if _, err = d.database.AddSession(ctx, &models.Session{ + ID: sessionID, SessionToken: sessionToken, CsrfToken: csrfToken, UserID: user.ID, diff --git a/internal/app/upload_file.go b/internal/app/upload_file.go index 0863114..8ca83b8 100644 --- a/internal/app/upload_file.go +++ b/internal/app/upload_file.go @@ -11,6 +11,7 @@ import ( "git.optclblast.xyz/draincloud/draincloud-core/internal/reqcontext" "github.com/davecgh/go-spew/spew" "github.com/gin-gonic/gin" + "github.com/google/uuid" ) const ( @@ -37,7 +38,7 @@ func (d *DrainCloud) UploadFile(ctx *gin.Context) { } } -func (d *DrainCloud) uploadFile(ctx *gin.Context, userID int64) error { +func (d *DrainCloud) uploadFile(ctx *gin.Context, userID uuid.UUID) error { title := ctx.PostForm("file") logger.Info(ctx, "uploadFile", slog.Any("postForm data", spew.Sdump(title))) diff --git a/internal/reqcontext/auth.go b/internal/reqcontext/auth.go index 59c6060..883aad6 100644 --- a/internal/reqcontext/auth.go +++ b/internal/reqcontext/auth.go @@ -5,6 +5,7 @@ import ( "fmt" "git.optclblast.xyz/draincloud/draincloud-core/internal/storage/models" + "github.com/google/uuid" ) type CtxKey string @@ -14,15 +15,15 @@ const ( SessionCtxKey CtxKey = "_ctx_session" ) -func WithUserID(parent context.Context, userID int64) context.Context { +func WithUserID(parent context.Context, userID uuid.UUID) context.Context { return context.WithValue(parent, UserIDCtxKey, userID) } -func GetUserID(ctx context.Context) (int64, error) { - if id, ok := ctx.Value(UserIDCtxKey).(int64); ok { +func GetUserID(ctx context.Context) (uuid.UUID, error) { + if id, ok := ctx.Value(UserIDCtxKey).(uuid.UUID); ok { return id, nil } - return -1, fmt.Errorf("userID not passed with context") + return uuid.Nil, fmt.Errorf("userID not passed with context") } func WithSession(parent context.Context, session *models.Session) context.Context { @@ -34,4 +35,4 @@ func GetSession(ctx context.Context) (*models.Session, error) { return ses, nil } return nil, fmt.Errorf("session not passed with context") -} \ No newline at end of file +} diff --git a/internal/storage/interface.go b/internal/storage/interface.go index 27d9759..fcb372e 100644 --- a/internal/storage/interface.go +++ b/internal/storage/interface.go @@ -15,7 +15,7 @@ type Database interface { } type AuthStorage interface { - AddUser(ctx context.Context, login string, username string, passwordHash []byte) (uuid.UUID, error) + AddUser(ctx context.Context, id uuid.UUID, login string, username string, passwordHash []byte) error GetUserByLogin(ctx context.Context, login string) (*models.User, error) GetUserByID(ctx context.Context, id uuid.UUID) (*models.User, error) diff --git a/internal/storage/postgres/database.go b/internal/storage/postgres/database.go index 700e2ad..de44d28 100644 --- a/internal/storage/postgres/database.go +++ b/internal/storage/postgres/database.go @@ -16,7 +16,8 @@ import ( ) type Database struct { - db *pgx.Conn + db *pgx.Conn + cluster *ShardCluster } func New(ctx context.Context, dsn string) *Database { @@ -40,8 +41,8 @@ type dbtx interface { Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error) } -func (d *Database) AddUser(ctx context.Context, login string, username string, passwordHash []byte) (uuid.UUID, error) { - return addUser(ctx, d.db, login, username, passwordHash) +func (d *Database) AddUser(ctx context.Context, id uuid.UUID, login string, username string, passwordHash []byte) error { + return addUser(ctx, d.db, id, login, username, passwordHash) } func (d *Database) GetUserByID(ctx context.Context, id uuid.UUID) (*models.User, error) { @@ -99,17 +100,16 @@ func (d *Database) RemoveExpiredSessions(ctx context.Context) error { return err } -func addUser(ctx context.Context, conn dbtx, login string, username string, passwordHash []byte) (uuid.UUID, error) { - const stmt = `INSERT INTO users (login,username,password) - VALUES ($1,$2,$3) RETURNING id` +func addUser(ctx context.Context, conn dbtx, id uuid.UUID, login string, username string, passwordHash []byte) error { + const stmt = `INSERT INTO users (id,login,username,password) + VALUES ($1,$2,$3,$4);` - row := conn.QueryRow(ctx, stmt, login, username, passwordHash) - var id uuid.UUID - if err := row.Scan(&id); err != nil { - return uuid.Nil, fmt.Errorf("failed to insert user data into users table: %w", err) + _, err := conn.Exec(ctx, stmt, id, login, username, passwordHash) + if err != nil { + return fmt.Errorf("failed to insert user data into users table: %w", err) } - return id, nil + return nil } func getUserByID(ctx context.Context, conn dbtx, id uuid.UUID) (*models.User, error) { @@ -135,10 +135,10 @@ func getUserByLogin(ctx context.Context, conn dbtx, login string) (*models.User, } func addSession(ctx context.Context, conn dbtx, session *models.Session) (uuid.UUID, error) { - const stmt = `INSERT INTO sessions (session_token, csrf_token, user_id, - created_at, expired_at) VALUES ($1, $2, $3, $4, $5) RETURNING id;` + const stmt = `INSERT INTO sessions (id,session_token, csrf_token, user_id, + created_at, expired_at) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id;` var id uuid.UUID - row := conn.QueryRow(ctx, stmt, session.SessionToken, session.CsrfToken, session.UserID, session.CreatedAt, session.ExpiredAt) + row := conn.QueryRow(ctx, stmt, session.ID, session.SessionToken, session.CsrfToken, session.UserID, session.CreatedAt, session.ExpiredAt) if err := row.Scan(&id); err != nil { return uuid.Nil, fmt.Errorf("failed to insert new session: %w", err) } diff --git a/internal/storage/postgres/connection_pool.go b/internal/storage/postgres/sharded_cluster.go similarity index 65% rename from internal/storage/postgres/connection_pool.go rename to internal/storage/postgres/sharded_cluster.go index b3f037f..88f00f8 100644 --- a/internal/storage/postgres/connection_pool.go +++ b/internal/storage/postgres/sharded_cluster.go @@ -2,22 +2,24 @@ package postgres import ( "context" + "hash/crc32" "log/slog" "sync" "git.optclblast.xyz/draincloud/draincloud-core/internal/logger" + "github.com/google/uuid" "github.com/jackc/pgx/v5" ) -type ShardMap = map[uint16]*pgx.ConnConfig +type ShardMap = map[uint32]*pgx.ConnConfig type ShardCluster struct { m sync.Mutex - shards map[uint16]*pgx.Conn + shards []*pgx.Conn } func NewShardCluster(ctx context.Context, shardMap ShardMap) *ShardCluster { - shards := make(map[uint16]*pgx.Conn, len(shardMap)) + shards := make([]*pgx.Conn, len(shardMap)) for n, cfg := range shardMap { conn, err := pgx.ConnectConfig(ctx, cfg) if err != nil { @@ -28,8 +30,12 @@ func NewShardCluster(ctx context.Context, shardMap ShardMap) *ShardCluster { return &ShardCluster{shards: shards} } -func (c *ShardCluster) SelectShard(n uint16) *pgx.Conn { +func (c *ShardCluster) PickShard(n uint32) *pgx.Conn { c.m.Lock() defer c.m.Unlock() return c.shards[n] } + +func UUIDShardFn(id uuid.UUID, numShards uint32) uint32 { + return crc32.ChecksumIEEE(id[:]) % numShards +}