107 lines
2.7 KiB
Go
107 lines
2.7 KiB
Go
package pipeline
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net/http"
|
|
|
|
"git.optclblast.xyz/optclblast/gustav/internal/client"
|
|
)
|
|
|
|
type ExecuteOpts func() // TODO
|
|
|
|
type HttpClient = client.Client[*http.Request, *http.Response]
|
|
|
|
func ExecutePipeline(ctx context.Context, cc HttpClient, pipeline *Pipeline, opts ...ExecuteOpts) error {
|
|
if pipeline == nil || pipeline.Steps == nil {
|
|
return fmt.Errorf("pipeline is empty")
|
|
}
|
|
|
|
globalRegistry := make(map[string]any)
|
|
|
|
slog.Info("Started executing pipeline", slog.String("name", pipeline.Name))
|
|
|
|
for i, s := range pipeline.Steps {
|
|
slog.Debug("Execuping step", slog.Int("number", i), slog.String("name", s.Name))
|
|
|
|
registry := make(map[string]any)
|
|
|
|
// the idea is to save response json just as a map, and then access data simply with path.to.data pattent
|
|
prepareResult, err := prepareShelling[map[string]any](ctx, cc, s.Prepare)
|
|
if err != nil {
|
|
panic(err) // TODO handle
|
|
}
|
|
|
|
if s.Prepare.GlobalSaveInto != "" {
|
|
globalRegistry[s.Prepare.GlobalSaveInto] = *prepareResult
|
|
}
|
|
|
|
if prepareResult != nil {
|
|
registry[s.Prepare.SaveInto] = *prepareResult
|
|
}
|
|
|
|
for i := 0; i < s.Repeat; i++ {
|
|
for _, sh := range s.Shells {
|
|
|
|
bodyData := bytes.NewBuffer(sh.Body)
|
|
|
|
req, err := http.NewRequestWithContext(ctx, sh.Method, sh.Target, bodyData)
|
|
if err != nil {
|
|
slog.Error("failed to create a new request", slog.String("error", err.Error()))
|
|
continue
|
|
}
|
|
|
|
_, err = cc.Do(ctx, req) // TODO assert expected result and the one we got
|
|
if err != nil {
|
|
slog.Error("failed to do prepare request", slog.String("error", err.Error()))
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func prepareShelling[OutT any](ctx context.Context, cc HttpClient, pre Prepare) (*OutT, error) {
|
|
const op = "prepare_shelling"
|
|
if pre.Type != "http_call" {
|
|
return nil, fmt.Errorf("[%s] unknown preshelling type", op)
|
|
}
|
|
|
|
bodyData := bytes.NewBuffer(pre.Call.Body)
|
|
|
|
req, err := http.NewRequestWithContext(ctx, pre.Call.Method, pre.Call.Target, bodyData)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("[%s] failed to create a new request: %w", op, err)
|
|
}
|
|
|
|
resp, err := cc.Do(ctx, req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("[%s] failed to do prepare request: %w", op, err)
|
|
}
|
|
|
|
defer func() {
|
|
if err := resp.Body.Close(); err != nil {
|
|
slog.Error("failed to close response body", slog.String("op", op), slog.String("error", err.Error()))
|
|
}
|
|
}()
|
|
|
|
respBody, err := io.ReadAll(req.Body)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("[%s] failed to read response body: %w", err)
|
|
}
|
|
|
|
out := new(OutT)
|
|
|
|
if err := json.Unmarshal(respBody, out); err != nil {
|
|
return nil, fmt.Errorf("[%s] failed to parse response: %w", op, err)
|
|
}
|
|
|
|
return out, nil
|
|
}
|