ping pong + init

This commit is contained in:
drainpilled 2024-10-19 19:05:34 -04:00
parent 45a0d70d28
commit aeaddbaedf
6 changed files with 159 additions and 0 deletions

View File

@ -4,8 +4,13 @@ import (
"context" "context"
"git.optclblast.xyz/draincloud/draincloud-core/internal/app" "git.optclblast.xyz/draincloud/draincloud-core/internal/app"
"git.optclblast.xyz/draincloud/draincloud-core/internal/plugin"
) )
func main() { func main() {
ctx := context.Background()
ps := plugin.NewPluginStore()
pl := plugin.MustNewPluginLoader(ctx, 8081, ps)
pl.Run(ctx)
app.New().Run(context.TODO()) app.New().Run(context.TODO())
} }

View File

@ -1,6 +1,19 @@
package domain package domain
type InitPluginRequest struct { type InitPluginRequest struct {
Name string `json:"name"`
Version int `json:"version"`
Namespace string `json:"namespace"`
}
type PluginPage struct {
Name string `json:"name"`
Version int `json:"version"`
Namespace string `json:"namespace"`
Path string `json:"path"`
}
type PluginAction struct {
Name string `json:"name"` Name string `json:"name"`
Version int `json:"version"` Version int `json:"version"`
Namespace string `json:"namespace"` Namespace string `json:"namespace"`
@ -9,3 +22,21 @@ type InitPluginRequest struct {
WithActions bool `json:"with_actions"` WithActions bool `json:"with_actions"`
Async bool `json:"async"` Async bool `json:"async"`
} }
type PluginComponent struct {
Name string `json:"name"`
Version int `json:"version"`
Namespace string `json:"namespace"`
RequiredResolveParams []string `json:"required_resolve_params"`
OptionalResolveParams []string `json:"optional_resolve_params"`
WithActions bool `json:"with_actions"`
Async bool `json:"async"`
}
type Ping struct {
Payload any `json:"payload"`
}
type Pong struct {
Payload any `json:"payload"`
}

View File

@ -58,6 +58,7 @@ func (p *PluginLoader) run(ctx context.Context) {
func (p *PluginLoader) accept(ctx context.Context, conn net.Conn) { func (p *PluginLoader) accept(ctx context.Context, conn net.Conn) {
data := make([]byte, 0) data := make([]byte, 0)
// TODO make read loop
n, err := conn.Read(data) n, err := conn.Read(data)
if err != nil { if err != nil {
logger.Error(ctx, "[plugin_loader][accept] read error", logger.Err(err)) logger.Error(ctx, "[plugin_loader][accept] read error", logger.Err(err))

View File

@ -1,6 +1,9 @@
package plugin package plugin
import ( import (
"bytes"
"encoding/json"
"fmt"
"net" "net"
"git.optclblast.xyz/draincloud/draincloud-core/internal/plugin/domain" "git.optclblast.xyz/draincloud/draincloud-core/internal/plugin/domain"
@ -11,6 +14,32 @@ type Plugin struct {
md domain.InitPluginRequest md domain.InitPluginRequest
} }
func (p *Plugin) Init(initPayload any) error {
r := &domain.Ping{
Payload: initPayload,
}
pingData, err := json.Marshal(r)
if err != nil {
return err
}
if _, err = p.conn.Write(pingData); err != nil {
return err
}
pongData := make([]byte, 0)
if _, err := p.conn.Read(pongData); err != nil {
return err
}
if !bytes.Equal(pongData, pingData) {
return fmt.Errorf("ping-pong payload assertion error")
}
return nil
}
func (p *Plugin) Close() error { func (p *Plugin) Close() error {
return p.conn.Close() return p.conn.Close()
} }

View File

@ -10,6 +10,12 @@ type PluginStore struct {
plugins map[string]*Plugin plugins map[string]*Plugin
} }
func NewPluginStore() *PluginStore {
return &PluginStore{
plugins: make(map[string]*Plugin),
}
}
func (s *PluginStore) Add(plugin *Plugin) { func (s *PluginStore) Add(plugin *Plugin) {
s.m.Lock() s.m.Lock()
defer s.m.Unlock() defer s.m.Unlock()

View File

@ -0,0 +1,87 @@
package pool
import (
"net"
"sync"
"sync/atomic"
)
var (
defaultMaxConns = 20
defaultStrategy = &RoundrobinStrategy{
lastSelected: initialRoundrobinAtomic(),
}
)
func initialRoundrobinAtomic() atomic.Int64 {
a := atomic.Int64{}
a.Store(-1)
return a
}
type ConnSelectionStrategy interface {
Select() int
}
type RoundrobinStrategy struct {
lastSelected atomic.Int64
}
func (r *RoundrobinStrategy) Select() int {
return int(r.lastSelected.Add(1))
}
type ConnPool struct {
m sync.RWMutex
strategy ConnSelectionStrategy
conns []net.Conn
}
type newConnPoolOpts struct {
strategy ConnSelectionStrategy
maxConns int
}
func newNewConnPoolOpts() newConnPoolOpts {
return newConnPoolOpts{
strategy: defaultStrategy,
maxConns: defaultMaxConns,
}
}
type NewConnPoolOpt func(p *newConnPoolOpts)
func WithStrategy(s ConnSelectionStrategy) NewConnPoolOpt {
return func(p *newConnPoolOpts) {
p.strategy = s
}
}
func WithMaxConns(mc int) NewConnPoolOpt {
return func(p *newConnPoolOpts) {
p.maxConns = mc
}
}
func NewConnPool(opts ...NewConnPoolOpt) *ConnPool {
o := newNewConnPoolOpts()
for _, opt := range opts {
opt(&o)
}
return &ConnPool{
conns: make([]net.Conn, 0),
strategy: o.strategy,
}
}
func (p *ConnPool) SelectConn() net.Conn {
p.m.RLock()
defer p.m.RUnlock()
return p.conns[p.strategy.Select()]
}
func (p *ConnPool) AddConn(conn net.Conn) {
p.m.Lock()
defer p.m.Unlock()
p.conns = append(p.conns, conn)
}