This commit is contained in:
r8zavetr8v 2024-06-20 22:39:31 +03:00
parent 512fc397da
commit f466fcbe4d
9 changed files with 202 additions and 153 deletions

1
backend/.gitignore vendored
View File

@ -1,2 +1,3 @@
build/blockd
*__debug_*
rabbitmq

View File

@ -41,6 +41,10 @@ func main() {
Name: "chain-api-url",
Value: "http://localhost:3000",
},
&cli.IntFlag{
Name: "num-internal-workers",
Value: 1,
},
// rest
&cli.StringFlag{
@ -73,16 +77,6 @@ func main() {
&cli.BoolFlag{
Name: "db-enable-tls",
},
&cli.StringFlag{
Name: "cache-host",
},
&cli.StringFlag{
Name: "cache-user",
},
&cli.StringFlag{
Name: "cache-secret",
},
},
Action: func(c *cli.Context) error {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
@ -106,10 +100,6 @@ func main() {
Database: c.String("db-database"),
User: c.String("db-user"),
Secret: c.String("db-secret"),
CacheHost: c.String("cache-host"),
CacheUser: c.String("cache-user"),
CacheSecret: c.String("cache-secret"),
},
ChainAPI: config.ChainAPIConfig{
Host: c.String("chain-api-url"),

View File

@ -62,64 +62,68 @@ services:
start_period: 5s
profiles: [blockd, database, noback]
blockd-cache:
container_name: blockd-cache
image: redis:7.2.4
blockd-rmq:
image: rabbitmq:3.13.3-management
ports:
- 15672:15672
- 5672:5672
hostname: blockd-rmq
restart: always
networks:
- blockd-net
ports:
- 6379:6379
profiles: [blockd, database, noback]
prometheus:
image: prom/prometheus
container_name: prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
ports:
- 9091:9090
restart: unless-stopped
networks:
- blockd-net
volumes:
- ./prometheus:/etc/prometheus
- prometheus_data:/prometheus
profiles: [metrics]
grafana:
image: grafana/grafana
container_name: grafana
ports:
- 3112:3000
restart: unless-stopped
networks:
- blockd-net
environment:
- GF_SECURITY_ADMIN_USER=admin
- GF_SECURITY_ADMIN_PASSWORD=grafana
- RABBITMQ_DEFAULT_USER=blockd
- RABBITMQ_DEFAULT_PASS=blockd
- RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=-rabbit disk_free_limit 2147483648
volumes:
- ./grafana:/etc/grafana/provisioning/datasources
profiles: [metrics]
- ./rabbitmq:/var/lib/rabbitmq
syslog:
image: linuxserver/syslog-ng:3.36.1
container_name: syslog-ng
environment:
- PUID=0
- PGID=0
- TZ=UTC
volumes:
- /srv/syslog/config:/config
- /srv/syslog/logs:/var/log
ports:
- 514:5514/udp
- 601:6601/tcp
- 6514:6514/tcp
restart: unless-stopped
networks:
- syslog
logging:
driver: "json-file"
profiles: [metrics]
# prometheus:
# image: prom/prometheus
# container_name: prometheus
# command:
# - '--config.file=/etc/prometheus/prometheus.yml'
# ports:
# - 9091:9090
# restart: unless-stopped
# networks:
# - blockd-net
# volumes:
# - ./prometheus:/etc/prometheus
# - prometheus_data:/prometheus
# profiles: [metrics]
# grafana:
# image: grafana/grafana
# container_name: grafana
# ports:
# - 3112:3000
# restart: unless-stopped
# networks:
# - blockd-net
# environment:
# - GF_SECURITY_ADMIN_USER=admin
# - GF_SECURITY_ADMIN_PASSWORD=grafana
# volumes:
# - ./grafana:/etc/grafana/provisioning/datasources
# profiles: [metrics]
# syslog:
# image: linuxserver/syslog-ng:3.36.1
# container_name: syslog-ng
# environment:
# - PUID=0
# - PGID=0
# - TZ=UTC
# volumes:
# - /srv/syslog/config:/config
# - /srv/syslog/logs:/var/log
# ports:
# - 514:5514/udp
# - 601:6601/tcp
# - 6514:6514/tcp
# restart: unless-stopped
# networks:
# - syslog
# logging:
# driver: "json-file"
# profiles: [metrics]

View File

@ -1 +0,0 @@
package beanstalk

View File

@ -3,9 +3,6 @@ package queue
import (
"context"
"fmt"
"time"
"github.com/google/uuid"
)
type QueueDriver interface {
@ -45,8 +42,9 @@ func (q *Queue[T]) Pop(ctx context.Context) (*T, error) {
return nil, fmt.Errorf("queue: error unexpected job type")
}
type Job[T any] struct {
ID uuid.UUID
Payload *T
CreatedAt time.Time
type Job struct {
ID string
IdempotencyKey string
Payload any
CreatedAt int64
}

View File

@ -33,3 +33,7 @@ func NewWithConnection(
cc: cc,
}
}
func (r *RMQClient) Channel() (*amqp.Channel, error) {
return r.cc.Channel()
}

View File

@ -1,73 +0,0 @@
package system
import (
"context"
"fmt"
"sync"
"sync/atomic"
)
type SystemQueue struct {
m sync.Mutex
_init_size int
buf []*any
_buf_p atomic.Int64
_read_p atomic.Int64
}
func NewSystemQueue(size int) *SystemQueue {
if size < 50 {
size = 50
}
return &SystemQueue{
_init_size: size,
buf: make([]*any, size),
}
}
func (s *SystemQueue) Put(_ context.Context, job any) error {
s.m.Lock()
defer s.m.Unlock()
p := s._buf_p.Load()
// Resize buf if needed
if int64(len(s.buf)) == p {
s.resize()
}
s.buf[p] = &job
s._buf_p.Add(1)
return nil
}
func (s *SystemQueue) Pop(_ context.Context) (any, error) {
s.m.Lock()
defer s.m.Unlock()
p := s._buf_p.Load()
if int64(len(s.buf)) < p {
s.resize()
return nil, fmt.Errorf("system-queue: error _p index out of buffer range")
}
jobp := s.buf[p]
if jobp == nil {
return nil, fmt.Errorf("system-queue: error nil job")
}
job := *jobp
s._buf_p.Add(-1)
s.buf[p-1] = nil
return job, nil
}
func (s *SystemQueue) resize() {
s.buf = append(s.buf, make([]*any, s._init_size/2)...)
}

View File

@ -1,3 +1,74 @@
package workers
// todo move all workers stuff here
import (
"encoding/json"
"log/slog"
"github.com/emochka2007/block-accounting/internal/infrastructure/queue"
"github.com/emochka2007/block-accounting/internal/pkg/logger"
amqp "github.com/rabbitmq/amqp091-go"
)
type Worker struct {
id string
log *slog.Logger
rmqc *amqp.Connection
queueName string
}
func (w *Worker) Run() {
w.log = w.log.With(slog.String("worker-id", w.id), slog.String("worker-queue", w.queueName))
defer func() {
if p := recover(); p != nil {
w.log.Error(
"worker paniced!",
slog.String("worker id", w.id),
slog.Any("panic", p),
)
} else {
w.log.Info("worker stoped. bye bye 0w0", slog.String("worker id", w.id))
}
}()
channel, err := w.rmqc.Channel()
if err != nil {
w.log.Error("error create rmq channel", logger.Err(err))
return
}
delivery, err := channel.Consume(
w.queueName,
w.id,
true,
false,
false,
false,
nil,
)
if err != nil {
w.log.Error("error consume from rmq channel", logger.Err(err))
return
}
w.handleJobs(delivery)
}
func (w *Worker) handleJobs(ch <-chan amqp.Delivery) {
for msg := range ch {
w.log.Debug("job received", slog.Any("job", msg.MessageId))
var job queue.Job
if err := json.Unmarshal(msg.Body, &job); err != nil {
w.log.Error("error parse message body. %w", err)
continue
}
// TODO check job.IdempotentKey for duplicate
// TODO dispatch job
}
}

View File

@ -1,5 +1,7 @@
package config
import "os"
type Config struct {
Common CommonConfig
Rest RestConfig
@ -38,3 +40,56 @@ type DBConfig struct {
type ChainAPIConfig struct {
Host string
}
type QueuesConfig struct {
ChainJobsQueue Queue
}
type Queue struct {
Driver string
Name string
Host string
User string
Secret string
}
func (c *Config) ReadFromEnv() {
if c.Common.LogLevel == "" {
c.Common.LogLevel = os.Getenv("BLOCKD_LOG_LEVEL")
}
if !c.Common.LogLocal {
if os.Getenv("BLOCKD_LOG_LOCAL") == "true" {
c.Common.LogLocal = true
}
}
if c.Common.LogFile == "" {
c.Common.LogFile = os.Getenv("BLOCKD_LOG_FILE")
}
if !c.Common.LogAddSource {
if os.Getenv("BLOCKD_LOG_ADD_SOURCE") == "true" {
c.Common.LogAddSource = true
}
}
// os.Getenv("BLOCKD_JWT_SECRET")
// os.Getenv("BLOCKD_CHAIN_API_URL")
// os.Getenv("BLOCKD_NUM_INTERNAL_WORKERS")
// os.Getenv("BLOCKD_REST_ADDRESS")
// os.Getenv("BLOCKD_REST_ENABLE_TLS")
// os.Getenv("BLOCKD_REST_CERT_PATH")
// os.Getenv("BLOCKD_REST_KEY_PATH")
// os.Getenv("BLOCKD_DB_HOST")
// os.Getenv("BLOCKD_DB_DATABASE")
// os.Getenv("BLOCKD_DB_USER")
// os.Getenv("BLOCKD_DB_SECRET")
// os.Getenv("BLOCKD_DB_ENABLE_TLS")
// os.Getenv("BLOCKD_CACHE_HOST")
// os.Getenv("BLOCKD_CACHE_USER")
// os.Getenv("BLOCKD_CACHE_SECRET")
}