diff --git a/backend/go.mod b/backend/go.mod index 1d85572..04c5b94 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -54,6 +54,7 @@ require ( github.com/prometheus/client_model v0.2.1-0.20210607210712-147c58e9608a // indirect github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect + github.com/rabbitmq/amqp091-go v1.10.0 github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect github.com/supranational/blst v0.3.11 // indirect diff --git a/backend/go.sum b/backend/go.sum index 347794d..87150da 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -342,6 +342,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8= github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= diff --git a/backend/internal/factory/interactors.go b/backend/internal/factory/interactors.go index f4dcc07..aea1208 100644 --- a/backend/internal/factory/interactors.go +++ b/backend/internal/factory/interactors.go @@ -58,6 +58,7 @@ func provideChainInteractor( log *slog.Logger, config config.Config, txRepository txRepo.Repository, + orgInteractor organizations.OrganizationsInteractor, ) chain.ChainInteractor { - return chain.NewChainInteractor(log, config, txRepository) + return chain.NewChainInteractor(log, config, txRepository, orgInteractor) } diff --git a/backend/internal/factory/wire_gen.go b/backend/internal/factory/wire_gen.go index 06c16f9..a43a53a 100644 --- a/backend/internal/factory/wire_gen.go +++ b/backend/internal/factory/wire_gen.go @@ -7,9 +7,9 @@ package factory import ( + "github.com/emochka2007/block-accounting/internal/infrastructure/repository" "github.com/emochka2007/block-accounting/internal/pkg/config" "github.com/emochka2007/block-accounting/internal/service" - "github.com/emochka2007/block-accounting/internal/infrastructure/repository" ) // Injectors from wire.go: @@ -23,14 +23,14 @@ func ProvideService(c config.Config) (service.Service, func(), error) { usersRepository := provideUsersRepository(db) organizationsRepository := provideOrganizationsRepository(db, usersRepository) transactionsRepository := provideTxRepository(db, organizationsRepository) - chainInteractor := provideChainInteractor(logger, c, transactionsRepository) + client, cleanup2 := provideRedisConnection(c) + cache := provideRedisCache(client, logger) + organizationsInteractor := provideOrganizationsInteractor(logger, organizationsRepository, cache) + chainInteractor := provideChainInteractor(logger, c, transactionsRepository, organizationsInteractor) usersInteractor := provideUsersInteractor(logger, usersRepository, chainInteractor) authRepository := provideAuthRepository(db) jwtInteractor := provideJWTInteractor(c, usersInteractor, authRepository) authPresenter := provideAuthPresenter(jwtInteractor) - client, cleanup2 := provideRedisConnection(c) - cache := provideRedisCache(client, logger) - organizationsInteractor := provideOrganizationsInteractor(logger, organizationsRepository, cache) authController := provideAuthController(logger, usersInteractor, authPresenter, jwtInteractor, authRepository, organizationsInteractor) organizationsPresenter := provideOrganizationsPresenter() organizationsController := provideOrganizationsController(logger, organizationsInteractor, organizationsPresenter) diff --git a/backend/internal/infrastructure/queue/beanstalk/queue.go b/backend/internal/infrastructure/queue/beanstalk/queue.go new file mode 100644 index 0000000..d034d65 --- /dev/null +++ b/backend/internal/infrastructure/queue/beanstalk/queue.go @@ -0,0 +1 @@ +package beanstalk diff --git a/backend/internal/infrastructure/queue/queue.go b/backend/internal/infrastructure/queue/queue.go new file mode 100644 index 0000000..a94e626 --- /dev/null +++ b/backend/internal/infrastructure/queue/queue.go @@ -0,0 +1,52 @@ +package queue + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" +) + +type QueueDriver interface { + Put(ctx context.Context, job any) error + Pop(ctx context.Context) (any, error) +} + +type Queue[T any] struct { + driver QueueDriver +} + +func NewWithDriver[T any]( + driver QueueDriver, +) *Queue[T] { + return &Queue[T]{ + driver: driver, + } +} + +func (q *Queue[T]) Put( + ctx context.Context, + job T, +) error { + return q.driver.Put(ctx, job) +} + +func (q *Queue[T]) Pop(ctx context.Context) (*T, error) { + job, err := q.driver.Pop(ctx) + if err != nil { + return nil, fmt.Errorf("queue: error pop a job from the queue. %w", err) + } + + if t, ok := job.(T); ok { + return &t, nil + } + + return nil, fmt.Errorf("queue: error unexpected job type") +} + +type Job[T any] struct { + ID uuid.UUID + Payload *T + CreatedAt time.Time +} diff --git a/backend/internal/infrastructure/queue/rmq/queue.go b/backend/internal/infrastructure/queue/rmq/queue.go new file mode 100644 index 0000000..c1f91ec --- /dev/null +++ b/backend/internal/infrastructure/queue/rmq/queue.go @@ -0,0 +1,35 @@ +package rmq + +import ( + "log" + + amqp "github.com/rabbitmq/amqp091-go" +) + +type RMQClient struct { + cc *amqp.Connection +} + +// NewClient creates a new RabbitMQ client. Will panic if there are an error while dealing +func NewClient( + address string, + user string, + password string, +) *RMQClient { + cc, err := amqp.Dial("amqp://" + user + ":" + password + "@localhost:5672/") + if err != nil { + log.Fatal("error connect to rabbitmq server", err) + } + + return &RMQClient{ + cc: cc, + } +} + +func NewWithConnection( + cc *amqp.Connection, +) *RMQClient { + return &RMQClient{ + cc: cc, + } +} diff --git a/backend/internal/infrastructure/queue/system/queue.go b/backend/internal/infrastructure/queue/system/queue.go new file mode 100644 index 0000000..28ec579 --- /dev/null +++ b/backend/internal/infrastructure/queue/system/queue.go @@ -0,0 +1,73 @@ +package system + +import ( + "context" + "fmt" + "sync" + "sync/atomic" +) + +type SystemQueue struct { + m sync.Mutex + _init_size int + buf []*any + _buf_p atomic.Int64 + _read_p atomic.Int64 +} + +func NewSystemQueue(size int) *SystemQueue { + if size < 50 { + size = 50 + } + + return &SystemQueue{ + _init_size: size, + buf: make([]*any, size), + } +} + +func (s *SystemQueue) Put(_ context.Context, job any) error { + s.m.Lock() + defer s.m.Unlock() + + p := s._buf_p.Load() + + // Resize buf if needed + if int64(len(s.buf)) == p { + s.resize() + } + + s.buf[p] = &job + + s._buf_p.Add(1) + + return nil +} + +func (s *SystemQueue) Pop(_ context.Context) (any, error) { + s.m.Lock() + defer s.m.Unlock() + + p := s._buf_p.Load() + if int64(len(s.buf)) < p { + s.resize() + + return nil, fmt.Errorf("system-queue: error _p index out of buffer range") + } + + jobp := s.buf[p] + + if jobp == nil { + return nil, fmt.Errorf("system-queue: error nil job") + } + + job := *jobp + s._buf_p.Add(-1) + s.buf[p-1] = nil + + return job, nil +} + +func (s *SystemQueue) resize() { + s.buf = append(s.buf, make([]*any, s._init_size/2)...) +} diff --git a/backend/internal/infrastructure/workers/workers.go b/backend/internal/infrastructure/workers/workers.go index e69de29..7224789 100644 --- a/backend/internal/infrastructure/workers/workers.go +++ b/backend/internal/infrastructure/workers/workers.go @@ -0,0 +1,3 @@ +package workers + +// todo move all workers stuff here