package pipeline import ( "bytes" "context" "encoding/json" "fmt" "io" "log/slog" "net/http" "git.optclblast.xyz/optclblast/gustav/internal/client" ) type ExecuteOpts func() // TODO type HttpClient = client.Client[*http.Request, *http.Response] func ExecutePipeline(ctx context.Context, cc HttpClient, pipeline *Pipeline, opts ...ExecuteOpts) error { if pipeline == nil || pipeline.Steps == nil { return fmt.Errorf("pipeline is empty") } globalRegistry := make(map[string]any) slog.Info("Started executing pipeline", slog.String("name", pipeline.Name)) for i, s := range pipeline.Steps { slog.Debug("Execuping step", slog.Int("number", i), slog.String("name", s.Name)) registry := make(map[string]any) // the idea is to save response json just as a map, and then access data simply with path.to.data pattent prepareResult, err := prepareShelling[map[string]any](ctx, cc, s.Prepare) if err != nil { panic(err) // TODO handle } if s.Prepare.GlobalSaveInto != "" { globalRegistry[s.Prepare.GlobalSaveInto] = *prepareResult } if prepareResult != nil { registry[s.Prepare.SaveInto] = *prepareResult } for i := 0; i < s.Repeat; i++ { for _, sh := range s.Shells { bodyData := bytes.NewBuffer(sh.Body) req, err := http.NewRequestWithContext(ctx, sh.Method, sh.Target, bodyData) if err != nil { slog.Error("failed to create a new request", slog.String("error", err.Error())) continue } _, err = cc.Do(ctx, req) // TODO assert expected result and the one we got if err != nil { slog.Error("failed to do prepare request", slog.String("error", err.Error())) continue } } } } return nil } func prepareShelling[OutT any](ctx context.Context, cc HttpClient, pre Prepare) (*OutT, error) { const op = "prepare_shelling" if pre.Type != "http_call" { return nil, fmt.Errorf("[%s] unknown preshelling type", op) } bodyData := bytes.NewBuffer(pre.Call.Body) req, err := http.NewRequestWithContext(ctx, pre.Call.Method, pre.Call.Target, bodyData) if err != nil { return nil, fmt.Errorf("[%s] failed to create a new request: %w", op, err) } resp, err := cc.Do(ctx, req) if err != nil { return nil, fmt.Errorf("[%s] failed to do prepare request: %w", op, err) } defer func() { if err := resp.Body.Close(); err != nil { slog.Error("failed to close response body", slog.String("op", op), slog.String("error", err.Error())) } }() respBody, err := io.ReadAll(req.Body) if err != nil { return nil, fmt.Errorf("[%s] failed to read response body: %w", err) } out := new(OutT) if err := json.Unmarshal(respBody, out); err != nil { return nil, fmt.Errorf("[%s] failed to parse response: %w", op, err) } return out, nil }