package plugin import ( "context" "encoding/json" "errors" "log/slog" "net" "strconv" "git.optclblast.xyz/draincloud/draincloud-core/internal/closer" "git.optclblast.xyz/draincloud/draincloud-core/internal/logger" "git.optclblast.xyz/draincloud/draincloud-core/internal/plugin/domain" ) type PluginLoader struct { l net.Listener store *PluginStore } func MustNewPluginLoader(ctx context.Context, listenPort uint16, ps *PluginStore) *PluginLoader { l, err := net.Listen("tcp", "127.0.0.1:"+strconv.FormatInt(int64(listenPort), 10)) if err != nil { logger.Fatal(ctx, "[MustNewPluginLoader] error build listener", logger.Err(err)) } return &PluginLoader{ l: l, store: ps, } } func (p *PluginLoader) Run(ctx context.Context) { go p.run(ctx) } func (p *PluginLoader) run(ctx context.Context) { for { select { case <-ctx.Done(): logger.Info(ctx, "[plugin_loader][loop] closing") if err := p.l.Close(); err != nil { logger.Error(ctx, "[plugin_loader][loop] failed to close listener", logger.Err(err)) } default: conn, err := p.l.Accept() if err != nil { logger.Error(ctx, "[plugin_loader][loop] failed to accet new connection", logger.Err(err)) continue } logger.Debug(ctx, "[plugin_loader][loop] accepting connection") go p.accept(ctx, conn) } } } func (p *PluginLoader) accept(ctx context.Context, conn net.Conn) { data := make([]byte, 0) // TODO make read loop n, err := conn.Read(data) if err != nil { logger.Error(ctx, "[plugin_loader][accept] read error", logger.Err(err)) return } logger.Debug(ctx, "[plugin_loader][accept] bytes read", slog.Int("n", n)) init := new(domain.InitPluginRequest) if err = json.Unmarshal(data, init); err != nil { logger.Error(ctx, "[plugin_loader][accept] unmarshal request error", logger.Err(err)) return } if init.Namespace == "" { logger.Error(ctx, "[plugin_loader][accept] empty namespace") err = errors.Join(err, errors.New("init request must contain namespace")) } if init.Name == "" { logger.Error(ctx, "[plugin_loader][accept] empty namespace") err = errors.Join(err, errors.New("init request must contain namespace")) } if init.Version == 0 { logger.Error(ctx, "[plugin_loader][accept] empty namespace") err = errors.Join(err, errors.New("init request must contain namespace")) } if err != nil { if _, werr := conn.Write([]byte(err.Error())); werr != nil { logger.Error(ctx, "[plugin_loader][accept] failed to write init error", logger.Err(werr)) } if cerr := conn.Close(); cerr != nil { logger.Error(ctx, "[plugin_loader][accept] failed to close conn", logger.Err(cerr)) } return } logger.Debug(ctx, "[plugin_loader][accept] new plugin initialized", "plugin", PluginStoreKey(init.Namespace, init.Name, init.Version), ) plugin := &Plugin{ conn: conn, md: *init, } closer.Add(plugin.Close) p.store.Add(plugin) }