marshaling added (optimisation needed)

This commit is contained in:
r8zavetr8v 2024-06-21 00:12:58 +03:00
parent f466fcbe4d
commit c0d7b0bfe1
4 changed files with 192 additions and 0 deletions

View File

@ -0,0 +1,6 @@
package queue
type JobDeployMultisig struct {
OwnersPubKeys []string `json:"pub_keys"`
Confirmations int `json:"confirmations"`
}

View File

@ -0,0 +1,53 @@
package queue
import (
"context"
"encoding/json"
"testing"
"time"
"github.com/emochka2007/block-accounting/internal/pkg/ctxmeta"
"github.com/emochka2007/block-accounting/internal/pkg/models"
"github.com/google/uuid"
)
func TestJobMarshal(t *testing.T) {
ctx := ctxmeta.UserContext(context.Background(), &models.User{
ID: uuid.New(),
Name: "kjdsfhkjfg",
Credentails: &models.UserCredentials{
Email: "jkdfhgls",
},
PK: []byte("1234567890qwertyuiop"),
Bip39Seed: []byte("poiuytrewq0987654321"),
Mnemonic: "mnemonic mnemonic mnemonicccc",
Activated: true,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
})
ctx = ctxmeta.OrganizationIdContext(ctx, uuid.New())
job := &Job{
ID: "123",
IdempotencyKey: "123",
Context: ctx,
Payload: &JobDeployMultisig{OwnersPubKeys: []string{"sdfdf", "sdfsd"}, Confirmations: 2},
CreatedAt: time.Now().UnixMilli(),
}
data, err := json.Marshal(job)
if err != nil {
t.Fatalf("err: %s", err.Error())
}
t.Log(string(data))
var job2 *Job = new(Job)
if err := json.Unmarshal(data, job2); err != nil {
t.Fatalf("err: %s", err.Error())
}
t.Logf("%+v", job2)
}

View File

@ -2,7 +2,11 @@ package queue
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/emochka2007/block-accounting/internal/pkg/ctxmeta"
)
type QueueDriver interface {
@ -45,6 +49,130 @@ func (q *Queue[T]) Pop(ctx context.Context) (*T, error) {
type Job struct {
ID string
IdempotencyKey string
Context context.Context
Payload any
CreatedAt int64
}
type job struct {
ID string `json:"id"`
IdempotencyKey string `json:"idempotency_key"`
Context *JobContext `json:"context"`
Type string `json:"_type"`
Payload []byte `json:"payload"`
CreatedAt int64 `json:"created_at"`
}
func (j *Job) MarshalJSON() ([]byte, error) {
payload, err := json.Marshal(j.Payload)
if err != nil {
return nil, fmt.Errorf("error marshal job payload. %w", err)
}
ja := &job{
ID: j.ID,
IdempotencyKey: j.IdempotencyKey,
Context: newOutgoingCoutext(j.Context),
Type: jobType(j.Payload),
Payload: payload,
CreatedAt: j.CreatedAt,
}
return json.Marshal(ja)
}
// TODO: fix this memory overhead
func (j *Job) UnmarshalJSON(data []byte) error {
ja := &job{}
err := json.Unmarshal(data, ja)
if err != nil {
return err
}
j.Payload, err = payloadByType(ja.Type, ja.Payload)
if err != nil {
return err
}
j.ID = ja.ID
j.IdempotencyKey = ja.IdempotencyKey
j.Context = ja.Context
j.CreatedAt = ja.CreatedAt
return nil
}
func payloadByType(t string, data []byte) (any, error) {
switch t {
case "job_deploy_multisig":
var dm JobDeployMultisig
if err := json.Unmarshal(data, &dm); err != nil {
return nil, err
}
return &dm, nil
default:
return nil, fmt.Errorf("error unknown job type")
}
}
func jobType(job any) string {
switch job.(type) {
case *JobDeployMultisig:
return "job_deploy_multisig"
default:
return ""
}
}
type JobContext struct {
Parent *JobContext `json:"_parent"`
Key any `json:"_key"`
Val any `json:"_value"`
}
func (c *JobContext) Deadline() (deadline time.Time, ok bool) {
return time.Time{}, false
}
func (c *JobContext) Done() <-chan struct{} {
return nil
}
func (c *JobContext) Err() error {
return nil
}
func (c *JobContext) Value(key any) any {
if c.Key == key {
return c.Val
}
return c.Parent.Value(key)
}
func newOutgoingCoutext(ctx context.Context) *JobContext {
var jobCtx *JobContext = new(JobContext)
lastFrame := jobCtx
if user, err := ctxmeta.User(ctx); err == nil {
lastFrame.Key = ctxmeta.UserContextKey
lastFrame.Val = user
lastFrame.Parent = new(JobContext)
lastFrame = lastFrame.Parent
}
if orgId, err := ctxmeta.OrganizationId(ctx); err == nil {
lastFrame.Key = ctxmeta.OrganizationIdContextKey
lastFrame.Val = orgId
// lastFrame.Parent = new(JobContext)
// lastFrame = lastFrame.Parent
}
return jobCtx
}

View File

@ -1,6 +1,7 @@
package workers
import (
"context"
"encoding/json"
"log/slog"
@ -72,3 +73,7 @@ func (w *Worker) handleJobs(ch <-chan amqp.Delivery) {
// TODO dispatch job
}
}
func (w *Worker) handleDeployMultisig(
ctx context.Context,
)