2
0
Files
gitcaddy-runner/internal/app/poll/poller.go
logikonline 899ca015b1
All checks were successful
CI / build-and-test (push) Successful in 1m0s
Release / build (amd64, linux) (push) Successful in 1m8s
Release / build (amd64, darwin) (push) Successful in 1m25s
Release / build (amd64, windows) (push) Successful in 51s
Release / build (arm64, darwin) (push) Successful in 1m0s
Release / build (arm64, linux) (push) Successful in 1m9s
Release / release (push) Successful in 26s
fix(poll): revert context inheritance to prevent deadlock
The v1.0.3 change that made poller contexts inherit from the parent
context caused a deadlock where runners would start but never poll
for tasks.

Reverted to using context.Background() for pollingCtx and jobsCtx.
Graceful shutdown still works via explicit Shutdown() call which
cancels the polling context.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-25 13:51:47 -05:00

242 lines
6.1 KiB
Go

// Copyright 2023 The Gitea Authors and MarketAlly. All rights reserved.
// SPDX-License-Identifier: MIT
// Package poll provides task polling functionality for CI runners.
package poll
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
"connectrpc.com/connect"
log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/app/run"
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/cleanup"
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/client"
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/config"
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/envcheck"
)
// Poller handles task polling from the Gitea server.
type Poller struct {
client client.Client
runner *run.Runner
cfg *config.Config
tasksVersion atomic.Int64 // tasksVersion used to store the version of the last task fetched from the Gitea.
bandwidthManager *envcheck.BandwidthManager
pollingCtx context.Context
shutdownPolling context.CancelFunc
jobsCtx context.Context
shutdownJobs context.CancelFunc
done chan struct{}
}
// New creates a new Poller instance.
func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller {
// Use independent contexts - shutdown is handled explicitly via Shutdown()
pollingCtx, shutdownPolling := context.WithCancel(context.Background())
jobsCtx, shutdownJobs := context.WithCancel(context.Background())
done := make(chan struct{})
return &Poller{
client: client,
runner: runner,
cfg: cfg,
pollingCtx: pollingCtx,
shutdownPolling: shutdownPolling,
jobsCtx: jobsCtx,
shutdownJobs: shutdownJobs,
done: done,
}
}
// SetBandwidthManager sets the bandwidth manager for on-demand testing
func (p *Poller) SetBandwidthManager(bm *envcheck.BandwidthManager) {
p.bandwidthManager = bm
}
// Poll starts polling for tasks with the configured capacity.
func (p *Poller) Poll() {
limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1)
wg := &sync.WaitGroup{}
for i := 0; i < p.cfg.Runner.Capacity; i++ {
wg.Add(1)
go p.poll(wg, limiter)
}
wg.Wait()
// signal that we shutdown
close(p.done)
}
// PollOnce polls for a single task and then exits.
func (p *Poller) PollOnce() {
limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1)
p.pollOnce(limiter)
// signal that we're done
close(p.done)
}
// Shutdown gracefully stops the poller.
func (p *Poller) Shutdown(ctx context.Context) error {
p.shutdownPolling()
select {
// graceful shutdown completed successfully
case <-p.done:
return nil
// our timeout for shutting down ran out
case <-ctx.Done():
// when both the timeout fires and the graceful shutdown
// completed successfully, this branch of the select may
// fire. Do a non-blocking check here against the graceful
// shutdown status to avoid sending an error if we don't need to.
_, ok := <-p.done
if !ok {
return nil
}
// force a shutdown of all running jobs
p.shutdownJobs()
// wait for running jobs to report their status to Gitea
<-p.done
return ctx.Err()
}
}
func (p *Poller) poll(wg *sync.WaitGroup, limiter *rate.Limiter) {
defer wg.Done()
for {
p.pollOnce(limiter)
select {
case <-p.pollingCtx.Done():
return
default:
continue
}
}
}
func (p *Poller) pollOnce(limiter *rate.Limiter) {
for {
if err := limiter.Wait(p.pollingCtx); err != nil {
if p.pollingCtx.Err() != nil {
log.WithError(err).Debug("limiter wait failed")
}
return
}
task, ok := p.fetchTask(p.pollingCtx)
if !ok {
continue
}
p.runTaskWithRecover(p.jobsCtx, task)
return
}
}
func (p *Poller) runTaskWithRecover(ctx context.Context, task *runnerv1.Task) {
defer func() {
if r := recover(); r != nil {
err := fmt.Errorf("panic: %v", r)
log.WithError(err).Error("panic in runTaskWithRecover")
}
}()
if err := p.runner.Run(ctx, task); err != nil {
log.WithError(err).Error("failed to run task")
}
}
func (p *Poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) {
reqCtx, cancel := context.WithTimeout(ctx, p.cfg.Runner.FetchTimeout)
defer cancel()
// Detect capabilities including current disk space
caps := envcheck.DetectCapabilities(ctx, p.cfg.Container.DockerHost, p.cfg.Container.WorkdirParent, p.cfg.Runner.Capacity)
// Include latest bandwidth result if available
if p.bandwidthManager != nil {
caps.Bandwidth = p.bandwidthManager.GetLastResult()
}
capsJSON := caps.ToJSON()
// Load the version value that was in the cache when the request was sent.
v := p.tasksVersion.Load()
fetchReq := &runnerv1.FetchTaskRequest{
TasksVersion: v,
CapabilitiesJson: capsJSON,
}
resp, err := p.client.FetchTask(reqCtx, connect.NewRequest(fetchReq))
if errors.Is(err, context.DeadlineExceeded) {
err = nil
}
if err != nil {
log.WithError(err).Error("failed to fetch task")
return nil, false
}
if resp == nil || resp.Msg == nil {
return nil, false
}
// Check if server requested a bandwidth test
if resp.Msg.RequestBandwidthTest && p.bandwidthManager != nil {
log.Info("Server requested bandwidth test, running now...")
go func() {
result := p.bandwidthManager.RunTest(ctx)
if result != nil {
log.Infof("Bandwidth test completed: %.1f Mbps download, %.0f ms latency",
result.DownloadMbps, result.Latency)
}
}()
}
// Check if server requested a cleanup
if resp.Msg.RequestCleanup {
log.Info("Server requested cleanup, running now...")
go func() {
result, err := cleanup.RunCleanup(ctx, p.cfg)
if err != nil {
log.Errorf("Cleanup failed: %v", err)
} else if result != nil {
log.Infof("Cleanup completed: freed %d bytes, deleted %d files in %s",
result.BytesFreed, result.FilesDeleted, result.Duration)
}
}()
}
if resp.Msg.TasksVersion > v {
p.tasksVersion.CompareAndSwap(v, resp.Msg.TasksVersion)
}
if resp.Msg.Task == nil {
return nil, false
}
// got a task, set tasksVersion to zero to force query db in next request.
p.tasksVersion.CompareAndSwap(resp.Msg.TasksVersion, 0)
return resp.Msg.Task, true
}