From 43adbaeffe0958c33f0efbee7e95e7b03ecd8d99 Mon Sep 17 00:00:00 2001 From: logikonline Date: Tue, 27 Jan 2026 09:00:36 -0500 Subject: [PATCH] feat(actions): add stuck job rescue mechanism Introduce a cron task that rescues waiting jobs stuck due to version-sync issues by bumping the task version for affected scopes. Also bump version after each successful job pick to ensure runners re-poll for remaining waiting jobs. Configurable via STUCK_JOB_TIMEOUT (default: 5 minutes). --- modules/setting/actions.go | 2 ++ services/actions/clear_tasks.go | 35 +++++++++++++++++++++++++++++++++ services/actions/task.go | 9 +++++++++ services/cron/tasks_actions.go | 11 +++++++++++ 4 files changed, 57 insertions(+) diff --git a/modules/setting/actions.go b/modules/setting/actions.go index 28a32930e6..fee232be9d 100644 --- a/modules/setting/actions.go +++ b/modules/setting/actions.go @@ -25,6 +25,7 @@ var ( ZombieTaskTimeout time.Duration `ini:"ZOMBIE_TASK_TIMEOUT"` EndlessTaskTimeout time.Duration `ini:"ENDLESS_TASK_TIMEOUT"` AbandonedJobTimeout time.Duration `ini:"ABANDONED_JOB_TIMEOUT"` + StuckJobTimeout time.Duration `ini:"STUCK_JOB_TIMEOUT"` SkipWorkflowStrings []string `ini:"SKIP_WORKFLOW_STRINGS"` // Runner health settings @@ -165,6 +166,7 @@ func loadActionsFrom(rootCfg ConfigProvider) error { Actions.ZombieTaskTimeout = sec.Key("ZOMBIE_TASK_TIMEOUT").MustDuration(10 * time.Minute) Actions.EndlessTaskTimeout = sec.Key("ENDLESS_TASK_TIMEOUT").MustDuration(3 * time.Hour) Actions.AbandonedJobTimeout = sec.Key("ABANDONED_JOB_TIMEOUT").MustDuration(24 * time.Hour) + Actions.StuckJobTimeout = sec.Key("STUCK_JOB_TIMEOUT").MustDuration(5 * time.Minute) if !Actions.LogCompression.IsValid() { return fmt.Errorf("invalid [actions] LOG_COMPRESSION: %q", Actions.LogCompression) diff --git a/services/actions/clear_tasks.go b/services/actions/clear_tasks.go index cbc05cb10f..e1c40b1ec9 100644 --- a/services/actions/clear_tasks.go +++ b/services/actions/clear_tasks.go @@ -231,3 +231,38 @@ func CancelAbandonedJobs(ctx context.Context) error { return nil } + +// RescueStuckJobs bumps the tasks version for any scopes that have waiting +// jobs older than StuckJobTimeout. This forces runners to re-poll and attempt +// to pick up jobs that were orphaned due to version-sync issues (e.g. runner +// restarted and synced to the latest version before picking all waiting jobs, +// or bandwidth routing deferred a job that was never retried). +func RescueStuckJobs(ctx context.Context) error { + jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{ + Statuses: []actions_model.Status{actions_model.StatusWaiting}, + UpdatedBefore: timeutil.TimeStampNow().AddDuration(-setting.Actions.StuckJobTimeout), + }) + if err != nil { + return fmt.Errorf("find stuck waiting jobs: %w", err) + } + if len(jobs) == 0 { + return nil + } + + // Collect unique scopes that need a version bump + type scope struct{ ownerID, repoID int64 } + seen := make(map[scope]bool) + for _, job := range jobs { + s := scope{job.OwnerID, job.RepoID} + if !seen[s] { + seen[s] = true + if err := actions_model.IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil { + log.Error("RescueStuckJobs: failed to bump version for owner=%d repo=%d: %v", job.OwnerID, job.RepoID, err) + } else { + log.Info("RescueStuckJobs: bumped tasks version for owner=%d repo=%d (%d stuck jobs)", job.OwnerID, job.RepoID, len(jobs)) + } + } + } + + return nil +} diff --git a/services/actions/task.go b/services/actions/task.go index 9bd3c5ef97..78f7cf50b7 100644 --- a/services/actions/task.go +++ b/services/actions/task.go @@ -11,6 +11,7 @@ import ( actions_model "code.gitcaddy.com/server/v3/models/actions" "code.gitcaddy.com/server/v3/models/db" secret_model "code.gitcaddy.com/server/v3/models/secret" + "code.gitcaddy.com/server/v3/modules/log" notify_service "code.gitcaddy.com/server/v3/services/notify" runnerv1 "code.gitea.io/actions-proto-go/runner/v1" @@ -97,6 +98,14 @@ func PickTask(ctx context.Context, runner *actions_model.ActionRunner) (*runnerv return nil, false, nil } + // Bump the tasks version after a successful pick so that runners re-poll + // and pick up any remaining waiting jobs. Without this, the version stays + // unchanged after a Waiting→Running transition, and runners that have + // already synced to the latest version will never attempt to pick again. + if err := actions_model.IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil { + log.Error("PickTask: failed to increase task version after pick: %v", err) + } + CreateCommitStatusForRunJobs(ctx, job.Run, job) notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, actionTask) diff --git a/services/cron/tasks_actions.go b/services/cron/tasks_actions.go index eb53c84ea1..de60187871 100644 --- a/services/cron/tasks_actions.go +++ b/services/cron/tasks_actions.go @@ -18,6 +18,7 @@ func initActionsTasks() { registerStopZombieTasks() registerStopEndlessTasks() registerCancelAbandonedJobs() + registerRescueStuckJobs() registerScheduleTasks() registerActionsCleanup() } @@ -52,6 +53,16 @@ func registerCancelAbandonedJobs() { }) } +func registerRescueStuckJobs() { + RegisterTaskFatal("rescue_stuck_jobs", &BaseConfig{ + Enabled: true, + RunAtStart: true, + Schedule: "@every 5m", + }, func(ctx context.Context, _ *user_model.User, cfg Config) error { + return actions_service.RescueStuckJobs(ctx) + }) +} + // registerScheduleTasks registers a scheduled task that runs every minute to start any due schedule tasks. func registerScheduleTasks() { // Register the task with a unique name, enabled status, and schedule for every minute.