gustav/internal/pipeline/executor.go
2024-09-08 01:40:03 +03:00

107 lines
2.7 KiB
Go

package pipeline
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"git.optclblast.xyz/optclblast/gustav/internal/client"
)
type ExecuteOpts func() // TODO
type HttpClient = client.Client[*http.Request, *http.Response]
func ExecutePipeline(ctx context.Context, cc HttpClient, pipeline *Pipeline, opts ...ExecuteOpts) error {
if pipeline == nil || pipeline.Steps == nil {
return fmt.Errorf("pipeline is empty")
}
globalRegistry := make(map[string]any)
slog.Info("Started executing pipeline", slog.String("name", pipeline.Name))
for i, s := range pipeline.Steps {
slog.Debug("Execuping step", slog.Int("number", i), slog.String("name", s.Name))
registry := make(map[string]any)
// the idea is to save response json just as a map, and then access data simply with path.to.data pattent
prepareResult, err := prepareShelling[map[string]any](ctx, cc, s.Prepare)
if err != nil {
panic(err) // TODO handle
}
if s.Prepare.GlobalSaveInto != "" {
globalRegistry[s.Prepare.GlobalSaveInto] = *prepareResult
}
if prepareResult != nil {
registry[s.Prepare.SaveInto] = *prepareResult
}
for i := 0; i < s.Repeat; i++ {
for _, sh := range s.Shells {
bodyData := bytes.NewBuffer(sh.Body)
req, err := http.NewRequestWithContext(ctx, sh.Method, sh.Target, bodyData)
if err != nil {
slog.Error("failed to create a new request", slog.String("error", err.Error()))
continue
}
_, err = cc.Do(ctx, req) // TODO assert expected result and the one we got
if err != nil {
slog.Error("failed to do prepare request", slog.String("error", err.Error()))
continue
}
}
}
}
return nil
}
func prepareShelling[OutT any](ctx context.Context, cc HttpClient, pre Prepare) (*OutT, error) {
const op = "prepare_shelling"
if pre.Type != "http_call" {
return nil, fmt.Errorf("[%s] unknown preshelling type", op)
}
bodyData := bytes.NewBuffer(pre.Call.Body)
req, err := http.NewRequestWithContext(ctx, pre.Call.Method, pre.Call.Target, bodyData)
if err != nil {
return nil, fmt.Errorf("[%s] failed to create a new request: %w", op, err)
}
resp, err := cc.Do(ctx, req)
if err != nil {
return nil, fmt.Errorf("[%s] failed to do prepare request: %w", op, err)
}
defer func() {
if err := resp.Body.Close(); err != nil {
slog.Error("failed to close response body", slog.String("op", op), slog.String("error", err.Error()))
}
}()
respBody, err := io.ReadAll(req.Body)
if err != nil {
return nil, fmt.Errorf("[%s] failed to read response body: %w", err)
}
out := new(OutT)
if err := json.Unmarshal(respBody, out); err != nil {
return nil, fmt.Errorf("[%s] failed to parse response: %w", op, err)
}
return out, nil
}