block-accounting/backend/internal/infrastructure/queue/queue.go

179 lines
3.4 KiB
Go
Raw Permalink Normal View History

2024-06-10 18:37:04 +00:00
package queue
import (
"context"
2024-06-20 21:12:58 +00:00
"encoding/json"
2024-06-10 18:37:04 +00:00
"fmt"
2024-06-20 21:12:58 +00:00
"time"
"github.com/emochka2007/block-accounting/internal/pkg/ctxmeta"
2024-06-10 18:37:04 +00:00
)
type QueueDriver interface {
Put(ctx context.Context, job any) error
Pop(ctx context.Context) (any, error)
}
type Queue[T any] struct {
driver QueueDriver
}
func NewWithDriver[T any](
driver QueueDriver,
) *Queue[T] {
return &Queue[T]{
driver: driver,
}
}
func (q *Queue[T]) Put(
ctx context.Context,
job T,
) error {
return q.driver.Put(ctx, job)
}
func (q *Queue[T]) Pop(ctx context.Context) (*T, error) {
job, err := q.driver.Pop(ctx)
if err != nil {
return nil, fmt.Errorf("queue: error pop a job from the queue. %w", err)
}
if t, ok := job.(T); ok {
return &t, nil
}
return nil, fmt.Errorf("queue: error unexpected job type")
}
2024-06-20 19:39:31 +00:00
type Job struct {
ID string
IdempotencyKey string
2024-06-20 21:12:58 +00:00
Context context.Context
2024-06-20 19:39:31 +00:00
Payload any
CreatedAt int64
2024-06-10 18:37:04 +00:00
}
2024-06-20 21:12:58 +00:00
type job struct {
ID string `json:"id"`
IdempotencyKey string `json:"idempotency_key"`
Context *JobContext `json:"context"`
Type string `json:"_type"`
Payload []byte `json:"payload"`
CreatedAt int64 `json:"created_at"`
}
func (j *Job) MarshalJSON() ([]byte, error) {
payload, err := json.Marshal(j.Payload)
if err != nil {
return nil, fmt.Errorf("error marshal job payload. %w", err)
}
ja := &job{
ID: j.ID,
IdempotencyKey: j.IdempotencyKey,
Context: newOutgoingCoutext(j.Context),
Type: jobType(j.Payload),
Payload: payload,
CreatedAt: j.CreatedAt,
}
return json.Marshal(ja)
}
// TODO: fix this memory overhead
func (j *Job) UnmarshalJSON(data []byte) error {
ja := &job{}
err := json.Unmarshal(data, ja)
if err != nil {
return err
}
j.Payload, err = payloadByType(ja.Type, ja.Payload)
if err != nil {
return err
}
j.ID = ja.ID
j.IdempotencyKey = ja.IdempotencyKey
j.Context = ja.Context
j.CreatedAt = ja.CreatedAt
return nil
}
func payloadByType(t string, data []byte) (any, error) {
switch t {
case "job_deploy_multisig":
var dm JobDeployMultisig
if err := json.Unmarshal(data, &dm); err != nil {
return nil, err
}
return &dm, nil
default:
return nil, fmt.Errorf("error unknown job type")
}
}
func jobType(job any) string {
switch job.(type) {
case *JobDeployMultisig:
return "job_deploy_multisig"
default:
return ""
}
}
type JobContext struct {
Parent *JobContext `json:"_parent"`
Key any `json:"_key"`
Val any `json:"_value"`
}
func (c *JobContext) Deadline() (deadline time.Time, ok bool) {
return time.Time{}, false
}
func (c *JobContext) Done() <-chan struct{} {
return nil
}
func (c *JobContext) Err() error {
return nil
}
func (c *JobContext) Value(key any) any {
if c.Key == key {
return c.Val
}
return c.Parent.Value(key)
}
func newOutgoingCoutext(ctx context.Context) *JobContext {
var jobCtx *JobContext = new(JobContext)
lastFrame := jobCtx
if user, err := ctxmeta.User(ctx); err == nil {
lastFrame.Key = ctxmeta.UserContextKey
lastFrame.Val = user
lastFrame.Parent = new(JobContext)
lastFrame = lastFrame.Parent
}
if orgId, err := ctxmeta.OrganizationId(ctx); err == nil {
lastFrame.Key = ctxmeta.OrganizationIdContextKey
lastFrame.Val = orgId
// lastFrame.Parent = new(JobContext)
// lastFrame = lastFrame.Parent
}
return jobCtx
}