forgejo-runner

git clone git://git.lin.moe/forgejo-runner.git

  1// Copyright 2022 The Gitea Authors. All rights reserved.
  2// SPDX-License-Identifier: MIT
  3
  4package run
  5
  6import (
  7	"context"
  8	"encoding/json"
  9	"fmt"
 10	"path/filepath"
 11	"strings"
 12	"sync"
 13	"time"
 14
 15	runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
 16	"connectrpc.com/connect"
 17	"github.com/docker/docker/api/types/container"
 18	"github.com/nektos/act/pkg/artifactcache"
 19	"github.com/nektos/act/pkg/common"
 20	"github.com/nektos/act/pkg/model"
 21	"github.com/nektos/act/pkg/runner"
 22	log "github.com/sirupsen/logrus"
 23
 24	"gitea.com/gitea/act_runner/internal/pkg/client"
 25	"gitea.com/gitea/act_runner/internal/pkg/config"
 26	"gitea.com/gitea/act_runner/internal/pkg/labels"
 27	"gitea.com/gitea/act_runner/internal/pkg/report"
 28	"gitea.com/gitea/act_runner/internal/pkg/ver"
 29)
 30
 31// Runner runs the pipeline.
 32type Runner struct {
 33	name string
 34
 35	cfg *config.Config
 36
 37	client client.Client
 38	labels labels.Labels
 39	envs   map[string]string
 40
 41	runningTasks sync.Map
 42}
 43
 44type RunnerInterface interface {
 45	Run(ctx context.Context, task *runnerv1.Task) error
 46}
 47
 48func NewRunner(cfg *config.Config, reg *config.Registration, cli client.Client) *Runner {
 49	ls := labels.Labels{}
 50	for _, v := range reg.Labels {
 51		if l, err := labels.Parse(v); err == nil {
 52			ls = append(ls, l)
 53		}
 54	}
 55
 56	if cfg.Runner.Envs == nil {
 57		cfg.Runner.Envs = make(map[string]string, 10)
 58	}
 59
 60	cfg.Runner.Envs["GITHUB_SERVER_URL"] = reg.Address
 61
 62	envs := make(map[string]string, len(cfg.Runner.Envs))
 63	for k, v := range cfg.Runner.Envs {
 64		envs[k] = v
 65	}
 66	if cfg.Cache.Enabled == nil || *cfg.Cache.Enabled {
 67		if cfg.Cache.ExternalServer != "" {
 68			envs["ACTIONS_CACHE_URL"] = cfg.Cache.ExternalServer
 69		} else {
 70			cacheHandler, err := artifactcache.StartHandler(
 71				cfg.Cache.Dir,
 72				cfg.Cache.Host,
 73				cfg.Cache.Port,
 74				log.StandardLogger().WithField("module", "cache_request"),
 75			)
 76			if err != nil {
 77				log.Errorf("cannot init cache server, it will be disabled: %v", err)
 78				// go on
 79			} else {
 80				envs["ACTIONS_CACHE_URL"] = cacheHandler.ExternalURL() + "/"
 81			}
 82		}
 83	}
 84
 85	// set artifact gitea api
 86	artifactGiteaAPI := strings.TrimSuffix(cli.Address(), "/") + "/api/actions_pipeline/"
 87	envs["ACTIONS_RUNTIME_URL"] = artifactGiteaAPI
 88	envs["ACTIONS_RESULTS_URL"] = strings.TrimSuffix(cli.Address(), "/")
 89
 90	// Set specific environments to distinguish between Gitea and GitHub
 91	envs["GITEA_ACTIONS"] = "true"
 92	envs["GITEA_ACTIONS_RUNNER_VERSION"] = ver.Version()
 93
 94	return &Runner{
 95		name:   reg.Name,
 96		cfg:    cfg,
 97		client: cli,
 98		labels: ls,
 99		envs:   envs,
100	}
101}
102
103func (r *Runner) Run(ctx context.Context, task *runnerv1.Task) error {
104	if _, ok := r.runningTasks.Load(task.Id); ok {
105		return fmt.Errorf("task %d is already running", task.Id)
106	}
107	r.runningTasks.Store(task.Id, struct{}{})
108	defer r.runningTasks.Delete(task.Id)
109
110	ctx, cancel := context.WithTimeout(ctx, r.cfg.Runner.Timeout)
111	defer cancel()
112	reporter := report.NewReporter(ctx, cancel, r.client, task, r.cfg.Runner.ReportInterval)
113	var runErr error
114	defer func() {
115		lastWords := ""
116		if runErr != nil {
117			lastWords = runErr.Error()
118		}
119		_ = reporter.Close(lastWords)
120	}()
121	reporter.RunDaemon()
122	runErr = r.run(ctx, task, reporter)
123
124	return nil
125}
126
127func (r *Runner) run(ctx context.Context, task *runnerv1.Task, reporter *report.Reporter) (err error) {
128	defer func() {
129		if r := recover(); r != nil {
130			err = fmt.Errorf("panic: %v", r)
131		}
132	}()
133
134	reporter.Logf("%s(version:%s) received task %v of job %v, be triggered by event: %s", r.name, ver.Version(), task.Id, task.Context.Fields["job"].GetStringValue(), task.Context.Fields["event_name"].GetStringValue())
135
136	workflow, jobID, err := generateWorkflow(task)
137	if err != nil {
138		return err
139	}
140
141	plan, err := model.CombineWorkflowPlanner(workflow).PlanJob(jobID)
142	if err != nil {
143		return err
144	}
145	job := workflow.GetJob(jobID)
146	reporter.ResetSteps(len(job.Steps))
147
148	taskContext := task.Context.Fields
149
150	log.Infof("task %v repo is %v %v %v", task.Id, taskContext["repository"].GetStringValue(),
151		taskContext["gitea_default_actions_url"].GetStringValue(),
152		r.client.Address())
153
154	preset := &model.GithubContext{
155		Event:           taskContext["event"].GetStructValue().AsMap(),
156		RunID:           taskContext["run_id"].GetStringValue(),
157		RunNumber:       taskContext["run_number"].GetStringValue(),
158		Actor:           taskContext["actor"].GetStringValue(),
159		Repository:      taskContext["repository"].GetStringValue(),
160		EventName:       taskContext["event_name"].GetStringValue(),
161		Sha:             taskContext["sha"].GetStringValue(),
162		Ref:             taskContext["ref"].GetStringValue(),
163		RefName:         taskContext["ref_name"].GetStringValue(),
164		RefType:         taskContext["ref_type"].GetStringValue(),
165		HeadRef:         taskContext["head_ref"].GetStringValue(),
166		BaseRef:         taskContext["base_ref"].GetStringValue(),
167		Token:           taskContext["token"].GetStringValue(),
168		RepositoryOwner: taskContext["repository_owner"].GetStringValue(),
169		RetentionDays:   taskContext["retention_days"].GetStringValue(),
170	}
171	if t := task.Secrets["GITEA_TOKEN"]; t != "" {
172		preset.Token = t
173	} else if t := task.Secrets["GITHUB_TOKEN"]; t != "" {
174		preset.Token = t
175	}
176
177	giteaRuntimeToken := taskContext["gitea_runtime_token"].GetStringValue()
178	if giteaRuntimeToken == "" {
179		// use task token to action api token for previous Gitea Server Versions
180		giteaRuntimeToken = preset.Token
181	}
182	r.envs["ACTIONS_RUNTIME_TOKEN"] = giteaRuntimeToken
183
184	eventJSON, err := json.Marshal(preset.Event)
185	if err != nil {
186		return err
187	}
188
189	maxLifetime := 3 * time.Hour
190	if deadline, ok := ctx.Deadline(); ok {
191		maxLifetime = time.Until(deadline)
192	}
193
194	var inputs map[string]string
195	if preset.EventName == "workflow_dispatch" {
196		if inputsRaw, ok := preset.Event["inputs"]; ok {
197			inputs, _ = inputsRaw.(map[string]string)
198		}
199	}
200
201	runnerConfig := &runner.Config{
202		// On Linux, Workdir will be like "/<parent_directory>/<owner>/<repo>"
203		// On Windows, Workdir will be like "\<parent_directory>\<owner>\<repo>"
204		Workdir:        filepath.FromSlash(filepath.Clean(fmt.Sprintf("/%s/%s", r.cfg.Container.WorkdirParent, preset.Repository))),
205		BindWorkdir:    false,
206		ActionCacheDir: filepath.FromSlash(r.cfg.Host.WorkdirParent),
207
208		ReuseContainers:            false,
209		ForcePull:                  r.cfg.Container.ForcePull,
210		ForceRebuild:               false,
211		LogOutput:                  true,
212		JSONLogger:                 false,
213		Env:                        r.envs,
214		Secrets:                    task.Secrets,
215		GitHubInstance:             strings.TrimSuffix(r.client.Address(), "/"),
216		AutoRemove:                 true,
217		NoSkipCheckout:             true,
218		PresetGitHubContext:        preset,
219		EventJSON:                  string(eventJSON),
220		ContainerNamePrefix:        fmt.Sprintf("GITEA-ACTIONS-TASK-%d", task.Id),
221		ContainerMaxLifetime:       maxLifetime,
222		ContainerNetworkMode:       container.NetworkMode(r.cfg.Container.Network),
223		ContainerNetworkEnableIPv6: r.cfg.Container.EnableIPv6,
224		ContainerOptions:           r.cfg.Container.Options,
225		ContainerDaemonSocket:      r.cfg.Container.DockerHost,
226		Privileged:                 r.cfg.Container.Privileged,
227		DefaultActionInstance:      taskContext["gitea_default_actions_url"].GetStringValue(),
228		PlatformPicker:             r.labels.PickPlatform,
229		Vars:                       task.Vars,
230		ValidVolumes:               r.cfg.Container.ValidVolumes,
231		InsecureSkipTLS:            r.cfg.Runner.Insecure,
232		Inputs:                     inputs,
233	}
234
235	rr, err := runner.New(runnerConfig)
236	if err != nil {
237		return err
238	}
239	executor := rr.NewPlanExecutor(plan)
240
241	reporter.Logf("workflow prepared")
242
243	// add logger recorders
244	ctx = common.WithLoggerHook(ctx, reporter)
245
246	execErr := executor(ctx)
247	reporter.SetOutputs(job.Outputs)
248	return execErr
249}
250
251func (r *Runner) Declare(ctx context.Context, labels []string) (*connect.Response[runnerv1.DeclareResponse], error) {
252	return r.client.Declare(ctx, connect.NewRequest(&runnerv1.DeclareRequest{
253		Version: ver.Version(),
254		Labels:  labels,
255	}))
256}
257
258func (r *Runner) Update(ctx context.Context, labels labels.Labels) {
259	r.labels = labels
260}