forgejo-runner

git clone git://git.lin.moe/forgejo-runner.git

  1// Copyright 2023 The Gitea Authors. All rights reserved.
  2// SPDX-License-Identifier: MIT
  3
  4package poll
  5
  6import (
  7	"context"
  8	"errors"
  9	"fmt"
 10	"sync"
 11	"sync/atomic"
 12
 13	runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
 14	"connectrpc.com/connect"
 15	log "github.com/sirupsen/logrus"
 16	"golang.org/x/time/rate"
 17
 18	"gitea.com/gitea/act_runner/internal/app/run"
 19	"gitea.com/gitea/act_runner/internal/pkg/client"
 20	"gitea.com/gitea/act_runner/internal/pkg/config"
 21)
 22
 23const PollerID = "PollerID"
 24
 25type Poller interface {
 26	Poll()
 27	Shutdown(ctx context.Context) error
 28}
 29
 30type poller struct {
 31	client       client.Client
 32	runner       run.RunnerInterface
 33	cfg          *config.Config
 34	tasksVersion atomic.Int64 // tasksVersion used to store the version of the last task fetched from the Gitea.
 35
 36	pollingCtx      context.Context
 37	shutdownPolling context.CancelFunc
 38
 39	jobsCtx      context.Context
 40	shutdownJobs context.CancelFunc
 41
 42	done chan any
 43}
 44
 45func New(cfg *config.Config, client client.Client, runner run.RunnerInterface) Poller {
 46	return (&poller{}).init(cfg, client, runner)
 47}
 48
 49func (p *poller) init(cfg *config.Config, client client.Client, runner run.RunnerInterface) Poller {
 50	pollingCtx, shutdownPolling := context.WithCancel(context.Background())
 51
 52	jobsCtx, shutdownJobs := context.WithCancel(context.Background())
 53
 54	done := make(chan any)
 55
 56	p.client = client
 57	p.runner = runner
 58	p.cfg = cfg
 59
 60	p.pollingCtx = pollingCtx
 61	p.shutdownPolling = shutdownPolling
 62
 63	p.jobsCtx = jobsCtx
 64	p.shutdownJobs = shutdownJobs
 65	p.done = done
 66
 67	return p
 68}
 69
 70func (p *poller) Poll() {
 71	limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1)
 72	wg := &sync.WaitGroup{}
 73	for i := 0; i < p.cfg.Runner.Capacity; i++ {
 74		wg.Add(1)
 75		go p.poll(i, wg, limiter)
 76	}
 77	wg.Wait()
 78
 79	// signal the poller is finished
 80	close(p.done)
 81}
 82
 83func (p *poller) Shutdown(ctx context.Context) error {
 84	p.shutdownPolling()
 85
 86	select {
 87	case <-p.done:
 88		log.Trace("all jobs are complete")
 89		return nil
 90
 91	case <-ctx.Done():
 92		log.Trace("forcing the jobs to shutdown")
 93		p.shutdownJobs()
 94		<-p.done
 95		log.Trace("all jobs have been shutdown")
 96		return ctx.Err()
 97	}
 98}
 99
100func (p *poller) poll(id int, wg *sync.WaitGroup, limiter *rate.Limiter) {
101	log.Infof("[poller %d] launched", id)
102	defer wg.Done()
103	for {
104		if err := limiter.Wait(p.pollingCtx); err != nil {
105			log.Infof("[poller %d] shutdown", id)
106			return
107		}
108		task, ok := p.fetchTask(p.pollingCtx)
109		if !ok {
110			continue
111		}
112		p.runTaskWithRecover(p.jobsCtx, task)
113	}
114}
115
116func (p *poller) runTaskWithRecover(ctx context.Context, task *runnerv1.Task) {
117	defer func() {
118		if r := recover(); r != nil {
119			err := fmt.Errorf("panic: %v", r)
120			log.WithError(err).Error("panic in runTaskWithRecover")
121		}
122	}()
123
124	if err := p.runner.Run(ctx, task); err != nil {
125		log.WithError(err).Error("failed to run task")
126	}
127}
128
129func (p *poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) {
130	reqCtx, cancel := context.WithTimeout(ctx, p.cfg.Runner.FetchTimeout)
131	defer cancel()
132
133	// Load the version value that was in the cache when the request was sent.
134	v := p.tasksVersion.Load()
135	resp, err := p.client.FetchTask(reqCtx, connect.NewRequest(&runnerv1.FetchTaskRequest{
136		TasksVersion: v,
137	}))
138	if errors.Is(err, context.DeadlineExceeded) {
139		log.Trace("deadline exceeded")
140		err = nil
141	}
142	if err != nil {
143		if errors.Is(err, context.Canceled) {
144			log.WithError(err).Debugf("shutdown, fetch task canceled")
145		} else {
146			log.WithError(err).Error("failed to fetch task")
147		}
148		return nil, false
149	}
150
151	if resp == nil || resp.Msg == nil {
152		return nil, false
153	}
154
155	if resp.Msg.TasksVersion > v {
156		p.tasksVersion.CompareAndSwap(v, resp.Msg.TasksVersion)
157	}
158
159	if resp.Msg.Task == nil {
160		return nil, false
161	}
162
163	// got a task, set `tasksVersion` to zero to focre query db in next request.
164	p.tasksVersion.CompareAndSwap(resp.Msg.TasksVersion, 0)
165
166	return resp.Msg.Task, true
167}