tmp with plugins 2
This commit is contained in:
parent
12e059bcc5
commit
45a0d70d28
@ -2,14 +2,32 @@ package plugin
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"log/slog"
|
||||||
"net"
|
"net"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"git.optclblast.xyz/draincloud/draincloud-core/internal/closer"
|
||||||
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
|
"git.optclblast.xyz/draincloud/draincloud-core/internal/logger"
|
||||||
|
"git.optclblast.xyz/draincloud/draincloud-core/internal/plugin/domain"
|
||||||
)
|
)
|
||||||
|
|
||||||
type PluginLoader struct {
|
type PluginLoader struct {
|
||||||
l net.Listener
|
l net.Listener
|
||||||
|
store *PluginStore
|
||||||
|
}
|
||||||
|
|
||||||
|
func MustNewPluginLoader(ctx context.Context, listenPort uint16, ps *PluginStore) *PluginLoader {
|
||||||
|
l, err := net.Listen("tcp", "127.0.0.1:"+strconv.FormatInt(int64(listenPort), 10))
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal(ctx, "[MustNewPluginLoader] error build listener", logger.Err(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
return &PluginLoader{
|
||||||
|
l: l,
|
||||||
|
store: ps,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PluginLoader) Run(ctx context.Context) {
|
func (p *PluginLoader) Run(ctx context.Context) {
|
||||||
@ -25,22 +43,71 @@ func (p *PluginLoader) run(ctx context.Context) {
|
|||||||
logger.Error(ctx, "[plugin_loader][loop] failed to close listener", logger.Err(err))
|
logger.Error(ctx, "[plugin_loader][loop] failed to close listener", logger.Err(err))
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
plugin, err := p.accept()
|
|
||||||
if err != nil {
|
|
||||||
logger.Error(ctx, "[plugin_loader][loop] accept error", logger.Err(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *PluginLoader) accept() (*Plugin, error) {
|
|
||||||
conn, err := p.l.Accept()
|
conn, err := p.l.Accept()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to accept new connection: %w", err)
|
logger.Error(ctx, "[plugin_loader][loop] failed to accet new connection", logger.Err(err))
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Plugin{
|
logger.Debug(ctx, "[plugin_loader][loop] accepting connection")
|
||||||
conn: conn,
|
|
||||||
}, nil
|
go p.accept(ctx, conn)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PluginLoader) accept(ctx context.Context, conn net.Conn) {
|
||||||
|
data := make([]byte, 0)
|
||||||
|
n, err := conn.Read(data)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(ctx, "[plugin_loader][accept] read error", logger.Err(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
logger.Debug(ctx, "[plugin_loader][accept] bytes read", slog.Int("n", n))
|
||||||
|
|
||||||
|
init := new(domain.InitPluginRequest)
|
||||||
|
|
||||||
|
if err = json.Unmarshal(data, init); err != nil {
|
||||||
|
logger.Error(ctx, "[plugin_loader][accept] unmarshal request error", logger.Err(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if init.Namespace == "" {
|
||||||
|
logger.Error(ctx, "[plugin_loader][accept] empty namespace")
|
||||||
|
err = errors.Join(err, errors.New("init request must contain namespace"))
|
||||||
|
}
|
||||||
|
|
||||||
|
if init.Name == "" {
|
||||||
|
logger.Error(ctx, "[plugin_loader][accept] empty namespace")
|
||||||
|
err = errors.Join(err, errors.New("init request must contain namespace"))
|
||||||
|
}
|
||||||
|
|
||||||
|
if init.Version == 0 {
|
||||||
|
logger.Error(ctx, "[plugin_loader][accept] empty namespace")
|
||||||
|
err = errors.Join(err, errors.New("init request must contain namespace"))
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if _, werr := conn.Write([]byte(err.Error())); werr != nil {
|
||||||
|
logger.Error(ctx, "[plugin_loader][accept] failed to write init error", logger.Err(werr))
|
||||||
|
}
|
||||||
|
if cerr := conn.Close(); cerr != nil {
|
||||||
|
logger.Error(ctx, "[plugin_loader][accept] failed to close conn", logger.Err(cerr))
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Debug(ctx,
|
||||||
|
"[plugin_loader][accept] new plugin initialized",
|
||||||
|
"plugin", PluginStoreKey(init.Namespace, init.Name, init.Version),
|
||||||
|
)
|
||||||
|
|
||||||
|
plugin := &Plugin{
|
||||||
|
conn: conn,
|
||||||
|
md: *init,
|
||||||
|
}
|
||||||
|
|
||||||
|
closer.Add(plugin.Close)
|
||||||
|
|
||||||
|
p.store.Add(plugin)
|
||||||
}
|
}
|
||||||
|
@ -10,3 +10,7 @@ type Plugin struct {
|
|||||||
conn net.Conn
|
conn net.Conn
|
||||||
md domain.InitPluginRequest
|
md domain.InitPluginRequest
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *Plugin) Close() error {
|
||||||
|
return p.conn.Close()
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user