2
0
Files
gitcaddy-runner/internal/app/cmd/daemon.go
logikonline 899ca015b1
All checks were successful
CI / build-and-test (push) Successful in 1m0s
Release / build (amd64, linux) (push) Successful in 1m8s
Release / build (amd64, darwin) (push) Successful in 1m25s
Release / build (amd64, windows) (push) Successful in 51s
Release / build (arm64, darwin) (push) Successful in 1m0s
Release / build (arm64, linux) (push) Successful in 1m9s
Release / release (push) Successful in 26s
fix(poll): revert context inheritance to prevent deadlock
The v1.0.3 change that made poller contexts inherit from the parent
context caused a deadlock where runners would start but never poll
for tasks.

Reverted to using context.Background() for pollingCtx and jobsCtx.
Graceful shutdown still works via explicit Shutdown() call which
cancels the polling context.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-25 13:51:47 -05:00

439 lines
14 KiB
Go

// Copyright 2022 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package cmd
import (
"context"
"errors"
"fmt"
"os"
"path"
"path/filepath"
"runtime"
"slices"
"strconv"
"strings"
"sync"
"time"
"connectrpc.com/connect"
"github.com/mattn/go-isatty"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/app/poll"
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/app/run"
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/cleanup"
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/client"
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/config"
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/envcheck"
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/labels"
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/ver"
)
const (
// DiskSpaceWarningThreshold is the percentage at which to warn about low disk space
DiskSpaceWarningThreshold = 85.0
// DiskSpaceCriticalThreshold is the percentage at which to log critical warnings
DiskSpaceCriticalThreshold = 95.0
// DiskSpaceAutoCleanupThreshold is the percentage at which to trigger automatic cleanup
DiskSpaceAutoCleanupThreshold = 85.0
// CleanupCooldown is the minimum time between automatic cleanups
CleanupCooldown = 10 * time.Minute
// CapabilitiesUpdateInterval is how often to update capabilities (including disk space)
CapabilitiesUpdateInterval = 5 * time.Minute
// BandwidthTestInterval is how often to run bandwidth tests (hourly)
BandwidthTestInterval = 1 * time.Hour
)
// Global bandwidth manager - accessible for triggering manual tests
var bandwidthManager *envcheck.BandwidthManager
// Global cleanup state
var (
lastCleanupTime time.Time
cleanupMutex sync.Mutex
globalConfig *config.Config
)
func runDaemon(ctx context.Context, daemArgs *daemonArgs, configFile *string) func(cmd *cobra.Command, args []string) error {
return func(_ *cobra.Command, _ []string) error {
cfg, err := config.LoadDefault(*configFile)
if err != nil {
return fmt.Errorf("invalid configuration: %w", err)
}
// Store config globally for auto-cleanup
globalConfig = cfg
initLogging(cfg)
log.Infoln("Starting runner daemon")
reg, err := config.LoadRegistration(cfg.Runner.File)
if os.IsNotExist(err) {
log.Error("registration file not found, please register the runner first")
return err
} else if err != nil {
return fmt.Errorf("failed to load registration file: %w", err)
}
lbls := reg.Labels
if len(cfg.Runner.Labels) > 0 {
lbls = cfg.Runner.Labels
}
ls := labels.Labels{}
for _, l := range lbls {
label, err := labels.Parse(l)
if err != nil {
log.WithError(err).Warnf("ignored invalid label %q", l)
continue
}
ls = append(ls, label)
}
if len(ls) == 0 {
log.Warn("no labels configured, runner may not be able to pick up jobs")
}
if ls.RequireDocker() || cfg.Container.RequireDocker {
// Wait for dockerd be ready
if timeout := cfg.Container.DockerTimeout; timeout > 0 {
tctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
keepRunning := true
for keepRunning {
dockerSocketPath, err := getDockerSocketPath(cfg.Container.DockerHost)
if err != nil {
log.Errorf("Failed to get socket path: %s", err.Error())
} else if err = envcheck.CheckIfDockerRunning(tctx, dockerSocketPath); errors.Is(err, context.Canceled) {
log.Infof("Docker wait timeout of %s expired", timeout.String())
break
} else if err != nil {
log.Errorf("Docker connection failed: %s", err.Error())
} else {
log.Infof("Docker is ready")
break
}
select {
case <-time.After(time.Second):
case <-tctx.Done():
log.Infof("Docker wait timeout of %s expired", timeout.String())
keepRunning = false
}
}
}
// Require dockerd be ready
dockerSocketPath, err := getDockerSocketPath(cfg.Container.DockerHost)
if err != nil {
return err
}
if err := envcheck.CheckIfDockerRunning(ctx, dockerSocketPath); err != nil {
return err
}
// if dockerSocketPath passes the check, override DOCKER_HOST with dockerSocketPath
_ = os.Setenv("DOCKER_HOST", dockerSocketPath)
// empty cfg.Container.DockerHost means act_runner need to find an available docker host automatically
// and assign the path to cfg.Container.DockerHost
if cfg.Container.DockerHost == "" {
cfg.Container.DockerHost = dockerSocketPath
}
// check the scheme, if the scheme is not npipe or unix
// set cfg.Container.DockerHost to "-" because it can't be mounted to the job container
if protoIndex := strings.Index(cfg.Container.DockerHost, "://"); protoIndex != -1 {
scheme := cfg.Container.DockerHost[:protoIndex]
if !strings.EqualFold(scheme, "npipe") && !strings.EqualFold(scheme, "unix") {
cfg.Container.DockerHost = "-"
}
}
}
if !slices.Equal(reg.Labels, ls.ToStrings()) {
reg.Labels = ls.ToStrings()
if err := config.SaveRegistration(cfg.Runner.File, reg); err != nil {
return fmt.Errorf("failed to save runner config: %w", err)
}
log.Infof("labels updated to: %v", reg.Labels)
}
cli := client.New(
reg.Address,
cfg.Runner.Insecure,
reg.UUID,
reg.Token,
ver.Version(),
)
runner := run.NewRunner(cfg, reg, cli)
// Detect runner capabilities for AI-friendly workflow generation
dockerHost := cfg.Container.DockerHost
if dockerHost == "" {
if dh, err := getDockerSocketPath(""); err == nil {
dockerHost = dh
}
}
// Initialize bandwidth manager with the Gitea server address
bandwidthManager = envcheck.NewBandwidthManager(reg.Address, BandwidthTestInterval)
bandwidthManager.Start(ctx)
log.Infof("bandwidth manager started, testing against: %s", reg.Address)
capabilities := envcheck.DetectCapabilities(ctx, dockerHost, cfg.Container.WorkdirParent, globalConfig.Runner.Capacity)
// Include initial bandwidth result if available
capabilities.Bandwidth = bandwidthManager.GetLastResult()
capabilitiesJSON := capabilities.ToJSON()
log.Infof("detected capabilities: %s", capabilitiesJSON)
// Check disk space and warn if low
checkDiskSpaceAndCleanup(ctx, capabilities)
// declare the labels of the runner before fetching tasks
resp, err := runner.Declare(ctx, ls.Names(), capabilitiesJSON)
switch {
case err != nil && connect.CodeOf(err) == connect.CodeUnimplemented:
log.Errorf("Your GitCaddy version is too old to support runner declare, please upgrade to v1.21 or later")
return err
case err != nil:
log.WithError(err).Error("fail to invoke Declare")
return err
default:
log.Infof("runner: %s, with version: %s, with labels: %v, declare successfully",
resp.Msg.Runner.Name, resp.Msg.Runner.Version, resp.Msg.Runner.Labels)
}
// Start periodic capabilities update goroutine
go periodicCapabilitiesUpdate(ctx, runner, ls.Names(), dockerHost, cfg.Container.WorkdirParent)
// Start periodic stale job cache cleanup (every hour, remove caches older than 2 hours)
go func() {
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
runner.CleanStaleJobCaches(2 * time.Hour)
}
}
}()
poller := poll.New(cfg, cli, runner)
poller.SetBandwidthManager(bandwidthManager)
if daemArgs.Once || reg.Ephemeral {
done := make(chan struct{})
go func() {
defer close(done)
poller.PollOnce()
}()
// shutdown when we complete a job or cancel is requested
select {
case <-ctx.Done():
case <-done:
}
} else {
go poller.Poll()
<-ctx.Done()
}
log.Infof("runner: %s shutdown initiated, waiting %s for running jobs to complete before shutting down", resp.Msg.Runner.Name, cfg.Runner.ShutdownTimeout)
ctx, cancel := context.WithTimeout(context.Background(), cfg.Runner.ShutdownTimeout)
defer cancel()
err = poller.Shutdown(ctx)
if err != nil {
log.Warnf("runner: %s cancelled in progress jobs during shutdown", resp.Msg.Runner.Name)
}
return nil
}
}
// checkDiskSpaceAndCleanup logs warnings if disk space is low and triggers cleanup if needed
func checkDiskSpaceAndCleanup(ctx context.Context, capabilities *envcheck.RunnerCapabilities) {
if capabilities.Disk == nil {
return
}
usedPercent := capabilities.Disk.UsedPercent
freeGB := float64(capabilities.Disk.Free) / (1024 * 1024 * 1024)
switch {
case usedPercent >= DiskSpaceCriticalThreshold:
log.Errorf("CRITICAL: Disk space critically low! %.1f%% used, only %.2f GB free. Runner may fail to execute jobs!", usedPercent, freeGB)
// Always try cleanup at critical level
triggerAutoCleanup(ctx)
case usedPercent >= DiskSpaceAutoCleanupThreshold:
log.Warnf("WARNING: Disk space at %.1f%% used (%.2f GB free). Triggering automatic cleanup.", usedPercent, freeGB)
triggerAutoCleanup(ctx)
case usedPercent >= DiskSpaceWarningThreshold:
log.Warnf("WARNING: Disk space running low. %.1f%% used, %.2f GB free. Consider cleaning up disk space.", usedPercent, freeGB)
}
}
// triggerAutoCleanup runs cleanup if cooldown has passed
func triggerAutoCleanup(ctx context.Context) {
cleanupMutex.Lock()
defer cleanupMutex.Unlock()
// Check cooldown (except for first run)
if !lastCleanupTime.IsZero() && time.Since(lastCleanupTime) < CleanupCooldown {
log.Debugf("Skipping auto-cleanup, cooldown not expired (last cleanup: %s ago)", time.Since(lastCleanupTime))
return
}
if globalConfig == nil {
log.Warn("Cannot run auto-cleanup: config not available")
return
}
log.Info("Starting automatic disk cleanup...")
lastCleanupTime = time.Now()
go func() {
result, err := cleanup.RunCleanup(ctx, globalConfig)
if err != nil {
log.WithError(err).Error("Auto-cleanup failed")
return
}
log.Infof("Auto-cleanup completed: freed %d bytes, deleted %d files in %s",
result.BytesFreed, result.FilesDeleted, result.Duration)
if len(result.Errors) > 0 {
for _, e := range result.Errors {
log.WithError(e).Warn("Cleanup error")
}
}
}()
}
// periodicCapabilitiesUpdate periodically updates capabilities including disk space and bandwidth
func periodicCapabilitiesUpdate(ctx context.Context, runner *run.Runner, labelNames []string, dockerHost string, workingDir string) {
ticker := time.NewTicker(CapabilitiesUpdateInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Debug("stopping periodic capabilities update")
if bandwidthManager != nil {
bandwidthManager.Stop()
}
return
case <-ticker.C:
// Detect updated capabilities (disk space changes over time)
capabilities := envcheck.DetectCapabilities(ctx, dockerHost, workingDir, globalConfig.Runner.Capacity)
// Include latest bandwidth result
if bandwidthManager != nil {
capabilities.Bandwidth = bandwidthManager.GetLastResult()
}
capabilitiesJSON := capabilities.ToJSON()
// Check for disk space warnings
checkDiskSpaceAndCleanup(ctx, capabilities)
// Send updated capabilities to server
_, err := runner.Declare(ctx, labelNames, capabilitiesJSON)
if err != nil {
log.WithError(err).Debug("failed to update capabilities")
} else {
bandwidthInfo := ""
if capabilities.Bandwidth != nil {
bandwidthInfo = fmt.Sprintf(", bandwidth: %.1f Mbps", capabilities.Bandwidth.DownloadMbps)
}
log.Debugf("capabilities updated: disk %.1f%% used, %.2f GB free%s",
capabilities.Disk.UsedPercent,
float64(capabilities.Disk.Free)/(1024*1024*1024),
bandwidthInfo)
}
}
}
}
type daemonArgs struct {
Once bool
}
// initLogging setup the global logrus logger.
func initLogging(cfg *config.Config) {
callPrettyfier := func(f *runtime.Frame) (string, string) {
// get function name
s := strings.Split(f.Function, ".")
funcname := "[" + s[len(s)-1] + "]"
// get file name and line number
_, filename := path.Split(f.File)
filename = "[" + filename + ":" + strconv.Itoa(f.Line) + "]"
return funcname, filename
}
isTerm := isatty.IsTerminal(os.Stdout.Fd())
format := &log.TextFormatter{
DisableColors: !isTerm,
FullTimestamp: true,
CallerPrettyfier: callPrettyfier,
}
log.SetFormatter(format)
l := cfg.Log.Level
if l == "" {
log.Infof("Log level not set, sticking to info")
return
}
level, err := log.ParseLevel(l)
if err != nil {
log.WithError(err).
Errorf("invalid log level: %q", l)
}
// debug level
switch level {
case log.DebugLevel, log.TraceLevel:
log.SetReportCaller(true) // Only in debug or trace because it takes a performance toll
log.Infof("Log level %s requested, setting up report caller for further debugging", level)
}
if log.GetLevel() != level {
log.Infof("log level set to %v", level)
log.SetLevel(level)
}
}
var commonSocketPaths = []string{
"/var/run/docker.sock",
"/run/podman/podman.sock",
"$HOME/.colima/docker.sock",
"$XDG_RUNTIME_DIR/docker.sock",
"$XDG_RUNTIME_DIR/podman/podman.sock",
`\\.\pipe\docker_engine`,
"$HOME/.docker/run/docker.sock",
}
func getDockerSocketPath(configDockerHost string) (string, error) {
// a `-` means don't mount the docker socket to job containers
if configDockerHost != "" && configDockerHost != "-" {
return configDockerHost, nil
}
socket, found := os.LookupEnv("DOCKER_HOST")
if found {
return socket, nil
}
for _, p := range commonSocketPaths {
if _, err := os.Lstat(os.ExpandEnv(p)); err == nil {
if strings.HasPrefix(p, `\\.\`) {
return "npipe://" + filepath.ToSlash(os.ExpandEnv(p)), nil
}
return "unix://" + filepath.ToSlash(os.ExpandEnv(p)), nil
}
}
return "", fmt.Errorf("daemon Docker Engine socket not found and docker_host config was invalid")
}