1// Copyright 2022 The Gitea Authors. All rights reserved.2// SPDX-License-Identifier: MIT34package report56import (7 "context"8 "fmt"9 "regexp"10 "strings"11 "sync"12 "time"1314 runnerv1 "code.gitea.io/actions-proto-go/runner/v1"15 "connectrpc.com/connect"16 retry "github.com/avast/retry-go/v4"17 log "github.com/sirupsen/logrus"18 "google.golang.org/protobuf/proto"19 "google.golang.org/protobuf/types/known/timestamppb"2021 "gitea.com/gitea/act_runner/internal/pkg/client"22)2324type Reporter struct {25 ctx context.Context26 cancel context.CancelFunc2728 closed bool29 client client.Client30 clientM sync.Mutex3132 logOffset int33 logRows []*runnerv1.LogRow34 logReplacer *strings.Replacer35 oldnew []string36 reportInterval time.Duration3738 state *runnerv1.TaskState39 stateMu sync.RWMutex40 outputs sync.Map4142 debugOutputEnabled bool43 stopCommandEndToken string44}4546func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.Client, task *runnerv1.Task, reportInterval time.Duration) *Reporter {47 var oldnew []string48 if v := task.Context.Fields["token"].GetStringValue(); v != "" {49 oldnew = append(oldnew, v, "***")50 }51 if v := task.Context.Fields["gitea_runtime_token"].GetStringValue(); v != "" {52 oldnew = append(oldnew, v, "***")53 }54 for _, v := range task.Secrets {55 oldnew = append(oldnew, v, "***")56 }5758 rv := &Reporter{59 ctx: ctx,60 cancel: cancel,61 client: client,62 oldnew: oldnew,63 reportInterval: reportInterval,64 logReplacer: strings.NewReplacer(oldnew...),65 state: &runnerv1.TaskState{66 Id: task.Id,67 },68 }6970 if task.Secrets["ACTIONS_STEP_DEBUG"] == "true" {71 rv.debugOutputEnabled = true72 }7374 return rv75}7677func (r *Reporter) ResetSteps(l int) {78 r.stateMu.Lock()79 defer r.stateMu.Unlock()80 for i := 0; i < l; i++ {81 r.state.Steps = append(r.state.Steps, &runnerv1.StepState{82 Id: int64(i),83 })84 }85}8687func (r *Reporter) Levels() []log.Level {88 return log.AllLevels89}9091func appendIfNotNil[T any](s []*T, v *T) []*T {92 if v != nil {93 return append(s, v)94 }95 return s96}9798func (r *Reporter) Fire(entry *log.Entry) error {99 r.stateMu.Lock()100 defer r.stateMu.Unlock()101102 log.WithFields(entry.Data).Trace(entry.Message)103104 timestamp := entry.Time105 if r.state.StartedAt == nil {106 r.state.StartedAt = timestamppb.New(timestamp)107 }108109 stage := entry.Data["stage"]110111 if stage != "Main" {112 if v, ok := entry.Data["jobResult"]; ok {113 if jobResult, ok := r.parseResult(v); ok {114 r.state.Result = jobResult115 r.state.StoppedAt = timestamppb.New(timestamp)116 for _, s := range r.state.Steps {117 if s.Result == runnerv1.Result_RESULT_UNSPECIFIED {118 s.Result = runnerv1.Result_RESULT_CANCELLED119 if jobResult == runnerv1.Result_RESULT_SKIPPED {120 s.Result = runnerv1.Result_RESULT_SKIPPED121 }122 }123 }124 }125 }126 if !r.duringSteps() {127 r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry))128 }129 return nil130 }131132 var step *runnerv1.StepState133 if v, ok := entry.Data["stepNumber"]; ok {134 if v, ok := v.(int); ok && len(r.state.Steps) > v {135 step = r.state.Steps[v]136 }137 }138 if step == nil {139 if !r.duringSteps() {140 r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry))141 }142 return nil143 }144145 if step.StartedAt == nil {146 step.StartedAt = timestamppb.New(timestamp)147 }148 if v, ok := entry.Data["raw_output"]; ok {149 if rawOutput, ok := v.(bool); ok && rawOutput {150 if row := r.parseLogRow(entry); row != nil {151 if step.LogLength == 0 {152 step.LogIndex = int64(r.logOffset + len(r.logRows))153 }154 step.LogLength++155 r.logRows = append(r.logRows, row)156 }157 }158 } else if !r.duringSteps() {159 r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry))160 }161 if v, ok := entry.Data["stepResult"]; ok {162 if stepResult, ok := r.parseResult(v); ok {163 if step.LogLength == 0 {164 step.LogIndex = int64(r.logOffset + len(r.logRows))165 }166 step.Result = stepResult167 step.StoppedAt = timestamppb.New(timestamp)168 }169 }170171 return nil172}173174func (r *Reporter) RunDaemon() {175 if r.closed {176 return177 }178 if r.ctx.Err() != nil {179 return180 }181182 _ = r.ReportLog(false)183 _ = r.ReportState()184185 time.AfterFunc(r.reportInterval, r.RunDaemon)186}187188func (r *Reporter) Logf(format string, a ...interface{}) {189 r.stateMu.Lock()190 defer r.stateMu.Unlock()191192 r.logf(format, a...)193}194195func (r *Reporter) logf(format string, a ...interface{}) {196 if !r.duringSteps() {197 r.logRows = append(r.logRows, &runnerv1.LogRow{198 Time: timestamppb.Now(),199 Content: fmt.Sprintf(format, a...),200 })201 }202}203204func (r *Reporter) SetOutputs(outputs map[string]string) {205 r.stateMu.Lock()206 defer r.stateMu.Unlock()207208 for k, v := range outputs {209 if len(k) > 255 {210 r.logf("ignore output because the key is too long: %q", k)211 continue212 }213 if l := len(v); l > 1024*1024 {214 log.Println("ignore output because the value is too long:", k, l)215 r.logf("ignore output because the value %q is too long: %d", k, l)216 }217 if _, ok := r.outputs.Load(k); ok {218 continue219 }220 r.outputs.Store(k, v)221 }222}223224func (r *Reporter) Close(lastWords string) error {225 r.closed = true226227 r.stateMu.Lock()228 if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED {229 if lastWords == "" {230 lastWords = "Early termination"231 }232 for _, v := range r.state.Steps {233 if v.Result == runnerv1.Result_RESULT_UNSPECIFIED {234 v.Result = runnerv1.Result_RESULT_CANCELLED235 }236 }237 r.state.Result = runnerv1.Result_RESULT_FAILURE238 r.logRows = append(r.logRows, &runnerv1.LogRow{239 Time: timestamppb.Now(),240 Content: lastWords,241 })242 r.state.StoppedAt = timestamppb.Now()243 } else if lastWords != "" {244 r.logRows = append(r.logRows, &runnerv1.LogRow{245 Time: timestamppb.Now(),246 Content: lastWords,247 })248 }249 r.stateMu.Unlock()250251 return retry.Do(func() error {252 if err := r.ReportLog(true); err != nil {253 return err254 }255 return r.ReportState()256 }, retry.Context(r.ctx))257}258259func (r *Reporter) ReportLog(noMore bool) error {260 r.clientM.Lock()261 defer r.clientM.Unlock()262263 r.stateMu.RLock()264 rows := r.logRows265 r.stateMu.RUnlock()266267 resp, err := r.client.UpdateLog(r.ctx, connect.NewRequest(&runnerv1.UpdateLogRequest{268 TaskId: r.state.Id,269 Index: int64(r.logOffset),270 Rows: rows,271 NoMore: noMore,272 }))273 if err != nil {274 return err275 }276277 ack := int(resp.Msg.AckIndex)278 if ack < r.logOffset {279 return fmt.Errorf("submitted logs are lost")280 }281282 r.stateMu.Lock()283 r.logRows = r.logRows[ack-r.logOffset:]284 r.logOffset = ack285 r.stateMu.Unlock()286287 if noMore && ack < r.logOffset+len(rows) {288 return fmt.Errorf("not all logs are submitted")289 }290291 return nil292}293294func (r *Reporter) ReportState() error {295 r.clientM.Lock()296 defer r.clientM.Unlock()297298 r.stateMu.RLock()299 state := proto.Clone(r.state).(*runnerv1.TaskState)300 r.stateMu.RUnlock()301302 outputs := make(map[string]string)303 r.outputs.Range(func(k, v interface{}) bool {304 if val, ok := v.(string); ok {305 outputs[k.(string)] = val306 }307 return true308 })309310 resp, err := r.client.UpdateTask(r.ctx, connect.NewRequest(&runnerv1.UpdateTaskRequest{311 State: state,312 Outputs: outputs,313 }))314 if err != nil {315 return err316 }317318 for _, k := range resp.Msg.SentOutputs {319 r.outputs.Store(k, struct{}{})320 }321322 if resp.Msg.State != nil && resp.Msg.State.Result == runnerv1.Result_RESULT_CANCELLED {323 r.cancel()324 }325326 var noSent []string327 r.outputs.Range(func(k, v interface{}) bool {328 if _, ok := v.(string); ok {329 noSent = append(noSent, k.(string))330 }331 return true332 })333 if len(noSent) > 0 {334 return fmt.Errorf("there are still outputs that have not been sent: %v", noSent)335 }336337 return nil338}339340func (r *Reporter) duringSteps() bool {341 if steps := r.state.Steps; len(steps) == 0 {342 return false343 } else if first := steps[0]; first.Result == runnerv1.Result_RESULT_UNSPECIFIED && first.LogLength == 0 {344 return false345 } else if last := steps[len(steps)-1]; last.Result != runnerv1.Result_RESULT_UNSPECIFIED {346 return false347 }348 return true349}350351var stringToResult = map[string]runnerv1.Result{352 "success": runnerv1.Result_RESULT_SUCCESS,353 "failure": runnerv1.Result_RESULT_FAILURE,354 "skipped": runnerv1.Result_RESULT_SKIPPED,355 "cancelled": runnerv1.Result_RESULT_CANCELLED,356}357358func (r *Reporter) parseResult(result interface{}) (runnerv1.Result, bool) {359 str := ""360 if v, ok := result.(string); ok { // for jobResult361 str = v362 } else if v, ok := result.(fmt.Stringer); ok { // for stepResult363 str = v.String()364 }365366 ret, ok := stringToResult[str]367 return ret, ok368}369370var cmdRegex = regexp.MustCompile(`^::([^ :]+)( .*)?::(.*)$`)371372func (r *Reporter) handleCommand(originalContent, command, parameters, value string) *string {373 if r.stopCommandEndToken != "" && command != r.stopCommandEndToken {374 return &originalContent375 }376377 switch command {378 case "add-mask":379 r.addMask(value)380 return nil381 case "debug":382 if r.debugOutputEnabled {383 return &value384 }385 return nil386387 case "notice":388 // Not implemented yet, so just return the original content.389 return &originalContent390 case "warning":391 // Not implemented yet, so just return the original content.392 return &originalContent393 case "error":394 // Not implemented yet, so just return the original content.395 return &originalContent396 case "group":397 // Rewriting into ##[] syntax which the frontend understands398 content := "##[group]" + value399 return &content400 case "endgroup":401 // Ditto402 content := "##[endgroup]"403 return &content404 case "stop-commands":405 r.stopCommandEndToken = value406 return nil407 case r.stopCommandEndToken:408 r.stopCommandEndToken = ""409 return nil410 }411 return &originalContent412}413414func (r *Reporter) parseLogRow(entry *log.Entry) *runnerv1.LogRow {415 content := strings.TrimRightFunc(entry.Message, func(r rune) bool { return r == '\r' || r == '\n' })416417 matches := cmdRegex.FindStringSubmatch(content)418 if matches != nil {419 if output := r.handleCommand(content, matches[1], matches[2], matches[3]); output != nil {420 content = *output421 } else {422 return nil423 }424 }425426 content = r.logReplacer.Replace(content)427428 return &runnerv1.LogRow{429 Time: timestamppb.New(entry.Time),430 Content: strings.ToValidUTF8(content, "?"),431 }432}433434func (r *Reporter) addMask(msg string) {435 r.oldnew = append(r.oldnew, msg, "***")436 r.logReplacer = strings.NewReplacer(r.oldnew...)437}