2
0
Files
gitcaddy-server/services/ai/queue.go
logikonline 14338d8fd4
All checks were successful
Build and Release / Create Release (push) Has been skipped
Build and Release / Unit Tests (push) Successful in 7m11s
Build and Release / Integration Tests (PostgreSQL) (push) Successful in 7m21s
Build and Release / Lint (push) Successful in 7m32s
Build and Release / Build Binaries (amd64, linux, linux-latest) (push) Has been skipped
Build and Release / Build Binaries (amd64, windows, windows-latest) (push) Has been skipped
Build and Release / Build Binaries (amd64, darwin, macos) (push) Has been skipped
Build and Release / Build Binaries (arm64, darwin, macos) (push) Has been skipped
Build and Release / Build Binary (linux/arm64) (push) Has been skipped
refactor(ai): consolidate ai operation types and reduce duplication
Refactor AI service layer to reduce code duplication and improve consistency.

Changes:
- Rename AIOperationRequest to OperationRequest for consistency
- Extract shared logic for issue-targeted operations (respond, triage) into triggerIssueAIOp helper
- Standardize field alignment in struct definitions
- Remove redundant error handling patterns

This reduces the API operations file by ~40 lines while maintaining identical functionality.
2026-02-12 00:55:52 -05:00

248 lines
7.8 KiB
Go

// Copyright 2026 MarketAlly. All rights reserved.
// SPDX-License-Identifier: MIT
package ai
import (
"context"
"errors"
"fmt"
"time"
ai_model "code.gitcaddy.com/server/v3/models/ai"
issues_model "code.gitcaddy.com/server/v3/models/issues"
repo_model "code.gitcaddy.com/server/v3/models/repo"
"code.gitcaddy.com/server/v3/models/unit"
user_model "code.gitcaddy.com/server/v3/models/user"
"code.gitcaddy.com/server/v3/modules/ai"
"code.gitcaddy.com/server/v3/modules/log"
"code.gitcaddy.com/server/v3/modules/setting"
issue_service "code.gitcaddy.com/server/v3/services/issue"
)
// OperationRequest represents a queued AI operation
type OperationRequest struct {
RepoID int64 `json:"repo_id"`
Operation string `json:"operation"` // "code-review", "issue-response", "issue-triage", "workflow-inspect", "agent-fix"
Tier int `json:"tier"` // 1 or 2
TriggerEvent string `json:"trigger_event"` // e.g. "issue.created"
TriggerUserID int64 `json:"trigger_user_id"` // who triggered the event
TargetID int64 `json:"target_id"` // issue/PR ID
TargetType string `json:"target_type"` // "issue", "pull"
}
// EnqueueOperation adds an AI operation to the processing queue
func EnqueueOperation(req *OperationRequest) error {
if aiOperationQueue == nil {
return errors.New("AI operation queue not initialized")
}
return aiOperationQueue.Push(req)
}
// handleAIOperation is the queue worker that processes AI operations
func handleAIOperation(items ...*OperationRequest) []*OperationRequest {
for _, req := range items {
if err := processOperation(context.Background(), req); err != nil {
log.Error("AI operation failed [repo:%d op:%s target:%d]: %v", req.RepoID, req.Operation, req.TargetID, err)
}
}
return nil
}
func processOperation(ctx context.Context, req *OperationRequest) error {
// Load repo
repo, err := repo_model.GetRepositoryByID(ctx, req.RepoID)
if err != nil {
return fmt.Errorf("failed to load repo %d: %w", req.RepoID, err)
}
if err := repo.LoadOwner(ctx); err != nil {
return fmt.Errorf("failed to load repo owner: %w", err)
}
// Rate limit check
count, err := ai_model.CountRecentOperations(ctx, req.RepoID)
if err != nil {
return fmt.Errorf("failed to count recent operations: %w", err)
}
maxOps := setting.AI.MaxOperationsPerHour
if repo.Owner.IsOrganization() {
if orgSettings, err := ai_model.GetOrgAISettings(ctx, repo.OwnerID); err == nil && orgSettings != nil && orgSettings.MaxOpsPerHour > 0 {
maxOps = orgSettings.MaxOpsPerHour
}
}
if count >= int64(maxOps) {
log.Warn("AI rate limit exceeded for repo %d (%d/%d ops/hour)", req.RepoID, count, maxOps)
return nil
}
// Create operation log entry
opLog := &ai_model.OperationLog{
RepoID: req.RepoID,
Operation: req.Operation,
Tier: req.Tier,
TriggerEvent: req.TriggerEvent,
TriggerUserID: req.TriggerUserID,
TargetID: req.TargetID,
TargetType: req.TargetType,
Status: ai_model.OperationStatusPending,
}
// Resolve provider config from cascade
var orgID int64
if repo.Owner.IsOrganization() {
orgID = repo.OwnerID
}
aiUnit, err := repo.GetUnit(ctx, unit.TypeAI)
var aiCfg *repo_model.AIConfig
if err == nil {
aiCfg = aiUnit.AIConfig()
} else {
aiCfg = &repo_model.AIConfig{}
}
opLog.Provider = ai_model.ResolveProvider(ctx, orgID, aiCfg.PreferredProvider)
opLog.Model = ai_model.ResolveModel(ctx, orgID, aiCfg.PreferredModel)
if err := ai_model.InsertOperationLog(ctx, opLog); err != nil {
return fmt.Errorf("failed to insert operation log: %w", err)
}
start := time.Now()
// Dispatch based on operation type
var opErr error
switch req.Operation {
case "issue-response":
opErr = handleIssueResponse(ctx, repo, aiCfg, opLog)
case "issue-triage":
opErr = handleIssueTriage(ctx, repo, aiCfg, opLog)
case "code-review":
opErr = handleCodeReview(ctx, repo, aiCfg, opLog)
case "agent-fix":
opErr = handleAgentFix(ctx, repo, aiCfg, opLog)
default:
opErr = fmt.Errorf("unknown operation: %s", req.Operation)
}
opLog.DurationMs = time.Since(start).Milliseconds()
if opErr != nil {
opLog.Status = ai_model.OperationStatusFailed
opLog.ErrorMessage = opErr.Error()
log.Error("AI operation %s failed for repo %d: %v", req.Operation, req.RepoID, opErr)
// Escalate on failure if configured
if aiCfg.EscalateToStaff {
escalateToStaff(ctx, repo, aiCfg, opLog)
}
} else {
opLog.Status = ai_model.OperationStatusSuccess
}
if err := ai_model.UpdateOperationLog(ctx, opLog); err != nil {
log.Error("Failed to update operation log: %v", err)
}
return opErr
}
func handleIssueResponse(ctx context.Context, repo *repo_model.Repository, aiCfg *repo_model.AIConfig, opLog *ai_model.OperationLog) error {
issue, err := issues_model.GetIssueByID(ctx, opLog.TargetID)
if err != nil {
return fmt.Errorf("failed to load issue %d: %w", opLog.TargetID, err)
}
issue.Repo = repo
client := ai.GetClient()
resp, err := client.GenerateIssueResponse(ctx, &ai.GenerateIssueResponseRequest{
RepoID: repo.ID,
IssueID: issue.ID,
Title: issue.Title,
Body: issue.Content,
CustomInstructions: aiCfg.IssueInstructions,
})
if err != nil {
return fmt.Errorf("AI GenerateIssueResponse failed: %w", err)
}
opLog.InputTokens = resp.InputTokens
opLog.OutputTokens = resp.OutputTokens
// Post the response as a comment from the AI bot user
botUser := user_model.NewAIUser()
comment, err := issue_service.CreateIssueComment(ctx, botUser, repo, issue, resp.Response, nil)
if err != nil {
return fmt.Errorf("failed to create comment: %w", err)
}
opLog.ResultCommentID = comment.ID
return nil
}
func handleIssueTriage(ctx context.Context, repo *repo_model.Repository, _ *repo_model.AIConfig, opLog *ai_model.OperationLog) error {
issue, err := issues_model.GetIssueByID(ctx, opLog.TargetID)
if err != nil {
return fmt.Errorf("failed to load issue %d: %w", opLog.TargetID, err)
}
issue.Repo = repo
triageResp, err := TriageIssue(ctx, issue)
if err != nil {
return fmt.Errorf("AI TriageIssue failed: %w", err)
}
// Apply suggested labels
botUser := user_model.NewAIUser()
for _, labelName := range triageResp.SuggestedLabels {
label, err := issues_model.GetLabelInRepoByName(ctx, repo.ID, labelName)
if err != nil {
log.Warn("AI suggested label %q not found in repo %d", labelName, repo.ID)
continue
}
if err := issue_service.AddLabel(ctx, issue, botUser, label); err != nil {
log.Error("Failed to add label %q to issue %d: %v", labelName, issue.Index, err)
}
}
return nil
}
func handleCodeReview(ctx context.Context, repo *repo_model.Repository, _ *repo_model.AIConfig, opLog *ai_model.OperationLog) error {
issue, err := issues_model.GetIssueByID(ctx, opLog.TargetID)
if err != nil {
return fmt.Errorf("failed to load issue %d: %w", opLog.TargetID, err)
}
if err := issue.LoadPullRequest(ctx); err != nil {
return fmt.Errorf("failed to load pull request: %w", err)
}
issue.Repo = repo
reviewResp, err := ReviewPullRequest(ctx, issue.PullRequest)
if err != nil {
return fmt.Errorf("AI ReviewPullRequest failed: %w", err)
}
// Post the review summary as a comment
botUser := user_model.NewAIUser()
comment, err := issue_service.CreateIssueComment(ctx, botUser, repo, issue, reviewResp.Summary, nil)
if err != nil {
return fmt.Errorf("failed to create review comment: %w", err)
}
opLog.ResultCommentID = comment.ID
return nil
}
func handleAgentFix(ctx context.Context, repo *repo_model.Repository, aiCfg *repo_model.AIConfig, opLog *ai_model.OperationLog) error {
// Tier 2: Trigger an Actions workflow for Claude Code headless
runID, err := triggerAgentWorkflow(ctx, repo, aiCfg, opLog)
if err != nil {
return fmt.Errorf("failed to trigger agent workflow: %w", err)
}
opLog.ActionRunID = runID
return nil
}