// Copyright 2023 The Gitea Authors and MarketAlly. All rights reserved. // SPDX-License-Identifier: MIT package poll import ( "context" "errors" "fmt" "sync" "sync/atomic" runnerv1 "code.gitea.io/actions-proto-go/runner/v1" "connectrpc.com/connect" log "github.com/sirupsen/logrus" "golang.org/x/time/rate" "git.marketally.com/gitcaddy/gitcaddy-runner/internal/app/run" "git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/cleanup" "git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/client" "git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/config" "git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/envcheck" ) type Poller struct { client client.Client runner *run.Runner cfg *config.Config tasksVersion atomic.Int64 // tasksVersion used to store the version of the last task fetched from the Gitea. bandwidthManager *envcheck.BandwidthManager pollingCtx context.Context shutdownPolling context.CancelFunc jobsCtx context.Context shutdownJobs context.CancelFunc done chan struct{} } func New(ctx context.Context, cfg *config.Config, client client.Client, runner *run.Runner) *Poller { // Inherit from parent context so shutdown signals propagate properly pollingCtx, shutdownPolling := context.WithCancel(ctx) jobsCtx, shutdownJobs := context.WithCancel(ctx) done := make(chan struct{}) return &Poller{ client: client, runner: runner, cfg: cfg, pollingCtx: pollingCtx, shutdownPolling: shutdownPolling, jobsCtx: jobsCtx, shutdownJobs: shutdownJobs, done: done, } } // SetBandwidthManager sets the bandwidth manager for on-demand testing func (p *Poller) SetBandwidthManager(bm *envcheck.BandwidthManager) { p.bandwidthManager = bm } func (p *Poller) Poll() { limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1) wg := &sync.WaitGroup{} for i := 0; i < p.cfg.Runner.Capacity; i++ { wg.Add(1) go p.poll(wg, limiter) } wg.Wait() // signal that we shutdown close(p.done) } func (p *Poller) PollOnce() { limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1) p.pollOnce(limiter) // signal that we're done close(p.done) } func (p *Poller) Shutdown(ctx context.Context) error { p.shutdownPolling() select { // graceful shutdown completed succesfully case <-p.done: return nil // our timeout for shutting down ran out case <-ctx.Done(): // when both the timeout fires and the graceful shutdown // completed succsfully, this branch of the select may // fire. Do a non-blocking check here against the graceful // shutdown status to avoid sending an error if we don't need to. _, ok := <-p.done if !ok { return nil } // force a shutdown of all running jobs p.shutdownJobs() // wait for running jobs to report their status to Gitea _, _ = <-p.done return ctx.Err() } } func (p *Poller) poll(wg *sync.WaitGroup, limiter *rate.Limiter) { defer wg.Done() for { p.pollOnce(limiter) select { case <-p.pollingCtx.Done(): return default: continue } } } func (p *Poller) pollOnce(limiter *rate.Limiter) { for { if err := limiter.Wait(p.pollingCtx); err != nil { if p.pollingCtx.Err() != nil { log.WithError(err).Debug("limiter wait failed") } return } task, ok := p.fetchTask(p.pollingCtx) if !ok { continue } p.runTaskWithRecover(p.jobsCtx, task) return } } func (p *Poller) runTaskWithRecover(ctx context.Context, task *runnerv1.Task) { defer func() { if r := recover(); r != nil { err := fmt.Errorf("panic: %v", r) log.WithError(err).Error("panic in runTaskWithRecover") } }() if err := p.runner.Run(ctx, task); err != nil { log.WithError(err).Error("failed to run task") } } func (p *Poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) { reqCtx, cancel := context.WithTimeout(ctx, p.cfg.Runner.FetchTimeout) defer cancel() // Detect capabilities including current disk space caps := envcheck.DetectCapabilities(ctx, p.cfg.Container.DockerHost, p.cfg.Container.WorkdirParent, p.cfg.Runner.Capacity) // Include latest bandwidth result if available if p.bandwidthManager != nil { caps.Bandwidth = p.bandwidthManager.GetLastResult() } capsJson := caps.ToJSON() // Load the version value that was in the cache when the request was sent. v := p.tasksVersion.Load() fetchReq := &runnerv1.FetchTaskRequest{ TasksVersion: v, CapabilitiesJson: capsJson, } resp, err := p.client.FetchTask(reqCtx, connect.NewRequest(fetchReq)) if errors.Is(err, context.DeadlineExceeded) { err = nil } if err != nil { log.WithError(err).Error("failed to fetch task") return nil, false } if resp == nil || resp.Msg == nil { return nil, false } // Check if server requested a bandwidth test if resp.Msg.RequestBandwidthTest && p.bandwidthManager != nil { log.Info("Server requested bandwidth test, running now...") go func() { result := p.bandwidthManager.RunTest(ctx) if result != nil { log.Infof("Bandwidth test completed: %.1f Mbps download, %.0f ms latency", result.DownloadMbps, result.Latency) } }() } // Check if server requested a cleanup if resp.Msg.RequestCleanup { log.Info("Server requested cleanup, running now...") go func() { result, err := cleanup.RunCleanup(ctx, p.cfg) if err != nil { log.Errorf("Cleanup failed: %v", err) } else if result != nil { log.Infof("Cleanup completed: freed %d bytes, deleted %d files in %s", result.BytesFreed, result.FilesDeleted, result.Duration) } }() } if resp.Msg.TasksVersion > v { p.tasksVersion.CompareAndSwap(v, resp.Msg.TasksVersion) } if resp.Msg.Task == nil { return nil, false } // got a task, set tasksVersion to zero to force query db in next request. p.tasksVersion.CompareAndSwap(resp.Msg.TasksVersion, 0) return resp.Msg.Task, true }