mirror of
https://github.com/emo2007/block-accounting.git
synced 2024-09-20 09:06:27 +00:00
179 lines
3.4 KiB
Go
179 lines
3.4 KiB
Go
package queue
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/emochka2007/block-accounting/internal/pkg/ctxmeta"
|
|
)
|
|
|
|
type QueueDriver interface {
|
|
Put(ctx context.Context, job any) error
|
|
Pop(ctx context.Context) (any, error)
|
|
}
|
|
|
|
type Queue[T any] struct {
|
|
driver QueueDriver
|
|
}
|
|
|
|
func NewWithDriver[T any](
|
|
driver QueueDriver,
|
|
) *Queue[T] {
|
|
return &Queue[T]{
|
|
driver: driver,
|
|
}
|
|
}
|
|
|
|
func (q *Queue[T]) Put(
|
|
ctx context.Context,
|
|
job T,
|
|
) error {
|
|
return q.driver.Put(ctx, job)
|
|
}
|
|
|
|
func (q *Queue[T]) Pop(ctx context.Context) (*T, error) {
|
|
job, err := q.driver.Pop(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("queue: error pop a job from the queue. %w", err)
|
|
}
|
|
|
|
if t, ok := job.(T); ok {
|
|
return &t, nil
|
|
}
|
|
|
|
return nil, fmt.Errorf("queue: error unexpected job type")
|
|
}
|
|
|
|
type Job struct {
|
|
ID string
|
|
IdempotencyKey string
|
|
Context context.Context
|
|
Payload any
|
|
CreatedAt int64
|
|
}
|
|
|
|
type job struct {
|
|
ID string `json:"id"`
|
|
IdempotencyKey string `json:"idempotency_key"`
|
|
Context *JobContext `json:"context"`
|
|
Type string `json:"_type"`
|
|
Payload []byte `json:"payload"`
|
|
CreatedAt int64 `json:"created_at"`
|
|
}
|
|
|
|
func (j *Job) MarshalJSON() ([]byte, error) {
|
|
payload, err := json.Marshal(j.Payload)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error marshal job payload. %w", err)
|
|
}
|
|
|
|
ja := &job{
|
|
ID: j.ID,
|
|
IdempotencyKey: j.IdempotencyKey,
|
|
Context: newOutgoingCoutext(j.Context),
|
|
Type: jobType(j.Payload),
|
|
Payload: payload,
|
|
CreatedAt: j.CreatedAt,
|
|
}
|
|
|
|
return json.Marshal(ja)
|
|
}
|
|
|
|
// TODO: fix this memory overhead
|
|
func (j *Job) UnmarshalJSON(data []byte) error {
|
|
ja := &job{}
|
|
|
|
err := json.Unmarshal(data, ja)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
j.Payload, err = payloadByType(ja.Type, ja.Payload)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
j.ID = ja.ID
|
|
j.IdempotencyKey = ja.IdempotencyKey
|
|
j.Context = ja.Context
|
|
j.CreatedAt = ja.CreatedAt
|
|
|
|
return nil
|
|
}
|
|
|
|
func payloadByType(t string, data []byte) (any, error) {
|
|
switch t {
|
|
case "job_deploy_multisig":
|
|
var dm JobDeployMultisig
|
|
|
|
if err := json.Unmarshal(data, &dm); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &dm, nil
|
|
default:
|
|
return nil, fmt.Errorf("error unknown job type")
|
|
}
|
|
}
|
|
|
|
func jobType(job any) string {
|
|
switch job.(type) {
|
|
case *JobDeployMultisig:
|
|
return "job_deploy_multisig"
|
|
default:
|
|
return ""
|
|
}
|
|
}
|
|
|
|
type JobContext struct {
|
|
Parent *JobContext `json:"_parent"`
|
|
Key any `json:"_key"`
|
|
Val any `json:"_value"`
|
|
}
|
|
|
|
func (c *JobContext) Deadline() (deadline time.Time, ok bool) {
|
|
return time.Time{}, false
|
|
}
|
|
|
|
func (c *JobContext) Done() <-chan struct{} {
|
|
return nil
|
|
}
|
|
|
|
func (c *JobContext) Err() error {
|
|
return nil
|
|
}
|
|
|
|
func (c *JobContext) Value(key any) any {
|
|
if c.Key == key {
|
|
return c.Val
|
|
}
|
|
|
|
return c.Parent.Value(key)
|
|
}
|
|
|
|
func newOutgoingCoutext(ctx context.Context) *JobContext {
|
|
var jobCtx *JobContext = new(JobContext)
|
|
|
|
lastFrame := jobCtx
|
|
|
|
if user, err := ctxmeta.User(ctx); err == nil {
|
|
lastFrame.Key = ctxmeta.UserContextKey
|
|
lastFrame.Val = user
|
|
lastFrame.Parent = new(JobContext)
|
|
|
|
lastFrame = lastFrame.Parent
|
|
}
|
|
|
|
if orgId, err := ctxmeta.OrganizationId(ctx); err == nil {
|
|
lastFrame.Key = ctxmeta.OrganizationIdContextKey
|
|
lastFrame.Val = orgId
|
|
// lastFrame.Parent = new(JobContext)
|
|
|
|
// lastFrame = lastFrame.Parent
|
|
}
|
|
|
|
return jobCtx
|
|
}
|