diff --git a/internal/plugin/loader.go b/internal/plugin/loader.go index b161a6a..b3c0735 100644 --- a/internal/plugin/loader.go +++ b/internal/plugin/loader.go @@ -2,14 +2,32 @@ package plugin import ( "context" - "fmt" + "encoding/json" + "errors" + "log/slog" "net" + "strconv" + "git.optclblast.xyz/draincloud/draincloud-core/internal/closer" "git.optclblast.xyz/draincloud/draincloud-core/internal/logger" + "git.optclblast.xyz/draincloud/draincloud-core/internal/plugin/domain" ) type PluginLoader struct { - l net.Listener + l net.Listener + store *PluginStore +} + +func MustNewPluginLoader(ctx context.Context, listenPort uint16, ps *PluginStore) *PluginLoader { + l, err := net.Listen("tcp", "127.0.0.1:"+strconv.FormatInt(int64(listenPort), 10)) + if err != nil { + logger.Fatal(ctx, "[MustNewPluginLoader] error build listener", logger.Err(err)) + } + + return &PluginLoader{ + l: l, + store: ps, + } } func (p *PluginLoader) Run(ctx context.Context) { @@ -25,22 +43,71 @@ func (p *PluginLoader) run(ctx context.Context) { logger.Error(ctx, "[plugin_loader][loop] failed to close listener", logger.Err(err)) } default: - plugin, err := p.accept() + conn, err := p.l.Accept() if err != nil { - logger.Error(ctx, "[plugin_loader][loop] accept error", logger.Err(err)) + logger.Error(ctx, "[plugin_loader][loop] failed to accet new connection", logger.Err(err)) continue } + + logger.Debug(ctx, "[plugin_loader][loop] accepting connection") + + go p.accept(ctx, conn) } } } -func (p *PluginLoader) accept() (*Plugin, error) { - conn, err := p.l.Accept() +func (p *PluginLoader) accept(ctx context.Context, conn net.Conn) { + data := make([]byte, 0) + n, err := conn.Read(data) if err != nil { - return nil, fmt.Errorf("failed to accept new connection: %w", err) + logger.Error(ctx, "[plugin_loader][accept] read error", logger.Err(err)) + return + } + logger.Debug(ctx, "[plugin_loader][accept] bytes read", slog.Int("n", n)) + + init := new(domain.InitPluginRequest) + + if err = json.Unmarshal(data, init); err != nil { + logger.Error(ctx, "[plugin_loader][accept] unmarshal request error", logger.Err(err)) + return } - return &Plugin{ + if init.Namespace == "" { + logger.Error(ctx, "[plugin_loader][accept] empty namespace") + err = errors.Join(err, errors.New("init request must contain namespace")) + } + + if init.Name == "" { + logger.Error(ctx, "[plugin_loader][accept] empty namespace") + err = errors.Join(err, errors.New("init request must contain namespace")) + } + + if init.Version == 0 { + logger.Error(ctx, "[plugin_loader][accept] empty namespace") + err = errors.Join(err, errors.New("init request must contain namespace")) + } + + if err != nil { + if _, werr := conn.Write([]byte(err.Error())); werr != nil { + logger.Error(ctx, "[plugin_loader][accept] failed to write init error", logger.Err(werr)) + } + if cerr := conn.Close(); cerr != nil { + logger.Error(ctx, "[plugin_loader][accept] failed to close conn", logger.Err(cerr)) + } + return + } + + logger.Debug(ctx, + "[plugin_loader][accept] new plugin initialized", + "plugin", PluginStoreKey(init.Namespace, init.Name, init.Version), + ) + + plugin := &Plugin{ conn: conn, - }, nil + md: *init, + } + + closer.Add(plugin.Close) + + p.store.Add(plugin) } diff --git a/internal/plugin/plugin.go b/internal/plugin/plugin.go index 0ef5bc4..c818a31 100644 --- a/internal/plugin/plugin.go +++ b/internal/plugin/plugin.go @@ -10,3 +10,7 @@ type Plugin struct { conn net.Conn md domain.InitPluginRequest } + +func (p *Plugin) Close() error { + return p.conn.Close() +}