diff --git a/backend/.gitignore b/backend/.gitignore index 25e1a8d..fb186e4 100644 --- a/backend/.gitignore +++ b/backend/.gitignore @@ -1,2 +1,3 @@ build/blockd -*__debug_* \ No newline at end of file +*__debug_* +rabbitmq \ No newline at end of file diff --git a/backend/cmd/main.go b/backend/cmd/main.go index 430a647..2c58ede 100644 --- a/backend/cmd/main.go +++ b/backend/cmd/main.go @@ -41,6 +41,10 @@ func main() { Name: "chain-api-url", Value: "http://localhost:3000", }, + &cli.IntFlag{ + Name: "num-internal-workers", + Value: 1, + }, // rest &cli.StringFlag{ @@ -73,16 +77,6 @@ func main() { &cli.BoolFlag{ Name: "db-enable-tls", }, - - &cli.StringFlag{ - Name: "cache-host", - }, - &cli.StringFlag{ - Name: "cache-user", - }, - &cli.StringFlag{ - Name: "cache-secret", - }, }, Action: func(c *cli.Context) error { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) @@ -106,10 +100,6 @@ func main() { Database: c.String("db-database"), User: c.String("db-user"), Secret: c.String("db-secret"), - - CacheHost: c.String("cache-host"), - CacheUser: c.String("cache-user"), - CacheSecret: c.String("cache-secret"), }, ChainAPI: config.ChainAPIConfig{ Host: c.String("chain-api-url"), diff --git a/backend/docker-compose.yaml b/backend/docker-compose.yaml index 1b47e70..a62b058 100644 --- a/backend/docker-compose.yaml +++ b/backend/docker-compose.yaml @@ -62,64 +62,68 @@ services: start_period: 5s profiles: [blockd, database, noback] - blockd-cache: - container_name: blockd-cache - image: redis:7.2.4 + blockd-rmq: + image: rabbitmq:3.13.3-management + ports: + - 15672:15672 + - 5672:5672 + hostname: blockd-rmq restart: always - networks: - - blockd-net - ports: - - 6379:6379 - profiles: [blockd, database, noback] - - prometheus: - image: prom/prometheus - container_name: prometheus - command: - - '--config.file=/etc/prometheus/prometheus.yml' - ports: - - 9091:9090 - restart: unless-stopped - networks: - - blockd-net - volumes: - - ./prometheus:/etc/prometheus - - prometheus_data:/prometheus - profiles: [metrics] - - grafana: - image: grafana/grafana - container_name: grafana - ports: - - 3112:3000 - restart: unless-stopped - networks: - - blockd-net environment: - - GF_SECURITY_ADMIN_USER=admin - - GF_SECURITY_ADMIN_PASSWORD=grafana + - RABBITMQ_DEFAULT_USER=blockd + - RABBITMQ_DEFAULT_PASS=blockd + - RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=-rabbit disk_free_limit 2147483648 volumes: - - ./grafana:/etc/grafana/provisioning/datasources - profiles: [metrics] + - ./rabbitmq:/var/lib/rabbitmq - syslog: - image: linuxserver/syslog-ng:3.36.1 - container_name: syslog-ng - environment: - - PUID=0 - - PGID=0 - - TZ=UTC - volumes: - - /srv/syslog/config:/config - - /srv/syslog/logs:/var/log - ports: - - 514:5514/udp - - 601:6601/tcp - - 6514:6514/tcp - restart: unless-stopped - networks: - - syslog - logging: - driver: "json-file" - profiles: [metrics] + # prometheus: + # image: prom/prometheus + # container_name: prometheus + # command: + # - '--config.file=/etc/prometheus/prometheus.yml' + # ports: + # - 9091:9090 + # restart: unless-stopped + # networks: + # - blockd-net + # volumes: + # - ./prometheus:/etc/prometheus + # - prometheus_data:/prometheus + # profiles: [metrics] + + # grafana: + # image: grafana/grafana + # container_name: grafana + # ports: + # - 3112:3000 + # restart: unless-stopped + # networks: + # - blockd-net + # environment: + # - GF_SECURITY_ADMIN_USER=admin + # - GF_SECURITY_ADMIN_PASSWORD=grafana + # volumes: + # - ./grafana:/etc/grafana/provisioning/datasources + # profiles: [metrics] + + # syslog: + # image: linuxserver/syslog-ng:3.36.1 + # container_name: syslog-ng + # environment: + # - PUID=0 + # - PGID=0 + # - TZ=UTC + # volumes: + # - /srv/syslog/config:/config + # - /srv/syslog/logs:/var/log + # ports: + # - 514:5514/udp + # - 601:6601/tcp + # - 6514:6514/tcp + # restart: unless-stopped + # networks: + # - syslog + # logging: + # driver: "json-file" + # profiles: [metrics] diff --git a/backend/internal/infrastructure/queue/beanstalk/queue.go b/backend/internal/infrastructure/queue/beanstalk/queue.go deleted file mode 100644 index d034d65..0000000 --- a/backend/internal/infrastructure/queue/beanstalk/queue.go +++ /dev/null @@ -1 +0,0 @@ -package beanstalk diff --git a/backend/internal/infrastructure/queue/queue.go b/backend/internal/infrastructure/queue/queue.go index a94e626..ace4b1c 100644 --- a/backend/internal/infrastructure/queue/queue.go +++ b/backend/internal/infrastructure/queue/queue.go @@ -3,9 +3,6 @@ package queue import ( "context" "fmt" - "time" - - "github.com/google/uuid" ) type QueueDriver interface { @@ -45,8 +42,9 @@ func (q *Queue[T]) Pop(ctx context.Context) (*T, error) { return nil, fmt.Errorf("queue: error unexpected job type") } -type Job[T any] struct { - ID uuid.UUID - Payload *T - CreatedAt time.Time +type Job struct { + ID string + IdempotencyKey string + Payload any + CreatedAt int64 } diff --git a/backend/internal/infrastructure/queue/rmq/queue.go b/backend/internal/infrastructure/queue/rmq/queue.go index c1f91ec..5061d73 100644 --- a/backend/internal/infrastructure/queue/rmq/queue.go +++ b/backend/internal/infrastructure/queue/rmq/queue.go @@ -33,3 +33,7 @@ func NewWithConnection( cc: cc, } } + +func (r *RMQClient) Channel() (*amqp.Channel, error) { + return r.cc.Channel() +} diff --git a/backend/internal/infrastructure/queue/system/queue.go b/backend/internal/infrastructure/queue/system/queue.go deleted file mode 100644 index 28ec579..0000000 --- a/backend/internal/infrastructure/queue/system/queue.go +++ /dev/null @@ -1,73 +0,0 @@ -package system - -import ( - "context" - "fmt" - "sync" - "sync/atomic" -) - -type SystemQueue struct { - m sync.Mutex - _init_size int - buf []*any - _buf_p atomic.Int64 - _read_p atomic.Int64 -} - -func NewSystemQueue(size int) *SystemQueue { - if size < 50 { - size = 50 - } - - return &SystemQueue{ - _init_size: size, - buf: make([]*any, size), - } -} - -func (s *SystemQueue) Put(_ context.Context, job any) error { - s.m.Lock() - defer s.m.Unlock() - - p := s._buf_p.Load() - - // Resize buf if needed - if int64(len(s.buf)) == p { - s.resize() - } - - s.buf[p] = &job - - s._buf_p.Add(1) - - return nil -} - -func (s *SystemQueue) Pop(_ context.Context) (any, error) { - s.m.Lock() - defer s.m.Unlock() - - p := s._buf_p.Load() - if int64(len(s.buf)) < p { - s.resize() - - return nil, fmt.Errorf("system-queue: error _p index out of buffer range") - } - - jobp := s.buf[p] - - if jobp == nil { - return nil, fmt.Errorf("system-queue: error nil job") - } - - job := *jobp - s._buf_p.Add(-1) - s.buf[p-1] = nil - - return job, nil -} - -func (s *SystemQueue) resize() { - s.buf = append(s.buf, make([]*any, s._init_size/2)...) -} diff --git a/backend/internal/infrastructure/workers/workers.go b/backend/internal/infrastructure/workers/workers.go index 7224789..29df215 100644 --- a/backend/internal/infrastructure/workers/workers.go +++ b/backend/internal/infrastructure/workers/workers.go @@ -1,3 +1,74 @@ package workers -// todo move all workers stuff here +import ( + "encoding/json" + "log/slog" + + "github.com/emochka2007/block-accounting/internal/infrastructure/queue" + "github.com/emochka2007/block-accounting/internal/pkg/logger" + amqp "github.com/rabbitmq/amqp091-go" +) + +type Worker struct { + id string + log *slog.Logger + + rmqc *amqp.Connection + + queueName string +} + +func (w *Worker) Run() { + w.log = w.log.With(slog.String("worker-id", w.id), slog.String("worker-queue", w.queueName)) + + defer func() { + if p := recover(); p != nil { + w.log.Error( + "worker paniced!", + slog.String("worker id", w.id), + slog.Any("panic", p), + ) + } else { + w.log.Info("worker stoped. bye bye 0w0", slog.String("worker id", w.id)) + } + }() + + channel, err := w.rmqc.Channel() + if err != nil { + w.log.Error("error create rmq channel", logger.Err(err)) + return + } + + delivery, err := channel.Consume( + w.queueName, + w.id, + true, + false, + false, + false, + nil, + ) + if err != nil { + w.log.Error("error consume from rmq channel", logger.Err(err)) + return + } + + w.handleJobs(delivery) +} + +func (w *Worker) handleJobs(ch <-chan amqp.Delivery) { + for msg := range ch { + w.log.Debug("job received", slog.Any("job", msg.MessageId)) + + var job queue.Job + + if err := json.Unmarshal(msg.Body, &job); err != nil { + w.log.Error("error parse message body. %w", err) + continue + } + + // TODO check job.IdempotentKey for duplicate + + // TODO dispatch job + } +} diff --git a/backend/internal/pkg/config/config.go b/backend/internal/pkg/config/config.go index c72e35a..ec9bcc1 100644 --- a/backend/internal/pkg/config/config.go +++ b/backend/internal/pkg/config/config.go @@ -1,5 +1,7 @@ package config +import "os" + type Config struct { Common CommonConfig Rest RestConfig @@ -38,3 +40,56 @@ type DBConfig struct { type ChainAPIConfig struct { Host string } + +type QueuesConfig struct { + ChainJobsQueue Queue +} + +type Queue struct { + Driver string + Name string + Host string + User string + Secret string +} + +func (c *Config) ReadFromEnv() { + if c.Common.LogLevel == "" { + c.Common.LogLevel = os.Getenv("BLOCKD_LOG_LEVEL") + } + + if !c.Common.LogLocal { + if os.Getenv("BLOCKD_LOG_LOCAL") == "true" { + c.Common.LogLocal = true + } + } + + if c.Common.LogFile == "" { + c.Common.LogFile = os.Getenv("BLOCKD_LOG_FILE") + } + + if !c.Common.LogAddSource { + if os.Getenv("BLOCKD_LOG_ADD_SOURCE") == "true" { + c.Common.LogAddSource = true + } + } + + // os.Getenv("BLOCKD_JWT_SECRET") + // os.Getenv("BLOCKD_CHAIN_API_URL") + // os.Getenv("BLOCKD_NUM_INTERNAL_WORKERS") + + // os.Getenv("BLOCKD_REST_ADDRESS") + // os.Getenv("BLOCKD_REST_ENABLE_TLS") + // os.Getenv("BLOCKD_REST_CERT_PATH") + // os.Getenv("BLOCKD_REST_KEY_PATH") + + // os.Getenv("BLOCKD_DB_HOST") + // os.Getenv("BLOCKD_DB_DATABASE") + // os.Getenv("BLOCKD_DB_USER") + // os.Getenv("BLOCKD_DB_SECRET") + // os.Getenv("BLOCKD_DB_ENABLE_TLS") + + // os.Getenv("BLOCKD_CACHE_HOST") + // os.Getenv("BLOCKD_CACHE_USER") + // os.Getenv("BLOCKD_CACHE_SECRET") +}