This commit is contained in:
r8zavetr8v 2024-06-10 21:37:04 +03:00
parent 842b8c7b6f
commit 24bc9fd7f2
9 changed files with 174 additions and 6 deletions

View File

@ -54,6 +54,7 @@ require (
github.com/prometheus/client_model v0.2.1-0.20210607210712-147c58e9608a // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/rabbitmq/amqp091-go v1.10.0
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
github.com/supranational/blst v0.3.11 // indirect

View File

@ -342,6 +342,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8=
github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=

View File

@ -58,6 +58,7 @@ func provideChainInteractor(
log *slog.Logger,
config config.Config,
txRepository txRepo.Repository,
orgInteractor organizations.OrganizationsInteractor,
) chain.ChainInteractor {
return chain.NewChainInteractor(log, config, txRepository)
return chain.NewChainInteractor(log, config, txRepository, orgInteractor)
}

View File

@ -7,9 +7,9 @@
package factory
import (
"github.com/emochka2007/block-accounting/internal/infrastructure/repository"
"github.com/emochka2007/block-accounting/internal/pkg/config"
"github.com/emochka2007/block-accounting/internal/service"
"github.com/emochka2007/block-accounting/internal/infrastructure/repository"
)
// Injectors from wire.go:
@ -23,14 +23,14 @@ func ProvideService(c config.Config) (service.Service, func(), error) {
usersRepository := provideUsersRepository(db)
organizationsRepository := provideOrganizationsRepository(db, usersRepository)
transactionsRepository := provideTxRepository(db, organizationsRepository)
chainInteractor := provideChainInteractor(logger, c, transactionsRepository)
client, cleanup2 := provideRedisConnection(c)
cache := provideRedisCache(client, logger)
organizationsInteractor := provideOrganizationsInteractor(logger, organizationsRepository, cache)
chainInteractor := provideChainInteractor(logger, c, transactionsRepository, organizationsInteractor)
usersInteractor := provideUsersInteractor(logger, usersRepository, chainInteractor)
authRepository := provideAuthRepository(db)
jwtInteractor := provideJWTInteractor(c, usersInteractor, authRepository)
authPresenter := provideAuthPresenter(jwtInteractor)
client, cleanup2 := provideRedisConnection(c)
cache := provideRedisCache(client, logger)
organizationsInteractor := provideOrganizationsInteractor(logger, organizationsRepository, cache)
authController := provideAuthController(logger, usersInteractor, authPresenter, jwtInteractor, authRepository, organizationsInteractor)
organizationsPresenter := provideOrganizationsPresenter()
organizationsController := provideOrganizationsController(logger, organizationsInteractor, organizationsPresenter)

View File

@ -0,0 +1 @@
package beanstalk

View File

@ -0,0 +1,52 @@
package queue
import (
"context"
"fmt"
"time"
"github.com/google/uuid"
)
type QueueDriver interface {
Put(ctx context.Context, job any) error
Pop(ctx context.Context) (any, error)
}
type Queue[T any] struct {
driver QueueDriver
}
func NewWithDriver[T any](
driver QueueDriver,
) *Queue[T] {
return &Queue[T]{
driver: driver,
}
}
func (q *Queue[T]) Put(
ctx context.Context,
job T,
) error {
return q.driver.Put(ctx, job)
}
func (q *Queue[T]) Pop(ctx context.Context) (*T, error) {
job, err := q.driver.Pop(ctx)
if err != nil {
return nil, fmt.Errorf("queue: error pop a job from the queue. %w", err)
}
if t, ok := job.(T); ok {
return &t, nil
}
return nil, fmt.Errorf("queue: error unexpected job type")
}
type Job[T any] struct {
ID uuid.UUID
Payload *T
CreatedAt time.Time
}

View File

@ -0,0 +1,35 @@
package rmq
import (
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
type RMQClient struct {
cc *amqp.Connection
}
// NewClient creates a new RabbitMQ client. Will panic if there are an error while dealing
func NewClient(
address string,
user string,
password string,
) *RMQClient {
cc, err := amqp.Dial("amqp://" + user + ":" + password + "@localhost:5672/")
if err != nil {
log.Fatal("error connect to rabbitmq server", err)
}
return &RMQClient{
cc: cc,
}
}
func NewWithConnection(
cc *amqp.Connection,
) *RMQClient {
return &RMQClient{
cc: cc,
}
}

View File

@ -0,0 +1,73 @@
package system
import (
"context"
"fmt"
"sync"
"sync/atomic"
)
type SystemQueue struct {
m sync.Mutex
_init_size int
buf []*any
_buf_p atomic.Int64
_read_p atomic.Int64
}
func NewSystemQueue(size int) *SystemQueue {
if size < 50 {
size = 50
}
return &SystemQueue{
_init_size: size,
buf: make([]*any, size),
}
}
func (s *SystemQueue) Put(_ context.Context, job any) error {
s.m.Lock()
defer s.m.Unlock()
p := s._buf_p.Load()
// Resize buf if needed
if int64(len(s.buf)) == p {
s.resize()
}
s.buf[p] = &job
s._buf_p.Add(1)
return nil
}
func (s *SystemQueue) Pop(_ context.Context) (any, error) {
s.m.Lock()
defer s.m.Unlock()
p := s._buf_p.Load()
if int64(len(s.buf)) < p {
s.resize()
return nil, fmt.Errorf("system-queue: error _p index out of buffer range")
}
jobp := s.buf[p]
if jobp == nil {
return nil, fmt.Errorf("system-queue: error nil job")
}
job := *jobp
s._buf_p.Add(-1)
s.buf[p-1] = nil
return job, nil
}
func (s *SystemQueue) resize() {
s.buf = append(s.buf, make([]*any, s._init_size/2)...)
}

View File

@ -0,0 +1,3 @@
package workers
// todo move all workers stuff here