Some checks failed
CI / build-and-test (push) Has been cancelled
Release / build (amd64, darwin) (push) Successful in 49s
Release / build (amd64, linux) (push) Successful in 1m4s
Release / build (amd64, windows) (push) Successful in 52s
Release / build (arm64, darwin) (push) Successful in 54s
Release / build (arm64, linux) (push) Successful in 47s
Release / release (push) Successful in 21s
Adds MergeServerLabels method to sync labels added by admins in Gitea UI with runner's local configuration. Called after successful declare response to incorporate any server-side label changes. Skips duplicate labels and logs invalid entries. Enables dynamic label management without requiring runner restart or config file edits.
441 lines
14 KiB
Go
441 lines
14 KiB
Go
// Copyright 2022 The Gitea Authors. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"path"
|
|
"path/filepath"
|
|
"runtime"
|
|
"slices"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"connectrpc.com/connect"
|
|
"github.com/mattn/go-isatty"
|
|
log "github.com/sirupsen/logrus"
|
|
"github.com/spf13/cobra"
|
|
|
|
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/app/poll"
|
|
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/app/run"
|
|
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/cleanup"
|
|
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/client"
|
|
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/config"
|
|
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/envcheck"
|
|
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/labels"
|
|
"git.marketally.com/gitcaddy/gitcaddy-runner/internal/pkg/ver"
|
|
)
|
|
|
|
const (
|
|
// DiskSpaceWarningThreshold is the percentage at which to warn about low disk space
|
|
DiskSpaceWarningThreshold = 85.0
|
|
// DiskSpaceCriticalThreshold is the percentage at which to log critical warnings
|
|
DiskSpaceCriticalThreshold = 95.0
|
|
// DiskSpaceAutoCleanupThreshold is the percentage at which to trigger automatic cleanup
|
|
DiskSpaceAutoCleanupThreshold = 85.0
|
|
// CleanupCooldown is the minimum time between automatic cleanups
|
|
CleanupCooldown = 10 * time.Minute
|
|
// CapabilitiesUpdateInterval is how often to update capabilities (including disk space)
|
|
CapabilitiesUpdateInterval = 5 * time.Minute
|
|
// BandwidthTestInterval is how often to run bandwidth tests (hourly)
|
|
BandwidthTestInterval = 1 * time.Hour
|
|
)
|
|
|
|
// Global bandwidth manager - accessible for triggering manual tests
|
|
var bandwidthManager *envcheck.BandwidthManager
|
|
|
|
// Global cleanup state
|
|
var (
|
|
lastCleanupTime time.Time
|
|
cleanupMutex sync.Mutex
|
|
globalConfig *config.Config
|
|
)
|
|
|
|
func runDaemon(ctx context.Context, daemArgs *daemonArgs, configFile *string) func(cmd *cobra.Command, args []string) error {
|
|
return func(_ *cobra.Command, _ []string) error {
|
|
cfg, err := config.LoadDefault(*configFile)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid configuration: %w", err)
|
|
}
|
|
|
|
// Store config globally for auto-cleanup
|
|
globalConfig = cfg
|
|
|
|
initLogging(cfg)
|
|
log.Infoln("Starting runner daemon")
|
|
|
|
reg, err := config.LoadRegistration(cfg.Runner.File)
|
|
if os.IsNotExist(err) {
|
|
log.Error("registration file not found, please register the runner first")
|
|
return err
|
|
} else if err != nil {
|
|
return fmt.Errorf("failed to load registration file: %w", err)
|
|
}
|
|
|
|
lbls := reg.Labels
|
|
if len(cfg.Runner.Labels) > 0 {
|
|
lbls = cfg.Runner.Labels
|
|
}
|
|
|
|
ls := labels.Labels{}
|
|
for _, l := range lbls {
|
|
label, err := labels.Parse(l)
|
|
if err != nil {
|
|
log.WithError(err).Warnf("ignored invalid label %q", l)
|
|
continue
|
|
}
|
|
ls = append(ls, label)
|
|
}
|
|
if len(ls) == 0 {
|
|
log.Warn("no labels configured, runner may not be able to pick up jobs")
|
|
}
|
|
|
|
if ls.RequireDocker() || cfg.Container.RequireDocker {
|
|
// Wait for dockerd be ready
|
|
if timeout := cfg.Container.DockerTimeout; timeout > 0 {
|
|
tctx, cancel := context.WithTimeout(ctx, timeout)
|
|
defer cancel()
|
|
keepRunning := true
|
|
for keepRunning {
|
|
dockerSocketPath, err := getDockerSocketPath(cfg.Container.DockerHost)
|
|
if err != nil {
|
|
log.Errorf("Failed to get socket path: %s", err.Error())
|
|
} else if err = envcheck.CheckIfDockerRunning(tctx, dockerSocketPath); errors.Is(err, context.Canceled) {
|
|
log.Infof("Docker wait timeout of %s expired", timeout.String())
|
|
break
|
|
} else if err != nil {
|
|
log.Errorf("Docker connection failed: %s", err.Error())
|
|
} else {
|
|
log.Infof("Docker is ready")
|
|
break
|
|
}
|
|
select {
|
|
case <-time.After(time.Second):
|
|
case <-tctx.Done():
|
|
log.Infof("Docker wait timeout of %s expired", timeout.String())
|
|
keepRunning = false
|
|
}
|
|
}
|
|
}
|
|
// Require dockerd be ready
|
|
dockerSocketPath, err := getDockerSocketPath(cfg.Container.DockerHost)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := envcheck.CheckIfDockerRunning(ctx, dockerSocketPath); err != nil {
|
|
return err
|
|
}
|
|
// if dockerSocketPath passes the check, override DOCKER_HOST with dockerSocketPath
|
|
_ = os.Setenv("DOCKER_HOST", dockerSocketPath)
|
|
// empty cfg.Container.DockerHost means act_runner need to find an available docker host automatically
|
|
// and assign the path to cfg.Container.DockerHost
|
|
if cfg.Container.DockerHost == "" {
|
|
cfg.Container.DockerHost = dockerSocketPath
|
|
}
|
|
// check the scheme, if the scheme is not npipe or unix
|
|
// set cfg.Container.DockerHost to "-" because it can't be mounted to the job container
|
|
if protoIndex := strings.Index(cfg.Container.DockerHost, "://"); protoIndex != -1 {
|
|
scheme := cfg.Container.DockerHost[:protoIndex]
|
|
if !strings.EqualFold(scheme, "npipe") && !strings.EqualFold(scheme, "unix") {
|
|
cfg.Container.DockerHost = "-"
|
|
}
|
|
}
|
|
}
|
|
|
|
if !slices.Equal(reg.Labels, ls.ToStrings()) {
|
|
reg.Labels = ls.ToStrings()
|
|
if err := config.SaveRegistration(cfg.Runner.File, reg); err != nil {
|
|
return fmt.Errorf("failed to save runner config: %w", err)
|
|
}
|
|
log.Infof("labels updated to: %v", reg.Labels)
|
|
}
|
|
|
|
cli := client.New(
|
|
reg.Address,
|
|
cfg.Runner.Insecure,
|
|
reg.UUID,
|
|
reg.Token,
|
|
ver.Version(),
|
|
)
|
|
|
|
runner := run.NewRunner(cfg, reg, cli)
|
|
|
|
// Detect runner capabilities for AI-friendly workflow generation
|
|
dockerHost := cfg.Container.DockerHost
|
|
if dockerHost == "" {
|
|
if dh, err := getDockerSocketPath(""); err == nil {
|
|
dockerHost = dh
|
|
}
|
|
}
|
|
|
|
// Initialize bandwidth manager with the Gitea server address
|
|
bandwidthManager = envcheck.NewBandwidthManager(reg.Address, BandwidthTestInterval)
|
|
bandwidthManager.Start(ctx)
|
|
log.Infof("bandwidth manager started, testing against: %s", reg.Address)
|
|
|
|
capabilities := envcheck.DetectCapabilities(ctx, dockerHost, cfg.Container.WorkdirParent, globalConfig.Runner.Capacity)
|
|
// Include initial bandwidth result if available
|
|
capabilities.Bandwidth = bandwidthManager.GetLastResult()
|
|
capabilitiesJSON := capabilities.ToJSON()
|
|
log.Infof("detected capabilities: %s", capabilitiesJSON)
|
|
|
|
// Check disk space and warn if low
|
|
checkDiskSpaceAndCleanup(ctx, capabilities)
|
|
|
|
// declare the labels of the runner before fetching tasks
|
|
resp, err := runner.Declare(ctx, ls.Names(), capabilitiesJSON)
|
|
switch {
|
|
case err != nil && connect.CodeOf(err) == connect.CodeUnimplemented:
|
|
log.Errorf("Your GitCaddy version is too old to support runner declare, please upgrade to v1.21 or later")
|
|
return err
|
|
case err != nil:
|
|
log.WithError(err).Error("fail to invoke Declare")
|
|
return err
|
|
default:
|
|
log.Infof("runner: %s, with version: %s, with labels: %v, declare successfully",
|
|
resp.Msg.Runner.Name, resp.Msg.Runner.Version, resp.Msg.Runner.Labels)
|
|
// Merge any admin-added labels from the server
|
|
runner.MergeServerLabels(resp.Msg.Runner.Labels)
|
|
}
|
|
|
|
// Start periodic capabilities update goroutine
|
|
go periodicCapabilitiesUpdate(ctx, runner, ls.Names(), dockerHost, cfg.Container.WorkdirParent)
|
|
// Start periodic stale job cache cleanup (every hour, remove caches older than 2 hours)
|
|
go func() {
|
|
ticker := time.NewTicker(1 * time.Hour)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
runner.CleanStaleJobCaches(2 * time.Hour)
|
|
}
|
|
}
|
|
}()
|
|
|
|
poller := poll.New(cfg, cli, runner)
|
|
poller.SetBandwidthManager(bandwidthManager)
|
|
|
|
if daemArgs.Once || reg.Ephemeral {
|
|
done := make(chan struct{})
|
|
go func() {
|
|
defer close(done)
|
|
poller.PollOnce()
|
|
}()
|
|
|
|
// shutdown when we complete a job or cancel is requested
|
|
select {
|
|
case <-ctx.Done():
|
|
case <-done:
|
|
}
|
|
} else {
|
|
go poller.Poll()
|
|
|
|
<-ctx.Done()
|
|
}
|
|
|
|
log.Infof("runner: %s shutdown initiated, waiting %s for running jobs to complete before shutting down", resp.Msg.Runner.Name, cfg.Runner.ShutdownTimeout)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), cfg.Runner.ShutdownTimeout)
|
|
defer cancel()
|
|
|
|
err = poller.Shutdown(ctx)
|
|
if err != nil {
|
|
log.Warnf("runner: %s cancelled in progress jobs during shutdown", resp.Msg.Runner.Name)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// checkDiskSpaceAndCleanup logs warnings if disk space is low and triggers cleanup if needed
|
|
func checkDiskSpaceAndCleanup(ctx context.Context, capabilities *envcheck.RunnerCapabilities) {
|
|
if capabilities.Disk == nil {
|
|
return
|
|
}
|
|
|
|
usedPercent := capabilities.Disk.UsedPercent
|
|
freeGB := float64(capabilities.Disk.Free) / (1024 * 1024 * 1024)
|
|
|
|
switch {
|
|
case usedPercent >= DiskSpaceCriticalThreshold:
|
|
log.Errorf("CRITICAL: Disk space critically low! %.1f%% used, only %.2f GB free. Runner may fail to execute jobs!", usedPercent, freeGB)
|
|
// Always try cleanup at critical level
|
|
triggerAutoCleanup(ctx)
|
|
case usedPercent >= DiskSpaceAutoCleanupThreshold:
|
|
log.Warnf("WARNING: Disk space at %.1f%% used (%.2f GB free). Triggering automatic cleanup.", usedPercent, freeGB)
|
|
triggerAutoCleanup(ctx)
|
|
case usedPercent >= DiskSpaceWarningThreshold:
|
|
log.Warnf("WARNING: Disk space running low. %.1f%% used, %.2f GB free. Consider cleaning up disk space.", usedPercent, freeGB)
|
|
}
|
|
}
|
|
|
|
// triggerAutoCleanup runs cleanup if cooldown has passed
|
|
func triggerAutoCleanup(ctx context.Context) {
|
|
cleanupMutex.Lock()
|
|
defer cleanupMutex.Unlock()
|
|
|
|
// Check cooldown (except for first run)
|
|
if !lastCleanupTime.IsZero() && time.Since(lastCleanupTime) < CleanupCooldown {
|
|
log.Debugf("Skipping auto-cleanup, cooldown not expired (last cleanup: %s ago)", time.Since(lastCleanupTime))
|
|
return
|
|
}
|
|
|
|
if globalConfig == nil {
|
|
log.Warn("Cannot run auto-cleanup: config not available")
|
|
return
|
|
}
|
|
|
|
log.Info("Starting automatic disk cleanup...")
|
|
lastCleanupTime = time.Now()
|
|
|
|
go func() {
|
|
result, err := cleanup.RunCleanup(ctx, globalConfig)
|
|
if err != nil {
|
|
log.WithError(err).Error("Auto-cleanup failed")
|
|
return
|
|
}
|
|
log.Infof("Auto-cleanup completed: freed %d bytes, deleted %d files in %s",
|
|
result.BytesFreed, result.FilesDeleted, result.Duration)
|
|
if len(result.Errors) > 0 {
|
|
for _, e := range result.Errors {
|
|
log.WithError(e).Warn("Cleanup error")
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// periodicCapabilitiesUpdate periodically updates capabilities including disk space and bandwidth
|
|
func periodicCapabilitiesUpdate(ctx context.Context, runner *run.Runner, labelNames []string, dockerHost string, workingDir string) {
|
|
ticker := time.NewTicker(CapabilitiesUpdateInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Debug("stopping periodic capabilities update")
|
|
if bandwidthManager != nil {
|
|
bandwidthManager.Stop()
|
|
}
|
|
return
|
|
case <-ticker.C:
|
|
// Detect updated capabilities (disk space changes over time)
|
|
capabilities := envcheck.DetectCapabilities(ctx, dockerHost, workingDir, globalConfig.Runner.Capacity)
|
|
|
|
// Include latest bandwidth result
|
|
if bandwidthManager != nil {
|
|
capabilities.Bandwidth = bandwidthManager.GetLastResult()
|
|
}
|
|
|
|
capabilitiesJSON := capabilities.ToJSON()
|
|
|
|
// Check for disk space warnings
|
|
checkDiskSpaceAndCleanup(ctx, capabilities)
|
|
|
|
// Send updated capabilities to server
|
|
_, err := runner.Declare(ctx, labelNames, capabilitiesJSON)
|
|
if err != nil {
|
|
log.WithError(err).Debug("failed to update capabilities")
|
|
} else {
|
|
bandwidthInfo := ""
|
|
if capabilities.Bandwidth != nil {
|
|
bandwidthInfo = fmt.Sprintf(", bandwidth: %.1f Mbps", capabilities.Bandwidth.DownloadMbps)
|
|
}
|
|
log.Debugf("capabilities updated: disk %.1f%% used, %.2f GB free%s",
|
|
capabilities.Disk.UsedPercent,
|
|
float64(capabilities.Disk.Free)/(1024*1024*1024),
|
|
bandwidthInfo)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
type daemonArgs struct {
|
|
Once bool
|
|
}
|
|
|
|
// initLogging setup the global logrus logger.
|
|
func initLogging(cfg *config.Config) {
|
|
callPrettyfier := func(f *runtime.Frame) (string, string) {
|
|
// get function name
|
|
s := strings.Split(f.Function, ".")
|
|
funcname := "[" + s[len(s)-1] + "]"
|
|
// get file name and line number
|
|
_, filename := path.Split(f.File)
|
|
filename = "[" + filename + ":" + strconv.Itoa(f.Line) + "]"
|
|
return funcname, filename
|
|
}
|
|
|
|
isTerm := isatty.IsTerminal(os.Stdout.Fd())
|
|
format := &log.TextFormatter{
|
|
DisableColors: !isTerm,
|
|
FullTimestamp: true,
|
|
CallerPrettyfier: callPrettyfier,
|
|
}
|
|
log.SetFormatter(format)
|
|
|
|
l := cfg.Log.Level
|
|
if l == "" {
|
|
log.Infof("Log level not set, sticking to info")
|
|
return
|
|
}
|
|
|
|
level, err := log.ParseLevel(l)
|
|
if err != nil {
|
|
log.WithError(err).
|
|
Errorf("invalid log level: %q", l)
|
|
}
|
|
|
|
// debug level
|
|
switch level {
|
|
case log.DebugLevel, log.TraceLevel:
|
|
log.SetReportCaller(true) // Only in debug or trace because it takes a performance toll
|
|
log.Infof("Log level %s requested, setting up report caller for further debugging", level)
|
|
}
|
|
|
|
if log.GetLevel() != level {
|
|
log.Infof("log level set to %v", level)
|
|
log.SetLevel(level)
|
|
}
|
|
}
|
|
|
|
var commonSocketPaths = []string{
|
|
"/var/run/docker.sock",
|
|
"/run/podman/podman.sock",
|
|
"$HOME/.colima/docker.sock",
|
|
"$XDG_RUNTIME_DIR/docker.sock",
|
|
"$XDG_RUNTIME_DIR/podman/podman.sock",
|
|
`\\.\pipe\docker_engine`,
|
|
"$HOME/.docker/run/docker.sock",
|
|
}
|
|
|
|
func getDockerSocketPath(configDockerHost string) (string, error) {
|
|
// a `-` means don't mount the docker socket to job containers
|
|
if configDockerHost != "" && configDockerHost != "-" {
|
|
return configDockerHost, nil
|
|
}
|
|
|
|
socket, found := os.LookupEnv("DOCKER_HOST")
|
|
if found {
|
|
return socket, nil
|
|
}
|
|
|
|
for _, p := range commonSocketPaths {
|
|
if _, err := os.Lstat(os.ExpandEnv(p)); err == nil {
|
|
if strings.HasPrefix(p, `\\.\`) {
|
|
return "npipe://" + filepath.ToSlash(os.ExpandEnv(p)), nil
|
|
}
|
|
return "unix://" + filepath.ToSlash(os.ExpandEnv(p)), nil
|
|
}
|
|
}
|
|
|
|
return "", fmt.Errorf("daemon Docker Engine socket not found and docker_host config was invalid")
|
|
}
|