From aeaddbaedf57c3c3ff15985bb2fbd88393ae1efe Mon Sep 17 00:00:00 2001 From: drainpilled Date: Sat, 19 Oct 2024 19:05:34 -0400 Subject: [PATCH] ping pong + init --- cmd/main.go | 5 ++ internal/plugin/domain/common.go | 31 ++++++++++++ internal/plugin/loader.go | 1 + internal/plugin/plugin.go | 29 +++++++++++ internal/plugin/store.go | 6 +++ internal/transport/pool/pool.go | 87 ++++++++++++++++++++++++++++++++ 6 files changed, 159 insertions(+) create mode 100644 internal/transport/pool/pool.go diff --git a/cmd/main.go b/cmd/main.go index 868bfa3..cc17ef1 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -4,8 +4,13 @@ import ( "context" "git.optclblast.xyz/draincloud/draincloud-core/internal/app" + "git.optclblast.xyz/draincloud/draincloud-core/internal/plugin" ) func main() { + ctx := context.Background() + ps := plugin.NewPluginStore() + pl := plugin.MustNewPluginLoader(ctx, 8081, ps) + pl.Run(ctx) app.New().Run(context.TODO()) } diff --git a/internal/plugin/domain/common.go b/internal/plugin/domain/common.go index 492b871..c6f583f 100644 --- a/internal/plugin/domain/common.go +++ b/internal/plugin/domain/common.go @@ -1,6 +1,19 @@ package domain type InitPluginRequest struct { + Name string `json:"name"` + Version int `json:"version"` + Namespace string `json:"namespace"` +} + +type PluginPage struct { + Name string `json:"name"` + Version int `json:"version"` + Namespace string `json:"namespace"` + Path string `json:"path"` +} + +type PluginAction struct { Name string `json:"name"` Version int `json:"version"` Namespace string `json:"namespace"` @@ -9,3 +22,21 @@ type InitPluginRequest struct { WithActions bool `json:"with_actions"` Async bool `json:"async"` } + +type PluginComponent struct { + Name string `json:"name"` + Version int `json:"version"` + Namespace string `json:"namespace"` + RequiredResolveParams []string `json:"required_resolve_params"` + OptionalResolveParams []string `json:"optional_resolve_params"` + WithActions bool `json:"with_actions"` + Async bool `json:"async"` +} + +type Ping struct { + Payload any `json:"payload"` +} + +type Pong struct { + Payload any `json:"payload"` +} diff --git a/internal/plugin/loader.go b/internal/plugin/loader.go index b3c0735..6e650a1 100644 --- a/internal/plugin/loader.go +++ b/internal/plugin/loader.go @@ -58,6 +58,7 @@ func (p *PluginLoader) run(ctx context.Context) { func (p *PluginLoader) accept(ctx context.Context, conn net.Conn) { data := make([]byte, 0) + // TODO make read loop n, err := conn.Read(data) if err != nil { logger.Error(ctx, "[plugin_loader][accept] read error", logger.Err(err)) diff --git a/internal/plugin/plugin.go b/internal/plugin/plugin.go index c818a31..24b5494 100644 --- a/internal/plugin/plugin.go +++ b/internal/plugin/plugin.go @@ -1,6 +1,9 @@ package plugin import ( + "bytes" + "encoding/json" + "fmt" "net" "git.optclblast.xyz/draincloud/draincloud-core/internal/plugin/domain" @@ -11,6 +14,32 @@ type Plugin struct { md domain.InitPluginRequest } +func (p *Plugin) Init(initPayload any) error { + r := &domain.Ping{ + Payload: initPayload, + } + + pingData, err := json.Marshal(r) + if err != nil { + return err + } + + if _, err = p.conn.Write(pingData); err != nil { + return err + } + + pongData := make([]byte, 0) + if _, err := p.conn.Read(pongData); err != nil { + return err + } + + if !bytes.Equal(pongData, pingData) { + return fmt.Errorf("ping-pong payload assertion error") + } + + return nil +} + func (p *Plugin) Close() error { return p.conn.Close() } diff --git a/internal/plugin/store.go b/internal/plugin/store.go index ab8ae2e..a32c394 100644 --- a/internal/plugin/store.go +++ b/internal/plugin/store.go @@ -10,6 +10,12 @@ type PluginStore struct { plugins map[string]*Plugin } +func NewPluginStore() *PluginStore { + return &PluginStore{ + plugins: make(map[string]*Plugin), + } +} + func (s *PluginStore) Add(plugin *Plugin) { s.m.Lock() defer s.m.Unlock() diff --git a/internal/transport/pool/pool.go b/internal/transport/pool/pool.go new file mode 100644 index 0000000..aec589f --- /dev/null +++ b/internal/transport/pool/pool.go @@ -0,0 +1,87 @@ +package pool + +import ( + "net" + "sync" + "sync/atomic" +) + +var ( + defaultMaxConns = 20 + defaultStrategy = &RoundrobinStrategy{ + lastSelected: initialRoundrobinAtomic(), + } +) + +func initialRoundrobinAtomic() atomic.Int64 { + a := atomic.Int64{} + a.Store(-1) + return a +} + +type ConnSelectionStrategy interface { + Select() int +} + +type RoundrobinStrategy struct { + lastSelected atomic.Int64 +} + +func (r *RoundrobinStrategy) Select() int { + return int(r.lastSelected.Add(1)) +} + +type ConnPool struct { + m sync.RWMutex + strategy ConnSelectionStrategy + conns []net.Conn +} + +type newConnPoolOpts struct { + strategy ConnSelectionStrategy + maxConns int +} + +func newNewConnPoolOpts() newConnPoolOpts { + return newConnPoolOpts{ + strategy: defaultStrategy, + maxConns: defaultMaxConns, + } +} + +type NewConnPoolOpt func(p *newConnPoolOpts) + +func WithStrategy(s ConnSelectionStrategy) NewConnPoolOpt { + return func(p *newConnPoolOpts) { + p.strategy = s + } +} + +func WithMaxConns(mc int) NewConnPoolOpt { + return func(p *newConnPoolOpts) { + p.maxConns = mc + } +} + +func NewConnPool(opts ...NewConnPoolOpt) *ConnPool { + o := newNewConnPoolOpts() + for _, opt := range opts { + opt(&o) + } + return &ConnPool{ + conns: make([]net.Conn, 0), + strategy: o.strategy, + } +} + +func (p *ConnPool) SelectConn() net.Conn { + p.m.RLock() + defer p.m.RUnlock() + return p.conns[p.strategy.Select()] +} + +func (p *ConnPool) AddConn(conn net.Conn) { + p.m.Lock() + defer p.m.Unlock() + p.conns = append(p.conns, conn) +}