All checks were successful
CI / build-and-test (push) Successful in 1m0s
Release / build (amd64, linux) (push) Successful in 1m8s
Release / build (amd64, darwin) (push) Successful in 1m25s
Release / build (amd64, windows) (push) Successful in 51s
Release / build (arm64, darwin) (push) Successful in 1m0s
Release / build (arm64, linux) (push) Successful in 1m9s
Release / release (push) Successful in 26s
The v1.0.3 change that made poller contexts inherit from the parent context caused a deadlock where runners would start but never poll for tasks. Reverted to using context.Background() for pollingCtx and jobsCtx. Graceful shutdown still works via explicit Shutdown() call which cancels the polling context. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
242 lines
6.1 KiB
Go
242 lines
6.1 KiB
Go
// Copyright 2023 The Gitea Authors and MarketAlly. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
// Package poll provides task polling functionality for CI runners.
|
|
package poll
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
|
|
"connectrpc.com/connect"
|
|
log "github.com/sirupsen/logrus"
|
|
"golang.org/x/time/rate"
|
|
|
|
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/app/run"
|
|
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/cleanup"
|
|
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/client"
|
|
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/config"
|
|
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/envcheck"
|
|
)
|
|
|
|
// Poller handles task polling from the Gitea server.
|
|
type Poller struct {
|
|
client client.Client
|
|
runner *run.Runner
|
|
cfg *config.Config
|
|
tasksVersion atomic.Int64 // tasksVersion used to store the version of the last task fetched from the Gitea.
|
|
bandwidthManager *envcheck.BandwidthManager
|
|
|
|
pollingCtx context.Context
|
|
shutdownPolling context.CancelFunc
|
|
|
|
jobsCtx context.Context
|
|
shutdownJobs context.CancelFunc
|
|
|
|
done chan struct{}
|
|
}
|
|
|
|
// New creates a new Poller instance.
|
|
func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller {
|
|
// Use independent contexts - shutdown is handled explicitly via Shutdown()
|
|
pollingCtx, shutdownPolling := context.WithCancel(context.Background())
|
|
jobsCtx, shutdownJobs := context.WithCancel(context.Background())
|
|
|
|
done := make(chan struct{})
|
|
|
|
return &Poller{
|
|
client: client,
|
|
runner: runner,
|
|
cfg: cfg,
|
|
|
|
pollingCtx: pollingCtx,
|
|
shutdownPolling: shutdownPolling,
|
|
|
|
jobsCtx: jobsCtx,
|
|
shutdownJobs: shutdownJobs,
|
|
|
|
done: done,
|
|
}
|
|
}
|
|
|
|
// SetBandwidthManager sets the bandwidth manager for on-demand testing
|
|
func (p *Poller) SetBandwidthManager(bm *envcheck.BandwidthManager) {
|
|
p.bandwidthManager = bm
|
|
}
|
|
|
|
// Poll starts polling for tasks with the configured capacity.
|
|
func (p *Poller) Poll() {
|
|
limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1)
|
|
wg := &sync.WaitGroup{}
|
|
for i := 0; i < p.cfg.Runner.Capacity; i++ {
|
|
wg.Add(1)
|
|
go p.poll(wg, limiter)
|
|
}
|
|
wg.Wait()
|
|
|
|
// signal that we shutdown
|
|
close(p.done)
|
|
}
|
|
|
|
// PollOnce polls for a single task and then exits.
|
|
func (p *Poller) PollOnce() {
|
|
limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1)
|
|
|
|
p.pollOnce(limiter)
|
|
|
|
// signal that we're done
|
|
close(p.done)
|
|
}
|
|
|
|
// Shutdown gracefully stops the poller.
|
|
func (p *Poller) Shutdown(ctx context.Context) error {
|
|
p.shutdownPolling()
|
|
|
|
select {
|
|
// graceful shutdown completed successfully
|
|
case <-p.done:
|
|
return nil
|
|
|
|
// our timeout for shutting down ran out
|
|
case <-ctx.Done():
|
|
// when both the timeout fires and the graceful shutdown
|
|
// completed successfully, this branch of the select may
|
|
// fire. Do a non-blocking check here against the graceful
|
|
// shutdown status to avoid sending an error if we don't need to.
|
|
_, ok := <-p.done
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
// force a shutdown of all running jobs
|
|
p.shutdownJobs()
|
|
|
|
// wait for running jobs to report their status to Gitea
|
|
<-p.done
|
|
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
func (p *Poller) poll(wg *sync.WaitGroup, limiter *rate.Limiter) {
|
|
defer wg.Done()
|
|
for {
|
|
p.pollOnce(limiter)
|
|
|
|
select {
|
|
case <-p.pollingCtx.Done():
|
|
return
|
|
default:
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *Poller) pollOnce(limiter *rate.Limiter) {
|
|
for {
|
|
if err := limiter.Wait(p.pollingCtx); err != nil {
|
|
if p.pollingCtx.Err() != nil {
|
|
log.WithError(err).Debug("limiter wait failed")
|
|
}
|
|
return
|
|
}
|
|
task, ok := p.fetchTask(p.pollingCtx)
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
p.runTaskWithRecover(p.jobsCtx, task)
|
|
return
|
|
}
|
|
}
|
|
|
|
func (p *Poller) runTaskWithRecover(ctx context.Context, task *runnerv1.Task) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
err := fmt.Errorf("panic: %v", r)
|
|
log.WithError(err).Error("panic in runTaskWithRecover")
|
|
}
|
|
}()
|
|
|
|
if err := p.runner.Run(ctx, task); err != nil {
|
|
log.WithError(err).Error("failed to run task")
|
|
}
|
|
}
|
|
|
|
func (p *Poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) {
|
|
reqCtx, cancel := context.WithTimeout(ctx, p.cfg.Runner.FetchTimeout)
|
|
defer cancel()
|
|
|
|
// Detect capabilities including current disk space
|
|
caps := envcheck.DetectCapabilities(ctx, p.cfg.Container.DockerHost, p.cfg.Container.WorkdirParent, p.cfg.Runner.Capacity)
|
|
|
|
// Include latest bandwidth result if available
|
|
if p.bandwidthManager != nil {
|
|
caps.Bandwidth = p.bandwidthManager.GetLastResult()
|
|
}
|
|
|
|
capsJSON := caps.ToJSON()
|
|
|
|
// Load the version value that was in the cache when the request was sent.
|
|
v := p.tasksVersion.Load()
|
|
fetchReq := &runnerv1.FetchTaskRequest{
|
|
TasksVersion: v,
|
|
CapabilitiesJson: capsJSON,
|
|
}
|
|
resp, err := p.client.FetchTask(reqCtx, connect.NewRequest(fetchReq))
|
|
if errors.Is(err, context.DeadlineExceeded) {
|
|
err = nil
|
|
}
|
|
if err != nil {
|
|
log.WithError(err).Error("failed to fetch task")
|
|
return nil, false
|
|
}
|
|
|
|
if resp == nil || resp.Msg == nil {
|
|
return nil, false
|
|
}
|
|
|
|
// Check if server requested a bandwidth test
|
|
if resp.Msg.RequestBandwidthTest && p.bandwidthManager != nil {
|
|
log.Info("Server requested bandwidth test, running now...")
|
|
go func() {
|
|
result := p.bandwidthManager.RunTest(ctx)
|
|
if result != nil {
|
|
log.Infof("Bandwidth test completed: %.1f Mbps download, %.0f ms latency",
|
|
result.DownloadMbps, result.Latency)
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Check if server requested a cleanup
|
|
if resp.Msg.RequestCleanup {
|
|
log.Info("Server requested cleanup, running now...")
|
|
go func() {
|
|
result, err := cleanup.RunCleanup(ctx, p.cfg)
|
|
if err != nil {
|
|
log.Errorf("Cleanup failed: %v", err)
|
|
} else if result != nil {
|
|
log.Infof("Cleanup completed: freed %d bytes, deleted %d files in %s",
|
|
result.BytesFreed, result.FilesDeleted, result.Duration)
|
|
}
|
|
}()
|
|
}
|
|
|
|
if resp.Msg.TasksVersion > v {
|
|
p.tasksVersion.CompareAndSwap(v, resp.Msg.TasksVersion)
|
|
}
|
|
|
|
if resp.Msg.Task == nil {
|
|
return nil, false
|
|
}
|
|
|
|
// got a task, set tasksVersion to zero to force query db in next request.
|
|
p.tasksVersion.CompareAndSwap(resp.Msg.TasksVersion, 0)
|
|
|
|
return resp.Msg.Task, true
|
|
}
|