forgejo-runner

git clone git://git.lin.moe/forgejo-runner.git

  1// Copyright 2022 The Gitea Authors. All rights reserved.
  2// SPDX-License-Identifier: MIT
  3
  4package report
  5
  6import (
  7	"context"
  8	"fmt"
  9	"regexp"
 10	"strings"
 11	"sync"
 12	"time"
 13
 14	runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
 15	"connectrpc.com/connect"
 16	retry "github.com/avast/retry-go/v4"
 17	log "github.com/sirupsen/logrus"
 18	"google.golang.org/protobuf/proto"
 19	"google.golang.org/protobuf/types/known/timestamppb"
 20
 21	"gitea.com/gitea/act_runner/internal/pkg/client"
 22)
 23
 24type Reporter struct {
 25	ctx    context.Context
 26	cancel context.CancelFunc
 27
 28	closed  bool
 29	client  client.Client
 30	clientM sync.Mutex
 31
 32	logOffset      int
 33	logRows        []*runnerv1.LogRow
 34	logReplacer    *strings.Replacer
 35	oldnew         []string
 36	reportInterval time.Duration
 37
 38	state   *runnerv1.TaskState
 39	stateMu sync.RWMutex
 40	outputs sync.Map
 41
 42	debugOutputEnabled  bool
 43	stopCommandEndToken string
 44}
 45
 46func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.Client, task *runnerv1.Task, reportInterval time.Duration) *Reporter {
 47	var oldnew []string
 48	if v := task.Context.Fields["token"].GetStringValue(); v != "" {
 49		oldnew = append(oldnew, v, "***")
 50	}
 51	if v := task.Context.Fields["gitea_runtime_token"].GetStringValue(); v != "" {
 52		oldnew = append(oldnew, v, "***")
 53	}
 54	for _, v := range task.Secrets {
 55		oldnew = append(oldnew, v, "***")
 56	}
 57
 58	rv := &Reporter{
 59		ctx:            ctx,
 60		cancel:         cancel,
 61		client:         client,
 62		oldnew:         oldnew,
 63		reportInterval: reportInterval,
 64		logReplacer:    strings.NewReplacer(oldnew...),
 65		state: &runnerv1.TaskState{
 66			Id: task.Id,
 67		},
 68	}
 69
 70	if task.Secrets["ACTIONS_STEP_DEBUG"] == "true" {
 71		rv.debugOutputEnabled = true
 72	}
 73
 74	return rv
 75}
 76
 77func (r *Reporter) ResetSteps(l int) {
 78	r.stateMu.Lock()
 79	defer r.stateMu.Unlock()
 80	for i := 0; i < l; i++ {
 81		r.state.Steps = append(r.state.Steps, &runnerv1.StepState{
 82			Id: int64(i),
 83		})
 84	}
 85}
 86
 87func (r *Reporter) Levels() []log.Level {
 88	return log.AllLevels
 89}
 90
 91func appendIfNotNil[T any](s []*T, v *T) []*T {
 92	if v != nil {
 93		return append(s, v)
 94	}
 95	return s
 96}
 97
 98func (r *Reporter) Fire(entry *log.Entry) error {
 99	r.stateMu.Lock()
100	defer r.stateMu.Unlock()
101
102	log.WithFields(entry.Data).Trace(entry.Message)
103
104	timestamp := entry.Time
105	if r.state.StartedAt == nil {
106		r.state.StartedAt = timestamppb.New(timestamp)
107	}
108
109	stage := entry.Data["stage"]
110
111	if stage != "Main" {
112		if v, ok := entry.Data["jobResult"]; ok {
113			if jobResult, ok := r.parseResult(v); ok {
114				r.state.Result = jobResult
115				r.state.StoppedAt = timestamppb.New(timestamp)
116				for _, s := range r.state.Steps {
117					if s.Result == runnerv1.Result_RESULT_UNSPECIFIED {
118						s.Result = runnerv1.Result_RESULT_CANCELLED
119						if jobResult == runnerv1.Result_RESULT_SKIPPED {
120							s.Result = runnerv1.Result_RESULT_SKIPPED
121						}
122					}
123				}
124			}
125		}
126		if !r.duringSteps() {
127			r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry))
128		}
129		return nil
130	}
131
132	var step *runnerv1.StepState
133	if v, ok := entry.Data["stepNumber"]; ok {
134		if v, ok := v.(int); ok && len(r.state.Steps) > v {
135			step = r.state.Steps[v]
136		}
137	}
138	if step == nil {
139		if !r.duringSteps() {
140			r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry))
141		}
142		return nil
143	}
144
145	if step.StartedAt == nil {
146		step.StartedAt = timestamppb.New(timestamp)
147	}
148	if v, ok := entry.Data["raw_output"]; ok {
149		if rawOutput, ok := v.(bool); ok && rawOutput {
150			if row := r.parseLogRow(entry); row != nil {
151				if step.LogLength == 0 {
152					step.LogIndex = int64(r.logOffset + len(r.logRows))
153				}
154				step.LogLength++
155				r.logRows = append(r.logRows, row)
156			}
157		}
158	} else if !r.duringSteps() {
159		r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry))
160	}
161	if v, ok := entry.Data["stepResult"]; ok {
162		if stepResult, ok := r.parseResult(v); ok {
163			if step.LogLength == 0 {
164				step.LogIndex = int64(r.logOffset + len(r.logRows))
165			}
166			step.Result = stepResult
167			step.StoppedAt = timestamppb.New(timestamp)
168		}
169	}
170
171	return nil
172}
173
174func (r *Reporter) RunDaemon() {
175	if r.closed {
176		return
177	}
178	if r.ctx.Err() != nil {
179		return
180	}
181
182	_ = r.ReportLog(false)
183	_ = r.ReportState()
184
185	time.AfterFunc(r.reportInterval, r.RunDaemon)
186}
187
188func (r *Reporter) Logf(format string, a ...interface{}) {
189	r.stateMu.Lock()
190	defer r.stateMu.Unlock()
191
192	r.logf(format, a...)
193}
194
195func (r *Reporter) logf(format string, a ...interface{}) {
196	if !r.duringSteps() {
197		r.logRows = append(r.logRows, &runnerv1.LogRow{
198			Time:    timestamppb.Now(),
199			Content: fmt.Sprintf(format, a...),
200		})
201	}
202}
203
204func (r *Reporter) SetOutputs(outputs map[string]string) {
205	r.stateMu.Lock()
206	defer r.stateMu.Unlock()
207
208	for k, v := range outputs {
209		if len(k) > 255 {
210			r.logf("ignore output because the key is too long: %q", k)
211			continue
212		}
213		if l := len(v); l > 1024*1024 {
214			log.Println("ignore output because the value is too long:", k, l)
215			r.logf("ignore output because the value %q is too long: %d", k, l)
216		}
217		if _, ok := r.outputs.Load(k); ok {
218			continue
219		}
220		r.outputs.Store(k, v)
221	}
222}
223
224func (r *Reporter) Close(lastWords string) error {
225	r.closed = true
226
227	r.stateMu.Lock()
228	if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED {
229		if lastWords == "" {
230			lastWords = "Early termination"
231		}
232		for _, v := range r.state.Steps {
233			if v.Result == runnerv1.Result_RESULT_UNSPECIFIED {
234				v.Result = runnerv1.Result_RESULT_CANCELLED
235			}
236		}
237		r.state.Result = runnerv1.Result_RESULT_FAILURE
238		r.logRows = append(r.logRows, &runnerv1.LogRow{
239			Time:    timestamppb.Now(),
240			Content: lastWords,
241		})
242		r.state.StoppedAt = timestamppb.Now()
243	} else if lastWords != "" {
244		r.logRows = append(r.logRows, &runnerv1.LogRow{
245			Time:    timestamppb.Now(),
246			Content: lastWords,
247		})
248	}
249	r.stateMu.Unlock()
250
251	return retry.Do(func() error {
252		if err := r.ReportLog(true); err != nil {
253			return err
254		}
255		return r.ReportState()
256	}, retry.Context(r.ctx))
257}
258
259func (r *Reporter) ReportLog(noMore bool) error {
260	r.clientM.Lock()
261	defer r.clientM.Unlock()
262
263	r.stateMu.RLock()
264	rows := r.logRows
265	r.stateMu.RUnlock()
266
267	resp, err := r.client.UpdateLog(r.ctx, connect.NewRequest(&runnerv1.UpdateLogRequest{
268		TaskId: r.state.Id,
269		Index:  int64(r.logOffset),
270		Rows:   rows,
271		NoMore: noMore,
272	}))
273	if err != nil {
274		return err
275	}
276
277	ack := int(resp.Msg.AckIndex)
278	if ack < r.logOffset {
279		return fmt.Errorf("submitted logs are lost")
280	}
281
282	r.stateMu.Lock()
283	r.logRows = r.logRows[ack-r.logOffset:]
284	r.logOffset = ack
285	r.stateMu.Unlock()
286
287	if noMore && ack < r.logOffset+len(rows) {
288		return fmt.Errorf("not all logs are submitted")
289	}
290
291	return nil
292}
293
294func (r *Reporter) ReportState() error {
295	r.clientM.Lock()
296	defer r.clientM.Unlock()
297
298	r.stateMu.RLock()
299	state := proto.Clone(r.state).(*runnerv1.TaskState)
300	r.stateMu.RUnlock()
301
302	outputs := make(map[string]string)
303	r.outputs.Range(func(k, v interface{}) bool {
304		if val, ok := v.(string); ok {
305			outputs[k.(string)] = val
306		}
307		return true
308	})
309
310	resp, err := r.client.UpdateTask(r.ctx, connect.NewRequest(&runnerv1.UpdateTaskRequest{
311		State:   state,
312		Outputs: outputs,
313	}))
314	if err != nil {
315		return err
316	}
317
318	for _, k := range resp.Msg.SentOutputs {
319		r.outputs.Store(k, struct{}{})
320	}
321
322	if resp.Msg.State != nil && resp.Msg.State.Result == runnerv1.Result_RESULT_CANCELLED {
323		r.cancel()
324	}
325
326	var noSent []string
327	r.outputs.Range(func(k, v interface{}) bool {
328		if _, ok := v.(string); ok {
329			noSent = append(noSent, k.(string))
330		}
331		return true
332	})
333	if len(noSent) > 0 {
334		return fmt.Errorf("there are still outputs that have not been sent: %v", noSent)
335	}
336
337	return nil
338}
339
340func (r *Reporter) duringSteps() bool {
341	if steps := r.state.Steps; len(steps) == 0 {
342		return false
343	} else if first := steps[0]; first.Result == runnerv1.Result_RESULT_UNSPECIFIED && first.LogLength == 0 {
344		return false
345	} else if last := steps[len(steps)-1]; last.Result != runnerv1.Result_RESULT_UNSPECIFIED {
346		return false
347	}
348	return true
349}
350
351var stringToResult = map[string]runnerv1.Result{
352	"success":   runnerv1.Result_RESULT_SUCCESS,
353	"failure":   runnerv1.Result_RESULT_FAILURE,
354	"skipped":   runnerv1.Result_RESULT_SKIPPED,
355	"cancelled": runnerv1.Result_RESULT_CANCELLED,
356}
357
358func (r *Reporter) parseResult(result interface{}) (runnerv1.Result, bool) {
359	str := ""
360	if v, ok := result.(string); ok { // for jobResult
361		str = v
362	} else if v, ok := result.(fmt.Stringer); ok { // for stepResult
363		str = v.String()
364	}
365
366	ret, ok := stringToResult[str]
367	return ret, ok
368}
369
370var cmdRegex = regexp.MustCompile(`^::([^ :]+)( .*)?::(.*)$`)
371
372func (r *Reporter) handleCommand(originalContent, command, parameters, value string) *string {
373	if r.stopCommandEndToken != "" && command != r.stopCommandEndToken {
374		return &originalContent
375	}
376
377	switch command {
378	case "add-mask":
379		r.addMask(value)
380		return nil
381	case "debug":
382		if r.debugOutputEnabled {
383			return &value
384		}
385		return nil
386
387	case "notice":
388		// Not implemented yet, so just return the original content.
389		return &originalContent
390	case "warning":
391		// Not implemented yet, so just return the original content.
392		return &originalContent
393	case "error":
394		// Not implemented yet, so just return the original content.
395		return &originalContent
396	case "group":
397		// Rewriting into ##[] syntax which the frontend understands
398		content := "##[group]" + value
399		return &content
400	case "endgroup":
401		// Ditto
402		content := "##[endgroup]"
403		return &content
404	case "stop-commands":
405		r.stopCommandEndToken = value
406		return nil
407	case r.stopCommandEndToken:
408		r.stopCommandEndToken = ""
409		return nil
410	}
411	return &originalContent
412}
413
414func (r *Reporter) parseLogRow(entry *log.Entry) *runnerv1.LogRow {
415	content := strings.TrimRightFunc(entry.Message, func(r rune) bool { return r == '\r' || r == '\n' })
416
417	matches := cmdRegex.FindStringSubmatch(content)
418	if matches != nil {
419		if output := r.handleCommand(content, matches[1], matches[2], matches[3]); output != nil {
420			content = *output
421		} else {
422			return nil
423		}
424	}
425
426	content = r.logReplacer.Replace(content)
427
428	return &runnerv1.LogRow{
429		Time:    timestamppb.New(entry.Time),
430		Content: strings.ToValidUTF8(content, "?"),
431	}
432}
433
434func (r *Reporter) addMask(msg string) {
435	r.oldnew = append(r.oldnew, msg, "***")
436	r.logReplacer = strings.NewReplacer(r.oldnew...)
437}