// Copyright 2026 MarketAlly. All rights reserved. // SPDX-License-Identifier: MIT package plugins import ( "context" "crypto/tls" "fmt" "maps" "net" "net/http" "os" "os/exec" "strings" "sync" "time" "connectrpc.com/connect" "golang.org/x/net/http2" "code.gitcaddy.com/server/v3/modules/graceful" "code.gitcaddy.com/server/v3/modules/log" pluginv1 "code.gitcaddy.com/server/v3/modules/plugins/pluginv1" "code.gitcaddy.com/server/v3/modules/plugins/pluginv1/pluginv1connect" ) // PluginStatus represents the status of an external plugin type PluginStatus string const ( PluginStatusStarting PluginStatus = "starting" PluginStatusOnline PluginStatus = "online" PluginStatusOffline PluginStatus = "offline" PluginStatusError PluginStatus = "error" // ProtocolVersion is the current plugin protocol version. // Increment this when new RPCs are added to PluginService. // The server uses this to avoid calling RPCs that older plugins don't implement. ProtocolVersion int32 = 1 ) // ManagedPlugin tracks the state of an external plugin type ManagedPlugin struct { config *ExternalPluginConfig process *os.Process status PluginStatus lastSeen time.Time manifest *pluginv1.PluginManifest protocolVersion int32 // protocol version reported by the plugin (0 = pre-versioning, treated as 1) failCount int client pluginv1connect.PluginServiceClient mu sync.RWMutex } // ExternalPluginManager manages external plugins (both managed and external mode) type ExternalPluginManager struct { mu sync.RWMutex plugins map[string]*ManagedPlugin config *Config ctx context.Context cancel context.CancelFunc } var globalExternalManager *ExternalPluginManager // GetExternalManager returns the global external plugin manager func GetExternalManager() *ExternalPluginManager { return globalExternalManager } // NewExternalPluginManager creates a new external plugin manager func NewExternalPluginManager(config *Config) *ExternalPluginManager { ctx, cancel := context.WithCancel(context.Background()) m := &ExternalPluginManager{ plugins: make(map[string]*ManagedPlugin), config: config, ctx: ctx, cancel: cancel, } globalExternalManager = m return m } // StartAll launches managed plugins and connects to external ones func (m *ExternalPluginManager) StartAll() error { m.mu.Lock() defer m.mu.Unlock() for name, cfg := range m.config.ExternalPlugins { if !cfg.Enabled { log.Info("External plugin %s is disabled, skipping", name) continue } address := cfg.Address if address == "" { log.Error("External plugin %s has no address configured", name) continue } if !strings.HasPrefix(address, "http://") && !strings.HasPrefix(address, "https://") { address = "http://" + address } mp := &ManagedPlugin{ config: cfg, status: PluginStatusStarting, client: pluginv1connect.NewPluginServiceClient( newH2CClient(cfg.HealthTimeout), address, connect.WithGRPC(), ), } m.plugins[name] = mp if cfg.IsManaged() { if err := m.startManagedPlugin(mp); err != nil { log.Error("Failed to start managed plugin %s: %v", name, err) mp.status = PluginStatusError continue } } // Try to initialize the plugin if err := m.initializePlugin(mp); err != nil { log.Error("Failed to initialize external plugin %s: %v", name, err) mp.status = PluginStatusError continue } mp.status = PluginStatusOnline mp.lastSeen = time.Now() log.Info("External plugin %s is online (managed=%v)", name, cfg.IsManaged()) } return nil } // StopAll gracefully shuts down all external plugins func (m *ExternalPluginManager) StopAll() { m.cancel() m.mu.Lock() defer m.mu.Unlock() for name, mp := range m.plugins { log.Info("Shutting down external plugin: %s", name) // Send shutdown request via Connect RPC m.shutdownPlugin(mp) // Kill managed process if mp.process != nil { if err := mp.process.Signal(os.Interrupt); err != nil { log.Warn("Failed to send interrupt to plugin %s, killing: %v", name, err) _ = mp.process.Kill() } } mp.status = PluginStatusOffline } } // GetPlugin returns an external plugin by name func (m *ExternalPluginManager) GetPlugin(name string) *ManagedPlugin { m.mu.RLock() defer m.mu.RUnlock() return m.plugins[name] } // AllPlugins returns all external plugins func (m *ExternalPluginManager) AllPlugins() map[string]*ManagedPlugin { m.mu.RLock() defer m.mu.RUnlock() result := make(map[string]*ManagedPlugin, len(m.plugins)) maps.Copy(result, m.plugins) return result } // OnEvent dispatches an event to all interested plugins (fire-and-forget with timeout) func (m *ExternalPluginManager) OnEvent(event *pluginv1.PluginEvent) { m.mu.RLock() defer m.mu.RUnlock() for name, mp := range m.plugins { mp.mu.RLock() if mp.status != PluginStatusOnline || mp.manifest == nil { mp.mu.RUnlock() continue } // Check if this plugin is subscribed to this event subscribed := false for _, e := range mp.manifest.SubscribedEvents { if e == event.EventType || e == "*" { subscribed = true break } } mp.mu.RUnlock() if !subscribed { continue } // Dispatch in background with timeout go func(pluginName string, p *ManagedPlugin) { ctx, cancel := context.WithTimeout(m.ctx, 30*time.Second) defer cancel() resp, err := p.client.OnEvent(ctx, connect.NewRequest(event)) if err != nil { log.Error("Failed to dispatch event %s to plugin %s: %v", event.EventType, pluginName, err) return } if resp.Msg.Error != "" { log.Error("Plugin %s returned error for event %s: %s", pluginName, event.EventType, resp.Msg.Error) } }(name, mp) } } // HandleHTTP proxies an HTTP request to a plugin that declares the matching route func (m *ExternalPluginManager) HandleHTTP(method, path string, headers map[string]string, body []byte) (*pluginv1.HTTPResponse, error) { m.mu.RLock() defer m.mu.RUnlock() for name, mp := range m.plugins { mp.mu.RLock() if mp.status != PluginStatusOnline || mp.manifest == nil { mp.mu.RUnlock() continue } for _, route := range mp.manifest.Routes { if route.Method == method && strings.HasPrefix(path, route.Path) { mp.mu.RUnlock() ctx, cancel := context.WithTimeout(m.ctx, 30*time.Second) defer cancel() resp, err := mp.client.HandleHTTP(ctx, connect.NewRequest(&pluginv1.HTTPRequest{ Method: method, Path: path, Headers: headers, Body: body, })) if err != nil { return nil, fmt.Errorf("plugin %s HandleHTTP failed: %w", name, err) } return resp.Msg, nil } } mp.mu.RUnlock() } return nil, fmt.Errorf("no plugin handles %s %s", method, path) } // Status returns the status of a plugin func (mp *ManagedPlugin) Status() PluginStatus { mp.mu.RLock() defer mp.mu.RUnlock() return mp.status } // Manifest returns the plugin's manifest func (mp *ManagedPlugin) Manifest() *pluginv1.PluginManifest { mp.mu.RLock() defer mp.mu.RUnlock() return mp.manifest } // SupportsProtocol returns true if the plugin supports the given protocol version. // Use this before calling RPCs added after protocol version 1. func (mp *ManagedPlugin) SupportsProtocol(version int32) bool { mp.mu.RLock() defer mp.mu.RUnlock() return mp.protocolVersion >= version } // --- Internal methods --- func (m *ExternalPluginManager) startManagedPlugin(mp *ManagedPlugin) error { args := strings.Fields(mp.config.Args) cmd := exec.Command(mp.config.Binary, args...) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr if err := cmd.Start(); err != nil { return fmt.Errorf("failed to start binary %s: %w", mp.config.Binary, err) } mp.process = cmd.Process // Register with graceful manager for proper shutdown graceful.GetManager().RunAtShutdown(m.ctx, func() { if mp.process != nil { _ = mp.process.Signal(os.Interrupt) } }) // Wait a bit for the process to start time.Sleep(2 * time.Second) return nil } func (m *ExternalPluginManager) initializePlugin(mp *ManagedPlugin) error { resp, err := mp.client.Initialize(m.ctx, connect.NewRequest(&pluginv1.InitializeRequest{ ServerVersion: "3.0.0", Config: map[string]string{}, ProtocolVersion: ProtocolVersion, })) if err != nil { return fmt.Errorf("plugin Initialize RPC failed: %w", err) } if !resp.Msg.Success { return fmt.Errorf("plugin initialization failed: %s", resp.Msg.Error) } mp.mu.Lock() mp.manifest = resp.Msg.Manifest mp.protocolVersion = resp.Msg.ProtocolVersion if mp.protocolVersion == 0 { mp.protocolVersion = 1 // pre-versioning plugins are treated as v1 } mp.mu.Unlock() log.Info("Plugin reports protocol version %d", mp.protocolVersion) return nil } func (m *ExternalPluginManager) shutdownPlugin(mp *ManagedPlugin) { _, err := mp.client.Shutdown(m.ctx, connect.NewRequest(&pluginv1.ShutdownRequest{ Reason: "server shutdown", })) if err != nil { log.Warn("Plugin shutdown call failed: %v", err) } } // newH2CClient creates an HTTP client that supports cleartext HTTP/2 (h2c) // for communicating with gRPC services without TLS. func newH2CClient(timeout time.Duration) *http.Client { return &http.Client{ Timeout: timeout, Transport: &http2.Transport{ AllowHTTP: true, DialTLSContext: func(ctx context.Context, network, addr string, _ *tls.Config) (net.Conn, error) { var d net.Dialer return d.DialContext(ctx, network, addr) }, }, } }