1package task23import (4 "context"5 "errors"6 "sync"7 "sync/atomic"8)910var (11 // ErrNotFound is returned when a process is not found.12 ErrNotFound = errors.New("task not found")1314 // ErrAlreadyStarted is returned when a process is already started.15 ErrAlreadyStarted = errors.New("task already started")16)1718// Task is a task that can be started and stopped.19type Task struct {20 id string21 fn func(context.Context) error22 started atomic.Bool23 ctx context.Context24 cancel context.CancelFunc25 err error26}2728// Manager manages tasks.29type Manager struct {30 m sync.Map31 ctx context.Context32}3334// NewManager returns a new task manager.35func NewManager(ctx context.Context) *Manager {36 return &Manager{37 m: sync.Map{},38 ctx: ctx,39 }40}4142// Add adds a task to the manager.43// If the process already exists, it is a no-op.44func (m *Manager) Add(id string, fn func(context.Context) error) {45 if m.Exists(id) {46 return47 }4849 ctx, cancel := context.WithCancel(m.ctx)50 m.m.Store(id, &Task{51 id: id,52 fn: fn,53 ctx: ctx,54 cancel: cancel,55 })56}5758// Stop stops the task and removes it from the manager.59func (m *Manager) Stop(id string) error {60 v, ok := m.m.Load(id)61 if !ok {62 return ErrNotFound63 }6465 p := v.(*Task)66 p.cancel()6768 m.m.Delete(id)69 return nil70}7172// Exists checks if a task exists.73func (m *Manager) Exists(id string) bool {74 _, ok := m.m.Load(id)75 return ok76}7778// Run starts the task if it exists.79// Otherwise, it waits for the process to finish.80func (m *Manager) Run(id string, done chan<- error) {81 v, ok := m.m.Load(id)82 if !ok {83 done <- ErrNotFound84 return85 }8687 p := v.(*Task)88 if p.started.Load() {89 <-p.ctx.Done()90 if p.err != nil {91 done <- p.err92 return93 }9495 done <- p.ctx.Err()96 }9798 p.started.Store(true)99 m.m.Store(id, p)100 defer p.cancel()101 defer m.m.Delete(id)102103 errc := make(chan error, 1)104 go func(ctx context.Context) {105 errc <- p.fn(ctx)106 }(p.ctx)107108 select {109 case <-m.ctx.Done():110 done <- m.ctx.Err()111 case err := <-errc:112 p.err = err113 m.m.Store(id, p)114 done <- err115 }116}