2
0
Files
logikonline 17f78a5e4c
Some checks failed
CI / build-and-test (push) Has been cancelled
Release / build (amd64, darwin) (push) Successful in 49s
Release / build (amd64, linux) (push) Successful in 1m4s
Release / build (amd64, windows) (push) Successful in 52s
Release / build (arm64, darwin) (push) Successful in 54s
Release / build (arm64, linux) (push) Successful in 47s
Release / release (push) Successful in 21s
feat(runner): merge admin-added labels from server on declare
Adds MergeServerLabels method to sync labels added by admins in Gitea UI with runner's local configuration. Called after successful declare response to incorporate any server-side label changes. Skips duplicate labels and logs invalid entries. Enables dynamic label management without requiring runner restart or config file edits.
2026-02-09 02:15:23 -05:00

441 lines
14 KiB
Go

// Copyright 2022 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package cmd
import (
"context"
"errors"
"fmt"
"os"
"path"
"path/filepath"
"runtime"
"slices"
"strconv"
"strings"
"sync"
"time"
"connectrpc.com/connect"
"github.com/mattn/go-isatty"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/app/poll"
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/app/run"
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/cleanup"
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/client"
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/config"
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/envcheck"
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/labels"
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/ver"
)
const (
// DiskSpaceWarningThreshold is the percentage at which to warn about low disk space
DiskSpaceWarningThreshold = 85.0
// DiskSpaceCriticalThreshold is the percentage at which to log critical warnings
DiskSpaceCriticalThreshold = 95.0
// DiskSpaceAutoCleanupThreshold is the percentage at which to trigger automatic cleanup
DiskSpaceAutoCleanupThreshold = 85.0
// CleanupCooldown is the minimum time between automatic cleanups
CleanupCooldown = 10 * time.Minute
// CapabilitiesUpdateInterval is how often to update capabilities (including disk space)
CapabilitiesUpdateInterval = 5 * time.Minute
// BandwidthTestInterval is how often to run bandwidth tests (hourly)
BandwidthTestInterval = 1 * time.Hour
)
// Global bandwidth manager - accessible for triggering manual tests
var bandwidthManager *envcheck.BandwidthManager
// Global cleanup state
var (
lastCleanupTime time.Time
cleanupMutex sync.Mutex
globalConfig *config.Config
)
func runDaemon(ctx context.Context, daemArgs *daemonArgs, configFile *string) func(cmd *cobra.Command, args []string) error {
return func(_ *cobra.Command, _ []string) error {
cfg, err := config.LoadDefault(*configFile)
if err != nil {
return fmt.Errorf("invalid configuration: %w", err)
}
// Store config globally for auto-cleanup
globalConfig = cfg
initLogging(cfg)
log.Infoln("Starting runner daemon")
reg, err := config.LoadRegistration(cfg.Runner.File)
if os.IsNotExist(err) {
log.Error("registration file not found, please register the runner first")
return err
} else if err != nil {
return fmt.Errorf("failed to load registration file: %w", err)
}
lbls := reg.Labels
if len(cfg.Runner.Labels) > 0 {
lbls = cfg.Runner.Labels
}
ls := labels.Labels{}
for _, l := range lbls {
label, err := labels.Parse(l)
if err != nil {
log.WithError(err).Warnf("ignored invalid label %q", l)
continue
}
ls = append(ls, label)
}
if len(ls) == 0 {
log.Warn("no labels configured, runner may not be able to pick up jobs")
}
if ls.RequireDocker() || cfg.Container.RequireDocker {
// Wait for dockerd be ready
if timeout := cfg.Container.DockerTimeout; timeout > 0 {
tctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
keepRunning := true
for keepRunning {
dockerSocketPath, err := getDockerSocketPath(cfg.Container.DockerHost)
if err != nil {
log.Errorf("Failed to get socket path: %s", err.Error())
} else if err = envcheck.CheckIfDockerRunning(tctx, dockerSocketPath); errors.Is(err, context.Canceled) {
log.Infof("Docker wait timeout of %s expired", timeout.String())
break
} else if err != nil {
log.Errorf("Docker connection failed: %s", err.Error())
} else {
log.Infof("Docker is ready")
break
}
select {
case <-time.After(time.Second):
case <-tctx.Done():
log.Infof("Docker wait timeout of %s expired", timeout.String())
keepRunning = false
}
}
}
// Require dockerd be ready
dockerSocketPath, err := getDockerSocketPath(cfg.Container.DockerHost)
if err != nil {
return err
}
if err := envcheck.CheckIfDockerRunning(ctx, dockerSocketPath); err != nil {
return err
}
// if dockerSocketPath passes the check, override DOCKER_HOST with dockerSocketPath
_ = os.Setenv("DOCKER_HOST", dockerSocketPath)
// empty cfg.Container.DockerHost means act_runner need to find an available docker host automatically
// and assign the path to cfg.Container.DockerHost
if cfg.Container.DockerHost == "" {
cfg.Container.DockerHost = dockerSocketPath
}
// check the scheme, if the scheme is not npipe or unix
// set cfg.Container.DockerHost to "-" because it can't be mounted to the job container
if protoIndex := strings.Index(cfg.Container.DockerHost, "://"); protoIndex != -1 {
scheme := cfg.Container.DockerHost[:protoIndex]
if !strings.EqualFold(scheme, "npipe") && !strings.EqualFold(scheme, "unix") {
cfg.Container.DockerHost = "-"
}
}
}
if !slices.Equal(reg.Labels, ls.ToStrings()) {
reg.Labels = ls.ToStrings()
if err := config.SaveRegistration(cfg.Runner.File, reg); err != nil {
return fmt.Errorf("failed to save runner config: %w", err)
}
log.Infof("labels updated to: %v", reg.Labels)
}
cli := client.New(
reg.Address,
cfg.Runner.Insecure,
reg.UUID,
reg.Token,
ver.Version(),
)
runner := run.NewRunner(cfg, reg, cli)
// Detect runner capabilities for AI-friendly workflow generation
dockerHost := cfg.Container.DockerHost
if dockerHost == "" {
if dh, err := getDockerSocketPath(""); err == nil {
dockerHost = dh
}
}
// Initialize bandwidth manager with the Gitea server address
bandwidthManager = envcheck.NewBandwidthManager(reg.Address, BandwidthTestInterval)
bandwidthManager.Start(ctx)
log.Infof("bandwidth manager started, testing against: %s", reg.Address)
capabilities := envcheck.DetectCapabilities(ctx, dockerHost, cfg.Container.WorkdirParent, globalConfig.Runner.Capacity)
// Include initial bandwidth result if available
capabilities.Bandwidth = bandwidthManager.GetLastResult()
capabilitiesJSON := capabilities.ToJSON()
log.Infof("detected capabilities: %s", capabilitiesJSON)
// Check disk space and warn if low
checkDiskSpaceAndCleanup(ctx, capabilities)
// declare the labels of the runner before fetching tasks
resp, err := runner.Declare(ctx, ls.Names(), capabilitiesJSON)
switch {
case err != nil && connect.CodeOf(err) == connect.CodeUnimplemented:
log.Errorf("Your GitCaddy version is too old to support runner declare, please upgrade to v1.21 or later")
return err
case err != nil:
log.WithError(err).Error("fail to invoke Declare")
return err
default:
log.Infof("runner: %s, with version: %s, with labels: %v, declare successfully",
resp.Msg.Runner.Name, resp.Msg.Runner.Version, resp.Msg.Runner.Labels)
// Merge any admin-added labels from the server
runner.MergeServerLabels(resp.Msg.Runner.Labels)
}
// Start periodic capabilities update goroutine
go periodicCapabilitiesUpdate(ctx, runner, ls.Names(), dockerHost, cfg.Container.WorkdirParent)
// Start periodic stale job cache cleanup (every hour, remove caches older than 2 hours)
go func() {
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
runner.CleanStaleJobCaches(2 * time.Hour)
}
}
}()
poller := poll.New(cfg, cli, runner)
poller.SetBandwidthManager(bandwidthManager)
if daemArgs.Once || reg.Ephemeral {
done := make(chan struct{})
go func() {
defer close(done)
poller.PollOnce()
}()
// shutdown when we complete a job or cancel is requested
select {
case <-ctx.Done():
case <-done:
}
} else {
go poller.Poll()
<-ctx.Done()
}
log.Infof("runner: %s shutdown initiated, waiting %s for running jobs to complete before shutting down", resp.Msg.Runner.Name, cfg.Runner.ShutdownTimeout)
ctx, cancel := context.WithTimeout(context.Background(), cfg.Runner.ShutdownTimeout)
defer cancel()
err = poller.Shutdown(ctx)
if err != nil {
log.Warnf("runner: %s cancelled in progress jobs during shutdown", resp.Msg.Runner.Name)
}
return nil
}
}
// checkDiskSpaceAndCleanup logs warnings if disk space is low and triggers cleanup if needed
func checkDiskSpaceAndCleanup(ctx context.Context, capabilities *envcheck.RunnerCapabilities) {
if capabilities.Disk == nil {
return
}
usedPercent := capabilities.Disk.UsedPercent
freeGB := float64(capabilities.Disk.Free) / (1024 * 1024 * 1024)
switch {
case usedPercent >= DiskSpaceCriticalThreshold:
log.Errorf("CRITICAL: Disk space critically low! %.1f%% used, only %.2f GB free. Runner may fail to execute jobs!", usedPercent, freeGB)
// Always try cleanup at critical level
triggerAutoCleanup(ctx)
case usedPercent >= DiskSpaceAutoCleanupThreshold:
log.Warnf("WARNING: Disk space at %.1f%% used (%.2f GB free). Triggering automatic cleanup.", usedPercent, freeGB)
triggerAutoCleanup(ctx)
case usedPercent >= DiskSpaceWarningThreshold:
log.Warnf("WARNING: Disk space running low. %.1f%% used, %.2f GB free. Consider cleaning up disk space.", usedPercent, freeGB)
}
}
// triggerAutoCleanup runs cleanup if cooldown has passed
func triggerAutoCleanup(ctx context.Context) {
cleanupMutex.Lock()
defer cleanupMutex.Unlock()
// Check cooldown (except for first run)
if !lastCleanupTime.IsZero() && time.Since(lastCleanupTime) < CleanupCooldown {
log.Debugf("Skipping auto-cleanup, cooldown not expired (last cleanup: %s ago)", time.Since(lastCleanupTime))
return
}
if globalConfig == nil {
log.Warn("Cannot run auto-cleanup: config not available")
return
}
log.Info("Starting automatic disk cleanup...")
lastCleanupTime = time.Now()
go func() {
result, err := cleanup.RunCleanup(ctx, globalConfig)
if err != nil {
log.WithError(err).Error("Auto-cleanup failed")
return
}
log.Infof("Auto-cleanup completed: freed %d bytes, deleted %d files in %s",
result.BytesFreed, result.FilesDeleted, result.Duration)
if len(result.Errors) > 0 {
for _, e := range result.Errors {
log.WithError(e).Warn("Cleanup error")
}
}
}()
}
// periodicCapabilitiesUpdate periodically updates capabilities including disk space and bandwidth
func periodicCapabilitiesUpdate(ctx context.Context, runner *run.Runner, labelNames []string, dockerHost string, workingDir string) {
ticker := time.NewTicker(CapabilitiesUpdateInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Debug("stopping periodic capabilities update")
if bandwidthManager != nil {
bandwidthManager.Stop()
}
return
case <-ticker.C:
// Detect updated capabilities (disk space changes over time)
capabilities := envcheck.DetectCapabilities(ctx, dockerHost, workingDir, globalConfig.Runner.Capacity)
// Include latest bandwidth result
if bandwidthManager != nil {
capabilities.Bandwidth = bandwidthManager.GetLastResult()
}
capabilitiesJSON := capabilities.ToJSON()
// Check for disk space warnings
checkDiskSpaceAndCleanup(ctx, capabilities)
// Send updated capabilities to server
_, err := runner.Declare(ctx, labelNames, capabilitiesJSON)
if err != nil {
log.WithError(err).Debug("failed to update capabilities")
} else {
bandwidthInfo := ""
if capabilities.Bandwidth != nil {
bandwidthInfo = fmt.Sprintf(", bandwidth: %.1f Mbps", capabilities.Bandwidth.DownloadMbps)
}
log.Debugf("capabilities updated: disk %.1f%% used, %.2f GB free%s",
capabilities.Disk.UsedPercent,
float64(capabilities.Disk.Free)/(1024*1024*1024),
bandwidthInfo)
}
}
}
}
type daemonArgs struct {
Once bool
}
// initLogging setup the global logrus logger.
func initLogging(cfg *config.Config) {
callPrettyfier := func(f *runtime.Frame) (string, string) {
// get function name
s := strings.Split(f.Function, ".")
funcname := "[" + s[len(s)-1] + "]"
// get file name and line number
_, filename := path.Split(f.File)
filename = "[" + filename + ":" + strconv.Itoa(f.Line) + "]"
return funcname, filename
}
isTerm := isatty.IsTerminal(os.Stdout.Fd())
format := &log.TextFormatter{
DisableColors: !isTerm,
FullTimestamp: true,
CallerPrettyfier: callPrettyfier,
}
log.SetFormatter(format)
l := cfg.Log.Level
if l == "" {
log.Infof("Log level not set, sticking to info")
return
}
level, err := log.ParseLevel(l)
if err != nil {
log.WithError(err).
Errorf("invalid log level: %q", l)
}
// debug level
switch level {
case log.DebugLevel, log.TraceLevel:
log.SetReportCaller(true) // Only in debug or trace because it takes a performance toll
log.Infof("Log level %s requested, setting up report caller for further debugging", level)
}
if log.GetLevel() != level {
log.Infof("log level set to %v", level)
log.SetLevel(level)
}
}
var commonSocketPaths = []string{
"/var/run/docker.sock",
"/run/podman/podman.sock",
"$HOME/.colima/docker.sock",
"$XDG_RUNTIME_DIR/docker.sock",
"$XDG_RUNTIME_DIR/podman/podman.sock",
`\\.\pipe\docker_engine`,
"$HOME/.docker/run/docker.sock",
}
func getDockerSocketPath(configDockerHost string) (string, error) {
// a `-` means don't mount the docker socket to job containers
if configDockerHost != "" && configDockerHost != "-" {
return configDockerHost, nil
}
socket, found := os.LookupEnv("DOCKER_HOST")
if found {
return socket, nil
}
for _, p := range commonSocketPaths {
if _, err := os.Lstat(os.ExpandEnv(p)); err == nil {
if strings.HasPrefix(p, `\\.\`) {
return "npipe://" + filepath.ToSlash(os.ExpandEnv(p)), nil
}
return "unix://" + filepath.ToSlash(os.ExpandEnv(p)), nil
}
}
return "", fmt.Errorf("daemon Docker Engine socket not found and docker_host config was invalid")
}