maddy

Fork https://github.com/foxcpp/maddy

git clone git://git.lin.moe/go/maddy.git

  1/*
  2Maddy Mail Server - Composable all-in-one email server.
  3Copyright © 2019-2020 Max Mazurov <fox.cpp@disroot.org>, Maddy Mail Server contributors
  4
  5This program is free software: you can redistribute it and/or modify
  6it under the terms of the GNU General Public License as published by
  7the Free Software Foundation, either version 3 of the License, or
  8(at your option) any later version.
  9
 10This program is distributed in the hope that it will be useful,
 11but WITHOUT ANY WARRANTY; without even the implied warranty of
 12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 13GNU General Public License for more details.
 14
 15You should have received a copy of the GNU General Public License
 16along with this program.  If not, see <https://www.gnu.org/licenses/>.
 17*/
 18
 19package queue
 20
 21import (
 22	"container/list"
 23	"sync"
 24	"sync/atomic"
 25	"time"
 26)
 27
 28type TimeSlot struct {
 29	Time  time.Time
 30	Value interface{}
 31}
 32
 33type TimeWheel struct {
 34	stopped uint32
 35
 36	slots     *list.List
 37	slotsLock sync.Mutex
 38
 39	updateNotify chan time.Time
 40	stopNotify   chan struct{}
 41
 42	dispatch func(TimeSlot)
 43}
 44
 45func NewTimeWheel(dispatch func(TimeSlot)) *TimeWheel {
 46	tw := &TimeWheel{
 47		slots:        list.New(),
 48		stopNotify:   make(chan struct{}),
 49		updateNotify: make(chan time.Time),
 50		dispatch:     dispatch,
 51	}
 52	go tw.tick()
 53	return tw
 54}
 55
 56func (tw *TimeWheel) Add(target time.Time, value interface{}) {
 57	if atomic.LoadUint32(&tw.stopped) == 1 {
 58		// Already stopped, ignore.
 59		return
 60	}
 61
 62	if value == nil {
 63		panic("can't insert nil objects into TimeWheel queue")
 64	}
 65
 66	tw.slotsLock.Lock()
 67	tw.slots.PushBack(TimeSlot{Time: target, Value: value})
 68	tw.slotsLock.Unlock()
 69
 70	tw.updateNotify <- target
 71}
 72
 73func (tw *TimeWheel) Close() {
 74	atomic.StoreUint32(&tw.stopped, 1)
 75
 76	// Idempotent Close is convenient sometimes.
 77	if tw.stopNotify == nil {
 78		return
 79	}
 80
 81	tw.stopNotify <- struct{}{}
 82	<-tw.stopNotify
 83
 84	tw.stopNotify = nil
 85
 86	close(tw.updateNotify)
 87}
 88
 89func (tw *TimeWheel) tick() {
 90	for {
 91		now := time.Now()
 92		// Look for list element closest to now.
 93		tw.slotsLock.Lock()
 94		var closestSlot TimeSlot
 95		var closestEl *list.Element
 96		for e := tw.slots.Front(); e != nil; e = e.Next() {
 97			slot := e.Value.(TimeSlot)
 98			if slot.Time.Sub(now) < closestSlot.Time.Sub(now) || closestSlot.Value == nil {
 99				closestSlot = slot
100				closestEl = e
101			}
102		}
103		tw.slotsLock.Unlock()
104		// Only this goroutine removes elements from TimeWheel so we can be safe using closestSlot.
105
106		// Queue is empty. Just wait until update.
107		if closestEl == nil {
108			select {
109			case <-tw.updateNotify:
110				continue
111			case <-tw.stopNotify:
112				tw.stopNotify <- struct{}{}
113				return
114			}
115		}
116
117		timer := time.NewTimer(closestSlot.Time.Sub(now))
118
119	selectloop:
120		for {
121			select {
122			case <-timer.C:
123				tw.slotsLock.Lock()
124				tw.slots.Remove(closestEl)
125				tw.slotsLock.Unlock()
126
127				tw.dispatch(closestSlot)
128
129				break selectloop
130			case newTarget := <-tw.updateNotify:
131				// Avoid unnecessary restarts if new target is not going to affect our
132				// current wait time.
133				if closestSlot.Time.Sub(now) <= newTarget.Sub(now) {
134					continue
135				}
136
137				timer.Stop()
138				// Recalculate new slot time.
139				break selectloop
140			case <-tw.stopNotify:
141				tw.stopNotify <- struct{}{}
142				return
143			}
144		}
145	}
146}