forgejo-runner

git clone git://git.lin.moe/forgejo-runner.git

  1// Copyright 2022 The Gitea Authors. All rights reserved.
  2// SPDX-License-Identifier: MIT
  3
  4package cmd
  5
  6import (
  7	"context"
  8	"fmt"
  9	"os"
 10	"path"
 11	"path/filepath"
 12	"runtime"
 13	"strconv"
 14	"strings"
 15
 16	"connectrpc.com/connect"
 17	"github.com/mattn/go-isatty"
 18	log "github.com/sirupsen/logrus"
 19	"github.com/spf13/cobra"
 20
 21	"gitea.com/gitea/act_runner/internal/app/poll"
 22	"gitea.com/gitea/act_runner/internal/app/run"
 23	"gitea.com/gitea/act_runner/internal/pkg/client"
 24	"gitea.com/gitea/act_runner/internal/pkg/config"
 25	"gitea.com/gitea/act_runner/internal/pkg/envcheck"
 26	"gitea.com/gitea/act_runner/internal/pkg/labels"
 27	"gitea.com/gitea/act_runner/internal/pkg/ver"
 28)
 29
 30func runDaemon(ctx context.Context, configFile *string) func(cmd *cobra.Command, args []string) error {
 31	return func(cmd *cobra.Command, args []string) error {
 32		cfg, err := config.LoadDefault(*configFile)
 33		if err != nil {
 34			return fmt.Errorf("invalid configuration: %w", err)
 35		}
 36
 37		initLogging(cfg)
 38		log.Infoln("Starting runner daemon")
 39
 40		reg, err := config.LoadRegistration(cfg.Runner.File)
 41		if os.IsNotExist(err) {
 42			log.Error("registration file not found, please register the runner first")
 43			return err
 44		} else if err != nil {
 45			return fmt.Errorf("failed to load registration file: %w", err)
 46		}
 47
 48		cfg.Tune(reg.Address)
 49
 50		lbls := reg.Labels
 51		if len(cfg.Runner.Labels) > 0 {
 52			lbls = cfg.Runner.Labels
 53		}
 54
 55		ls := labels.Labels{}
 56		for _, l := range lbls {
 57			label, err := labels.Parse(l)
 58			if err != nil {
 59				log.WithError(err).Warnf("ignored invalid label %q", l)
 60				continue
 61			}
 62			ls = append(ls, label)
 63		}
 64		if len(ls) == 0 {
 65			log.Warn("no labels configured, runner may not be able to pick up jobs")
 66		}
 67
 68		if ls.RequireDocker() {
 69			dockerSocketPath, err := getDockerSocketPath(cfg.Container.DockerHost)
 70			if err != nil {
 71				return err
 72			}
 73			if err := envcheck.CheckIfDockerRunning(ctx, dockerSocketPath); err != nil {
 74				return err
 75			}
 76			// if dockerSocketPath passes the check, override DOCKER_HOST with dockerSocketPath
 77			os.Setenv("DOCKER_HOST", dockerSocketPath)
 78			// empty cfg.Container.DockerHost means act_runner need to find an available docker host automatically
 79			// and assign the path to cfg.Container.DockerHost
 80			if cfg.Container.DockerHost == "" {
 81				cfg.Container.DockerHost = dockerSocketPath
 82			}
 83			// check the scheme, if the scheme is not npipe or unix
 84			// set cfg.Container.DockerHost to "-" because it can't be mounted to the job container
 85			if protoIndex := strings.Index(cfg.Container.DockerHost, "://"); protoIndex != -1 {
 86				scheme := cfg.Container.DockerHost[:protoIndex]
 87				if !strings.EqualFold(scheme, "npipe") && !strings.EqualFold(scheme, "unix") {
 88					cfg.Container.DockerHost = "-"
 89				}
 90			}
 91		}
 92
 93		cli := client.New(
 94			reg.Address,
 95			cfg.Runner.Insecure,
 96			reg.UUID,
 97			reg.Token,
 98			ver.Version(),
 99		)
100
101		runner := run.NewRunner(cfg, reg, cli)
102		// declare the labels of the runner before fetching tasks
103		resp, err := runner.Declare(ctx, ls.Names())
104		if err != nil && connect.CodeOf(err) == connect.CodeUnimplemented {
105			// Gitea instance is older version. skip declare step.
106			log.Warn("Because the Forgejo instance is an old version, skipping declaring the labels and version.")
107		} else if err != nil {
108			log.WithError(err).Error("fail to invoke Declare")
109			return err
110		} else {
111			log.Infof("runner: %s, with version: %s, with labels: %v, declared successfully",
112				resp.Msg.Runner.Name, resp.Msg.Runner.Version, resp.Msg.Runner.Labels)
113			// if declared successfully, override the labels in the.runner file with valid labels in the config file (if specified)
114			runner.Update(ctx, ls)
115			reg.Labels = ls.ToStrings()
116			if err := config.SaveRegistration(cfg.Runner.File, reg); err != nil {
117				return fmt.Errorf("failed to save runner config: %w", err)
118			}
119		}
120
121		poller := poll.New(cfg, cli, runner)
122
123		go poller.Poll()
124
125		<-ctx.Done()
126		log.Infof("runner: %s shutdown initiated, waiting [runner].shutdown_timeout=%s for running jobs to complete before shutting down", resp.Msg.Runner.Name, cfg.Runner.ShutdownTimeout)
127
128		ctx, cancel := context.WithTimeout(context.Background(), cfg.Runner.ShutdownTimeout)
129		defer cancel()
130
131		err = poller.Shutdown(ctx)
132		if err != nil {
133			log.Warnf("runner: %s cancelled in progress jobs during shutdown", resp.Msg.Runner.Name)
134		}
135		return nil
136	}
137}
138
139// initLogging setup the global logrus logger.
140func initLogging(cfg *config.Config) {
141	isTerm := isatty.IsTerminal(os.Stdout.Fd())
142	format := &log.TextFormatter{
143		DisableColors: !isTerm,
144		FullTimestamp: true,
145	}
146	log.SetFormatter(format)
147
148	if l := cfg.Log.Level; l != "" {
149		level, err := log.ParseLevel(l)
150		if err != nil {
151			log.WithError(err).
152				Errorf("invalid log level: %q", l)
153		}
154
155		// debug level
156		if level == log.DebugLevel {
157			log.SetReportCaller(true)
158			format.CallerPrettyfier = func(f *runtime.Frame) (string, string) {
159				// get function name
160				s := strings.Split(f.Function, ".")
161				funcname := "[" + s[len(s)-1] + "]"
162				// get file name and line number
163				_, filename := path.Split(f.File)
164				filename = "[" + filename + ":" + strconv.Itoa(f.Line) + "]"
165				return funcname, filename
166			}
167			log.SetFormatter(format)
168		}
169
170		if log.GetLevel() != level {
171			log.Infof("log level changed to %v", level)
172			log.SetLevel(level)
173		}
174	}
175}
176
177var commonSocketPaths = []string{
178	"/var/run/docker.sock",
179	"/run/podman/podman.sock",
180	"$HOME/.colima/docker.sock",
181	"$XDG_RUNTIME_DIR/docker.sock",
182	"$XDG_RUNTIME_DIR/podman/podman.sock",
183	`\\.\pipe\docker_engine`,
184	"$HOME/.docker/run/docker.sock",
185}
186
187func getDockerSocketPath(configDockerHost string) (string, error) {
188	// a `-` means don't mount the docker socket to job containers
189	if configDockerHost != "" && configDockerHost != "-" {
190		return configDockerHost, nil
191	}
192
193	socket, found := os.LookupEnv("DOCKER_HOST")
194	if found {
195		return socket, nil
196	}
197
198	for _, p := range commonSocketPaths {
199		if _, err := os.Lstat(os.ExpandEnv(p)); err == nil {
200			if strings.HasPrefix(p, `\\.\`) {
201				return "npipe://" + filepath.ToSlash(os.ExpandEnv(p)), nil
202			}
203			return "unix://" + filepath.ToSlash(os.ExpandEnv(p)), nil
204		}
205	}
206
207	return "", fmt.Errorf("daemon Docker Engine socket not found and docker_host config was invalid")
208}