1// Copyright 2023 The Gitea Authors. All rights reserved.2// SPDX-License-Identifier: MIT34package poll56import (7 "context"8 "errors"9 "fmt"10 "sync"11 "sync/atomic"1213 runnerv1 "code.gitea.io/actions-proto-go/runner/v1"14 "connectrpc.com/connect"15 log "github.com/sirupsen/logrus"16 "golang.org/x/time/rate"1718 "gitea.com/gitea/act_runner/internal/app/run"19 "gitea.com/gitea/act_runner/internal/pkg/client"20 "gitea.com/gitea/act_runner/internal/pkg/config"21)2223const PollerID = "PollerID"2425type Poller interface {26 Poll()27 Shutdown(ctx context.Context) error28}2930type poller struct {31 client client.Client32 runner run.RunnerInterface33 cfg *config.Config34 tasksVersion atomic.Int64 // tasksVersion used to store the version of the last task fetched from the Gitea.3536 pollingCtx context.Context37 shutdownPolling context.CancelFunc3839 jobsCtx context.Context40 shutdownJobs context.CancelFunc4142 done chan any43}4445func New(cfg *config.Config, client client.Client, runner run.RunnerInterface) Poller {46 return (&poller{}).init(cfg, client, runner)47}4849func (p *poller) init(cfg *config.Config, client client.Client, runner run.RunnerInterface) Poller {50 pollingCtx, shutdownPolling := context.WithCancel(context.Background())5152 jobsCtx, shutdownJobs := context.WithCancel(context.Background())5354 done := make(chan any)5556 p.client = client57 p.runner = runner58 p.cfg = cfg5960 p.pollingCtx = pollingCtx61 p.shutdownPolling = shutdownPolling6263 p.jobsCtx = jobsCtx64 p.shutdownJobs = shutdownJobs65 p.done = done6667 return p68}6970func (p *poller) Poll() {71 limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1)72 wg := &sync.WaitGroup{}73 for i := 0; i < p.cfg.Runner.Capacity; i++ {74 wg.Add(1)75 go p.poll(i, wg, limiter)76 }77 wg.Wait()7879 // signal the poller is finished80 close(p.done)81}8283func (p *poller) Shutdown(ctx context.Context) error {84 p.shutdownPolling()8586 select {87 case <-p.done:88 log.Trace("all jobs are complete")89 return nil9091 case <-ctx.Done():92 log.Trace("forcing the jobs to shutdown")93 p.shutdownJobs()94 <-p.done95 log.Trace("all jobs have been shutdown")96 return ctx.Err()97 }98}99100func (p *poller) poll(id int, wg *sync.WaitGroup, limiter *rate.Limiter) {101 log.Infof("[poller %d] launched", id)102 defer wg.Done()103 for {104 if err := limiter.Wait(p.pollingCtx); err != nil {105 log.Infof("[poller %d] shutdown", id)106 return107 }108 task, ok := p.fetchTask(p.pollingCtx)109 if !ok {110 continue111 }112 p.runTaskWithRecover(p.jobsCtx, task)113 }114}115116func (p *poller) runTaskWithRecover(ctx context.Context, task *runnerv1.Task) {117 defer func() {118 if r := recover(); r != nil {119 err := fmt.Errorf("panic: %v", r)120 log.WithError(err).Error("panic in runTaskWithRecover")121 }122 }()123124 if err := p.runner.Run(ctx, task); err != nil {125 log.WithError(err).Error("failed to run task")126 }127}128129func (p *poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) {130 reqCtx, cancel := context.WithTimeout(ctx, p.cfg.Runner.FetchTimeout)131 defer cancel()132133 // Load the version value that was in the cache when the request was sent.134 v := p.tasksVersion.Load()135 resp, err := p.client.FetchTask(reqCtx, connect.NewRequest(&runnerv1.FetchTaskRequest{136 TasksVersion: v,137 }))138 if errors.Is(err, context.DeadlineExceeded) {139 log.Trace("deadline exceeded")140 err = nil141 }142 if err != nil {143 if errors.Is(err, context.Canceled) {144 log.WithError(err).Debugf("shutdown, fetch task canceled")145 } else {146 log.WithError(err).Error("failed to fetch task")147 }148 return nil, false149 }150151 if resp == nil || resp.Msg == nil {152 return nil, false153 }154155 if resp.Msg.TasksVersion > v {156 p.tasksVersion.CompareAndSwap(v, resp.Msg.TasksVersion)157 }158159 if resp.Msg.Task == nil {160 return nil, false161 }162163 // got a task, set `tasksVersion` to zero to focre query db in next request.164 p.tasksVersion.CompareAndSwap(resp.Msg.TasksVersion, 0)165166 return resp.Msg.Task, true167}