// Copyright 2022 The Gitea Authors. All rights reserved. // SPDX-License-Identifier: MIT package cmd import ( "context" "errors" "fmt" "os" "path" "path/filepath" "runtime" "slices" "strconv" "strings" "time" "connectrpc.com/connect" "github.com/mattn/go-isatty" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "gitea.com/gitea/act_runner/internal/app/poll" "gitea.com/gitea/act_runner/internal/app/run" "gitea.com/gitea/act_runner/internal/pkg/client" "gitea.com/gitea/act_runner/internal/pkg/config" "gitea.com/gitea/act_runner/internal/pkg/envcheck" "gitea.com/gitea/act_runner/internal/pkg/labels" "gitea.com/gitea/act_runner/internal/pkg/ver" ) const ( // DiskSpaceWarningThreshold is the percentage at which to warn about low disk space DiskSpaceWarningThreshold = 85.0 // DiskSpaceCriticalThreshold is the percentage at which to log critical warnings DiskSpaceCriticalThreshold = 95.0 // CapabilitiesUpdateInterval is how often to update capabilities (including disk space) CapabilitiesUpdateInterval = 5 * time.Minute // BandwidthTestInterval is how often to run bandwidth tests (hourly) BandwidthTestInterval = 1 * time.Hour ) // Global bandwidth manager - accessible for triggering manual tests var bandwidthManager *envcheck.BandwidthManager func runDaemon(ctx context.Context, daemArgs *daemonArgs, configFile *string) func(cmd *cobra.Command, args []string) error { return func(cmd *cobra.Command, args []string) error { cfg, err := config.LoadDefault(*configFile) if err != nil { return fmt.Errorf("invalid configuration: %w", err) } initLogging(cfg) log.Infoln("Starting runner daemon") reg, err := config.LoadRegistration(cfg.Runner.File) if os.IsNotExist(err) { log.Error("registration file not found, please register the runner first") return err } else if err != nil { return fmt.Errorf("failed to load registration file: %w", err) } lbls := reg.Labels if len(cfg.Runner.Labels) > 0 { lbls = cfg.Runner.Labels } ls := labels.Labels{} for _, l := range lbls { label, err := labels.Parse(l) if err != nil { log.WithError(err).Warnf("ignored invalid label %q", l) continue } ls = append(ls, label) } if len(ls) == 0 { log.Warn("no labels configured, runner may not be able to pick up jobs") } if ls.RequireDocker() || cfg.Container.RequireDocker { // Wait for dockerd be ready if timeout := cfg.Container.DockerTimeout; timeout > 0 { tctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() keepRunning := true for keepRunning { dockerSocketPath, err := getDockerSocketPath(cfg.Container.DockerHost) if err != nil { log.Errorf("Failed to get socket path: %s", err.Error()) } else if err = envcheck.CheckIfDockerRunning(tctx, dockerSocketPath); errors.Is(err, context.Canceled) { log.Infof("Docker wait timeout of %s expired", timeout.String()) break } else if err != nil { log.Errorf("Docker connection failed: %s", err.Error()) } else { log.Infof("Docker is ready") break } select { case <-time.After(time.Second): case <-tctx.Done(): log.Infof("Docker wait timeout of %s expired", timeout.String()) keepRunning = false } } } // Require dockerd be ready dockerSocketPath, err := getDockerSocketPath(cfg.Container.DockerHost) if err != nil { return err } if err := envcheck.CheckIfDockerRunning(ctx, dockerSocketPath); err != nil { return err } // if dockerSocketPath passes the check, override DOCKER_HOST with dockerSocketPath os.Setenv("DOCKER_HOST", dockerSocketPath) // empty cfg.Container.DockerHost means act_runner need to find an available docker host automatically // and assign the path to cfg.Container.DockerHost if cfg.Container.DockerHost == "" { cfg.Container.DockerHost = dockerSocketPath } // check the scheme, if the scheme is not npipe or unix // set cfg.Container.DockerHost to "-" because it can't be mounted to the job container if protoIndex := strings.Index(cfg.Container.DockerHost, "://"); protoIndex != -1 { scheme := cfg.Container.DockerHost[:protoIndex] if !strings.EqualFold(scheme, "npipe") && !strings.EqualFold(scheme, "unix") { cfg.Container.DockerHost = "-" } } } if !slices.Equal(reg.Labels, ls.ToStrings()) { reg.Labels = ls.ToStrings() if err := config.SaveRegistration(cfg.Runner.File, reg); err != nil { return fmt.Errorf("failed to save runner config: %w", err) } log.Infof("labels updated to: %v", reg.Labels) } cli := client.New( reg.Address, cfg.Runner.Insecure, reg.UUID, reg.Token, ver.Version(), ) runner := run.NewRunner(cfg, reg, cli) // Detect runner capabilities for AI-friendly workflow generation dockerHost := cfg.Container.DockerHost if dockerHost == "" { if dh, err := getDockerSocketPath(""); err == nil { dockerHost = dh } } // Initialize bandwidth manager with the Gitea server address bandwidthManager = envcheck.NewBandwidthManager(reg.Address, BandwidthTestInterval) bandwidthManager.Start(ctx) log.Infof("bandwidth manager started, testing against: %s", reg.Address) capabilities := envcheck.DetectCapabilities(ctx, dockerHost, cfg.Container.WorkdirParent) // Include initial bandwidth result if available capabilities.Bandwidth = bandwidthManager.GetLastResult() capabilitiesJson := capabilities.ToJSON() log.Infof("detected capabilities: %s", capabilitiesJson) // Check disk space and warn if low checkDiskSpaceWarnings(capabilities) // declare the labels of the runner before fetching tasks resp, err := runner.Declare(ctx, ls.Names(), capabilitiesJson) if err != nil && connect.CodeOf(err) == connect.CodeUnimplemented { log.Errorf("Your Gitea version is too old to support runner declare, please upgrade to v1.21 or later") return err } else if err != nil { log.WithError(err).Error("fail to invoke Declare") return err } else { log.Infof("runner: %s, with version: %s, with labels: %v, declare successfully", resp.Msg.Runner.Name, resp.Msg.Runner.Version, resp.Msg.Runner.Labels) } // Start periodic capabilities update goroutine go periodicCapabilitiesUpdate(ctx, runner, ls.Names(), dockerHost, cfg.Container.WorkdirParent) // Start periodic stale job cache cleanup (every hour, remove caches older than 2 hours) go func() { ticker := time.NewTicker(1 * time.Hour) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: runner.CleanStaleJobCaches(2 * time.Hour) } } }() poller := poll.New(cfg, cli, runner) poller.SetBandwidthManager(bandwidthManager) if daemArgs.Once || reg.Ephemeral { done := make(chan struct{}) go func() { defer close(done) poller.PollOnce() }() // shutdown when we complete a job or cancel is requested select { case <-ctx.Done(): case <-done: } } else { go poller.Poll() <-ctx.Done() } log.Infof("runner: %s shutdown initiated, waiting %s for running jobs to complete before shutting down", resp.Msg.Runner.Name, cfg.Runner.ShutdownTimeout) ctx, cancel := context.WithTimeout(context.Background(), cfg.Runner.ShutdownTimeout) defer cancel() err = poller.Shutdown(ctx) if err != nil { log.Warnf("runner: %s cancelled in progress jobs during shutdown", resp.Msg.Runner.Name) } return nil } } // checkDiskSpaceWarnings logs warnings if disk space is low func checkDiskSpaceWarnings(capabilities *envcheck.RunnerCapabilities) { if capabilities.Disk == nil { return } usedPercent := capabilities.Disk.UsedPercent freeGB := float64(capabilities.Disk.Free) / (1024 * 1024 * 1024) if usedPercent >= DiskSpaceCriticalThreshold { log.Errorf("CRITICAL: Disk space critically low! %.1f%% used, only %.2f GB free. Runner may fail to execute jobs!", usedPercent, freeGB) } else if usedPercent >= DiskSpaceWarningThreshold { log.Warnf("WARNING: Disk space running low. %.1f%% used, %.2f GB free. Consider cleaning up disk space.", usedPercent, freeGB) } } // periodicCapabilitiesUpdate periodically updates capabilities including disk space and bandwidth func periodicCapabilitiesUpdate(ctx context.Context, runner *run.Runner, labelNames []string, dockerHost string, workingDir string) { ticker := time.NewTicker(CapabilitiesUpdateInterval) defer ticker.Stop() for { select { case <-ctx.Done(): log.Debug("stopping periodic capabilities update") if bandwidthManager != nil { bandwidthManager.Stop() } return case <-ticker.C: // Detect updated capabilities (disk space changes over time) capabilities := envcheck.DetectCapabilities(ctx, dockerHost, workingDir) // Include latest bandwidth result if bandwidthManager != nil { capabilities.Bandwidth = bandwidthManager.GetLastResult() } capabilitiesJson := capabilities.ToJSON() // Check for disk space warnings checkDiskSpaceWarnings(capabilities) // Send updated capabilities to server _, err := runner.Declare(ctx, labelNames, capabilitiesJson) if err != nil { log.WithError(err).Debug("failed to update capabilities") } else { bandwidthInfo := "" if capabilities.Bandwidth != nil { bandwidthInfo = fmt.Sprintf(", bandwidth: %.1f Mbps", capabilities.Bandwidth.DownloadMbps) } log.Debugf("capabilities updated: disk %.1f%% used, %.2f GB free%s", capabilities.Disk.UsedPercent, float64(capabilities.Disk.Free)/(1024*1024*1024), bandwidthInfo) } } } } type daemonArgs struct { Once bool } // initLogging setup the global logrus logger. func initLogging(cfg *config.Config) { callPrettyfier := func(f *runtime.Frame) (string, string) { // get function name s := strings.Split(f.Function, ".") funcname := "[" + s[len(s)-1] + "]" // get file name and line number _, filename := path.Split(f.File) filename = "[" + filename + ":" + strconv.Itoa(f.Line) + "]" return funcname, filename } isTerm := isatty.IsTerminal(os.Stdout.Fd()) format := &log.TextFormatter{ DisableColors: !isTerm, FullTimestamp: true, CallerPrettyfier: callPrettyfier, } log.SetFormatter(format) l := cfg.Log.Level if l == "" { log.Infof("Log level not set, sticking to info") return } level, err := log.ParseLevel(l) if err != nil { log.WithError(err). Errorf("invalid log level: %q", l) } // debug level switch level { case log.DebugLevel, log.TraceLevel: log.SetReportCaller(true) // Only in debug or trace because it takes a performance toll log.Infof("Log level %s requested, setting up report caller for further debugging", level) } if log.GetLevel() != level { log.Infof("log level set to %v", level) log.SetLevel(level) } } var commonSocketPaths = []string{ "/var/run/docker.sock", "/run/podman/podman.sock", "$HOME/.colima/docker.sock", "$XDG_RUNTIME_DIR/docker.sock", "$XDG_RUNTIME_DIR/podman/podman.sock", `\\.\pipe\docker_engine`, "$HOME/.docker/run/docker.sock", } func getDockerSocketPath(configDockerHost string) (string, error) { // a `-` means don't mount the docker socket to job containers if configDockerHost != "" && configDockerHost != "-" { return configDockerHost, nil } socket, found := os.LookupEnv("DOCKER_HOST") if found { return socket, nil } for _, p := range commonSocketPaths { if _, err := os.Lstat(os.ExpandEnv(p)); err == nil { if strings.HasPrefix(p, `\\.\`) { return "npipe://" + filepath.ToSlash(os.ExpandEnv(p)), nil } return "unix://" + filepath.ToSlash(os.ExpandEnv(p)), nil } } return "", fmt.Errorf("daemon Docker Engine socket not found and docker_host config was invalid") }