From c0d7b0bfe13f49917950e4c9ca8eb79c5b792448 Mon Sep 17 00:00:00 2001 From: optclblast Date: Fri, 21 Jun 2024 00:12:58 +0300 Subject: [PATCH] marshaling added (optimisation needed) --- backend/internal/infrastructure/queue/jobs.go | 6 + .../infrastructure/queue/jobs_test.go | 53 ++++++++ .../internal/infrastructure/queue/queue.go | 128 ++++++++++++++++++ .../infrastructure/workers/workers.go | 5 + 4 files changed, 192 insertions(+) create mode 100644 backend/internal/infrastructure/queue/jobs.go create mode 100644 backend/internal/infrastructure/queue/jobs_test.go diff --git a/backend/internal/infrastructure/queue/jobs.go b/backend/internal/infrastructure/queue/jobs.go new file mode 100644 index 0000000..2c6bd7f --- /dev/null +++ b/backend/internal/infrastructure/queue/jobs.go @@ -0,0 +1,6 @@ +package queue + +type JobDeployMultisig struct { + OwnersPubKeys []string `json:"pub_keys"` + Confirmations int `json:"confirmations"` +} diff --git a/backend/internal/infrastructure/queue/jobs_test.go b/backend/internal/infrastructure/queue/jobs_test.go new file mode 100644 index 0000000..5515f19 --- /dev/null +++ b/backend/internal/infrastructure/queue/jobs_test.go @@ -0,0 +1,53 @@ +package queue + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/emochka2007/block-accounting/internal/pkg/ctxmeta" + "github.com/emochka2007/block-accounting/internal/pkg/models" + "github.com/google/uuid" +) + +func TestJobMarshal(t *testing.T) { + ctx := ctxmeta.UserContext(context.Background(), &models.User{ + ID: uuid.New(), + Name: "kjdsfhkjfg", + Credentails: &models.UserCredentials{ + Email: "jkdfhgls", + }, + PK: []byte("1234567890qwertyuiop"), + Bip39Seed: []byte("poiuytrewq0987654321"), + Mnemonic: "mnemonic mnemonic mnemonicccc", + Activated: true, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + }) + + ctx = ctxmeta.OrganizationIdContext(ctx, uuid.New()) + + job := &Job{ + ID: "123", + IdempotencyKey: "123", + Context: ctx, + Payload: &JobDeployMultisig{OwnersPubKeys: []string{"sdfdf", "sdfsd"}, Confirmations: 2}, + CreatedAt: time.Now().UnixMilli(), + } + + data, err := json.Marshal(job) + if err != nil { + t.Fatalf("err: %s", err.Error()) + } + + t.Log(string(data)) + + var job2 *Job = new(Job) + + if err := json.Unmarshal(data, job2); err != nil { + t.Fatalf("err: %s", err.Error()) + } + + t.Logf("%+v", job2) +} diff --git a/backend/internal/infrastructure/queue/queue.go b/backend/internal/infrastructure/queue/queue.go index ace4b1c..8ef8069 100644 --- a/backend/internal/infrastructure/queue/queue.go +++ b/backend/internal/infrastructure/queue/queue.go @@ -2,7 +2,11 @@ package queue import ( "context" + "encoding/json" "fmt" + "time" + + "github.com/emochka2007/block-accounting/internal/pkg/ctxmeta" ) type QueueDriver interface { @@ -45,6 +49,130 @@ func (q *Queue[T]) Pop(ctx context.Context) (*T, error) { type Job struct { ID string IdempotencyKey string + Context context.Context Payload any CreatedAt int64 } + +type job struct { + ID string `json:"id"` + IdempotencyKey string `json:"idempotency_key"` + Context *JobContext `json:"context"` + Type string `json:"_type"` + Payload []byte `json:"payload"` + CreatedAt int64 `json:"created_at"` +} + +func (j *Job) MarshalJSON() ([]byte, error) { + payload, err := json.Marshal(j.Payload) + if err != nil { + return nil, fmt.Errorf("error marshal job payload. %w", err) + } + + ja := &job{ + ID: j.ID, + IdempotencyKey: j.IdempotencyKey, + Context: newOutgoingCoutext(j.Context), + Type: jobType(j.Payload), + Payload: payload, + CreatedAt: j.CreatedAt, + } + + return json.Marshal(ja) +} + +// TODO: fix this memory overhead +func (j *Job) UnmarshalJSON(data []byte) error { + ja := &job{} + + err := json.Unmarshal(data, ja) + if err != nil { + return err + } + + j.Payload, err = payloadByType(ja.Type, ja.Payload) + if err != nil { + return err + } + + j.ID = ja.ID + j.IdempotencyKey = ja.IdempotencyKey + j.Context = ja.Context + j.CreatedAt = ja.CreatedAt + + return nil +} + +func payloadByType(t string, data []byte) (any, error) { + switch t { + case "job_deploy_multisig": + var dm JobDeployMultisig + + if err := json.Unmarshal(data, &dm); err != nil { + return nil, err + } + + return &dm, nil + default: + return nil, fmt.Errorf("error unknown job type") + } +} + +func jobType(job any) string { + switch job.(type) { + case *JobDeployMultisig: + return "job_deploy_multisig" + default: + return "" + } +} + +type JobContext struct { + Parent *JobContext `json:"_parent"` + Key any `json:"_key"` + Val any `json:"_value"` +} + +func (c *JobContext) Deadline() (deadline time.Time, ok bool) { + return time.Time{}, false +} + +func (c *JobContext) Done() <-chan struct{} { + return nil +} + +func (c *JobContext) Err() error { + return nil +} + +func (c *JobContext) Value(key any) any { + if c.Key == key { + return c.Val + } + + return c.Parent.Value(key) +} + +func newOutgoingCoutext(ctx context.Context) *JobContext { + var jobCtx *JobContext = new(JobContext) + + lastFrame := jobCtx + + if user, err := ctxmeta.User(ctx); err == nil { + lastFrame.Key = ctxmeta.UserContextKey + lastFrame.Val = user + lastFrame.Parent = new(JobContext) + + lastFrame = lastFrame.Parent + } + + if orgId, err := ctxmeta.OrganizationId(ctx); err == nil { + lastFrame.Key = ctxmeta.OrganizationIdContextKey + lastFrame.Val = orgId + // lastFrame.Parent = new(JobContext) + + // lastFrame = lastFrame.Parent + } + + return jobCtx +} diff --git a/backend/internal/infrastructure/workers/workers.go b/backend/internal/infrastructure/workers/workers.go index 29df215..ceaa755 100644 --- a/backend/internal/infrastructure/workers/workers.go +++ b/backend/internal/infrastructure/workers/workers.go @@ -1,6 +1,7 @@ package workers import ( + "context" "encoding/json" "log/slog" @@ -72,3 +73,7 @@ func (w *Worker) handleJobs(ch <-chan amqp.Delivery) { // TODO dispatch job } } + +func (w *Worker) handleDeployMultisig( + ctx context.Context, +)