All checks were successful
Build and Release / Create Release (push) Has been skipped
Build and Release / Unit Tests (push) Successful in 6m49s
Build and Release / Integration Tests (PostgreSQL) (push) Successful in 7m6s
Build and Release / Lint (push) Successful in 7m15s
Build and Release / Build Binaries (amd64, windows, windows-latest) (push) Has been skipped
Build and Release / Build Binaries (amd64, darwin, macos) (push) Has been skipped
Build and Release / Build Binaries (amd64, linux, linux-latest) (push) Has been skipped
Build and Release / Build Binaries (arm64, darwin, macos) (push) Has been skipped
Build and Release / Build Binary (linux/arm64) (push) Has been skipped
Implement critical production readiness features for AI integration: per-request provider config, admin dashboard, workflow inspection, and plugin framework foundation. Per-Request Provider Config: - Add ProviderConfig struct to all AI request types - Update queue to resolve provider/model/API key from cascade (repo > org > system) - Pass resolved config to AI sidecar on every request - Fixes multi-tenant issue where all orgs shared sidecar's hardcoded config Admin AI Dashboard: - Add /admin/ai page with sidecar health status - Display global operation stats (total, 24h, success/fail/escalated counts) - Show operations by tier, top 5 repos, token usage - Recent operations table with repo, operation, status, duration - Add GetGlobalOperationStats model method Workflow Inspection: - Add InspectWorkflow client method and types - Implement workflow-inspect queue handler - Add notifier trigger on workflow file push - Analyzes YAML for syntax errors, security issues, best practices - Returns structured issues with line numbers and suggested fixes Plugin Framework (Phase 5 Foundation): - Add external plugin config loading from app.ini - Define ExternalPlugin interface and manager - Add plugin.proto contract (Initialize, Shutdown, HealthCheck, OnEvent, HandleHTTP) - Implement health monitoring with auto-restart for managed plugins - Add event routing to subscribed plugins - HTTP proxy support for plugin-served routes This completes Tasks 1-4 from the production readiness plan and establishes the foundation for managed plugin lifecycle.
383 lines
9.7 KiB
Go
383 lines
9.7 KiB
Go
// Copyright 2026 MarketAlly. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package plugins
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"maps"
|
|
"net/http"
|
|
"os"
|
|
"os/exec"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"code.gitcaddy.com/server/v3/modules/graceful"
|
|
"code.gitcaddy.com/server/v3/modules/json"
|
|
"code.gitcaddy.com/server/v3/modules/log"
|
|
pluginv1 "code.gitcaddy.com/server/v3/modules/plugins/pluginv1"
|
|
)
|
|
|
|
// PluginStatus represents the status of an external plugin
|
|
type PluginStatus string
|
|
|
|
const (
|
|
PluginStatusStarting PluginStatus = "starting"
|
|
PluginStatusOnline PluginStatus = "online"
|
|
PluginStatusOffline PluginStatus = "offline"
|
|
PluginStatusError PluginStatus = "error"
|
|
)
|
|
|
|
// ManagedPlugin tracks the state of an external plugin
|
|
type ManagedPlugin struct {
|
|
config *ExternalPluginConfig
|
|
process *os.Process
|
|
status PluginStatus
|
|
lastSeen time.Time
|
|
manifest *pluginv1.PluginManifest
|
|
failCount int
|
|
httpClient *http.Client
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// ExternalPluginManager manages external plugins (both managed and external mode)
|
|
type ExternalPluginManager struct {
|
|
mu sync.RWMutex
|
|
plugins map[string]*ManagedPlugin
|
|
config *Config
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
var globalExternalManager *ExternalPluginManager
|
|
|
|
// GetExternalManager returns the global external plugin manager
|
|
func GetExternalManager() *ExternalPluginManager {
|
|
return globalExternalManager
|
|
}
|
|
|
|
// NewExternalPluginManager creates a new external plugin manager
|
|
func NewExternalPluginManager(config *Config) *ExternalPluginManager {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
m := &ExternalPluginManager{
|
|
plugins: make(map[string]*ManagedPlugin),
|
|
config: config,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
globalExternalManager = m
|
|
return m
|
|
}
|
|
|
|
// StartAll launches managed plugins and connects to external ones
|
|
func (m *ExternalPluginManager) StartAll() error {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
for name, cfg := range m.config.ExternalPlugins {
|
|
if !cfg.Enabled {
|
|
log.Info("External plugin %s is disabled, skipping", name)
|
|
continue
|
|
}
|
|
|
|
mp := &ManagedPlugin{
|
|
config: cfg,
|
|
status: PluginStatusStarting,
|
|
httpClient: &http.Client{
|
|
Timeout: cfg.HealthTimeout,
|
|
},
|
|
}
|
|
m.plugins[name] = mp
|
|
|
|
if cfg.IsManaged() {
|
|
if err := m.startManagedPlugin(mp); err != nil {
|
|
log.Error("Failed to start managed plugin %s: %v", name, err)
|
|
mp.status = PluginStatusError
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Try to initialize the plugin
|
|
if err := m.initializePlugin(mp); err != nil {
|
|
log.Error("Failed to initialize external plugin %s: %v", name, err)
|
|
mp.status = PluginStatusError
|
|
continue
|
|
}
|
|
|
|
mp.status = PluginStatusOnline
|
|
mp.lastSeen = time.Now()
|
|
log.Info("External plugin %s is online (managed=%v)", name, cfg.IsManaged())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// StopAll gracefully shuts down all external plugins
|
|
func (m *ExternalPluginManager) StopAll() {
|
|
m.cancel()
|
|
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
for name, mp := range m.plugins {
|
|
log.Info("Shutting down external plugin: %s", name)
|
|
|
|
// Send shutdown request
|
|
m.shutdownPlugin(mp)
|
|
|
|
// Kill managed process
|
|
if mp.process != nil {
|
|
if err := mp.process.Signal(os.Interrupt); err != nil {
|
|
log.Warn("Failed to send interrupt to plugin %s, killing: %v", name, err)
|
|
_ = mp.process.Kill()
|
|
}
|
|
}
|
|
|
|
mp.status = PluginStatusOffline
|
|
}
|
|
}
|
|
|
|
// GetPlugin returns an external plugin by name
|
|
func (m *ExternalPluginManager) GetPlugin(name string) *ManagedPlugin {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
return m.plugins[name]
|
|
}
|
|
|
|
// AllPlugins returns all external plugins
|
|
func (m *ExternalPluginManager) AllPlugins() map[string]*ManagedPlugin {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
result := make(map[string]*ManagedPlugin, len(m.plugins))
|
|
maps.Copy(result, m.plugins)
|
|
return result
|
|
}
|
|
|
|
// OnEvent dispatches an event to all interested plugins (fire-and-forget with timeout)
|
|
func (m *ExternalPluginManager) OnEvent(event *pluginv1.PluginEvent) {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
for name, mp := range m.plugins {
|
|
mp.mu.RLock()
|
|
if mp.status != PluginStatusOnline || mp.manifest == nil {
|
|
mp.mu.RUnlock()
|
|
continue
|
|
}
|
|
|
|
// Check if this plugin is subscribed to this event
|
|
subscribed := false
|
|
for _, e := range mp.manifest.SubscribedEvents {
|
|
if e == event.EventType || e == "*" {
|
|
subscribed = true
|
|
break
|
|
}
|
|
}
|
|
mp.mu.RUnlock()
|
|
|
|
if !subscribed {
|
|
continue
|
|
}
|
|
|
|
// Dispatch in background with timeout
|
|
go func(pluginName string, p *ManagedPlugin) {
|
|
ctx, cancel := context.WithTimeout(m.ctx, 30*time.Second)
|
|
defer cancel()
|
|
|
|
if err := m.callOnEvent(ctx, p, event); err != nil {
|
|
log.Error("Failed to dispatch event %s to plugin %s: %v", event.EventType, pluginName, err)
|
|
}
|
|
}(name, mp)
|
|
}
|
|
}
|
|
|
|
// HandleHTTP proxies an HTTP request to a plugin that declares the matching route
|
|
func (m *ExternalPluginManager) HandleHTTP(method, path string, headers map[string]string, body []byte) (*pluginv1.HTTPResponse, error) {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
for name, mp := range m.plugins {
|
|
mp.mu.RLock()
|
|
if mp.status != PluginStatusOnline || mp.manifest == nil {
|
|
mp.mu.RUnlock()
|
|
continue
|
|
}
|
|
|
|
for _, route := range mp.manifest.Routes {
|
|
if route.Method == method && matchRoute(route.Path, path) {
|
|
mp.mu.RUnlock()
|
|
|
|
ctx, cancel := context.WithTimeout(m.ctx, 30*time.Second)
|
|
defer cancel()
|
|
|
|
resp, err := m.callHandleHTTP(ctx, mp, &pluginv1.HTTPRequest{
|
|
Method: method,
|
|
Path: path,
|
|
Headers: headers,
|
|
Body: body,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("plugin %s HandleHTTP failed: %w", name, err)
|
|
}
|
|
return resp, nil
|
|
}
|
|
}
|
|
mp.mu.RUnlock()
|
|
}
|
|
|
|
return nil, fmt.Errorf("no plugin handles %s %s", method, path)
|
|
}
|
|
|
|
// Status returns the status of a plugin
|
|
func (mp *ManagedPlugin) Status() PluginStatus {
|
|
mp.mu.RLock()
|
|
defer mp.mu.RUnlock()
|
|
return mp.status
|
|
}
|
|
|
|
// Manifest returns the plugin's manifest
|
|
func (mp *ManagedPlugin) Manifest() *pluginv1.PluginManifest {
|
|
mp.mu.RLock()
|
|
defer mp.mu.RUnlock()
|
|
return mp.manifest
|
|
}
|
|
|
|
// --- Internal methods ---
|
|
|
|
func (m *ExternalPluginManager) startManagedPlugin(mp *ManagedPlugin) error {
|
|
args := strings.Fields(mp.config.Args)
|
|
cmd := exec.Command(mp.config.Binary, args...)
|
|
cmd.Stdout = os.Stdout
|
|
cmd.Stderr = os.Stderr
|
|
|
|
if err := cmd.Start(); err != nil {
|
|
return fmt.Errorf("failed to start binary %s: %w", mp.config.Binary, err)
|
|
}
|
|
|
|
mp.process = cmd.Process
|
|
|
|
// Register with graceful manager for proper shutdown
|
|
graceful.GetManager().RunAtShutdown(m.ctx, func() {
|
|
if mp.process != nil {
|
|
_ = mp.process.Signal(os.Interrupt)
|
|
}
|
|
})
|
|
|
|
// Wait a bit for the process to start
|
|
time.Sleep(2 * time.Second)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *ExternalPluginManager) initializePlugin(mp *ManagedPlugin) error {
|
|
req := &pluginv1.InitializeRequest{
|
|
ServerVersion: "3.0.0",
|
|
Config: map[string]string{},
|
|
}
|
|
|
|
resp := &pluginv1.InitializeResponse{}
|
|
if err := m.callRPC(mp, "initialize", req, resp); err != nil {
|
|
return err
|
|
}
|
|
|
|
if !resp.Success {
|
|
return fmt.Errorf("plugin initialization failed: %s", resp.Error)
|
|
}
|
|
|
|
mp.mu.Lock()
|
|
mp.manifest = resp.Manifest
|
|
mp.mu.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *ExternalPluginManager) shutdownPlugin(mp *ManagedPlugin) {
|
|
req := &pluginv1.ShutdownRequest{Reason: "server shutdown"}
|
|
resp := &pluginv1.ShutdownResponse{}
|
|
if err := m.callRPC(mp, "shutdown", req, resp); err != nil {
|
|
log.Warn("Plugin shutdown call failed: %v", err)
|
|
}
|
|
}
|
|
|
|
func (m *ExternalPluginManager) callOnEvent(ctx context.Context, mp *ManagedPlugin, event *pluginv1.PluginEvent) error {
|
|
resp := &pluginv1.EventResponse{}
|
|
if err := m.callRPCWithContext(ctx, mp, "on-event", event, resp); err != nil {
|
|
return err
|
|
}
|
|
if resp.Error != "" {
|
|
return fmt.Errorf("plugin event error: %s", resp.Error)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *ExternalPluginManager) callHandleHTTP(ctx context.Context, mp *ManagedPlugin, req *pluginv1.HTTPRequest) (*pluginv1.HTTPResponse, error) {
|
|
resp := &pluginv1.HTTPResponse{}
|
|
if err := m.callRPCWithContext(ctx, mp, "handle-http", req, resp); err != nil {
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
// callRPC makes a JSON-over-HTTP call to the plugin (simplified RPC)
|
|
func (m *ExternalPluginManager) callRPC(mp *ManagedPlugin, method string, req, resp any) error {
|
|
return m.callRPCWithContext(m.ctx, mp, method, req, resp)
|
|
}
|
|
|
|
func (m *ExternalPluginManager) callRPCWithContext(ctx context.Context, mp *ManagedPlugin, method string, reqBody, respBody any) error {
|
|
address := mp.config.Address
|
|
if address == "" {
|
|
return errors.New("plugin has no address configured")
|
|
}
|
|
|
|
// Ensure address has scheme
|
|
if !strings.HasPrefix(address, "http://") && !strings.HasPrefix(address, "https://") {
|
|
address = "http://" + address
|
|
}
|
|
|
|
url := address + "/plugin/v1/" + method
|
|
|
|
body, err := json.Marshal(reqBody)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal request: %w", err)
|
|
}
|
|
|
|
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create request: %w", err)
|
|
}
|
|
httpReq.Header.Set("Content-Type", "application/json")
|
|
|
|
httpResp, err := mp.httpClient.Do(httpReq)
|
|
if err != nil {
|
|
return fmt.Errorf("RPC call to %s failed: %w", method, err)
|
|
}
|
|
defer httpResp.Body.Close()
|
|
|
|
respData, err := io.ReadAll(httpResp.Body)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read response: %w", err)
|
|
}
|
|
|
|
if httpResp.StatusCode != http.StatusOK {
|
|
return fmt.Errorf("RPC %s returned status %d: %s", method, httpResp.StatusCode, string(respData))
|
|
}
|
|
|
|
if err := json.Unmarshal(respData, respBody); err != nil {
|
|
return fmt.Errorf("failed to unmarshal response: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// matchRoute checks if a URL path matches a route pattern (simple prefix matching)
|
|
func matchRoute(pattern, path string) bool {
|
|
// Simple prefix match for now
|
|
return strings.HasPrefix(path, pattern)
|
|
}
|