draincloud-core/internal/plugin/loader.go

115 lines
2.8 KiB
Go
Raw Permalink Normal View History

2024-10-17 20:20:42 +00:00
package plugin
import (
"context"
2024-10-17 22:24:05 +00:00
"encoding/json"
"errors"
"log/slog"
2024-10-17 20:20:42 +00:00
"net"
2024-10-17 22:24:05 +00:00
"strconv"
2024-10-17 20:20:42 +00:00
2024-10-17 22:24:05 +00:00
"git.optclblast.xyz/draincloud/draincloud-core/internal/closer"
2024-10-17 20:20:42 +00:00
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
2024-10-17 22:24:05 +00:00
"git.optclblast.xyz/draincloud/draincloud-core/internal/plugin/domain"
2024-10-17 20:20:42 +00:00
)
type PluginLoader struct {
2024-10-17 22:24:05 +00:00
l net.Listener
store *PluginStore
}
func MustNewPluginLoader(ctx context.Context, listenPort uint16, ps *PluginStore) *PluginLoader {
l, err := net.Listen("tcp", "127.0.0.1:"+strconv.FormatInt(int64(listenPort), 10))
if err != nil {
logger.Fatal(ctx, "[MustNewPluginLoader] error build listener", logger.Err(err))
}
return &PluginLoader{
l: l,
store: ps,
}
2024-10-17 20:20:42 +00:00
}
func (p *PluginLoader) Run(ctx context.Context) {
go p.run(ctx)
}
func (p *PluginLoader) run(ctx context.Context) {
for {
select {
case <-ctx.Done():
logger.Info(ctx, "[plugin_loader][loop] closing")
if err := p.l.Close(); err != nil {
logger.Error(ctx, "[plugin_loader][loop] failed to close listener", logger.Err(err))
}
default:
2024-10-17 22:24:05 +00:00
conn, err := p.l.Accept()
2024-10-17 20:20:42 +00:00
if err != nil {
2024-10-17 22:24:05 +00:00
logger.Error(ctx, "[plugin_loader][loop] failed to accet new connection", logger.Err(err))
2024-10-17 20:20:42 +00:00
continue
}
2024-10-17 22:24:05 +00:00
logger.Debug(ctx, "[plugin_loader][loop] accepting connection")
go p.accept(ctx, conn)
2024-10-17 20:20:42 +00:00
}
}
}
2024-10-17 22:24:05 +00:00
func (p *PluginLoader) accept(ctx context.Context, conn net.Conn) {
data := make([]byte, 0)
2024-10-19 23:05:34 +00:00
// TODO make read loop
2024-10-17 22:24:05 +00:00
n, err := conn.Read(data)
if err != nil {
logger.Error(ctx, "[plugin_loader][accept] read error", logger.Err(err))
return
}
logger.Debug(ctx, "[plugin_loader][accept] bytes read", slog.Int("n", n))
init := new(domain.InitPluginRequest)
if err = json.Unmarshal(data, init); err != nil {
logger.Error(ctx, "[plugin_loader][accept] unmarshal request error", logger.Err(err))
return
}
if init.Namespace == "" {
logger.Error(ctx, "[plugin_loader][accept] empty namespace")
err = errors.Join(err, errors.New("init request must contain namespace"))
}
if init.Name == "" {
logger.Error(ctx, "[plugin_loader][accept] empty namespace")
err = errors.Join(err, errors.New("init request must contain namespace"))
}
if init.Version == 0 {
logger.Error(ctx, "[plugin_loader][accept] empty namespace")
err = errors.Join(err, errors.New("init request must contain namespace"))
}
2024-10-17 20:20:42 +00:00
if err != nil {
2024-10-17 22:24:05 +00:00
if _, werr := conn.Write([]byte(err.Error())); werr != nil {
logger.Error(ctx, "[plugin_loader][accept] failed to write init error", logger.Err(werr))
}
if cerr := conn.Close(); cerr != nil {
logger.Error(ctx, "[plugin_loader][accept] failed to close conn", logger.Err(cerr))
}
return
2024-10-17 20:20:42 +00:00
}
2024-10-17 22:24:05 +00:00
logger.Debug(ctx,
"[plugin_loader][accept] new plugin initialized",
"plugin", PluginStoreKey(init.Namespace, init.Name, init.Version),
)
plugin := &Plugin{
2024-10-17 20:20:42 +00:00
conn: conn,
2024-10-17 22:24:05 +00:00
md: *init,
}
closer.Add(plugin.Close)
p.store.Add(plugin)
2024-10-17 20:20:42 +00:00
}