This commit is contained in:
r8zavetr8v 2025-02-25 14:47:57 -08:00
parent bfec6255bc
commit 2f0b38a3fb
60 changed files with 3260 additions and 3228 deletions

6
.gitignore vendored
View File

@ -1,4 +1,4 @@
*.sqlite
*.db
assets/*
*.sqlite
*.db
assets/*
build/*

View File

@ -1,12 +1,12 @@
filename: "mock_{{.InterfaceName}}.go"
dir: "mocks/{{.PackagePath}}"
outpkg: "{{.PackageName}}"
with-expecter: true
packages:
git.optclblast.xyz/draincloud/draincloud-core/internal/storage:
interfaces:
Database:
AuthAuditLogStorage:
AuthStorage:
BlobStorage:
filename: "mock_{{.InterfaceName}}.go"
dir: "mocks/{{.PackagePath}}"
outpkg: "{{.PackageName}}"
with-expecter: true
packages:
git.optclblast.xyz/draincloud/draincloud-core/internal/storage:
interfaces:
Database:
AuthAuditLogStorage:
AuthStorage:
BlobStorage:
MetaStorage:

30
.vscode/launch.json vendored
View File

@ -1,16 +1,16 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Launch Package",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}/cmd/main.go"
}
]
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Launch Package",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}/cmd/main.go"
}
]
}

View File

@ -1,7 +1,7 @@
# DrainCloud Core
DrainCloud Core is an all-in-one lightweight DrainCloud distribution designed to work in resource-constrained environments.
The node can work in three modes: #TBD
1. All-in-one mode, the recommended one.
2. Auth-node. Only auth api will be operational.
3. Storage-node. Only filestorage api will be operational.
# DrainCloud Core
DrainCloud Core is an all-in-one lightweight DrainCloud distribution designed to work in resource-constrained environments.
The node can work in three modes: #TBD
1. All-in-one mode, the recommended one.
2. Auth-node. Only auth api will be operational.
3. Storage-node. Only filestorage api will be operational.

View File

@ -1,12 +1,12 @@
version: 3
tasks:
run:
cmds:
- go run cmd/main.go
deploy-local:
cmds:
- sudo docker stack deploy draincloud_core -c ./compose.rw.yaml
migrate-local-status:
cmds:
version: 3
tasks:
run:
cmds:
- go run cmd/main.go
deploy-local:
cmds:
- sudo docker stack deploy draincloud_core -c ./compose.rw.yaml
migrate-local-status:
cmds:
- goose postgres "postgres://draincloud:draincloud@localhost:5432/draincloud" status -dir migrations

BIN
bin/task Normal file

Binary file not shown.

View File

@ -1,33 +1,33 @@
package main
import (
"context"
"os"
"os/signal"
"git.optclblast.xyz/draincloud/draincloud-core/internal/app"
filesengine "git.optclblast.xyz/draincloud/draincloud-core/internal/files_engine"
"git.optclblast.xyz/draincloud/draincloud-core/internal/plugin"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage/postgres"
)
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer cancel()
plugin.MustNewPluginLoader(ctx, 8081, plugin.NewPluginStore()).
Run(ctx)
pg := postgres.New(ctx, "postgres://draincloud:mysuperstrongpassword@127.0.0.1:5432/draincloud?sslmode=disable")
// TODO move cron on a separate job (k8s cronjob / docker cron)
// cleanupSessionsCron := cleanupsessions.New(pg)
// cleanupSessionsCron.Run(ctx)
engine := filesengine.NewFilesEngine(nil, nil)
go app.New(ctx, pg, engine).
Run(ctx)
<-ctx.Done()
}
package main
import (
"context"
"os"
"os/signal"
"git.optclblast.xyz/draincloud/draincloud-core/internal/app"
filesengine "git.optclblast.xyz/draincloud/draincloud-core/internal/files_engine"
"git.optclblast.xyz/draincloud/draincloud-core/internal/plugin"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage/postgres"
)
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer cancel()
plugin.MustNewPluginLoader(ctx, 8081, plugin.NewPluginStore()).
Run(ctx)
pg := postgres.New(ctx, "postgres://draincloud:mysuperstrongpassword@127.0.0.1:5432/draincloud?sslmode=disable")
// TODO move cron on a separate job (k8s cronjob / docker cron)
// cleanupSessionsCron := cleanupsessions.New(pg)
// cleanupSessionsCron.Run(ctx)
engine := filesengine.NewFilesEngine(nil, nil)
go app.New(ctx, pg, engine).
Run(ctx)
<-ctx.Done()
}

View File

@ -1,50 +1,50 @@
services:
rw_1:
image: postgres:17
container_name: draincloud-db-rw-1
ports:
- 5432:5432
environment:
- POSTGRES_USER=draincloud
- POSTGRES_PASSWORD=mysuperstrongpassword
- POSTGRES_DB=draincloud
volumes:
- draincloud-rw-1:/var/lib/postgresql/data
networks:
- draincloud-pg
# rw_2:
# image: postgres:17
# container_name: draincloud-db-rw-2
# ports:
# - 5433:5432
# environment:
# - POSTGRES_USER=draincloud
# - POSTGRES_PASSWORD=mysuperstrongpassword
# - POSTGRES_DB=draincloud
# volumes:
# - draincloud-rw-2:/var/lib/postgresql/data
# networks:
# - draincloud-pg
# rw_3:
# image: postgres:17
# container_name: draincloud-db-rw-3
# ports:
# - 5434:5432
# environment:
# - POSTGRES_USER=draincloud
# - POSTGRES_PASSWORD=mysuperstrongpassword
# - POSTGRES_DB=draincloud
# volumes:
# - draincloud-rw-3:/var/lib/postgresql/data
# networks:
# - draincloud-pg
volumes:
draincloud-rw-1: {}
# draincloud-rw-2: {}
# draincloud-rw-3: {}
networks:
services:
rw_1:
image: postgres:17
container_name: draincloud-db-rw-1
ports:
- 5432:5432
environment:
- POSTGRES_USER=draincloud
- POSTGRES_PASSWORD=mysuperstrongpassword
- POSTGRES_DB=draincloud
volumes:
- draincloud-rw-1:/var/lib/postgresql/data
networks:
- draincloud-pg
# rw_2:
# image: postgres:17
# container_name: draincloud-db-rw-2
# ports:
# - 5433:5432
# environment:
# - POSTGRES_USER=draincloud
# - POSTGRES_PASSWORD=mysuperstrongpassword
# - POSTGRES_DB=draincloud
# volumes:
# - draincloud-rw-2:/var/lib/postgresql/data
# networks:
# - draincloud-pg
# rw_3:
# image: postgres:17
# container_name: draincloud-db-rw-3
# ports:
# - 5434:5432
# environment:
# - POSTGRES_USER=draincloud
# - POSTGRES_PASSWORD=mysuperstrongpassword
# - POSTGRES_DB=draincloud
# volumes:
# - draincloud-rw-3:/var/lib/postgresql/data
# networks:
# - draincloud-pg
volumes:
draincloud-rw-1: {}
# draincloud-rw-2: {}
# draincloud-rw-3: {}
networks:
draincloud-pg: {}

View File

@ -1,82 +1,82 @@
package app
import (
"context"
"git.optclblast.xyz/draincloud/draincloud-core/internal/app/handlers"
filesengine "git.optclblast.xyz/draincloud/draincloud-core/internal/files_engine"
"git.optclblast.xyz/draincloud/draincloud-core/internal/processor"
resolvedispatcher "git.optclblast.xyz/draincloud/draincloud-core/internal/resolve_dispatcher"
"git.optclblast.xyz/draincloud/draincloud-core/internal/resolvers/auth"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage"
"github.com/gin-gonic/gin"
)
type DrainCloud struct {
mux *gin.Engine
database storage.Database
filesEngine *filesengine.FilesEngine
ginProcessor processor.Processor[gin.HandlerFunc]
}
func New(
ctx context.Context,
database storage.Database,
filesEngine *filesengine.FilesEngine,
) *DrainCloud {
mux := gin.Default()
dispatcher := resolvedispatcher.New()
dispatcher.RegisterResolver(
ctx,
auth.AuthResolverV1Name,
auth.NewAuthResolver(database),
)
d := &DrainCloud{
database: database,
filesEngine: filesEngine,
ginProcessor: processor.NewGinProcessor(database, dispatcher),
}
// TODO. Maybe overkill
internalGroup := mux.Group("/_internal")
{
regGroup := internalGroup.Group("/register")
{
regGroup.POST("/resolver", d.ginProcessor.Process(
handlers.NewInternalRegisterResolverHandler(dispatcher),
))
regGroup.POST("/plugin", func(ctx *gin.Context) {})
}
}
// Built-in auth component of DrainCloud-Core
authGroup := mux.Group("/auth")
{
authGroup.POST("/register", d.ginProcessor.Process(
handlers.NewRegisterHandler(database),
))
authGroup.POST("/logon", d.ginProcessor.Process(
handlers.NewLogonHandler(database),
))
}
filesGroup := mux.Group("/files")
{
filesGroup.POST("/upload", d.ginProcessor.Process(
handlers.NewUploadFileHandler(filesEngine),
))
}
d.mux = mux
return d
}
func (d *DrainCloud) Run(ctx context.Context) error {
return d.mux.Run()
}
package app
import (
"context"
"git.optclblast.xyz/draincloud/draincloud-core/internal/app/handlers"
filesengine "git.optclblast.xyz/draincloud/draincloud-core/internal/files_engine"
"git.optclblast.xyz/draincloud/draincloud-core/internal/processor"
resolvedispatcher "git.optclblast.xyz/draincloud/draincloud-core/internal/resolve_dispatcher"
"git.optclblast.xyz/draincloud/draincloud-core/internal/resolvers/auth"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage"
"github.com/gin-gonic/gin"
)
type DrainCloud struct {
mux *gin.Engine
database storage.Database
filesEngine *filesengine.FilesEngine
ginProcessor processor.Processor[gin.HandlerFunc]
}
func New(
ctx context.Context,
database storage.Database,
filesEngine *filesengine.FilesEngine,
) *DrainCloud {
mux := gin.Default()
dispatcher := resolvedispatcher.New()
dispatcher.RegisterResolver(
ctx,
auth.AuthResolverV1Name,
auth.NewAuthResolver(database),
)
d := &DrainCloud{
database: database,
filesEngine: filesEngine,
ginProcessor: processor.NewGinProcessor(database, dispatcher),
}
// TODO. Maybe overkill
internalGroup := mux.Group("/_internal")
{
regGroup := internalGroup.Group("/register")
{
regGroup.POST("/resolver", d.ginProcessor.Process(
handlers.NewInternalRegisterResolverHandler(dispatcher),
))
regGroup.POST("/plugin", func(ctx *gin.Context) {})
}
}
// Built-in auth component of DrainCloud-Core
authGroup := mux.Group("/auth")
{
authGroup.POST("/register", d.ginProcessor.Process(
handlers.NewRegisterHandler(database),
))
authGroup.POST("/logon", d.ginProcessor.Process(
handlers.NewLogonHandler(database),
))
}
filesGroup := mux.Group("/files")
{
filesGroup.POST("/upload", d.ginProcessor.Process(
handlers.NewUploadFileHandler(filesEngine),
))
}
d.mux = mux
return d
}
func (d *DrainCloud) Run(ctx context.Context) error {
return d.mux.Run()
}

View File

@ -1,37 +1,37 @@
package handlers
import (
"crypto/rand"
"encoding/base64"
"errors"
"fmt"
)
const (
csrfTokenCookie = "__Csrf_token"
sessionTokenCookie = "__Session_token"
)
var (
ErrorUnauthorized = errors.New("unauthorized")
)
func validateLoginAndPassword(login, password string) error {
if len(login) < 4 {
return fmt.Errorf("login must be longer than 8 chars")
}
if len(password) < 6 {
return fmt.Errorf("password must be longer than 8 chars")
}
return nil
}
func generateSessionToken(length int) (string, error) {
bytes := make([]byte, length)
if _, err := rand.Read(bytes); err != nil {
return "", fmt.Errorf("failed to generate token: %w", err)
}
return base64.URLEncoding.EncodeToString(bytes), nil
}
package handlers
import (
"crypto/rand"
"encoding/base64"
"errors"
"fmt"
)
const (
csrfTokenCookie = "__Csrf_token"
sessionTokenCookie = "__Session_token"
)
var (
ErrorUnauthorized = errors.New("unauthorized")
)
func validateLoginAndPassword(login, password string) error {
if len(login) < 4 {
return fmt.Errorf("login must be longer than 8 chars")
}
if len(password) < 6 {
return fmt.Errorf("password must be longer than 8 chars")
}
return nil
}
func generateSessionToken(length int) (string, error) {
bytes := make([]byte, length)
if _, err := rand.Read(bytes); err != nil {
return "", fmt.Errorf("failed to generate token: %w", err)
}
return base64.URLEncoding.EncodeToString(bytes), nil
}

View File

@ -1,179 +1,179 @@
package handlers
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/http"
"time"
"git.optclblast.xyz/draincloud/draincloud-core/internal/common"
"git.optclblast.xyz/draincloud/draincloud-core/internal/domain"
"git.optclblast.xyz/draincloud/draincloud-core/internal/errs"
"git.optclblast.xyz/draincloud/draincloud-core/internal/handler"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage/models/auth"
"github.com/google/uuid"
"golang.org/x/crypto/bcrypt"
)
type LogonHandler struct {
*handler.BaseHandler
authStorage storage.AuthStorage
}
func NewLogonHandler(
authStorage storage.AuthStorage,
) *LogonHandler {
h := &LogonHandler{
authStorage: authStorage,
BaseHandler: handler.New().
WithName("logonv1").
WithRequiredResolveParams(),
}
h.WithProcessFunc(h.process)
return h
}
func (h *LogonHandler) process(ctx context.Context, req *common.Request, w handler.Writer) error {
logger.Debug(ctx, "[Logon] new request")
body := new(domain.LogonRequest)
err := json.Unmarshal(req.Body, body)
if err != nil {
logger.Error(ctx, "[Logon] failed to bind request", logger.Err(err))
w.Write(ctx, map[string]string{
"error": "bad request",
}, handler.WithCode(http.StatusBadRequest))
return nil
}
session, err := h.getSession(ctx, req)
if err != nil && !errors.Is(err, http.ErrNoCookie) {
return err
}
if session != nil {
if err := validateSession(req, session); err != nil {
// TODO add audit log entry
return err
}
logger.Debug(ctx, "[login] user is already logged in", slog.String("session_id", session.ID.String()))
w.Write(ctx, &domain.LogonResponse{
Ok: true,
})
return nil
}
logger.Debug(ctx, "[login] session not founh. trying to authorize")
resp, err := h.login(ctx, body, session, w)
if err != nil {
logger.Error(ctx, "[Logon] failed to login user", logger.Err(err))
return err
}
w.Write(ctx, resp)
return nil
}
func (h *LogonHandler) login(ctx context.Context, req *domain.LogonRequest, session *auth.Session, w handler.Writer) (*domain.LogonResponse, error) {
passwordHash, err := bcrypt.GenerateFromPassword([]byte(req.Password), 10)
if err != nil {
logger.Error(ctx, "[login] failed to generate password hash", logger.Err(err))
return nil, fmt.Errorf("failed to generate password hash: %w", err)
}
user, err := h.authStorage.GetUserByLogin(ctx, req.Login)
if err != nil {
return nil, fmt.Errorf("failed to fetch user by login: %w", err)
}
if bytes.Equal(passwordHash, user.PasswordHash) {
logger.Warn(ctx, "[login] failed to login user. passwords hashes not equal")
return nil, errs.ErrorAccessDenied
}
sessionCreatedAt := time.Now()
sessionExpiredAt := sessionCreatedAt.Add(time.Hour * 24 * 7)
sessionToken, err := generateSessionToken(100)
if err != nil {
return nil, fmt.Errorf("failed to generate a session token: %w", err)
}
w.SetCookie(sessionTokenCookie, sessionToken, int(sessionExpiredAt.Sub(sessionCreatedAt).Seconds()), "_path", "_domain", true, true)
csrfToken, err := generateSessionToken(100)
if err != nil {
return nil, fmt.Errorf("failed to generate a csrf token: %w", err)
}
w.SetCookie(csrfTokenCookie, csrfToken, int(sessionExpiredAt.Sub(sessionCreatedAt).Seconds()), "_path", "_domain", true, false)
sessionID, err := uuid.NewV7()
if err != nil {
return nil, fmt.Errorf("failed to generate session id: %w", err)
}
if _, err = h.authStorage.AddSession(ctx, &auth.Session{
ID: sessionID,
SessionToken: sessionToken,
CsrfToken: csrfToken,
UserID: user.ID,
CreatedAt: sessionCreatedAt,
ExpiredAt: sessionExpiredAt,
}); err != nil {
return nil, fmt.Errorf("failed to save session: %w", err)
}
// TODO add audit log entry
return &domain.LogonResponse{
Ok: true,
}, nil
}
func (h *LogonHandler) getSession(ctx context.Context, req *common.Request) (*auth.Session, error) {
token, err := common.GetValue[string](req.Metadata, sessionTokenCookie)
if err != nil {
return nil, fmt.Errorf("failed to fetch session cookie from request: %w", err)
}
csrfToken, err := common.GetValue[string](req.Metadata, csrfTokenCookie)
if err != nil {
return nil, fmt.Errorf("failed to fetch csrf cookie from request: %w", err)
}
if len(csrfToken) == 0 || len(token) == 0 {
return nil, fmt.Errorf("session token or csrf token is empty")
}
session, err := h.authStorage.GetSession(ctx, token)
if err != nil {
return nil, fmt.Errorf("failed to fetch session from repo: %w", err)
}
return session, nil
}
func validateSession(req *common.Request, session *auth.Session) error {
if session == nil {
return errs.ErrorAccessDenied
}
csrfToken, err := common.GetValue[string](req.Metadata, csrfTokenCookie)
if err != nil {
return fmt.Errorf("failed to fetch csrf cookie from request: %w", err)
}
if session.CsrfToken != csrfToken {
return errs.ErrorAccessDenied
}
if session.ExpiredAt.Before(time.Now()) {
return errs.ErrorSessionExpired
}
return nil
}
package handlers
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/http"
"time"
"git.optclblast.xyz/draincloud/draincloud-core/internal/common"
"git.optclblast.xyz/draincloud/draincloud-core/internal/domain"
"git.optclblast.xyz/draincloud/draincloud-core/internal/errs"
"git.optclblast.xyz/draincloud/draincloud-core/internal/handler"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage/models/auth"
"github.com/google/uuid"
"golang.org/x/crypto/bcrypt"
)
type LogonHandler struct {
*handler.BaseHandler
authStorage storage.AuthStorage
}
func NewLogonHandler(
authStorage storage.AuthStorage,
) *LogonHandler {
h := &LogonHandler{
authStorage: authStorage,
BaseHandler: handler.New().
WithName("logonv1").
WithRequiredResolveParams(),
}
h.WithProcessFunc(h.process)
return h
}
func (h *LogonHandler) process(ctx context.Context, req *common.Request, w handler.Writer) error {
logger.Debug(ctx, "[Logon] new request")
body := new(domain.LogonRequest)
err := json.Unmarshal(req.Body, body)
if err != nil {
logger.Error(ctx, "[Logon] failed to bind request", logger.Err(err))
w.Write(ctx, map[string]string{
"error": "bad request",
}, handler.WithCode(http.StatusBadRequest))
return nil
}
session, err := h.getSession(ctx, req)
if err != nil && !errors.Is(err, http.ErrNoCookie) {
return err
}
if session != nil {
if err := validateSession(req, session); err != nil {
// TODO add audit log entry
return err
}
logger.Debug(ctx, "[login] user is already logged in", slog.String("session_id", session.ID.String()))
w.Write(ctx, &domain.LogonResponse{
Ok: true,
})
return nil
}
logger.Debug(ctx, "[login] session not founh. trying to authorize")
resp, err := h.login(ctx, body, session, w)
if err != nil {
logger.Error(ctx, "[Logon] failed to login user", logger.Err(err))
return err
}
w.Write(ctx, resp)
return nil
}
func (h *LogonHandler) login(ctx context.Context, req *domain.LogonRequest, session *auth.Session, w handler.Writer) (*domain.LogonResponse, error) {
passwordHash, err := bcrypt.GenerateFromPassword([]byte(req.Password), 10)
if err != nil {
logger.Error(ctx, "[login] failed to generate password hash", logger.Err(err))
return nil, fmt.Errorf("failed to generate password hash: %w", err)
}
user, err := h.authStorage.GetUserByLogin(ctx, req.Login)
if err != nil {
return nil, fmt.Errorf("failed to fetch user by login: %w", err)
}
if bytes.Equal(passwordHash, user.PasswordHash) {
logger.Warn(ctx, "[login] failed to login user. passwords hashes not equal")
return nil, errs.ErrorAccessDenied
}
sessionCreatedAt := time.Now()
sessionExpiredAt := sessionCreatedAt.Add(time.Hour * 24 * 7)
sessionToken, err := generateSessionToken(100)
if err != nil {
return nil, fmt.Errorf("failed to generate a session token: %w", err)
}
w.SetCookie(sessionTokenCookie, sessionToken, int(sessionExpiredAt.Sub(sessionCreatedAt).Seconds()), "_path", "_domain", true, true)
csrfToken, err := generateSessionToken(100)
if err != nil {
return nil, fmt.Errorf("failed to generate a csrf token: %w", err)
}
w.SetCookie(csrfTokenCookie, csrfToken, int(sessionExpiredAt.Sub(sessionCreatedAt).Seconds()), "_path", "_domain", true, false)
sessionID, err := uuid.NewV7()
if err != nil {
return nil, fmt.Errorf("failed to generate session id: %w", err)
}
if _, err = h.authStorage.AddSession(ctx, &auth.Session{
ID: sessionID,
SessionToken: sessionToken,
CsrfToken: csrfToken,
UserID: user.ID,
CreatedAt: sessionCreatedAt,
ExpiredAt: sessionExpiredAt,
}); err != nil {
return nil, fmt.Errorf("failed to save session: %w", err)
}
// TODO add audit log entry
return &domain.LogonResponse{
Ok: true,
}, nil
}
func (h *LogonHandler) getSession(ctx context.Context, req *common.Request) (*auth.Session, error) {
token, err := common.GetValue[string](req.Metadata, sessionTokenCookie)
if err != nil {
return nil, fmt.Errorf("failed to fetch session cookie from request: %w", err)
}
csrfToken, err := common.GetValue[string](req.Metadata, csrfTokenCookie)
if err != nil {
return nil, fmt.Errorf("failed to fetch csrf cookie from request: %w", err)
}
if len(csrfToken) == 0 || len(token) == 0 {
return nil, fmt.Errorf("session token or csrf token is empty")
}
session, err := h.authStorage.GetSession(ctx, token)
if err != nil {
return nil, fmt.Errorf("failed to fetch session from repo: %w", err)
}
return session, nil
}
func validateSession(req *common.Request, session *auth.Session) error {
if session == nil {
return errs.ErrorAccessDenied
}
csrfToken, err := common.GetValue[string](req.Metadata, csrfTokenCookie)
if err != nil {
return fmt.Errorf("failed to fetch csrf cookie from request: %w", err)
}
if session.CsrfToken != csrfToken {
return errs.ErrorAccessDenied
}
if session.ExpiredAt.Before(time.Now()) {
return errs.ErrorSessionExpired
}
return nil
}

View File

@ -1,121 +1,121 @@
package handlers
import (
"context"
"encoding/json"
"fmt"
"time"
"git.optclblast.xyz/draincloud/draincloud-core/internal/common"
"git.optclblast.xyz/draincloud/draincloud-core/internal/domain"
"git.optclblast.xyz/draincloud/draincloud-core/internal/handler"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage/models/auth"
"github.com/google/uuid"
"golang.org/x/crypto/bcrypt"
)
type RegisterHandler struct {
*handler.BaseHandler
authStorage storage.AuthStorage
}
func NewRegisterHandler(
authStorage storage.AuthStorage,
) *RegisterHandler {
h := &RegisterHandler{
authStorage: authStorage,
BaseHandler: handler.New().
WithName("registerv1").
WithRequiredResolveParams(),
}
h.WithProcessFunc(h.process)
return h
}
func (h *RegisterHandler) process(ctx context.Context, req *common.Request, w handler.Writer) error {
regReq := new(domain.RegisterRequest)
if err := json.Unmarshal(req.Body, regReq); err != nil {
return err
}
resp, err := h.register(ctx, regReq, w)
if err != nil {
return fmt.Errorf("failed to register user: %w", err)
}
w.Write(ctx, resp)
return nil
}
func (d *RegisterHandler) register(
ctx context.Context,
req *domain.RegisterRequest,
w handler.Writer,
) (*domain.RegisterResponse, error) {
if err := validateLoginAndPassword(req.Login, req.Password); err != nil {
return nil, fmt.Errorf("invalid creds: %w", err)
}
passwordHash, err := bcrypt.GenerateFromPassword([]byte(req.Password), 10)
if err != nil {
logger.Error(ctx, "[register] failed to generate password hash", logger.Err(err))
return nil, fmt.Errorf("failed to generate password hash: %w", err)
}
userID, err := uuid.NewV7()
if err != nil {
return nil, fmt.Errorf("failed to generate user id: %w", err)
}
user := &auth.User{
ID: userID,
Username: req.Login,
Login: req.Login,
PasswordHash: passwordHash,
}
err = d.authStorage.AddUser(ctx, userID, user.Login, user.Username, user.PasswordHash)
if err != nil {
return nil, fmt.Errorf("failed to add new user: %w", err)
}
sessionCreatedAt := time.Now()
sessionExpiredAt := sessionCreatedAt.Add(time.Hour * 24 * 7)
sessionToken, err := generateSessionToken(100)
if err != nil {
return nil, fmt.Errorf("failed to generate a session token: %w", err)
}
w.SetCookie(sessionTokenCookie, sessionToken, int(sessionExpiredAt.Sub(sessionCreatedAt).Seconds()), "_path", "_domain", true, true)
csrfToken, err := generateSessionToken(100)
if err != nil {
return nil, fmt.Errorf("failed to generate a csrf token: %w", err)
}
w.SetCookie(csrfTokenCookie, csrfToken, int(sessionExpiredAt.Sub(sessionCreatedAt).Seconds()), "_path", "_domain", true, false)
sessionID, err := uuid.NewV7()
if err != nil {
return nil, fmt.Errorf("failed to generate session id: %w", err)
}
if _, err = d.authStorage.AddSession(ctx, &auth.Session{
ID: sessionID,
SessionToken: sessionToken,
CsrfToken: csrfToken,
UserID: user.ID,
CreatedAt: sessionCreatedAt,
ExpiredAt: sessionExpiredAt,
}); err != nil {
return nil, fmt.Errorf("failed to save session: %w", err)
}
return &domain.RegisterResponse{
Ok: true,
}, nil
}
package handlers
import (
"context"
"encoding/json"
"fmt"
"time"
"git.optclblast.xyz/draincloud/draincloud-core/internal/common"
"git.optclblast.xyz/draincloud/draincloud-core/internal/domain"
"git.optclblast.xyz/draincloud/draincloud-core/internal/handler"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage/models/auth"
"github.com/google/uuid"
"golang.org/x/crypto/bcrypt"
)
type RegisterHandler struct {
*handler.BaseHandler
authStorage storage.AuthStorage
}
func NewRegisterHandler(
authStorage storage.AuthStorage,
) *RegisterHandler {
h := &RegisterHandler{
authStorage: authStorage,
BaseHandler: handler.New().
WithName("registerv1").
WithRequiredResolveParams(),
}
h.WithProcessFunc(h.process)
return h
}
func (h *RegisterHandler) process(ctx context.Context, req *common.Request, w handler.Writer) error {
regReq := new(domain.RegisterRequest)
if err := json.Unmarshal(req.Body, regReq); err != nil {
return err
}
resp, err := h.register(ctx, regReq, w)
if err != nil {
return fmt.Errorf("failed to register user: %w", err)
}
w.Write(ctx, resp)
return nil
}
func (d *RegisterHandler) register(
ctx context.Context,
req *domain.RegisterRequest,
w handler.Writer,
) (*domain.RegisterResponse, error) {
if err := validateLoginAndPassword(req.Login, req.Password); err != nil {
return nil, fmt.Errorf("invalid creds: %w", err)
}
passwordHash, err := bcrypt.GenerateFromPassword([]byte(req.Password), 10)
if err != nil {
logger.Error(ctx, "[register] failed to generate password hash", logger.Err(err))
return nil, fmt.Errorf("failed to generate password hash: %w", err)
}
userID, err := uuid.NewV7()
if err != nil {
return nil, fmt.Errorf("failed to generate user id: %w", err)
}
user := &auth.User{
ID: userID,
Username: req.Login,
Login: req.Login,
PasswordHash: passwordHash,
}
err = d.authStorage.AddUser(ctx, userID, user.Login, user.Username, user.PasswordHash)
if err != nil {
return nil, fmt.Errorf("failed to add new user: %w", err)
}
sessionCreatedAt := time.Now()
sessionExpiredAt := sessionCreatedAt.Add(time.Hour * 24 * 7)
sessionToken, err := generateSessionToken(100)
if err != nil {
return nil, fmt.Errorf("failed to generate a session token: %w", err)
}
w.SetCookie(sessionTokenCookie, sessionToken, int(sessionExpiredAt.Sub(sessionCreatedAt).Seconds()), "_path", "_domain", true, true)
csrfToken, err := generateSessionToken(100)
if err != nil {
return nil, fmt.Errorf("failed to generate a csrf token: %w", err)
}
w.SetCookie(csrfTokenCookie, csrfToken, int(sessionExpiredAt.Sub(sessionCreatedAt).Seconds()), "_path", "_domain", true, false)
sessionID, err := uuid.NewV7()
if err != nil {
return nil, fmt.Errorf("failed to generate session id: %w", err)
}
if _, err = d.authStorage.AddSession(ctx, &auth.Session{
ID: sessionID,
SessionToken: sessionToken,
CsrfToken: csrfToken,
UserID: user.ID,
CreatedAt: sessionCreatedAt,
ExpiredAt: sessionExpiredAt,
}); err != nil {
return nil, fmt.Errorf("failed to save session: %w", err)
}
return &domain.RegisterResponse{
Ok: true,
}, nil
}

View File

@ -1,38 +1,38 @@
package handlers
import (
"context"
"fmt"
"git.optclblast.xyz/draincloud/draincloud-core/internal/common"
"git.optclblast.xyz/draincloud/draincloud-core/internal/handler"
resolvedispatcher "git.optclblast.xyz/draincloud/draincloud-core/internal/resolve_dispatcher"
)
// TODO. Maybe remove
type InternalRegisterResolverHandler struct {
*handler.BaseHandler
resolveDispatcher *resolvedispatcher.ResolveDispatcher
}
func NewInternalRegisterResolverHandler(
resolveDispatcher *resolvedispatcher.ResolveDispatcher,
) *InternalRegisterResolverHandler {
h := &InternalRegisterResolverHandler{
resolveDispatcher: resolveDispatcher,
}
h.BaseHandler = handler.New().
WithName("internal_registerresolver").
WithProcessFunc(h.process)
return h
}
func (h *InternalRegisterResolverHandler) process(
ctx context.Context,
req *common.Request,
w handler.Writer,
) error {
//_, ok := h.resolveDispatcher.GetResolver()
return fmt.Errorf("uniplemented")
}
package handlers
import (
"context"
"fmt"
"git.optclblast.xyz/draincloud/draincloud-core/internal/common"
"git.optclblast.xyz/draincloud/draincloud-core/internal/handler"
resolvedispatcher "git.optclblast.xyz/draincloud/draincloud-core/internal/resolve_dispatcher"
)
// TODO. Maybe remove
type InternalRegisterResolverHandler struct {
*handler.BaseHandler
resolveDispatcher *resolvedispatcher.ResolveDispatcher
}
func NewInternalRegisterResolverHandler(
resolveDispatcher *resolvedispatcher.ResolveDispatcher,
) *InternalRegisterResolverHandler {
h := &InternalRegisterResolverHandler{
resolveDispatcher: resolveDispatcher,
}
h.BaseHandler = handler.New().
WithName("internal_registerresolver").
WithProcessFunc(h.process)
return h
}
func (h *InternalRegisterResolverHandler) process(
ctx context.Context,
req *common.Request,
w handler.Writer,
) error {
//_, ok := h.resolveDispatcher.GetResolver()
return fmt.Errorf("uniplemented")
}

View File

@ -1,95 +1,95 @@
package handlers
import (
"context"
"git.optclblast.xyz/draincloud/draincloud-core/internal/common"
filesengine "git.optclblast.xyz/draincloud/draincloud-core/internal/files_engine"
"git.optclblast.xyz/draincloud/draincloud-core/internal/handler"
"git.optclblast.xyz/draincloud/draincloud-core/internal/resolvers/auth"
)
const (
maxFileSize = 10 << 30
)
type UploadFileHandler struct {
*handler.BaseHandler
filesEngine *filesengine.FilesEngine
}
func NewUploadFileHandler(
filesEngine *filesengine.FilesEngine,
) *UploadFileHandler {
h := &UploadFileHandler{
filesEngine: filesEngine,
BaseHandler: handler.New().
WithName("uploadfilev1").
WithRequiredResolveParams(
auth.AuthResolverV1Name,
// TODO with MultipartReaderResolverV1Name
// or
// MultipartDataResolverV1Name
),
}
h.WithProcessFunc(h.process)
return h
}
func (d *UploadFileHandler) process(ctx context.Context, req *common.Request, w handler.Writer) error {
// TODO fetch (interface{ ParseMultipartForm(size int) error }) from req.GetValue[ParseMultipartFormer](req.ResolveValues)
// if err := req.RawReq.ParseMultipartForm(maxFileSize); err != nil {
// logger.Error(ctx, "uploadFile handler error", logger.Err(err))
// return err
// }
// if err := d.uploadFile(ctx, userID); err != nil {
// logger.Error(ctx, "uploadFile handle", logger.Err(err))
// writeError(ctx, err)
// return
// }
return nil
}
// func (d *UploadFileHandler) uploadFile(ctx context.Context, req *common.Request) error {
// title := ctx.PostForm("file")
// logger.Info(ctx, "uploadFile", slog.Any("postForm data", spew.Sdump(title)))
// file, header, err := req.RawReq.FormFile("file")
// if err != nil {
// return err
// }
// logger.Info(ctx, "uploadFile", slog.Any("header", spew.Sdump(header)))
// data, err := io.ReadAll(file)
// if err != nil {
// return err
// }
// ext := parseExtension(header.Filename)
// id, err := d.filesEngine.SaveFile(ctx, filesengine.File{
// Name: header.Filename,
// UserID: userID,
// Data: data,
// Ext: ext,
// Size: int64(len(data)),
// Type: "", // че такое type?
// })
// if err != nil {
// return fmt.Errorf("failed to save file: %w", err)
// }
// logger.Debug(ctx, "new file id", "id", id)
// return nil
// }
// func parseExtension(filename string) string {
// parts := strings.Split(filename, ".")
// if len(parts) == 0 {
// return ""
// }
// return parts[len(parts)-1]
// }
package handlers
import (
"context"
"git.optclblast.xyz/draincloud/draincloud-core/internal/common"
filesengine "git.optclblast.xyz/draincloud/draincloud-core/internal/files_engine"
"git.optclblast.xyz/draincloud/draincloud-core/internal/handler"
"git.optclblast.xyz/draincloud/draincloud-core/internal/resolvers/auth"
)
const (
maxFileSize = 10 << 30
)
type UploadFileHandler struct {
*handler.BaseHandler
filesEngine *filesengine.FilesEngine
}
func NewUploadFileHandler(
filesEngine *filesengine.FilesEngine,
) *UploadFileHandler {
h := &UploadFileHandler{
filesEngine: filesEngine,
BaseHandler: handler.New().
WithName("uploadfilev1").
WithRequiredResolveParams(
auth.AuthResolverV1Name,
// TODO with MultipartReaderResolverV1Name
// or
// MultipartDataResolverV1Name
),
}
h.WithProcessFunc(h.process)
return h
}
func (d *UploadFileHandler) process(ctx context.Context, req *common.Request, w handler.Writer) error {
// TODO fetch (interface{ ParseMultipartForm(size int) error }) from req.GetValue[ParseMultipartFormer](req.ResolveValues)
// if err := req.RawReq.ParseMultipartForm(maxFileSize); err != nil {
// logger.Error(ctx, "uploadFile handler error", logger.Err(err))
// return err
// }
// if err := d.uploadFile(ctx, userID); err != nil {
// logger.Error(ctx, "uploadFile handle", logger.Err(err))
// writeError(ctx, err)
// return
// }
return nil
}
// func (d *UploadFileHandler) uploadFile(ctx context.Context, req *common.Request) error {
// title := ctx.PostForm("file")
// logger.Info(ctx, "uploadFile", slog.Any("postForm data", spew.Sdump(title)))
// file, header, err := req.RawReq.FormFile("file")
// if err != nil {
// return err
// }
// logger.Info(ctx, "uploadFile", slog.Any("header", spew.Sdump(header)))
// data, err := io.ReadAll(file)
// if err != nil {
// return err
// }
// ext := parseExtension(header.Filename)
// id, err := d.filesEngine.SaveFile(ctx, filesengine.File{
// Name: header.Filename,
// UserID: userID,
// Data: data,
// Ext: ext,
// Size: int64(len(data)),
// Type: "", // че такое type?
// })
// if err != nil {
// return fmt.Errorf("failed to save file: %w", err)
// }
// logger.Debug(ctx, "new file id", "id", id)
// return nil
// }
// func parseExtension(filename string) string {
// parts := strings.Split(filename, ".")
// if len(parts) == 0 {
// return ""
// }
// return parts[len(parts)-1]
// }

View File

@ -1,50 +1,50 @@
package closer
import (
"context"
"errors"
"fmt"
"sync/atomic"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
)
var globalCloser *Closer = &Closer{
closeFns: make([]func() error, 0),
}
type Closer struct {
_lock atomic.Bool
closeFns []func() error
}
func (c *Closer) Add(fn func() error) {
if c._lock.Load() {
return
}
c.closeFns = append(c.closeFns, fn)
}
func (c *Closer) Close() error {
if !c._lock.CompareAndSwap(false, true) {
return fmt.Errorf("already closed")
}
var commonErr error
for _, fn := range c.closeFns {
if err := fn(); err != nil {
logger.Error(context.Background(), "[closer][Close] error at close func call", logger.Err(err))
commonErr = errors.Join(commonErr, err)
}
}
return commonErr
}
func Add(fn func() error) {
globalCloser.Add(fn)
}
func Close() error {
return globalCloser.Close()
}
package closer
import (
"context"
"errors"
"fmt"
"sync/atomic"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
)
var globalCloser *Closer = &Closer{
closeFns: make([]func() error, 0),
}
type Closer struct {
_lock atomic.Bool
closeFns []func() error
}
func (c *Closer) Add(fn func() error) {
if c._lock.Load() {
return
}
c.closeFns = append(c.closeFns, fn)
}
func (c *Closer) Close() error {
if !c._lock.CompareAndSwap(false, true) {
return fmt.Errorf("already closed")
}
var commonErr error
for _, fn := range c.closeFns {
if err := fn(); err != nil {
logger.Error(context.Background(), "[closer][Close] error at close func call", logger.Err(err))
commonErr = errors.Join(commonErr, err)
}
}
return commonErr
}
func Add(fn func() error) {
globalCloser.Add(fn)
}
func Close() error {
return globalCloser.Close()
}

View File

@ -1,103 +1,103 @@
package common
import (
"context"
"fmt"
"io"
"net/http"
"sync"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage/models/auth"
"github.com/google/uuid"
)
type RequestPool struct {
sp sync.Pool
}
func (p *RequestPool) Get() *Request {
r, _ := p.sp.Get().(*Request)
return r
}
func (p *RequestPool) Put(r *Request) {
r.ID = ""
r.Metadata = &sync.Map{}
r.ResolveValues = &sync.Map{}
r.Session = nil
r.User = nil
r.Body = nil
p.sp.Put(r)
}
func NewRequestPool() *RequestPool {
return &RequestPool{
sp: sync.Pool{
New: func() any {
return &Request{
ResolveValues: &sync.Map{},
Metadata: &sync.Map{},
}
},
},
}
}
type Request struct {
ID string
Session *auth.Session
User *auth.User
// ResolveValues - data required to process request.
ResolveValues *sync.Map
// Metadata - an additional data, usually added with preprocessing.
Metadata *sync.Map
// Request body
Body []byte
}
// NewRequestFromHttp builds a new *Request struct from raw http Request. No auth data validated.
func NewRequestFromHttp(pool *RequestPool, req *http.Request) *Request {
out := pool.sp.Get().(*Request)
cookies := req.Cookies()
headers := req.Header
out.Metadata = &sync.Map{}
for _, cookie := range cookies {
out.Metadata.Store(cookie.Name, cookie.Value)
}
for hname, hval := range headers {
out.Metadata.Store(hname, hval)
}
body, err := io.ReadAll(req.Body)
if err != nil {
logger.Error(context.TODO(), "failed to read request body", logger.Err(err))
}
out.Body = body
reqID := uuid.NewString()
out.ID = reqID
return out
}
func GetValue[T any](vals *sync.Map, key string) (T, error) {
var out T
if vals == nil {
return out, fmt.Errorf("nil vals map")
}
rawVal, ok := vals.Load(key)
if !ok {
return out, fmt.Errorf("value not found in resolve values set")
}
out, ok = rawVal.(T)
if !ok {
return out, fmt.Errorf("type of a value is unexpected")
}
return out, nil
}
package common
import (
"context"
"fmt"
"io"
"net/http"
"sync"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage/models/auth"
"github.com/google/uuid"
)
type RequestPool struct {
sp sync.Pool
}
func (p *RequestPool) Get() *Request {
r, _ := p.sp.Get().(*Request)
return r
}
func (p *RequestPool) Put(r *Request) {
r.ID = ""
r.Metadata = &sync.Map{}
r.ResolveValues = &sync.Map{}
r.Session = nil
r.User = nil
r.Body = nil
p.sp.Put(r)
}
func NewRequestPool() *RequestPool {
return &RequestPool{
sp: sync.Pool{
New: func() any {
return &Request{
ResolveValues: &sync.Map{},
Metadata: &sync.Map{},
}
},
},
}
}
type Request struct {
ID string
Session *auth.Session
User *auth.User
// ResolveValues - data required to process request.
ResolveValues *sync.Map
// Metadata - an additional data, usually added with preprocessing.
Metadata *sync.Map
// Request body
Body []byte
}
// NewRequestFromHttp builds a new *Request struct from raw http Request. No auth data validated.
func NewRequestFromHttp(pool *RequestPool, req *http.Request) *Request {
out := pool.sp.Get().(*Request)
cookies := req.Cookies()
headers := req.Header
out.Metadata = &sync.Map{}
for _, cookie := range cookies {
out.Metadata.Store(cookie.Name, cookie.Value)
}
for hname, hval := range headers {
out.Metadata.Store(hname, hval)
}
body, err := io.ReadAll(req.Body)
if err != nil {
logger.Error(context.TODO(), "failed to read request body", logger.Err(err))
}
out.Body = body
reqID := uuid.NewString()
out.ID = reqID
return out
}
func GetValue[T any](vals *sync.Map, key string) (T, error) {
var out T
if vals == nil {
return out, fmt.Errorf("nil vals map")
}
rawVal, ok := vals.Load(key)
if !ok {
return out, fmt.Errorf("value not found in resolve values set")
}
out, ok = rawVal.(T)
if !ok {
return out, fmt.Errorf("type of a value is unexpected")
}
return out, nil
}

View File

@ -1,261 +1,261 @@
package common
import (
"reflect"
"sync"
"testing"
)
func TestGetValue_string(t *testing.T) {
t.Parallel()
type args struct {
vals map[string]any
key string
}
tests := []struct {
name string
args args
want string
wantErr bool
}{
{
name: "ok",
args: args{
vals: map[string]any{
"1": "123",
"2": "234",
},
key: "1",
},
want: "123",
wantErr: false,
},
{
name: "value not presented",
args: args{
vals: map[string]any{
"1": "123",
"2": "234",
},
key: "3",
},
want: "",
wantErr: true,
},
{
name: "nil map",
args: args{
vals: nil,
key: "1",
},
want: "",
wantErr: true,
},
{
name: "invalid type",
args: args{
vals: map[string]any{
"1": "123",
"2": 234,
},
key: "2",
},
want: "",
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := GetValue[string](_mapToSyncMap(tt.args.vals), tt.args.key)
if (err != nil) != tt.wantErr {
t.Errorf("GetValue() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("GetValue() = %v, want %v", got, tt.want)
}
})
}
}
func TestGetValue_struct(t *testing.T) {
t.Parallel()
type val struct {
a int
b string
c bool
}
type args struct {
vals map[string]any
key string
}
tests := []struct {
name string
args args
want val
wantErr bool
}{
{
name: "ok",
args: args{
vals: map[string]any{
"1": val{
a: 1,
b: "2",
c: true,
},
"2": "234",
},
key: "1",
},
want: val{
a: 1,
b: "2",
c: true,
},
wantErr: false,
},
{
name: "value not presented",
args: args{
vals: map[string]any{
"1": "123",
"2": "234",
},
key: "3",
},
want: val{},
wantErr: true,
},
{
name: "nil map",
args: args{
vals: nil,
key: "1",
},
want: val{},
wantErr: true,
},
{
name: "invalid type",
args: args{
vals: map[string]any{
"1": "123",
"2": 234,
},
key: "2",
},
want: val{},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := GetValue[val](_mapToSyncMap(tt.args.vals), tt.args.key)
if (err != nil) != tt.wantErr {
t.Errorf("GetValue() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("GetValue() = %v, want %v", got, tt.want)
}
})
}
}
func TestGetValue_structptr(t *testing.T) {
t.Parallel()
type val struct {
a int
b string
c bool
}
type args struct {
vals map[string]any
key string
}
tests := []struct {
name string
args args
want *val
wantErr bool
}{
{
name: "ok",
args: args{
vals: map[string]any{
"1": &val{
a: 1,
b: "2",
c: true,
},
"2": "234",
},
key: "1",
},
want: &val{
a: 1,
b: "2",
c: true,
},
wantErr: false,
},
{
name: "value not presented",
args: args{
vals: map[string]any{
"1": "123",
"2": "234",
},
key: "3",
},
want: nil,
wantErr: true,
},
{
name: "nil map",
args: args{
vals: nil,
key: "1",
},
want: nil,
wantErr: true,
},
{
name: "invalid type",
args: args{
vals: map[string]any{
"1": "123",
"2": 234,
},
key: "2",
},
want: nil,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := GetValue[*val](_mapToSyncMap(tt.args.vals), tt.args.key)
if (err != nil) != tt.wantErr {
t.Errorf("GetValue() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("GetValue() = %v, want %v", got, tt.want)
}
})
}
}
func _mapToSyncMap(m map[string]any) *sync.Map {
out := &sync.Map{}
for k, v := range m {
out.Store(k, v)
}
return out
}
package common
import (
"reflect"
"sync"
"testing"
)
func TestGetValue_string(t *testing.T) {
t.Parallel()
type args struct {
vals map[string]any
key string
}
tests := []struct {
name string
args args
want string
wantErr bool
}{
{
name: "ok",
args: args{
vals: map[string]any{
"1": "123",
"2": "234",
},
key: "1",
},
want: "123",
wantErr: false,
},
{
name: "value not presented",
args: args{
vals: map[string]any{
"1": "123",
"2": "234",
},
key: "3",
},
want: "",
wantErr: true,
},
{
name: "nil map",
args: args{
vals: nil,
key: "1",
},
want: "",
wantErr: true,
},
{
name: "invalid type",
args: args{
vals: map[string]any{
"1": "123",
"2": 234,
},
key: "2",
},
want: "",
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := GetValue[string](_mapToSyncMap(tt.args.vals), tt.args.key)
if (err != nil) != tt.wantErr {
t.Errorf("GetValue() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("GetValue() = %v, want %v", got, tt.want)
}
})
}
}
func TestGetValue_struct(t *testing.T) {
t.Parallel()
type val struct {
a int
b string
c bool
}
type args struct {
vals map[string]any
key string
}
tests := []struct {
name string
args args
want val
wantErr bool
}{
{
name: "ok",
args: args{
vals: map[string]any{
"1": val{
a: 1,
b: "2",
c: true,
},
"2": "234",
},
key: "1",
},
want: val{
a: 1,
b: "2",
c: true,
},
wantErr: false,
},
{
name: "value not presented",
args: args{
vals: map[string]any{
"1": "123",
"2": "234",
},
key: "3",
},
want: val{},
wantErr: true,
},
{
name: "nil map",
args: args{
vals: nil,
key: "1",
},
want: val{},
wantErr: true,
},
{
name: "invalid type",
args: args{
vals: map[string]any{
"1": "123",
"2": 234,
},
key: "2",
},
want: val{},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := GetValue[val](_mapToSyncMap(tt.args.vals), tt.args.key)
if (err != nil) != tt.wantErr {
t.Errorf("GetValue() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("GetValue() = %v, want %v", got, tt.want)
}
})
}
}
func TestGetValue_structptr(t *testing.T) {
t.Parallel()
type val struct {
a int
b string
c bool
}
type args struct {
vals map[string]any
key string
}
tests := []struct {
name string
args args
want *val
wantErr bool
}{
{
name: "ok",
args: args{
vals: map[string]any{
"1": &val{
a: 1,
b: "2",
c: true,
},
"2": "234",
},
key: "1",
},
want: &val{
a: 1,
b: "2",
c: true,
},
wantErr: false,
},
{
name: "value not presented",
args: args{
vals: map[string]any{
"1": "123",
"2": "234",
},
key: "3",
},
want: nil,
wantErr: true,
},
{
name: "nil map",
args: args{
vals: nil,
key: "1",
},
want: nil,
wantErr: true,
},
{
name: "invalid type",
args: args{
vals: map[string]any{
"1": "123",
"2": 234,
},
key: "2",
},
want: nil,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := GetValue[*val](_mapToSyncMap(tt.args.vals), tt.args.key)
if (err != nil) != tt.wantErr {
t.Errorf("GetValue() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("GetValue() = %v, want %v", got, tt.want)
}
})
}
}
func _mapToSyncMap(m map[string]any) *sync.Map {
out := &sync.Map{}
for k, v := range m {
out.Store(k, v)
}
return out
}

View File

@ -1,70 +1,70 @@
package config
import (
"context"
"time"
)
type Provider interface {
GetValue(ctx context.Context, key Key) Value
}
type Key string
type Value interface {
Int() int
String() string
Float() float32
Duration() time.Duration
}
type DurationValue time.Duration
type FloatValue struct {
EmptyValue
Val float32
}
func (v FloatValue) Float() float32 {
return v.Val
}
type StringValue struct {
EmptyValue
Val string
}
func (v StringValue) String() string {
return v.Val
}
type IntValue struct {
EmptyValue
Val int
}
func (v IntValue) Int() int {
return v.Val
}
func (v IntValue) Float() float32 {
return float32(v.Val)
}
type EmptyValue struct{}
func (v EmptyValue) Int() int {
return 0
}
func (v EmptyValue) String() string {
return ""
}
func (v EmptyValue) Float() float32 {
return 0
}
func (v EmptyValue) Duration() time.Duration {
return 0
}
package config
import (
"context"
"time"
)
type Provider interface {
GetValue(ctx context.Context, key Key) Value
}
type Key string
type Value interface {
Int() int
String() string
Float() float64
Duration() time.Duration
}
type DurationValue time.Duration
type FloatValue struct {
EmptyValue
Val float64
}
func (v FloatValue) Float() float64 {
return v.Val
}
type StringValue struct {
EmptyValue
Val string
}
func (v StringValue) String() string {
return v.Val
}
type IntValue struct {
EmptyValue
Val int
}
func (v IntValue) Int() int {
return v.Val
}
func (v IntValue) Float() float64 {
return float64(v.Val)
}
type EmptyValue struct{}
func (v EmptyValue) Int() int {
return 0
}
func (v EmptyValue) String() string {
return ""
}
func (v EmptyValue) Float() float64 {
return 0
}
func (v EmptyValue) Duration() time.Duration {
return 0
}

View File

@ -1 +1 @@
package externalprovider
package externalprovider

View File

@ -1,30 +1,30 @@
package natskv
import (
"context"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
"github.com/nats-io/nats.go/jetstream"
)
type Provider struct {
cc jetstream.KeyValue
}
func New(
ctx context.Context,
js jetstream.JetStream,
) *Provider {
kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "rtc",
Description: "Real Time Config",
Storage: jetstream.FileStorage,
Replicas: 2,
Compression: true,
})
if err != nil {
logger.Fatal(ctx, "[natskv][New] failed to initialize rtc", logger.Err(err))
}
return &Provider{cc: kv}
}
package natskv
import (
"context"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
"github.com/nats-io/nats.go/jetstream"
)
type Provider struct {
cc jetstream.KeyValue
}
func New(
ctx context.Context,
js jetstream.JetStream,
) *Provider {
kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "rtc",
Description: "Real Time Config",
Storage: jetstream.FileStorage,
Replicas: 2,
Compression: true,
})
if err != nil {
logger.Fatal(ctx, "[natskv][New] failed to initialize rtc", logger.Err(err))
}
return &Provider{cc: kv}
}

View File

@ -1,123 +1,123 @@
package staticprovider
import (
"context"
"os"
"path/filepath"
"sync"
"git.optclblast.xyz/draincloud/draincloud-core/internal/config"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
"github.com/fsnotify/fsnotify"
"github.com/spf13/viper"
)
var _ config.Provider = new(staticProvider)
type StaticProvider interface {
config.Provider
}
type staticProvider struct {
m sync.RWMutex
rawValues map[string]any
}
func (p *staticProvider) GetValue(ctx context.Context, key config.Key) config.Value {
p.m.RLock()
defer p.m.RUnlock()
rawVal, ok := p.rawValues[string(key)]
if !ok {
return config.EmptyValue{}
}
switch val := rawVal.(type) {
case int:
return config.IntValue{
Val: val,
}
case string:
return config.StringValue{
Val: val,
}
case float32:
return config.FloatValue{
Val: val,
}
default:
return config.EmptyValue{}
}
}
type newStaticProviderOptions struct {
configName string
configDirPath string
configFileType string
}
func mustDefaultNewStaticProviderOptions(ctx context.Context) *newStaticProviderOptions {
ex, err := os.Executable()
if err != nil {
logger.Fatal(ctx, "failed to get executable location", logger.Err(err))
}
exPath := filepath.Dir(ex)
return &newStaticProviderOptions{
configName: "config",
configDirPath: exPath,
configFileType: "yaml",
}
}
type NewStaticProviderOption func(o *newStaticProviderOptions)
func WithConfigDir(path string) NewStaticProviderOption {
return func(o *newStaticProviderOptions) {
o.configDirPath = path
}
}
func WithConfigType(t string) NewStaticProviderOption {
return func(o *newStaticProviderOptions) {
o.configFileType = t
}
}
func WithConfigName(name string) NewStaticProviderOption {
return func(o *newStaticProviderOptions) {
o.configName = name
}
}
func NewStaticProvider(
ctx context.Context,
opts ...NewStaticProviderOption,
) (*staticProvider, error) {
o := mustDefaultNewStaticProviderOptions(ctx)
for _, opt := range opts {
opt(o)
}
// TODO check if ile exists
provider := &staticProvider{
rawValues: make(map[string]any),
}
viper.SetConfigName(o.configName)
viper.SetConfigType(o.configFileType)
viper.AddConfigPath(o.configDirPath)
viper.WatchConfig()
viper.OnConfigChange(func(_ fsnotify.Event) {
provider.m.Lock()
defer provider.m.Unlock()
provider.rawValues = viper.AllSettings()
})
provider.rawValues = viper.AllSettings()
return provider, nil
}
package staticprovider
import (
"context"
"os"
"path/filepath"
"sync"
"git.optclblast.xyz/draincloud/draincloud-core/internal/config"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
"github.com/fsnotify/fsnotify"
"github.com/spf13/viper"
)
var _ config.Provider = new(staticProvider)
type StaticProvider interface {
config.Provider
}
type staticProvider struct {
m sync.RWMutex
rawValues map[string]any
}
func (p *staticProvider) GetValue(ctx context.Context, key config.Key) config.Value {
p.m.RLock()
defer p.m.RUnlock()
rawVal, ok := p.rawValues[string(key)]
if !ok {
return config.EmptyValue{}
}
switch val := rawVal.(type) {
case int:
return config.IntValue{
Val: val,
}
case string:
return config.StringValue{
Val: val,
}
case float64:
return config.FloatValue{
Val: val,
}
default:
return config.EmptyValue{}
}
}
type newStaticProviderOptions struct {
configName string
configDirPath string
configFileType string
}
func mustDefaultNewStaticProviderOptions(ctx context.Context) *newStaticProviderOptions {
ex, err := os.Executable()
if err != nil {
logger.Fatal(ctx, "failed to get executable location", logger.Err(err))
}
exPath := filepath.Dir(ex)
return &newStaticProviderOptions{
configName: "config",
configDirPath: exPath,
configFileType: "yaml",
}
}
type NewStaticProviderOption func(o *newStaticProviderOptions)
func WithConfigDir(path string) NewStaticProviderOption {
return func(o *newStaticProviderOptions) {
o.configDirPath = path
}
}
func WithConfigType(t string) NewStaticProviderOption {
return func(o *newStaticProviderOptions) {
o.configFileType = t
}
}
func WithConfigName(name string) NewStaticProviderOption {
return func(o *newStaticProviderOptions) {
o.configName = name
}
}
func NewStaticProvider(
ctx context.Context,
opts ...NewStaticProviderOption,
) (*staticProvider, error) {
o := mustDefaultNewStaticProviderOptions(ctx)
for _, opt := range opts {
opt(o)
}
// TODO check if ile exists
provider := &staticProvider{
rawValues: make(map[string]any),
}
viper.SetConfigName(o.configName)
viper.SetConfigType(o.configFileType)
viper.AddConfigPath(o.configDirPath)
viper.WatchConfig()
viper.OnConfigChange(func(_ fsnotify.Event) {
provider.m.Lock()
defer provider.m.Unlock()
provider.rawValues = viper.AllSettings()
})
provider.rawValues = viper.AllSettings()
return provider, nil
}

View File

@ -1,46 +1,46 @@
package cleanupsessions
import (
"context"
"time"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
)
// TODO set with config
const cronInterval = time.Minute * 10
type ExpiredSessionsRemover interface {
RemoveExpiredSessions(ctx context.Context) error
}
type CleanupSessionCron struct {
db ExpiredSessionsRemover
}
func New(db ExpiredSessionsRemover) *CleanupSessionCron {
return &CleanupSessionCron{
db: db,
}
}
func (c *CleanupSessionCron) Run(ctx context.Context) {
logger.Info(ctx, "[CleanupSessionCron] running cron")
go func() {
t := time.NewTicker(cronInterval)
defer t.Stop()
for {
select {
case <-ctx.Done():
logger.Warn(ctx, "[CleanupSessionCron] context cancelled")
return
case <-t.C:
logger.Notice(ctx, "[CleanupSessionCron] cleanup started")
t.Reset(cronInterval)
if err := c.db.RemoveExpiredSessions(ctx); err != nil {
logger.Error(ctx, "[CleanupSessionCron] failed to remove expired sessions", logger.Err(err))
}
}
}
}()
}
package cleanupsessions
import (
"context"
"time"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
)
// TODO set with config
const cronInterval = time.Minute * 10
type ExpiredSessionsRemover interface {
RemoveExpiredSessions(ctx context.Context) error
}
type CleanupSessionCron struct {
db ExpiredSessionsRemover
}
func New(db ExpiredSessionsRemover) *CleanupSessionCron {
return &CleanupSessionCron{
db: db,
}
}
func (c *CleanupSessionCron) Run(ctx context.Context) {
logger.Info(ctx, "[CleanupSessionCron] running cron")
go func() {
t := time.NewTicker(cronInterval)
defer t.Stop()
for {
select {
case <-ctx.Done():
logger.Warn(ctx, "[CleanupSessionCron] context cancelled")
return
case <-t.C:
logger.Notice(ctx, "[CleanupSessionCron] cleanup started")
t.Reset(cronInterval)
if err := c.db.RemoveExpiredSessions(ctx); err != nil {
logger.Error(ctx, "[CleanupSessionCron] failed to remove expired sessions", logger.Err(err))
}
}
}
}()
}

View File

@ -1,7 +1,7 @@
package cron
import "context"
type Cron interface {
Run(ctx context.Context)
}
package cron
import "context"
type Cron interface {
Run(ctx context.Context)
}

View File

@ -1 +1 @@
package domain
package domain

View File

@ -0,0 +1,24 @@
package domain
import "fmt"
type StorageType int
const (
StorageTypeFS StorageType = iota
StorageTypeS3
)
const (
fslinkTemplate = "fs:///%s"
)
func GetFSConverter(storageType StorageType) func(fslink string) string {
switch storageType {
default:
// TODO s3 converter
return func(fslink string) string {
return fmt.Sprintf(fslinkTemplate, fslink)
}
}
}

View File

@ -1,7 +1,7 @@
package domain
type RegisterResolverRequest struct {
ResolverName string `json:"resolver_name"`
ResolverEndpoint string `json:"resolver_endpoint"`
RequiredResolveParams []string `json:"required_resolve_params"`
}
package domain
type RegisterResolverRequest struct {
ResolverName string `json:"resolver_name"`
ResolverEndpoint string `json:"resolver_endpoint"`
RequiredResolveParams []string `json:"required_resolve_params"`
}

View File

@ -1,29 +1,29 @@
package domain
type RegisterRequest struct {
Login string `json:"login"`
Password string `json:"password"`
}
type RegisterResponse struct {
Ok bool `json:"ok"`
Message string `json:"message"`
}
type LogonRequest struct {
Login string `json:"login"`
Password string `json:"password"`
}
type LogonResponse struct {
Ok bool `json:"ok"`
Message string `json:"message"`
}
type LogoutRequest struct {
}
type ErrorJson struct {
Code int `json:"code"`
Message string `json:"message"`
}
package domain
type RegisterRequest struct {
Login string `json:"login"`
Password string `json:"password"`
}
type RegisterResponse struct {
Ok bool `json:"ok"`
Message string `json:"message"`
}
type LogonRequest struct {
Login string `json:"login"`
Password string `json:"password"`
}
type LogonResponse struct {
Ok bool `json:"ok"`
Message string `json:"message"`
}
type LogoutRequest struct {
}
type ErrorJson struct {
Code int `json:"code"`
Message string `json:"message"`
}

View File

@ -1,9 +1,9 @@
package errs
import "errors"
var (
ErrorUnauthorized = errors.New("unauthorized")
ErrorAccessDenied = errors.New("access denied")
ErrorSessionExpired = errors.New("session expired")
)
package errs
import "errors"
var (
ErrorUnauthorized = errors.New("unauthorized")
ErrorAccessDenied = errors.New("access denied")
ErrorSessionExpired = errors.New("session expired")
)

View File

@ -1,51 +1,58 @@
package filesengine
import (
"context"
"fmt"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage/models/files"
"github.com/google/uuid"
)
type FilesEngine struct {
blobStorage storage.BlobStorage
metaStorage storage.MetaStorage
}
func NewFilesEngine(
blobStorage storage.BlobStorage,
metaStorage storage.MetaStorage,
) *FilesEngine {
return &FilesEngine{
blobStorage: blobStorage,
metaStorage: metaStorage,
}
}
type File struct {
Name string
UserID uuid.UUID
Ext string
Type string
Size int64
Data []byte
}
// TODO save file
func (e *FilesEngine) SaveFile(
ctx context.Context,
file File,
) (uuid.UUID, error) {
fileID, err := e.metaStorage.SaveMetadata(ctx, files.FileMetadata{})
if err != nil {
return uuid.Nil, fmt.Errorf("failed to create new file metadata: %w", err)
}
if err = e.blobStorage.SaveBlob(ctx, fileID, file.Data); err != nil {
return uuid.Nil, fmt.Errorf("failed to save file data: %w", err)
}
return fileID, nil
}
package filesengine
import (
"context"
"fmt"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage/models/files"
// "git.optclblast.xyz/draincloud/draincloud-core/internal/storage/models/files"
"github.com/google/uuid"
)
type FilesEngine struct {
blobStorage storage.BlobStorage
metaStorage storage.MetaStorage
}
func NewFilesEngine(
blobStorage storage.BlobStorage,
metaStorage storage.MetaStorage,
) *FilesEngine {
return &FilesEngine{
blobStorage: blobStorage,
metaStorage: metaStorage,
}
}
type File struct {
Name string
UserID int64
Ext string
Type string
Size int64
Data []byte
}
// TODO save file
func (e *FilesEngine) SaveFile(
ctx context.Context,
file File,
) (uuid.UUID, error) {
fileID, err := e.metaStorage.SaveMetadata(ctx, files.FileMetadata{
Name: file.Name,
UserID: file.UserID,
Ext: file.Ext,
Type: file.Type,
// FSLink: f,
})
if err != nil {
return uuid.Nil, fmt.Errorf("failed to create new file metadata: %w", err)
}
if err = e.blobStorage.SaveBlob(ctx, fileID, file.Data); err != nil {
return uuid.Nil, fmt.Errorf("failed to save file data: %w", err)
}
return fileID, nil
}

View File

@ -1,9 +1,9 @@
package handler
import (
"context"
"git.optclblast.xyz/draincloud/draincloud-core/internal/common"
)
type CallHandler func(ctx context.Context, req *common.Request) ([]byte, error)
package handler
import (
"context"
"git.optclblast.xyz/draincloud/draincloud-core/internal/common"
)
type CallHandler func(ctx context.Context, req *common.Request) ([]byte, error)

View File

@ -1,78 +1,78 @@
package handler
import (
"context"
"git.optclblast.xyz/draincloud/draincloud-core/internal/common"
)
type WriteOptions struct {
Code int
}
type WriteOption func(opts *WriteOptions)
func WithCode(code int) WriteOption {
return func(opts *WriteOptions) {
opts.Code = code
}
}
type Writer interface {
Write(ctx context.Context, resp any, opts ...WriteOption)
SetCookie(name string, value string, maxAge int, path string, domain string, secure bool, httpOnly bool)
}
type Handler interface {
GetName() string
GetRequiredResolveParams() []string
GetProcessFn() func(ctx context.Context, req *common.Request, w Writer) error
GetPreprocessFn() func(ctx context.Context, req *common.Request, w Writer) error
}
type BaseHandler struct {
Name string
RequiredResolveParams []string
ProcessFn func(ctx context.Context, req *common.Request, w Writer) error
PreprocessFn func(ctx context.Context, req *common.Request, w Writer) error
}
func New() *BaseHandler {
return new(BaseHandler)
}
func (h *BaseHandler) WithName(name string) *BaseHandler {
h.Name = name
return h
}
func (h *BaseHandler) WithRequiredResolveParams(params ...string) *BaseHandler {
h.RequiredResolveParams = params
return h
}
func (h *BaseHandler) WithProcessFunc(fn func(ctx context.Context, req *common.Request, w Writer) error) *BaseHandler {
h.ProcessFn = fn
return h
}
func (h *BaseHandler) WithPreprocessFunc(fn func(ctx context.Context, req *common.Request, w Writer) error) *BaseHandler {
h.PreprocessFn = fn
return h
}
func (h *BaseHandler) GetName() string {
return h.Name
}
func (h *BaseHandler) GetRequiredResolveParams() []string {
return h.RequiredResolveParams
}
func (h *BaseHandler) GetProcessFn() func(ctx context.Context, req *common.Request, w Writer) error {
return h.ProcessFn
}
func (h *BaseHandler) GetPreprocessFn() func(ctx context.Context, req *common.Request, w Writer) error {
return h.PreprocessFn
}
package handler
import (
"context"
"git.optclblast.xyz/draincloud/draincloud-core/internal/common"
)
type WriteOptions struct {
Code int
}
type WriteOption func(opts *WriteOptions)
func WithCode(code int) WriteOption {
return func(opts *WriteOptions) {
opts.Code = code
}
}
type Writer interface {
Write(ctx context.Context, resp any, opts ...WriteOption)
SetCookie(name string, value string, maxAge int, path string, domain string, secure bool, httpOnly bool)
}
type Handler interface {
GetName() string
GetRequiredResolveParams() []string
GetProcessFn() func(ctx context.Context, req *common.Request, w Writer) error
GetPreprocessFn() func(ctx context.Context, req *common.Request, w Writer) error
}
type BaseHandler struct {
Name string
RequiredResolveParams []string
ProcessFn func(ctx context.Context, req *common.Request, w Writer) error
PreprocessFn func(ctx context.Context, req *common.Request, w Writer) error
}
func New() *BaseHandler {
return new(BaseHandler)
}
func (h *BaseHandler) WithName(name string) *BaseHandler {
h.Name = name
return h
}
func (h *BaseHandler) WithRequiredResolveParams(params ...string) *BaseHandler {
h.RequiredResolveParams = params
return h
}
func (h *BaseHandler) WithProcessFunc(fn func(ctx context.Context, req *common.Request, w Writer) error) *BaseHandler {
h.ProcessFn = fn
return h
}
func (h *BaseHandler) WithPreprocessFunc(fn func(ctx context.Context, req *common.Request, w Writer) error) *BaseHandler {
h.PreprocessFn = fn
return h
}
func (h *BaseHandler) GetName() string {
return h.Name
}
func (h *BaseHandler) GetRequiredResolveParams() []string {
return h.RequiredResolveParams
}
func (h *BaseHandler) GetProcessFn() func(ctx context.Context, req *common.Request, w Writer) error {
return h.ProcessFn
}
func (h *BaseHandler) GetPreprocessFn() func(ctx context.Context, req *common.Request, w Writer) error {
return h.PreprocessFn
}

View File

@ -1,154 +1,154 @@
package logger
import (
"context"
"io"
"log/slog"
"os"
"strings"
)
type _key string
//nolint:gochecknoglobals // ...
var loggerKey _key = "_core_logger"
type LoggerOpt func(p *loggerParams)
func NewLoggerContext(ctx context.Context, opts ...LoggerOpt) context.Context {
p := new(loggerParams)
for _, o := range opts {
o(p)
}
log := p.build()
return context.WithValue(ctx, loggerKey, log)
}
type loggerParams struct {
local bool
addSource bool
lvl slog.Level
writers []io.Writer
}
func WithWriter(w io.Writer) LoggerOpt {
return func(p *loggerParams) {
p.writers = append(p.writers, w)
}
}
func WithLevel(l slog.Level) LoggerOpt {
return func(p *loggerParams) {
p.lvl = l
}
}
func Local() LoggerOpt {
return func(p *loggerParams) {
p.local = true
}
}
func WithSource() LoggerOpt {
return func(p *loggerParams) {
p.addSource = true
}
}
func Err(err error) slog.Attr {
return slog.Attr{
Key: "error",
Value: slog.StringValue(err.Error()),
}
}
func MapLevel(lvl string) slog.Level {
switch strings.ToLower(lvl) {
case "debug":
return LevelDebug
case "info":
return LevelInfo
case "notice":
return LevelNotice
case "warn":
return LevelWarn
case "error":
return LevelError
case "critical":
return LevelCritial
case "alert":
return LevelAlert
case "emergency":
return LevelEmergency
default:
return LevelInfo
}
}
func (b *loggerParams) build() *slog.Logger {
if len(b.writers) == 0 {
b.writers = append(b.writers, os.Stdout)
}
w := io.MultiWriter(b.writers...)
if b.local {
opts := prettyHandlerOptions{
SlogOpts: &slog.HandlerOptions{
Level: b.lvl,
AddSource: b.addSource,
},
}
handler := opts.newPrettyHandler(w)
return slog.New(handler)
}
return newLogger(b.lvl, w)
}
func newLogger(lvl slog.Level, w io.Writer) *slog.Logger {
return slog.New(
slog.NewJSONHandler(w, &slog.HandlerOptions{
Level: lvl,
ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
if a.Key == slog.LevelKey {
level := a.Value.Any().(slog.Level)
switch {
case level < LevelInfo:
a.Value = slog.StringValue("DEBUG")
case level < LevelNotice:
a.Value = slog.StringValue("INFO")
case level < LevelWarn:
a.Value = slog.StringValue("NOTICE")
case level < LevelError:
a.Value = slog.StringValue("WARNING")
case level < LevelCritial:
a.Value = slog.StringValue("ERROR")
case level < LevelAlert:
a.Value = slog.StringValue("CRITICAL")
case level < LevelEmergency:
a.Value = slog.StringValue("ALERT")
default:
a.Value = slog.StringValue("EMERGENCY")
}
}
return a
},
}),
)
}
func loggerFromCtx(ctx context.Context) *slog.Logger {
if l, ok := ctx.Value(loggerKey).(*slog.Logger); ok {
return l
}
return globalLogger
}
package logger
import (
"context"
"io"
"log/slog"
"os"
"strings"
)
type _key string
//nolint:gochecknoglobals // ...
var loggerKey _key = "_core_logger"
type LoggerOpt func(p *loggerParams)
func NewLoggerContext(ctx context.Context, opts ...LoggerOpt) context.Context {
p := new(loggerParams)
for _, o := range opts {
o(p)
}
log := p.build()
return context.WithValue(ctx, loggerKey, log)
}
type loggerParams struct {
local bool
addSource bool
lvl slog.Level
writers []io.Writer
}
func WithWriter(w io.Writer) LoggerOpt {
return func(p *loggerParams) {
p.writers = append(p.writers, w)
}
}
func WithLevel(l slog.Level) LoggerOpt {
return func(p *loggerParams) {
p.lvl = l
}
}
func Local() LoggerOpt {
return func(p *loggerParams) {
p.local = true
}
}
func WithSource() LoggerOpt {
return func(p *loggerParams) {
p.addSource = true
}
}
func Err(err error) slog.Attr {
return slog.Attr{
Key: "error",
Value: slog.StringValue(err.Error()),
}
}
func MapLevel(lvl string) slog.Level {
switch strings.ToLower(lvl) {
case "debug":
return LevelDebug
case "info":
return LevelInfo
case "notice":
return LevelNotice
case "warn":
return LevelWarn
case "error":
return LevelError
case "critical":
return LevelCritial
case "alert":
return LevelAlert
case "emergency":
return LevelEmergency
default:
return LevelInfo
}
}
func (b *loggerParams) build() *slog.Logger {
if len(b.writers) == 0 {
b.writers = append(b.writers, os.Stdout)
}
w := io.MultiWriter(b.writers...)
if b.local {
opts := prettyHandlerOptions{
SlogOpts: &slog.HandlerOptions{
Level: b.lvl,
AddSource: b.addSource,
},
}
handler := opts.newPrettyHandler(w)
return slog.New(handler)
}
return newLogger(b.lvl, w)
}
func newLogger(lvl slog.Level, w io.Writer) *slog.Logger {
return slog.New(
slog.NewJSONHandler(w, &slog.HandlerOptions{
Level: lvl,
ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
if a.Key == slog.LevelKey {
level := a.Value.Any().(slog.Level)
switch {
case level < LevelInfo:
a.Value = slog.StringValue("DEBUG")
case level < LevelNotice:
a.Value = slog.StringValue("INFO")
case level < LevelWarn:
a.Value = slog.StringValue("NOTICE")
case level < LevelError:
a.Value = slog.StringValue("WARNING")
case level < LevelCritial:
a.Value = slog.StringValue("ERROR")
case level < LevelAlert:
a.Value = slog.StringValue("CRITICAL")
case level < LevelEmergency:
a.Value = slog.StringValue("ALERT")
default:
a.Value = slog.StringValue("EMERGENCY")
}
}
return a
},
}),
)
}
func loggerFromCtx(ctx context.Context) *slog.Logger {
if l, ok := ctx.Value(loggerKey).(*slog.Logger); ok {
return l
}
return globalLogger
}

View File

@ -1,35 +1,35 @@
package logger
import (
"context"
"log/slog"
)
//nolint:unused //...
func newDiscardLogger() *slog.Logger {
return slog.New(newDiscardHandler())
}
//nolint:unused //...
type DiscardHandler struct{}
//nolint:unused //...
func newDiscardHandler() *DiscardHandler {
return &DiscardHandler{}
}
func (h *DiscardHandler) Handle(_ context.Context, _ slog.Record) error {
return nil
}
func (h *DiscardHandler) WithAttrs(_ []slog.Attr) slog.Handler {
return h
}
func (h *DiscardHandler) WithGroup(_ string) slog.Handler {
return h
}
func (h *DiscardHandler) Enabled(_ context.Context, _ slog.Level) bool {
return false
}
package logger
import (
"context"
"log/slog"
)
//nolint:unused //...
func newDiscardLogger() *slog.Logger {
return slog.New(newDiscardHandler())
}
//nolint:unused //...
type DiscardHandler struct{}
//nolint:unused //...
func newDiscardHandler() *DiscardHandler {
return &DiscardHandler{}
}
func (h *DiscardHandler) Handle(_ context.Context, _ slog.Record) error {
return nil
}
func (h *DiscardHandler) WithAttrs(_ []slog.Attr) slog.Handler {
return h
}
func (h *DiscardHandler) WithGroup(_ string) slog.Handler {
return h
}
func (h *DiscardHandler) Enabled(_ context.Context, _ slog.Level) bool {
return false
}

View File

@ -1,81 +1,81 @@
package logger
import (
"context"
"log/slog"
"os"
)
//nolint:gochecknoglobals // ...
var globalLogger *slog.Logger = newLogger(LevelDebug, os.Stdout)
func SetLevel(l slog.Level) {
globalLogger = newLogger(l, os.Stdout)
}
const (
LevelEmergency = slog.Level(10000)
LevelAlert = slog.Level(1000)
LevelCritial = slog.Level(100)
LevelError = slog.LevelError
LevelWarn = slog.LevelWarn
LevelNotice = slog.Level(2)
LevelInfo = slog.LevelInfo
LevelDebug = slog.LevelDebug
)
func Fatal(ctx context.Context, message string, attrs ...any) {
l := loggerFromCtx(ctx)
l.Log(ctx, LevelEmergency, message, attrs...)
os.Exit(1)
}
func Emergency(ctx context.Context, message string, attrs ...any) {
l := loggerFromCtx(ctx)
l.Log(ctx, LevelEmergency, message, attrs...)
}
func Alert(ctx context.Context, message string, attrs ...any) {
l := loggerFromCtx(ctx)
l.Log(ctx, LevelAlert, message, attrs...)
}
func Critial(ctx context.Context, message string, attrs ...any) {
l := loggerFromCtx(ctx)
l.Log(ctx, LevelCritial, message, attrs...)
}
func Error(ctx context.Context, message string, attrs ...any) {
l := loggerFromCtx(ctx)
l.ErrorContext(ctx, message, attrs...)
}
func Warn(ctx context.Context, message string, attrs ...any) {
l := loggerFromCtx(ctx)
l.WarnContext(ctx, message, attrs...)
}
func Notice(ctx context.Context, message string, attrs ...any) {
l := loggerFromCtx(ctx)
l.Log(ctx, LevelNotice, message, attrs...)
}
func Info(ctx context.Context, message string, attrs ...any) {
l := loggerFromCtx(ctx)
l.InfoContext(ctx, message, attrs...)
}
func Debug(ctx context.Context, message string, attrs ...any) {
l := loggerFromCtx(ctx)
l.DebugContext(ctx, message, attrs...)
}
package logger
import (
"context"
"log/slog"
"os"
)
//nolint:gochecknoglobals // ...
var globalLogger *slog.Logger = newLogger(LevelDebug, os.Stdout)
func SetLevel(l slog.Level) {
globalLogger = newLogger(l, os.Stdout)
}
const (
LevelEmergency = slog.Level(10000)
LevelAlert = slog.Level(1000)
LevelCritial = slog.Level(100)
LevelError = slog.LevelError
LevelWarn = slog.LevelWarn
LevelNotice = slog.Level(2)
LevelInfo = slog.LevelInfo
LevelDebug = slog.LevelDebug
)
func Fatal(ctx context.Context, message string, attrs ...any) {
l := loggerFromCtx(ctx)
l.Log(ctx, LevelEmergency, message, attrs...)
os.Exit(1)
}
func Emergency(ctx context.Context, message string, attrs ...any) {
l := loggerFromCtx(ctx)
l.Log(ctx, LevelEmergency, message, attrs...)
}
func Alert(ctx context.Context, message string, attrs ...any) {
l := loggerFromCtx(ctx)
l.Log(ctx, LevelAlert, message, attrs...)
}
func Critial(ctx context.Context, message string, attrs ...any) {
l := loggerFromCtx(ctx)
l.Log(ctx, LevelCritial, message, attrs...)
}
func Error(ctx context.Context, message string, attrs ...any) {
l := loggerFromCtx(ctx)
l.ErrorContext(ctx, message, attrs...)
}
func Warn(ctx context.Context, message string, attrs ...any) {
l := loggerFromCtx(ctx)
l.WarnContext(ctx, message, attrs...)
}
func Notice(ctx context.Context, message string, attrs ...any) {
l := loggerFromCtx(ctx)
l.Log(ctx, LevelNotice, message, attrs...)
}
func Info(ctx context.Context, message string, attrs ...any) {
l := loggerFromCtx(ctx)
l.InfoContext(ctx, message, attrs...)
}
func Debug(ctx context.Context, message string, attrs ...any) {
l := loggerFromCtx(ctx)
l.DebugContext(ctx, message, attrs...)
}

View File

@ -1,97 +1,97 @@
package logger
import (
"context"
"encoding/json"
"io"
stdlog "log"
"log/slog"
"github.com/fatih/color"
)
type prettyHandlerOptions struct {
SlogOpts *slog.HandlerOptions
}
type prettyHandler struct {
opts prettyHandlerOptions
slog.Handler
l *stdlog.Logger
attrs []slog.Attr
}
func (opts prettyHandlerOptions) newPrettyHandler(
out io.Writer,
) *prettyHandler {
h := &prettyHandler{
Handler: slog.NewJSONHandler(out, opts.SlogOpts),
l: stdlog.New(out, "", 0),
}
return h
}
func (h *prettyHandler) Handle(_ context.Context, r slog.Record) error {
level := r.Level.String() + ":"
switch r.Level {
case slog.LevelDebug:
level = color.MagentaString(level)
case slog.LevelInfo:
level = color.BlueString(level)
case slog.LevelWarn:
level = color.YellowString(level)
case slog.LevelError:
level = color.RedString(level)
}
fields := make(map[string]interface{}, r.NumAttrs())
r.Attrs(func(a slog.Attr) bool {
fields[a.Key] = a.Value.Any()
return true
})
for _, a := range h.attrs {
fields[a.Key] = a.Value.Any()
}
var b []byte
var err error
if len(fields) > 0 {
b, err = json.MarshalIndent(fields, "", " ")
if err != nil {
return err
}
}
timeStr := r.Time.Format("[15:05:05.000]")
msg := color.CyanString(r.Message)
h.l.Println(
timeStr,
level,
msg,
color.WhiteString(string(b)),
)
return nil
}
func (h *prettyHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
return &prettyHandler{
Handler: h.Handler,
l: h.l,
attrs: attrs,
}
}
func (h *prettyHandler) WithGroup(name string) slog.Handler {
return &prettyHandler{
Handler: h.Handler.WithGroup(name),
l: h.l,
}
}
package logger
import (
"context"
"encoding/json"
"io"
stdlog "log"
"log/slog"
"github.com/fatih/color"
)
type prettyHandlerOptions struct {
SlogOpts *slog.HandlerOptions
}
type prettyHandler struct {
opts prettyHandlerOptions
slog.Handler
l *stdlog.Logger
attrs []slog.Attr
}
func (opts prettyHandlerOptions) newPrettyHandler(
out io.Writer,
) *prettyHandler {
h := &prettyHandler{
Handler: slog.NewJSONHandler(out, opts.SlogOpts),
l: stdlog.New(out, "", 0),
}
return h
}
func (h *prettyHandler) Handle(_ context.Context, r slog.Record) error {
level := r.Level.String() + ":"
switch r.Level {
case slog.LevelDebug:
level = color.MagentaString(level)
case slog.LevelInfo:
level = color.BlueString(level)
case slog.LevelWarn:
level = color.YellowString(level)
case slog.LevelError:
level = color.RedString(level)
}
fields := make(map[string]interface{}, r.NumAttrs())
r.Attrs(func(a slog.Attr) bool {
fields[a.Key] = a.Value.Any()
return true
})
for _, a := range h.attrs {
fields[a.Key] = a.Value.Any()
}
var b []byte
var err error
if len(fields) > 0 {
b, err = json.MarshalIndent(fields, "", " ")
if err != nil {
return err
}
}
timeStr := r.Time.Format("[15:05:05.000]")
msg := color.CyanString(r.Message)
h.l.Println(
timeStr,
level,
msg,
color.WhiteString(string(b)),
)
return nil
}
func (h *prettyHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
return &prettyHandler{
Handler: h.Handler,
l: h.l,
attrs: attrs,
}
}
func (h *prettyHandler) WithGroup(name string) slog.Handler {
return &prettyHandler{
Handler: h.Handler.WithGroup(name),
l: h.l,
}
}

View File

@ -1,42 +1,42 @@
package domain
type InitPluginRequest struct {
Name string `json:"name"`
Version int `json:"version"`
Namespace string `json:"namespace"`
}
type PluginPage struct {
Name string `json:"name"`
Version int `json:"version"`
Namespace string `json:"namespace"`
Path string `json:"path"`
}
type PluginAction struct {
Name string `json:"name"`
Version int `json:"version"`
Namespace string `json:"namespace"`
RequiredResolveParams []string `json:"required_resolve_params"`
OptionalResolveParams []string `json:"optional_resolve_params"`
WithActions bool `json:"with_actions"`
Async bool `json:"async"`
}
type PluginComponent struct {
Name string `json:"name"`
Version int `json:"version"`
Namespace string `json:"namespace"`
RequiredResolveParams []string `json:"required_resolve_params"`
OptionalResolveParams []string `json:"optional_resolve_params"`
WithActions bool `json:"with_actions"`
Async bool `json:"async"`
}
type Ping struct {
Payload any `json:"payload"`
}
type Pong struct {
Payload any `json:"payload"`
}
package domain
type InitPluginRequest struct {
Name string `json:"name"`
Version int `json:"version"`
Namespace string `json:"namespace"`
}
type PluginPage struct {
Name string `json:"name"`
Version int `json:"version"`
Namespace string `json:"namespace"`
Path string `json:"path"`
}
type PluginAction struct {
Name string `json:"name"`
Version int `json:"version"`
Namespace string `json:"namespace"`
RequiredResolveParams []string `json:"required_resolve_params"`
OptionalResolveParams []string `json:"optional_resolve_params"`
WithActions bool `json:"with_actions"`
Async bool `json:"async"`
}
type PluginComponent struct {
Name string `json:"name"`
Version int `json:"version"`
Namespace string `json:"namespace"`
RequiredResolveParams []string `json:"required_resolve_params"`
OptionalResolveParams []string `json:"optional_resolve_params"`
WithActions bool `json:"with_actions"`
Async bool `json:"async"`
}
type Ping struct {
Payload any `json:"payload"`
}
type Pong struct {
Payload any `json:"payload"`
}

View File

@ -1,114 +1,114 @@
package plugin
import (
"context"
"encoding/json"
"errors"
"log/slog"
"net"
"strconv"
"git.optclblast.xyz/draincloud/draincloud-core/internal/closer"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
"git.optclblast.xyz/draincloud/draincloud-core/internal/plugin/domain"
)
type PluginLoader struct {
l net.Listener
store *PluginStore
}
func MustNewPluginLoader(ctx context.Context, listenPort uint16, ps *PluginStore) *PluginLoader {
l, err := net.Listen("tcp", "127.0.0.1:"+strconv.FormatInt(int64(listenPort), 10))
if err != nil {
logger.Fatal(ctx, "[MustNewPluginLoader] error build listener", logger.Err(err))
}
return &PluginLoader{
l: l,
store: ps,
}
}
func (p *PluginLoader) Run(ctx context.Context) {
go p.run(ctx)
}
func (p *PluginLoader) run(ctx context.Context) {
for {
select {
case <-ctx.Done():
logger.Info(ctx, "[plugin_loader][loop] closing")
if err := p.l.Close(); err != nil {
logger.Error(ctx, "[plugin_loader][loop] failed to close listener", logger.Err(err))
}
default:
conn, err := p.l.Accept()
if err != nil {
logger.Error(ctx, "[plugin_loader][loop] failed to accet new connection", logger.Err(err))
continue
}
logger.Debug(ctx, "[plugin_loader][loop] accepting connection")
go p.accept(ctx, conn)
}
}
}
func (p *PluginLoader) accept(ctx context.Context, conn net.Conn) {
data := make([]byte, 0)
// TODO make read loop
n, err := conn.Read(data)
if err != nil {
logger.Error(ctx, "[plugin_loader][accept] read error", logger.Err(err))
return
}
logger.Debug(ctx, "[plugin_loader][accept] bytes read", slog.Int("n", n))
init := new(domain.InitPluginRequest)
if err = json.Unmarshal(data, init); err != nil {
logger.Error(ctx, "[plugin_loader][accept] unmarshal request error", logger.Err(err))
return
}
if init.Namespace == "" {
logger.Error(ctx, "[plugin_loader][accept] empty namespace")
err = errors.Join(err, errors.New("init request must contain namespace"))
}
if init.Name == "" {
logger.Error(ctx, "[plugin_loader][accept] empty namespace")
err = errors.Join(err, errors.New("init request must contain namespace"))
}
if init.Version == 0 {
logger.Error(ctx, "[plugin_loader][accept] empty namespace")
err = errors.Join(err, errors.New("init request must contain namespace"))
}
if err != nil {
if _, werr := conn.Write([]byte(err.Error())); werr != nil {
logger.Error(ctx, "[plugin_loader][accept] failed to write init error", logger.Err(werr))
}
if cerr := conn.Close(); cerr != nil {
logger.Error(ctx, "[plugin_loader][accept] failed to close conn", logger.Err(cerr))
}
return
}
logger.Debug(ctx,
"[plugin_loader][accept] new plugin initialized",
"plugin", PluginStoreKey(init.Namespace, init.Name, init.Version),
)
plugin := &Plugin{
conn: conn,
md: *init,
}
closer.Add(plugin.Close)
p.store.Add(plugin)
}
package plugin
import (
"context"
"encoding/json"
"errors"
"log/slog"
"net"
"strconv"
"git.optclblast.xyz/draincloud/draincloud-core/internal/closer"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
"git.optclblast.xyz/draincloud/draincloud-core/internal/plugin/domain"
)
type PluginLoader struct {
l net.Listener
store *PluginStore
}
func MustNewPluginLoader(ctx context.Context, listenPort uint16, ps *PluginStore) *PluginLoader {
l, err := net.Listen("tcp", "127.0.0.1:"+strconv.FormatInt(int64(listenPort), 10))
if err != nil {
logger.Fatal(ctx, "[MustNewPluginLoader] error build listener", logger.Err(err))
}
return &PluginLoader{
l: l,
store: ps,
}
}
func (p *PluginLoader) Run(ctx context.Context) {
go p.run(ctx)
}
func (p *PluginLoader) run(ctx context.Context) {
for {
select {
case <-ctx.Done():
logger.Info(ctx, "[plugin_loader][loop] closing")
if err := p.l.Close(); err != nil {
logger.Error(ctx, "[plugin_loader][loop] failed to close listener", logger.Err(err))
}
default:
conn, err := p.l.Accept()
if err != nil {
logger.Error(ctx, "[plugin_loader][loop] failed to accet new connection", logger.Err(err))
continue
}
logger.Debug(ctx, "[plugin_loader][loop] accepting connection")
go p.accept(ctx, conn)
}
}
}
func (p *PluginLoader) accept(ctx context.Context, conn net.Conn) {
data := make([]byte, 0)
// TODO make read loop
n, err := conn.Read(data)
if err != nil {
logger.Error(ctx, "[plugin_loader][accept] read error", logger.Err(err))
return
}
logger.Debug(ctx, "[plugin_loader][accept] bytes read", slog.Int("n", n))
init := new(domain.InitPluginRequest)
if err = json.Unmarshal(data, init); err != nil {
logger.Error(ctx, "[plugin_loader][accept] unmarshal request error", logger.Err(err))
return
}
if init.Namespace == "" {
logger.Error(ctx, "[plugin_loader][accept] empty namespace")
err = errors.Join(err, errors.New("init request must contain namespace"))
}
if init.Name == "" {
logger.Error(ctx, "[plugin_loader][accept] empty namespace")
err = errors.Join(err, errors.New("init request must contain namespace"))
}
if init.Version == 0 {
logger.Error(ctx, "[plugin_loader][accept] empty namespace")
err = errors.Join(err, errors.New("init request must contain namespace"))
}
if err != nil {
if _, werr := conn.Write([]byte(err.Error())); werr != nil {
logger.Error(ctx, "[plugin_loader][accept] failed to write init error", logger.Err(werr))
}
if cerr := conn.Close(); cerr != nil {
logger.Error(ctx, "[plugin_loader][accept] failed to close conn", logger.Err(cerr))
}
return
}
logger.Debug(ctx,
"[plugin_loader][accept] new plugin initialized",
"plugin", PluginStoreKey(init.Namespace, init.Name, init.Version),
)
plugin := &Plugin{
conn: conn,
md: *init,
}
closer.Add(plugin.Close)
p.store.Add(plugin)
}

View File

@ -1,45 +1,45 @@
package plugin
import (
"bytes"
"encoding/json"
"fmt"
"net"
"git.optclblast.xyz/draincloud/draincloud-core/internal/plugin/domain"
)
type Plugin struct {
conn net.Conn
md domain.InitPluginRequest
}
func (p *Plugin) Init(initPayload any) error {
r := &domain.Ping{
Payload: initPayload,
}
pingData, err := json.Marshal(r)
if err != nil {
return err
}
if _, err = p.conn.Write(pingData); err != nil {
return err
}
pongData := make([]byte, 0)
if _, err := p.conn.Read(pongData); err != nil {
return err
}
if !bytes.Equal(pongData, pingData) {
return fmt.Errorf("ping-pong payload assertion error")
}
return nil
}
func (p *Plugin) Close() error {
return p.conn.Close()
}
package plugin
import (
"bytes"
"encoding/json"
"fmt"
"net"
"git.optclblast.xyz/draincloud/draincloud-core/internal/plugin/domain"
)
type Plugin struct {
conn net.Conn
md domain.InitPluginRequest
}
func (p *Plugin) Init(initPayload any) error {
r := &domain.Ping{
Payload: initPayload,
}
pingData, err := json.Marshal(r)
if err != nil {
return err
}
if _, err = p.conn.Write(pingData); err != nil {
return err
}
pongData := make([]byte, 0)
if _, err := p.conn.Read(pongData); err != nil {
return err
}
if !bytes.Equal(pongData, pingData) {
return fmt.Errorf("ping-pong payload assertion error")
}
return nil
}
func (p *Plugin) Close() error {
return p.conn.Close()
}

View File

@ -1,24 +1,24 @@
package plugin
import (
"context"
"fmt"
"git.optclblast.xyz/draincloud/draincloud-core/internal/common"
"git.optclblast.xyz/draincloud/draincloud-core/internal/handler"
)
type PluginHandler struct {
*handler.BaseHandler
store *PluginStore
}
func (_ *PluginHandler) GetName() string {
return "pluginv1"
}
func (p *PluginHandler) GetProcessFn() func(ctx context.Context, req *common.Request, w handler.Writer) error {
return func(ctx context.Context, req *common.Request, w handler.Writer) error {
return fmt.Errorf("unimplemented")
}
}
package plugin
import (
"context"
"fmt"
"git.optclblast.xyz/draincloud/draincloud-core/internal/common"
"git.optclblast.xyz/draincloud/draincloud-core/internal/handler"
)
type PluginHandler struct {
*handler.BaseHandler
store *PluginStore
}
func (_ *PluginHandler) GetName() string {
return "pluginv1"
}
func (p *PluginHandler) GetProcessFn() func(ctx context.Context, req *common.Request, w handler.Writer) error {
return func(ctx context.Context, req *common.Request, w handler.Writer) error {
return fmt.Errorf("unimplemented")
}
}

View File

@ -1,37 +1,37 @@
package plugin
import (
"fmt"
"sync"
)
type PluginStore struct {
m sync.RWMutex
plugins map[string]*Plugin
}
func NewPluginStore() *PluginStore {
return &PluginStore{
plugins: make(map[string]*Plugin),
}
}
func (s *PluginStore) Add(plugin *Plugin) {
s.m.Lock()
defer s.m.Unlock()
s.plugins[PluginStoreKey(plugin.md.Namespace, plugin.md.Name, plugin.md.Version)] = plugin
}
func (s *PluginStore) Get(plugin string) *Plugin {
s.m.RLock()
defer s.m.RUnlock()
if p, ok := s.plugins[plugin]; ok {
return p
}
return nil
}
func PluginStoreKey(ns, name string, v int) string {
return fmt.Sprintf("%s.%s.%v", ns, name, v)
}
package plugin
import (
"fmt"
"sync"
)
type PluginStore struct {
m sync.RWMutex
plugins map[string]*Plugin
}
func NewPluginStore() *PluginStore {
return &PluginStore{
plugins: make(map[string]*Plugin),
}
}
func (s *PluginStore) Add(plugin *Plugin) {
s.m.Lock()
defer s.m.Unlock()
s.plugins[PluginStoreKey(plugin.md.Namespace, plugin.md.Name, plugin.md.Version)] = plugin
}
func (s *PluginStore) Get(plugin string) *Plugin {
s.m.RLock()
defer s.m.RUnlock()
if p, ok := s.plugins[plugin]; ok {
return p
}
return nil
}
func PluginStoreKey(ns, name string, v int) string {
return fmt.Sprintf("%s.%s.%v", ns, name, v)
}

View File

@ -1,112 +1,112 @@
package processor
import (
"context"
"errors"
"fmt"
"net/http"
"git.optclblast.xyz/draincloud/draincloud-core/internal/common"
"git.optclblast.xyz/draincloud/draincloud-core/internal/domain"
"git.optclblast.xyz/draincloud/draincloud-core/internal/errs"
"git.optclblast.xyz/draincloud/draincloud-core/internal/handler"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
resolvedispatcher "git.optclblast.xyz/draincloud/draincloud-core/internal/resolve_dispatcher"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage"
"github.com/gin-gonic/gin"
"golang.org/x/sync/errgroup"
)
type GinProcessor struct {
rp *common.RequestPool
authStorage storage.AuthStorage
resolveDispatcher *resolvedispatcher.ResolveDispatcher
}
func NewGinProcessor(
authStorage storage.AuthStorage,
resolveDispatcher *resolvedispatcher.ResolveDispatcher,
) *GinProcessor {
return &GinProcessor{
rp: common.NewRequestPool(),
authStorage: authStorage,
resolveDispatcher: resolveDispatcher,
}
}
func (p *GinProcessor) Process(handler handler.Handler) gin.HandlerFunc {
return func(ctx *gin.Context) {
req := common.NewRequestFromHttp(p.rp, ctx.Request)
ctx.Request = ctx.Request.WithContext(context.WithValue(ctx.Request.Context(), "__request_id", req.ID))
// 1. Resolve the resolvers, collect all data required
// 2. Try process oprional resolvers
err := p.resolve(ctx, handler, req)
if err != nil {
p.writeError(ctx, err)
return
}
// 3. Call preprocessing fn's, middlewares etc.
if preprocessFn := handler.GetPreprocessFn(); preprocessFn != nil {
if err = preprocessFn(ctx, req, wrapGin(ctx)); err != nil {
p.writeError(ctx, err)
return
}
}
// 4. Call handler.ProcessFn
if err = handler.GetProcessFn()(ctx, req, wrapGin(ctx)); err != nil {
p.writeError(ctx, err)
return
}
}
}
func (p *GinProcessor) resolve(ctx *gin.Context, h handler.Handler, req *common.Request) error {
eg, c := errgroup.WithContext(ctx)
for _, r := range h.GetRequiredResolveParams() {
resolver, err := p.resolveDispatcher.GetResolver(r)
if err != nil {
return fmt.Errorf("failed to resolve '%s' param: no resolver provided: %w", r, err)
}
resolveValueName := r
eg.Go(func() error {
if resolveErr := resolver.Resolve(c, req, ctx); resolveErr != nil {
return fmt.Errorf("failed to resolve '%s' value: %w", resolveValueName, resolveErr)
}
return nil
})
}
if err := eg.Wait(); err != nil {
return err
}
return nil
}
func (p *GinProcessor) writeError(ctx *gin.Context, err error) {
logger.Error(ctx, "error process request", logger.Err(err))
// TODO do a custom error handling for resolvers / handlers / processors etc
switch {
case errors.Is(err, errs.ErrorAccessDenied):
ctx.JSON(http.StatusInternalServerError, domain.ErrorJson{
Code: http.StatusForbidden,
Message: err.Error(),
})
case errors.Is(err, errs.ErrorSessionExpired):
ctx.JSON(http.StatusInternalServerError, domain.ErrorJson{
Code: http.StatusForbidden,
Message: err.Error(),
})
default:
ctx.JSON(http.StatusInternalServerError, domain.ErrorJson{
Code: http.StatusInternalServerError,
Message: "Internal Error",
})
}
}
package processor
import (
"context"
"errors"
"fmt"
"net/http"
"git.optclblast.xyz/draincloud/draincloud-core/internal/common"
"git.optclblast.xyz/draincloud/draincloud-core/internal/domain"
"git.optclblast.xyz/draincloud/draincloud-core/internal/errs"
"git.optclblast.xyz/draincloud/draincloud-core/internal/handler"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
resolvedispatcher "git.optclblast.xyz/draincloud/draincloud-core/internal/resolve_dispatcher"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage"
"github.com/gin-gonic/gin"
"golang.org/x/sync/errgroup"
)
type GinProcessor struct {
rp *common.RequestPool
authStorage storage.AuthStorage
resolveDispatcher *resolvedispatcher.ResolveDispatcher
}
func NewGinProcessor(
authStorage storage.AuthStorage,
resolveDispatcher *resolvedispatcher.ResolveDispatcher,
) *GinProcessor {
return &GinProcessor{
rp: common.NewRequestPool(),
authStorage: authStorage,
resolveDispatcher: resolveDispatcher,
}
}
func (p *GinProcessor) Process(handler handler.Handler) gin.HandlerFunc {
return func(ctx *gin.Context) {
req := common.NewRequestFromHttp(p.rp, ctx.Request)
ctx.Request = ctx.Request.WithContext(context.WithValue(ctx.Request.Context(), "__request_id", req.ID))
// 1. Resolve the resolvers, collect all data required
// 2. Try process oprional resolvers
err := p.resolve(ctx, handler, req)
if err != nil {
p.writeError(ctx, err)
return
}
// 3. Call preprocessing fn's, middlewares etc.
if preprocessFn := handler.GetPreprocessFn(); preprocessFn != nil {
if err = preprocessFn(ctx, req, wrapGin(ctx)); err != nil {
p.writeError(ctx, err)
return
}
}
// 4. Call handler.ProcessFn
if err = handler.GetProcessFn()(ctx, req, wrapGin(ctx)); err != nil {
p.writeError(ctx, err)
return
}
}
}
func (p *GinProcessor) resolve(ctx *gin.Context, h handler.Handler, req *common.Request) error {
eg, c := errgroup.WithContext(ctx)
for _, r := range h.GetRequiredResolveParams() {
resolver, err := p.resolveDispatcher.GetResolver(r)
if err != nil {
return fmt.Errorf("failed to resolve '%s' param: no resolver provided: %w", r, err)
}
resolveValueName := r
eg.Go(func() error {
if resolveErr := resolver.Resolve(c, req, ctx); resolveErr != nil {
return fmt.Errorf("failed to resolve '%s' value: %w", resolveValueName, resolveErr)
}
return nil
})
}
if err := eg.Wait(); err != nil {
return err
}
return nil
}
func (p *GinProcessor) writeError(ctx *gin.Context, err error) {
logger.Error(ctx, "error process request", logger.Err(err))
// TODO do a custom error handling for resolvers / handlers / processors etc
switch {
case errors.Is(err, errs.ErrorAccessDenied):
ctx.JSON(http.StatusInternalServerError, domain.ErrorJson{
Code: http.StatusForbidden,
Message: err.Error(),
})
case errors.Is(err, errs.ErrorSessionExpired):
ctx.JSON(http.StatusInternalServerError, domain.ErrorJson{
Code: http.StatusForbidden,
Message: err.Error(),
})
default:
ctx.JSON(http.StatusInternalServerError, domain.ErrorJson{
Code: http.StatusInternalServerError,
Message: "Internal Error",
})
}
}

View File

@ -1,34 +1,34 @@
package processor
import (
"context"
"net/http"
"git.optclblast.xyz/draincloud/draincloud-core/internal/handler"
"github.com/gin-gonic/gin"
)
type ginWriter struct {
ctx *gin.Context
}
func wrapGin(ctx *gin.Context) ginWriter {
return ginWriter{
ctx: ctx,
}
}
func (w ginWriter) Write(ctx context.Context, resp any, opts ...handler.WriteOption) {
params := &handler.WriteOptions{
Code: http.StatusOK,
}
for _, o := range opts {
o(params)
}
w.ctx.JSON(params.Code, resp)
}
func (w ginWriter) SetCookie(name string, value string, maxAge int, path string, domain string, secure bool, httpOnly bool) {
w.ctx.SetCookie(name, value, maxAge, path, domain, secure, httpOnly)
}
package processor
import (
"context"
"net/http"
"git.optclblast.xyz/draincloud/draincloud-core/internal/handler"
"github.com/gin-gonic/gin"
)
type ginWriter struct {
ctx *gin.Context
}
func wrapGin(ctx *gin.Context) ginWriter {
return ginWriter{
ctx: ctx,
}
}
func (w ginWriter) Write(ctx context.Context, resp any, opts ...handler.WriteOption) {
params := &handler.WriteOptions{
Code: http.StatusOK,
}
for _, o := range opts {
o(params)
}
w.ctx.JSON(params.Code, resp)
}
func (w ginWriter) SetCookie(name string, value string, maxAge int, path string, domain string, secure bool, httpOnly bool) {
w.ctx.SetCookie(name, value, maxAge, path, domain, secure, httpOnly)
}

View File

@ -1,7 +1,7 @@
package processor
import "git.optclblast.xyz/draincloud/draincloud-core/internal/handler"
type Processor[H any] interface {
Process(handler.Handler) H
}
package processor
import "git.optclblast.xyz/draincloud/draincloud-core/internal/handler"
type Processor[H any] interface {
Process(handler.Handler) H
}

View File

@ -1,38 +1,38 @@
package reqcontext
import (
"context"
"fmt"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage/models/auth"
"github.com/google/uuid"
)
type CtxKey string
const (
UserIDCtxKey CtxKey = "_ctx_user_id"
SessionCtxKey CtxKey = "_ctx_session"
)
func WithUserID(parent context.Context, userID uuid.UUID) context.Context {
return context.WithValue(parent, UserIDCtxKey, userID)
}
func GetUserID(ctx context.Context) (uuid.UUID, error) {
if id, ok := ctx.Value(UserIDCtxKey).(uuid.UUID); ok {
return id, nil
}
return uuid.Nil, fmt.Errorf("userID not passed with context")
}
func WithSession(parent context.Context, session *auth.Session) context.Context {
return context.WithValue(parent, SessionCtxKey, session)
}
func GetSession(ctx context.Context) (*auth.Session, error) {
if ses, ok := ctx.Value(UserIDCtxKey).(*auth.Session); ok {
return ses, nil
}
return nil, fmt.Errorf("session not passed with context")
}
package reqcontext
import (
"context"
"fmt"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage/models/auth"
"github.com/google/uuid"
)
type CtxKey string
const (
UserIDCtxKey CtxKey = "_ctx_user_id"
SessionCtxKey CtxKey = "_ctx_session"
)
func WithUserID(parent context.Context, userID uuid.UUID) context.Context {
return context.WithValue(parent, UserIDCtxKey, userID)
}
func GetUserID(ctx context.Context) (uuid.UUID, error) {
if id, ok := ctx.Value(UserIDCtxKey).(uuid.UUID); ok {
return id, nil
}
return uuid.Nil, fmt.Errorf("userID not passed with context")
}
func WithSession(parent context.Context, session *auth.Session) context.Context {
return context.WithValue(parent, SessionCtxKey, session)
}
func GetSession(ctx context.Context) (*auth.Session, error) {
if ses, ok := ctx.Value(UserIDCtxKey).(*auth.Session); ok {
return ses, nil
}
return nil, fmt.Errorf("session not passed with context")
}

View File

@ -1,48 +1,48 @@
package resolvedispatcher
import (
"context"
"fmt"
"sync"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
"git.optclblast.xyz/draincloud/draincloud-core/internal/resolvers"
)
type ResolveDispatcher struct {
m sync.RWMutex
router map[string]resolvers.Resolver
}
func New() *ResolveDispatcher {
return &ResolveDispatcher{
router: map[string]resolvers.Resolver{},
}
}
func (d *ResolveDispatcher) RegisterResolver(
ctx context.Context,
resolverName string,
resolver resolvers.Resolver,
) {
d.m.Lock()
defer d.m.Unlock()
if _, ok := d.router[resolverName]; ok {
logger.Fatal(ctx, fmt.Sprintf("resolver '%s' is already registered in router", resolverName))
}
d.router[resolverName] = resolver
}
func (d *ResolveDispatcher) GetResolver(name string) (resolvers.Resolver, error) {
d.m.RLock()
defer d.m.RUnlock()
res, ok := d.router[name]
if !ok {
return nil, fmt.Errorf("resolver '%s' not found", name)
}
return res, nil
}
package resolvedispatcher
import (
"context"
"fmt"
"sync"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
"git.optclblast.xyz/draincloud/draincloud-core/internal/resolvers"
)
type ResolveDispatcher struct {
m sync.RWMutex
router map[string]resolvers.Resolver
}
func New() *ResolveDispatcher {
return &ResolveDispatcher{
router: map[string]resolvers.Resolver{},
}
}
func (d *ResolveDispatcher) RegisterResolver(
ctx context.Context,
resolverName string,
resolver resolvers.Resolver,
) {
d.m.Lock()
defer d.m.Unlock()
if _, ok := d.router[resolverName]; ok {
logger.Fatal(ctx, fmt.Sprintf("resolver '%s' is already registered in router", resolverName))
}
d.router[resolverName] = resolver
}
func (d *ResolveDispatcher) GetResolver(name string) (resolvers.Resolver, error) {
d.m.RLock()
defer d.m.RUnlock()
res, ok := d.router[name]
if !ok {
return nil, fmt.Errorf("resolver '%s' not found", name)
}
return res, nil
}

View File

@ -1,109 +1,109 @@
package auth
import (
"context"
"errors"
"fmt"
"log/slog"
"net/http"
"time"
"git.optclblast.xyz/draincloud/draincloud-core/internal/common"
"git.optclblast.xyz/draincloud/draincloud-core/internal/errs"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage"
models "git.optclblast.xyz/draincloud/draincloud-core/internal/storage/models/auth"
)
const (
AuthResolverV1Name = "auth.v1"
)
const (
csrfTokenCookie = "__Csrf_token"
sessionTokenCookie = "__Session_token"
)
type AuthResolver struct {
authStorage storage.AuthStorage
}
func NewAuthResolver(authStorage storage.AuthStorage) *AuthResolver {
return &AuthResolver{
authStorage: authStorage,
}
}
func (r *AuthResolver) Resolve(ctx context.Context, req *common.Request, _ any) error {
return r.authorize(ctx, req)
}
func (r *AuthResolver) GetRequiredResolveParams() []string {
return nil
}
func (p *AuthResolver) authorize(ctx context.Context, req *common.Request) error {
session, err := p.getSession(ctx, req)
if err != nil && !errors.Is(err, http.ErrNoCookie) {
return errs.ErrorUnauthorized
}
if session == nil {
return errs.ErrorUnauthorized
}
if err := validateSession(ctx, req, session); err != nil {
// TODO add audit log entry
return errs.ErrorUnauthorized
}
user, err := p.authStorage.GetUserByID(ctx, session.UserID)
if err != nil {
return fmt.Errorf("failed to fetch user by id: %w", err)
}
logger.Debug(ctx, "[authorize] user authorized", slog.String("session_id", session.ID.String()))
req.User = user
req.Session = session
return nil
}
func (d *AuthResolver) getSession(ctx context.Context, req *common.Request) (*models.Session, error) {
token, err := common.GetValue[string](req.Metadata, sessionTokenCookie)
if err != nil {
return nil, fmt.Errorf("failed to fetch session cookie from request: %w", err)
}
if len(token) == 0 {
return nil, fmt.Errorf("session token or csrf token is empty")
}
session, err := d.authStorage.GetSession(ctx, token)
if err != nil {
return nil, fmt.Errorf("failed to fetch session from repo: %w", err)
}
return session, nil
}
func validateSession(_ context.Context, req *common.Request, session *models.Session) error {
if session == nil {
return errs.ErrorAccessDenied
}
csrfToken, err := common.GetValue[string](req.Metadata, csrfTokenCookie)
if err != nil {
return fmt.Errorf("failed to fetch csrf cookie from request: %w", err)
}
if session.CsrfToken != csrfToken {
return errs.ErrorAccessDenied
}
if session.ExpiredAt.Before(time.Now()) {
return errs.ErrorSessionExpired
}
return nil
}
package auth
import (
"context"
"errors"
"fmt"
"log/slog"
"net/http"
"time"
"git.optclblast.xyz/draincloud/draincloud-core/internal/common"
"git.optclblast.xyz/draincloud/draincloud-core/internal/errs"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage"
models "git.optclblast.xyz/draincloud/draincloud-core/internal/storage/models/auth"
)
const (
AuthResolverV1Name = "auth.v1"
)
const (
csrfTokenCookie = "__Csrf_token"
sessionTokenCookie = "__Session_token"
)
type AuthResolver struct {
authStorage storage.AuthStorage
}
func NewAuthResolver(authStorage storage.AuthStorage) *AuthResolver {
return &AuthResolver{
authStorage: authStorage,
}
}
func (r *AuthResolver) Resolve(ctx context.Context, req *common.Request, _ any) error {
return r.authorize(ctx, req)
}
func (r *AuthResolver) GetRequiredResolveParams() []string {
return nil
}
func (p *AuthResolver) authorize(ctx context.Context, req *common.Request) error {
session, err := p.getSession(ctx, req)
if err != nil && !errors.Is(err, http.ErrNoCookie) {
return errs.ErrorUnauthorized
}
if session == nil {
return errs.ErrorUnauthorized
}
if err := validateSession(ctx, req, session); err != nil {
// TODO add audit log entry
return errs.ErrorUnauthorized
}
user, err := p.authStorage.GetUserByID(ctx, session.UserID)
if err != nil {
return fmt.Errorf("failed to fetch user by id: %w", err)
}
logger.Debug(ctx, "[authorize] user authorized", slog.String("session_id", session.ID.String()))
req.User = user
req.Session = session
return nil
}
func (d *AuthResolver) getSession(ctx context.Context, req *common.Request) (*models.Session, error) {
token, err := common.GetValue[string](req.Metadata, sessionTokenCookie)
if err != nil {
return nil, fmt.Errorf("failed to fetch session cookie from request: %w", err)
}
if len(token) == 0 {
return nil, fmt.Errorf("session token or csrf token is empty")
}
session, err := d.authStorage.GetSession(ctx, token)
if err != nil {
return nil, fmt.Errorf("failed to fetch session from repo: %w", err)
}
return session, nil
}
func validateSession(_ context.Context, req *common.Request, session *models.Session) error {
if session == nil {
return errs.ErrorAccessDenied
}
csrfToken, err := common.GetValue[string](req.Metadata, csrfTokenCookie)
if err != nil {
return fmt.Errorf("failed to fetch csrf cookie from request: %w", err)
}
if session.CsrfToken != csrfToken {
return errs.ErrorAccessDenied
}
if session.ExpiredAt.Before(time.Now()) {
return errs.ErrorSessionExpired
}
return nil
}

View File

@ -1,33 +1,33 @@
package pluginname
import (
"context"
"git.optclblast.xyz/draincloud/draincloud-core/internal/common"
"github.com/gin-gonic/gin"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
PluginNameResolverName = "plugin_name"
)
type PluginNameResolver struct{}
func (p *PluginNameResolver) Resolve(ctx context.Context, req *common.Request, rawReq any) error {
ginCtx, ok := rawReq.(*gin.Context)
if !ok {
return status.Errorf(codes.Internal, "invalid request type")
}
pluginName := ginCtx.Param("plugin_name")
if pluginName == "" {
return status.Error(codes.InvalidArgument, "plugin name is empty")
}
req.ResolveValues.Store(PluginNameResolverName, pluginName)
return nil
}
func (p *PluginNameResolver) GetRequiredResolveParams() []string {
return nil
}
package pluginname
import (
"context"
"git.optclblast.xyz/draincloud/draincloud-core/internal/common"
"github.com/gin-gonic/gin"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
PluginNameResolverName = "plugin_name"
)
type PluginNameResolver struct{}
func (p *PluginNameResolver) Resolve(ctx context.Context, req *common.Request, rawReq any) error {
ginCtx, ok := rawReq.(*gin.Context)
if !ok {
return status.Errorf(codes.Internal, "invalid request type")
}
pluginName := ginCtx.Param("plugin_name")
if pluginName == "" {
return status.Error(codes.InvalidArgument, "plugin name is empty")
}
req.ResolveValues.Store(PluginNameResolverName, pluginName)
return nil
}
func (p *PluginNameResolver) GetRequiredResolveParams() []string {
return nil
}

View File

@ -1,12 +1,12 @@
package resolvers
import (
"context"
"git.optclblast.xyz/draincloud/draincloud-core/internal/common"
)
type Resolver interface {
Resolve(ctx context.Context, req *common.Request, reqReq any) error
GetRequiredResolveParams() []string
}
package resolvers
import (
"context"
"git.optclblast.xyz/draincloud/draincloud-core/internal/common"
)
type Resolver interface {
Resolve(ctx context.Context, req *common.Request, reqReq any) error
GetRequiredResolveParams() []string
}

View File

@ -1,6 +1,6 @@
package seal
// TODO
type SealResolver struct {
wardenClient any
}
package seal
// TODO
type SealResolver struct {
wardenClient any
}

View File

@ -1,19 +1,19 @@
package audit
import (
"context"
"github.com/jackc/pgx/v5"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage/models/audit"
)
type Repository struct {
db *pgx.Conn
}
func (r *Repository) AddEntry(ctx context.Context, entry audit.AuditLogEntry) error {
logger.Warn(ctx, "[Repository][AddEntry] not implemented yet!")
return nil
}
package audit
import (
"context"
"github.com/jackc/pgx/v5"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage/models/audit"
)
type Repository struct {
db *pgx.Conn
}
func (r *Repository) AddEntry(ctx context.Context, entry audit.AuditLogEntry) error {
logger.Warn(ctx, "[Repository][AddEntry] not implemented yet!")
return nil
}

View File

@ -1,86 +1,86 @@
package fs
import (
"context"
"fmt"
"os"
"sync"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
)
type Storage struct {
lm *sync.Map
dir string
// If file is not belongs to current node FS - redirect to corresponding node
// cluster DrainCloudCluster
}
func NewFSStorage(dir string) *Storage {
return &Storage{
lm: &sync.Map{},
dir: dir,
}
}
func (s *Storage) GetFile(ctx context.Context, id int64) (*os.File, error) {
tx := lockFile(s.lm, id)
defer unlockFile(s.lm, id, tx)
file, err := os.Open(getFilePath(s.dir, id))
if err != nil {
return nil, fmt.Errorf("failed to open file: %w", err)
}
defer func() {
if err = file.Close(); err != nil {
logger.Error(ctx, "[getFile] close error", logger.Err(err))
}
}()
return file, nil
}
func (s *Storage) SaveBlob(ctx context.Context, id int64, data []byte) error {
tx := lockFile(s.lm, id)
defer unlockFile(s.lm, id, tx)
file, err := os.Open(getFilePath(s.dir, id))
if err != nil {
return fmt.Errorf("failed to open file: %w", err)
}
defer func() {
if err = file.Close(); err != nil {
logger.Error(ctx, "[saveFile] close error", logger.Err(err))
}
}()
if _, err = file.Write(data); err != nil {
return fmt.Errorf("failed to write data to file: %w", err)
}
return nil
}
func (s *Storage) DeleteFile(ctx context.Context, id int64) error {
tx := lockFile(s.lm, id)
defer unlockFile(s.lm, id, tx)
return nil
}
func getFilePath(dir string, id int64) string {
return fmt.Sprintf("%s/%v", dir, id)
}
func lockFile(lm *sync.Map, id int64) sync.Locker {
_m := &sync.Mutex{}
many, _ := lm.LoadOrStore(id, _m)
_m, _ = many.(*sync.Mutex)
_m.Lock()
return _m
}
func unlockFile(lm *sync.Map, id int64, tx sync.Locker) {
tx.Unlock()
lm.Delete(id)
}
package fs
import (
"context"
"fmt"
"os"
"sync"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
)
type Storage struct {
lm *sync.Map
dir string
// If file is not belongs to current node FS - redirect to corresponding node
// cluster DrainCloudCluster
}
func NewFSStorage(dir string) *Storage {
return &Storage{
lm: &sync.Map{},
dir: dir,
}
}
func (s *Storage) GetFile(ctx context.Context, id int64) (*os.File, error) {
tx := lockFile(s.lm, id)
defer unlockFile(s.lm, id, tx)
file, err := os.Open(getFilePath(s.dir, id))
if err != nil {
return nil, fmt.Errorf("failed to open file: %w", err)
}
defer func() {
if err = file.Close(); err != nil {
logger.Error(ctx, "[getFile] close error", logger.Err(err))
}
}()
return file, nil
}
func (s *Storage) SaveBlob(ctx context.Context, id int64, data []byte) error {
tx := lockFile(s.lm, id)
defer unlockFile(s.lm, id, tx)
file, err := os.Open(getFilePath(s.dir, id))
if err != nil {
return fmt.Errorf("failed to open file: %w", err)
}
defer func() {
if err = file.Close(); err != nil {
logger.Error(ctx, "[saveFile] close error", logger.Err(err))
}
}()
if _, err = file.Write(data); err != nil {
return fmt.Errorf("failed to write data to file: %w", err)
}
return nil
}
func (s *Storage) DeleteFile(ctx context.Context, id int64) error {
tx := lockFile(s.lm, id)
defer unlockFile(s.lm, id, tx)
return nil
}
func getFilePath(dir string, id int64) string {
return fmt.Sprintf("%s/%v", dir, id)
}
func lockFile(lm *sync.Map, id int64) sync.Locker {
_m := &sync.Mutex{}
many, _ := lm.LoadOrStore(id, _m)
_m, _ = many.(*sync.Mutex)
_m.Lock()
return _m
}
func unlockFile(lm *sync.Map, id int64, tx sync.Locker) {
tx.Unlock()
lm.Delete(id)
}

View File

@ -1,39 +1,40 @@
package storage
import (
"context"
"os"
auditmodels "git.optclblast.xyz/draincloud/draincloud-core/internal/storage/models/audit"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage/models/auth"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage/models/files"
"github.com/google/uuid"
)
type Database interface {
AuthStorage
}
type AuthStorage interface {
AddUser(ctx context.Context, id uuid.UUID, login string, username string, passwordHash []byte) error
GetUserByLogin(ctx context.Context, login string) (*auth.User, error)
GetUserByID(ctx context.Context, id uuid.UUID) (*auth.User, error)
AddSession(ctx context.Context, ses *auth.Session) (uuid.UUID, error)
GetSession(ctx context.Context, sessionToken string) (*auth.Session, error)
RemoveSession(ctx context.Context, id uuid.UUID) error
}
type AuthAuditLogStorage interface {
AddEntry(ctx context.Context, entry auditmodels.AuditLogEntry) error
}
type MetaStorage interface {
SaveMetadata(ctx context.Context, meta files.FileMetadata) (uuid.UUID, error)
}
type BlobStorage interface {
GetFile(ctx context.Context, id uuid.UUID) (*os.File, error)
SaveBlob(ctx context.Context, id uuid.UUID, data []byte) error
DeleteFile(ctx context.Context, id uuid.UUID) error
}
package storage
import (
"context"
"os"
auditmodels "git.optclblast.xyz/draincloud/draincloud-core/internal/storage/models/audit"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage/models/auth"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage/models/files"
"github.com/google/uuid"
)
type Database interface {
AuthStorage
}
type AuthStorage interface {
AddUser(ctx context.Context, id uuid.UUID, login string, username string, passwordHash []byte) error
GetUserByLogin(ctx context.Context, login string) (*auth.User, error)
GetUserByID(ctx context.Context, id uuid.UUID) (*auth.User, error)
AddSession(ctx context.Context, ses *auth.Session) (uuid.UUID, error)
GetSession(ctx context.Context, sessionToken string) (*auth.Session, error)
RemoveSession(ctx context.Context, id uuid.UUID) error
}
type AuthAuditLogStorage interface {
AddEntry(ctx context.Context, entry auditmodels.AuditLogEntry) error
}
type MetaStorage interface {
SaveMetadata(ctx context.Context, meta files.FileMetadata) (uuid.UUID, error)
}
type BlobStorage interface {
GetFile(ctx context.Context, id uuid.UUID) (*os.File, error)
SaveBlob(ctx context.Context, id uuid.UUID, data []byte) error
DeleteFile(ctx context.Context, id uuid.UUID) error
GetFSLink(ctx context.Context, fileID uuid.UUID) (string, error)
}

View File

@ -1,49 +1,49 @@
package audit
import "time"
type EventType int
const (
EventUnspecified EventType = iota
EventSuccessfullLogin
EventFailedLogin
EventSuccessfullRegister
EventFailedRegister
EventSuccessfullAuth
EventFailedAuth
EventUserUpdated
)
type Severity int
const (
SeverityAlert = 0
SeverityWarning = 10
SeverityInfo = 100
SeverityNotice = 200
)
type Actor struct {
ActorSysName string
RemoteIP string
ID int64
}
const (
ActorDrainCloudCore = "_actor_draincloud_core"
ActorUser = "user"
)
type AuditLogEntry struct {
EventType EventType
// Who caused changes
Actor Actor
Severity Severity
SessionID int64
CreatedAt time.Time
// What changed
Object string
// How it was changed
Action string
}
package audit
import "time"
type EventType int
const (
EventUnspecified EventType = iota
EventSuccessfullLogin
EventFailedLogin
EventSuccessfullRegister
EventFailedRegister
EventSuccessfullAuth
EventFailedAuth
EventUserUpdated
)
type Severity int
const (
SeverityAlert = 0
SeverityWarning = 10
SeverityInfo = 100
SeverityNotice = 200
)
type Actor struct {
ActorSysName string
RemoteIP string
ID int64
}
const (
ActorDrainCloudCore = "_actor_draincloud_core"
ActorUser = "user"
)
type AuditLogEntry struct {
EventType EventType
// Who caused changes
Actor Actor
Severity Severity
SessionID int64
CreatedAt time.Time
// What changed
Object string
// How it was changed
Action string
}

View File

@ -1,25 +1,25 @@
package auth
import (
"time"
"github.com/google/uuid"
)
type Session struct {
ID uuid.UUID
SessionToken string
CsrfToken string
UserID uuid.UUID
CreatedAt time.Time
ExpiredAt time.Time
}
type User struct {
ID uuid.UUID
Username string
Login string
PasswordHash []byte
CreatedAt time.Time
UpdatedAt time.Time
}
package auth
import (
"time"
"github.com/google/uuid"
)
type Session struct {
ID uuid.UUID
SessionToken string
CsrfToken string
UserID uuid.UUID
CreatedAt time.Time
ExpiredAt time.Time
}
type User struct {
ID uuid.UUID
Username string
Login string
PasswordHash []byte
CreatedAt time.Time
UpdatedAt time.Time
}

View File

@ -1,13 +1,13 @@
package files
import "github.com/google/uuid"
type FileMetadata struct {
Id uuid.UUID
Name string
UserID int64
Ext string
Type string
FSLink string
Size int64
}
package files
import "github.com/google/uuid"
type FileMetadata struct {
Id uuid.UUID
Name string
UserID int64
Ext string
Type string
FSLink string
Size int64
}

View File

@ -1,147 +1,147 @@
package postgres
import (
"context"
"database/sql"
"fmt"
"log/slog"
"time"
"git.optclblast.xyz/draincloud/draincloud-core/internal/closer"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage/models/auth"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
)
type Database struct {
db *pgx.Conn
cluster *ShardCluster
}
func New(ctx context.Context, dsn string) *Database {
db, err := pgx.Connect(ctx, dsn)
if err != nil {
logger.Fatal(ctx, "failed to connect to postgres", logger.Err(err))
}
closer.Add(func() error {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
return db.Close(ctx)
})
return &Database{db: db}
}
type dbtx interface {
Exec(ctx context.Context, stmt string, args ...any) (pgconn.CommandTag, error)
QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
}
func (d *Database) AddUser(ctx context.Context, id uuid.UUID, login string, username string, passwordHash []byte) error {
return addUser(ctx, d.db, id, login, username, passwordHash)
}
func (d *Database) GetUserByID(ctx context.Context, id uuid.UUID) (*auth.User, error) {
return getUserByID(ctx, d.db, id)
}
func (d *Database) GetUserByLogin(ctx context.Context, login string) (*auth.User, error) {
return getUserByLogin(ctx, d.db, login)
}
func (d *Database) AddSession(ctx context.Context, ses *auth.Session) (uuid.UUID, error) {
return addSession(ctx, d.db, ses)
}
func (d *Database) GetSession(ctx context.Context, sessionToken string) (*auth.Session, error) {
const stmt = `SELECT
s.id, s.session_token, s.csrf_token, s.user_id, s.created_at, s.expired_at
FROM sessions as s
WHERE s.session_token = $1;`
row := d.db.QueryRow(ctx, stmt, sessionToken)
var (
id uuid.UUID
sesToken, csrfToken string
userID uuid.UUID
createdAt sql.NullTime
expiredAt sql.NullTime
)
if err := row.Scan(&id, &sesToken, &csrfToken, &userID, &createdAt, &expiredAt); err != nil {
return nil, err
}
return &auth.Session{
ID: id,
SessionToken: sesToken,
CsrfToken: csrfToken,
UserID: userID,
CreatedAt: createdAt.Time,
ExpiredAt: expiredAt.Time,
}, nil
}
func (d *Database) RemoveSession(ctx context.Context, id uuid.UUID) error {
const stmt = `DELETE FROM sessions WHERE id = $1;`
_, err := d.db.Exec(ctx, stmt, id)
return err
}
func (d *Database) RemoveExpiredSessions(ctx context.Context) error {
const stmt = `DELETE FROM sessions WHERE expired_at < $1;`
res, err := d.db.Exec(ctx, stmt, time.Now())
logger.Notice(ctx, "[Database][RemoveExpiredSessions] sessions cleanup", slog.Int64("removed", res.RowsAffected()))
return err
}
func addUser(ctx context.Context, conn dbtx, id uuid.UUID, login string, username string, passwordHash []byte) error {
const stmt = `INSERT INTO users (id,login,username,password)
VALUES ($1,$2,$3,$4);`
_, err := conn.Exec(ctx, stmt, id, login, username, passwordHash)
if err != nil {
return fmt.Errorf("failed to insert user data into users table: %w", err)
}
return nil
}
func getUserByID(ctx context.Context, conn dbtx, id uuid.UUID) (*auth.User, error) {
const stmt = `SELECT * FROM users WHERE id = $1 LIMIT 1`
u := new(auth.User)
row := conn.QueryRow(ctx, stmt, id)
if err := row.Scan(&u.ID, &u.Login, &u.Username, &u.PasswordHash, &u.CreatedAt, &u.UpdatedAt); err != nil {
return nil, fmt.Errorf("failed to fetch user by id: %w", err)
}
return u, nil
}
func getUserByLogin(ctx context.Context, conn dbtx, login string) (*auth.User, error) {
const stmt = `SELECT * FROM users WHERE login = $1 LIMIT 1`
u := new(auth.User)
row := conn.QueryRow(ctx, stmt, login)
if err := row.Scan(&u.ID, &u.Login, &u.Username, &u.PasswordHash, &u.CreatedAt, &u.UpdatedAt); err != nil {
return nil, fmt.Errorf("failed to fetch user by login: %w", err)
}
return u, nil
}
func addSession(ctx context.Context, conn dbtx, session *auth.Session) (uuid.UUID, error) {
const stmt = `INSERT INTO sessions (id,session_token, csrf_token, user_id,
created_at, expired_at) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id;`
var id uuid.UUID
row := conn.QueryRow(ctx, stmt, session.ID, session.SessionToken, session.CsrfToken, session.UserID, session.CreatedAt, session.ExpiredAt)
if err := row.Scan(&id); err != nil {
return uuid.Nil, fmt.Errorf("failed to insert new session: %w", err)
}
return id, nil
}
package postgres
import (
"context"
"database/sql"
"fmt"
"log/slog"
"time"
"git.optclblast.xyz/draincloud/draincloud-core/internal/closer"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
"git.optclblast.xyz/draincloud/draincloud-core/internal/storage/models/auth"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
)
type Database struct {
db *pgx.Conn
cluster *ShardCluster
}
func New(ctx context.Context, dsn string) *Database {
db, err := pgx.Connect(ctx, dsn)
if err != nil {
logger.Fatal(ctx, "failed to connect to postgres", logger.Err(err))
}
closer.Add(func() error {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
return db.Close(ctx)
})
return &Database{db: db}
}
type dbtx interface {
Exec(ctx context.Context, stmt string, args ...any) (pgconn.CommandTag, error)
QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
}
func (d *Database) AddUser(ctx context.Context, id uuid.UUID, login string, username string, passwordHash []byte) error {
return addUser(ctx, d.db, id, login, username, passwordHash)
}
func (d *Database) GetUserByID(ctx context.Context, id uuid.UUID) (*auth.User, error) {
return getUserByID(ctx, d.db, id)
}
func (d *Database) GetUserByLogin(ctx context.Context, login string) (*auth.User, error) {
return getUserByLogin(ctx, d.db, login)
}
func (d *Database) AddSession(ctx context.Context, ses *auth.Session) (uuid.UUID, error) {
return addSession(ctx, d.db, ses)
}
func (d *Database) GetSession(ctx context.Context, sessionToken string) (*auth.Session, error) {
const stmt = `SELECT
s.id, s.session_token, s.csrf_token, s.user_id, s.created_at, s.expired_at
FROM sessions as s
WHERE s.session_token = $1;`
row := d.db.QueryRow(ctx, stmt, sessionToken)
var (
id uuid.UUID
sesToken, csrfToken string
userID uuid.UUID
createdAt sql.NullTime
expiredAt sql.NullTime
)
if err := row.Scan(&id, &sesToken, &csrfToken, &userID, &createdAt, &expiredAt); err != nil {
return nil, err
}
return &auth.Session{
ID: id,
SessionToken: sesToken,
CsrfToken: csrfToken,
UserID: userID,
CreatedAt: createdAt.Time,
ExpiredAt: expiredAt.Time,
}, nil
}
func (d *Database) RemoveSession(ctx context.Context, id uuid.UUID) error {
const stmt = `DELETE FROM sessions WHERE id = $1;`
_, err := d.db.Exec(ctx, stmt, id)
return err
}
func (d *Database) RemoveExpiredSessions(ctx context.Context) error {
const stmt = `DELETE FROM sessions WHERE expired_at < $1;`
res, err := d.db.Exec(ctx, stmt, time.Now())
logger.Notice(ctx, "[Database][RemoveExpiredSessions] sessions cleanup", slog.Int64("removed", res.RowsAffected()))
return err
}
func addUser(ctx context.Context, conn dbtx, id uuid.UUID, login string, username string, passwordHash []byte) error {
const stmt = `INSERT INTO users (id,login,username,password)
VALUES ($1,$2,$3,$4);`
_, err := conn.Exec(ctx, stmt, id, login, username, passwordHash)
if err != nil {
return fmt.Errorf("failed to insert user data into users table: %w", err)
}
return nil
}
func getUserByID(ctx context.Context, conn dbtx, id uuid.UUID) (*auth.User, error) {
const stmt = `SELECT * FROM users WHERE id = $1 LIMIT 1`
u := new(auth.User)
row := conn.QueryRow(ctx, stmt, id)
if err := row.Scan(&u.ID, &u.Login, &u.Username, &u.PasswordHash, &u.CreatedAt, &u.UpdatedAt); err != nil {
return nil, fmt.Errorf("failed to fetch user by id: %w", err)
}
return u, nil
}
func getUserByLogin(ctx context.Context, conn dbtx, login string) (*auth.User, error) {
const stmt = `SELECT * FROM users WHERE login = $1 LIMIT 1`
u := new(auth.User)
row := conn.QueryRow(ctx, stmt, login)
if err := row.Scan(&u.ID, &u.Login, &u.Username, &u.PasswordHash, &u.CreatedAt, &u.UpdatedAt); err != nil {
return nil, fmt.Errorf("failed to fetch user by login: %w", err)
}
return u, nil
}
func addSession(ctx context.Context, conn dbtx, session *auth.Session) (uuid.UUID, error) {
const stmt = `INSERT INTO sessions (id,session_token, csrf_token, user_id,
created_at, expired_at) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id;`
var id uuid.UUID
row := conn.QueryRow(ctx, stmt, session.ID, session.SessionToken, session.CsrfToken, session.UserID, session.CreatedAt, session.ExpiredAt)
if err := row.Scan(&id); err != nil {
return uuid.Nil, fmt.Errorf("failed to insert new session: %w", err)
}
return id, nil
}

View File

@ -1,41 +1,41 @@
package postgres
import (
"context"
"hash/crc32"
"log/slog"
"sync"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
)
type ShardMap = map[uint32]*pgx.ConnConfig
type ShardCluster struct {
m sync.Mutex
shards []*pgx.Conn
}
func NewShardCluster(ctx context.Context, shardMap ShardMap) *ShardCluster {
shards := make([]*pgx.Conn, len(shardMap))
for n, cfg := range shardMap {
conn, err := pgx.ConnectConfig(ctx, cfg)
if err != nil {
logger.Fatal(ctx, "failed to connect to shard", slog.Uint64("num", uint64(n)), logger.Err(err))
}
shards[n] = conn
}
return &ShardCluster{shards: shards}
}
func (c *ShardCluster) PickShard(n uint32) *pgx.Conn {
c.m.Lock()
defer c.m.Unlock()
return c.shards[n]
}
func UUIDShardFn(id uuid.UUID, numShards uint32) uint32 {
return crc32.ChecksumIEEE(id[:]) % numShards
}
package postgres
import (
"context"
"hash/crc32"
"log/slog"
"sync"
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
)
type ShardMap = map[uint32]*pgx.ConnConfig
type ShardCluster struct {
m sync.Mutex
shards []*pgx.Conn
}
func NewShardCluster(ctx context.Context, shardMap ShardMap) *ShardCluster {
shards := make([]*pgx.Conn, len(shardMap))
for n, cfg := range shardMap {
conn, err := pgx.ConnectConfig(ctx, cfg)
if err != nil {
logger.Fatal(ctx, "failed to connect to shard", slog.Uint64("num", uint64(n)), logger.Err(err))
}
shards[n] = conn
}
return &ShardCluster{shards: shards}
}
func (c *ShardCluster) PickShard(n uint32) *pgx.Conn {
c.m.Lock()
defer c.m.Unlock()
return c.shards[n]
}
func UUIDShardFn(id uuid.UUID, numShards uint32) uint32 {
return crc32.ChecksumIEEE(id[:]) % numShards
}

View File

@ -1,66 +1,66 @@
package storage
import (
"context"
"database/sql"
"errors"
"fmt"
"github.com/jmoiron/sqlx"
)
type txKey struct{}
var ctxKey txKey = txKey{}
type DBTX interface {
sqlx.Ext
sqlx.ExtContext
}
func Transaction(ctx context.Context, db *sqlx.DB, fn func(context.Context) error) (err error) {
tx := txFromContext(ctx)
if tx == nil {
tx, err = db.BeginTxx(ctx, &sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
})
if err != nil {
return fmt.Errorf("failed to begin tx: %w", err)
}
defer func() {
if err == nil {
err = tx.Commit()
}
if err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
err = errors.Join(err, rbErr)
}
}
}()
ctx = txContext(ctx, tx)
}
return fn(ctx)
}
func Conn(ctx context.Context, db DBTX) DBTX {
if tx := txFromContext(ctx); tx != nil {
return tx
}
return db
}
func txFromContext(ctx context.Context) *sqlx.Tx {
if tx, ok := ctx.Value(ctxKey).(*sqlx.Tx); ok {
return tx
}
return nil
}
func txContext(parent context.Context, tx *sqlx.Tx) context.Context {
return context.WithValue(parent, tx, ctxKey)
}
package storage
import (
"context"
"database/sql"
"errors"
"fmt"
"github.com/jmoiron/sqlx"
)
type txKey struct{}
var ctxKey txKey = txKey{}
type DBTX interface {
sqlx.Ext
sqlx.ExtContext
}
func Transaction(ctx context.Context, db *sqlx.DB, fn func(context.Context) error) (err error) {
tx := txFromContext(ctx)
if tx == nil {
tx, err = db.BeginTxx(ctx, &sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
})
if err != nil {
return fmt.Errorf("failed to begin tx: %w", err)
}
defer func() {
if err == nil {
err = tx.Commit()
}
if err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
err = errors.Join(err, rbErr)
}
}
}()
ctx = txContext(ctx, tx)
}
return fn(ctx)
}
func Conn(ctx context.Context, db DBTX) DBTX {
if tx := txFromContext(ctx); tx != nil {
return tx
}
return db
}
func txFromContext(ctx context.Context) *sqlx.Tx {
if tx, ok := ctx.Value(ctxKey).(*sqlx.Tx); ok {
return tx
}
return nil
}
func txContext(parent context.Context, tx *sqlx.Tx) context.Context {
return context.WithValue(parent, tx, ctxKey)
}

View File

@ -1,88 +1,88 @@
// TODO wtf?
package pool
import (
"net"
"sync"
"sync/atomic"
)
var (
defaultMaxConns = 20
defaultStrategy = &RoundrobinStrategy{
lastSelected: initialRoundrobinAtomic(),
}
)
func initialRoundrobinAtomic() atomic.Int64 {
a := atomic.Int64{}
a.Store(-1)
return a
}
type ConnSelectionStrategy interface {
Select() int
}
type RoundrobinStrategy struct {
lastSelected atomic.Int64
}
func (r *RoundrobinStrategy) Select() int {
return int(r.lastSelected.Add(1))
}
type ConnPool struct {
m sync.RWMutex
strategy ConnSelectionStrategy
conns []net.Conn
}
type newConnPoolOpts struct {
strategy ConnSelectionStrategy
maxConns int
}
func newNewConnPoolOpts() newConnPoolOpts {
return newConnPoolOpts{
strategy: defaultStrategy,
maxConns: defaultMaxConns,
}
}
type NewConnPoolOpt func(p *newConnPoolOpts)
func WithStrategy(s ConnSelectionStrategy) NewConnPoolOpt {
return func(p *newConnPoolOpts) {
p.strategy = s
}
}
func WithMaxConns(mc int) NewConnPoolOpt {
return func(p *newConnPoolOpts) {
p.maxConns = mc
}
}
func NewConnPool(opts ...NewConnPoolOpt) *ConnPool {
o := newNewConnPoolOpts()
for _, opt := range opts {
opt(&o)
}
return &ConnPool{
conns: make([]net.Conn, 0),
strategy: o.strategy,
}
}
func (p *ConnPool) SelectConn() net.Conn {
p.m.RLock()
defer p.m.RUnlock()
return p.conns[p.strategy.Select()]
}
func (p *ConnPool) AddConn(conn net.Conn) {
p.m.Lock()
defer p.m.Unlock()
p.conns = append(p.conns, conn)
}
// TODO wtf?
package pool
import (
"net"
"sync"
"sync/atomic"
)
var (
defaultMaxConns = 20
defaultStrategy = &RoundrobinStrategy{
lastSelected: initialRoundrobinAtomic(),
}
)
func initialRoundrobinAtomic() atomic.Int64 {
a := atomic.Int64{}
a.Store(-1)
return a
}
type ConnSelectionStrategy interface {
Select() int
}
type RoundrobinStrategy struct {
lastSelected atomic.Int64
}
func (r *RoundrobinStrategy) Select() int {
return int(r.lastSelected.Add(1))
}
type ConnPool struct {
m sync.RWMutex
strategy ConnSelectionStrategy
conns []net.Conn
}
type newConnPoolOpts struct {
strategy ConnSelectionStrategy
maxConns int
}
func newNewConnPoolOpts() newConnPoolOpts {
return newConnPoolOpts{
strategy: defaultStrategy,
maxConns: defaultMaxConns,
}
}
type NewConnPoolOpt func(p *newConnPoolOpts)
func WithStrategy(s ConnSelectionStrategy) NewConnPoolOpt {
return func(p *newConnPoolOpts) {
p.strategy = s
}
}
func WithMaxConns(mc int) NewConnPoolOpt {
return func(p *newConnPoolOpts) {
p.maxConns = mc
}
}
func NewConnPool(opts ...NewConnPoolOpt) *ConnPool {
o := newNewConnPoolOpts()
for _, opt := range opts {
opt(&o)
}
return &ConnPool{
conns: make([]net.Conn, 0),
strategy: o.strategy,
}
}
func (p *ConnPool) SelectConn() net.Conn {
p.m.RLock()
defer p.m.RUnlock()
return p.conns[p.strategy.Select()]
}
func (p *ConnPool) AddConn(conn net.Conn) {
p.m.Lock()
defer p.m.Unlock()
p.conns = append(p.conns, conn)
}

View File

@ -1,71 +1,71 @@
-- +goose Up
-- +goose StatementBegin
SELECT 'up SQL query';
-- Users as auth data
create table if not exists users (
id uuid primary key,
username text default null,
login text not null unique,
password bytea not null,
created_at timestamptz default current_timestamp,
updated_at timestamptz default current_timestamp
);
create index idx_users_login on users (login);
create index idx_users_username on users (username);
-- Sessions and auth data
create table sessions (
id uuid primary key,
session_token varchar(200) not null unique,
csrf_token varchar(200) not null unique,
user_id uuid references users(id),
created_at timestamp default current_timestamp,
expired_at timestamp not null
);
create index if not exists idx_sessions_session_token_csrf_token on sessions (session_token, csrf_token);
-- Files
create table files_metadata (
id uuid primary key,
name text not null,
fslink text not null,
size bigint not null,
ext text not null,
owner_id uuid not null,
parent_dir uuid not null,
created_at timestamptz default current_timestamp,
updated_at timestamptz default null,
deleted_at timestamptz default null
);
create index idx_fm_owner_id on files_metadata(owner_id);
create index idx_fm_owner_id_parent_dir on files_metadata(owner_id, parent_dir);
create table directories (
id uuid primary key,
name text not null,
owner_id uuid not null,
parent_dir uuid not null,
created_at timestamptz default current_timestamp,
updated_at timestamptz default null,
deleted_at timestamptz default null
);
create index idx_directories_owner_id_parent_dir on directories(owner_id, parent_dir);
create table directory_users_access (
id uuid primary key,
dir_id uuid not null,
user_id uuid not null,
assess_flag integer,
created_at timestamptz default current_timestamp,
updated_at timestamptz default null
);
create index idx_dua_owner_id_parent_dir on directories(owner_id, parent_dir);
-- +goose Up
-- +goose StatementBegin
SELECT 'up SQL query';
-- Users as auth data
create table if not exists users (
id uuid primary key,
username text default null,
login text not null unique,
password bytea not null,
created_at timestamptz default current_timestamp,
updated_at timestamptz default current_timestamp
);
create index idx_users_login on users (login);
create index idx_users_username on users (username);
-- Sessions and auth data
create table sessions (
id uuid primary key,
session_token varchar(200) not null unique,
csrf_token varchar(200) not null unique,
user_id uuid references users(id),
created_at timestamp default current_timestamp,
expired_at timestamp not null
);
create index if not exists idx_sessions_session_token_csrf_token on sessions (session_token, csrf_token);
-- Files
create table files_metadata (
id uuid primary key,
name text not null,
fslink text not null,
size bigint not null,
ext text not null,
owner_id uuid not null,
parent_dir uuid not null,
created_at timestamptz default current_timestamp,
updated_at timestamptz default null,
deleted_at timestamptz default null
);
create index idx_fm_owner_id on files_metadata(owner_id);
create index idx_fm_owner_id_parent_dir on files_metadata(owner_id, parent_dir);
create table directories (
id uuid primary key,
name text not null,
owner_id uuid not null,
parent_dir uuid not null,
created_at timestamptz default current_timestamp,
updated_at timestamptz default null,
deleted_at timestamptz default null
);
create index idx_directories_owner_id_parent_dir on directories(owner_id, parent_dir);
create table directory_users_access (
id uuid primary key,
dir_id uuid not null,
user_id uuid not null,
assess_flag integer,
created_at timestamptz default current_timestamp,
updated_at timestamptz default null
);
create index idx_dua_owner_id_parent_dir on directories(owner_id, parent_dir);