1/*2Maddy Mail Server - Composable all-in-one email server.3Copyright © 2019-2020 Max Mazurov <fox.cpp@disroot.org>, Maddy Mail Server contributors45This program is free software: you can redistribute it and/or modify6it under the terms of the GNU General Public License as published by7the Free Software Foundation, either version 3 of the License, or8(at your option) any later version.910This program is distributed in the hope that it will be useful,11but WITHOUT ANY WARRANTY; without even the implied warranty of12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the13GNU General Public License for more details.1415You should have received a copy of the GNU General Public License16along with this program. If not, see <https://www.gnu.org/licenses/>.17*/1819package queue2021import (22 "container/list"23 "sync"24 "sync/atomic"25 "time"26)2728type TimeSlot struct {29 Time time.Time30 Value interface{}31}3233type TimeWheel struct {34 stopped uint323536 slots *list.List37 slotsLock sync.Mutex3839 updateNotify chan time.Time40 stopNotify chan struct{}4142 dispatch func(TimeSlot)43}4445func NewTimeWheel(dispatch func(TimeSlot)) *TimeWheel {46 tw := &TimeWheel{47 slots: list.New(),48 stopNotify: make(chan struct{}),49 updateNotify: make(chan time.Time),50 dispatch: dispatch,51 }52 go tw.tick()53 return tw54}5556func (tw *TimeWheel) Add(target time.Time, value interface{}) {57 if atomic.LoadUint32(&tw.stopped) == 1 {58 // Already stopped, ignore.59 return60 }6162 if value == nil {63 panic("can't insert nil objects into TimeWheel queue")64 }6566 tw.slotsLock.Lock()67 tw.slots.PushBack(TimeSlot{Time: target, Value: value})68 tw.slotsLock.Unlock()6970 tw.updateNotify <- target71}7273func (tw *TimeWheel) Close() {74 atomic.StoreUint32(&tw.stopped, 1)7576 // Idempotent Close is convenient sometimes.77 if tw.stopNotify == nil {78 return79 }8081 tw.stopNotify <- struct{}{}82 <-tw.stopNotify8384 tw.stopNotify = nil8586 close(tw.updateNotify)87}8889func (tw *TimeWheel) tick() {90 for {91 now := time.Now()92 // Look for list element closest to now.93 tw.slotsLock.Lock()94 var closestSlot TimeSlot95 var closestEl *list.Element96 for e := tw.slots.Front(); e != nil; e = e.Next() {97 slot := e.Value.(TimeSlot)98 if slot.Time.Sub(now) < closestSlot.Time.Sub(now) || closestSlot.Value == nil {99 closestSlot = slot100 closestEl = e101 }102 }103 tw.slotsLock.Unlock()104 // Only this goroutine removes elements from TimeWheel so we can be safe using closestSlot.105106 // Queue is empty. Just wait until update.107 if closestEl == nil {108 select {109 case <-tw.updateNotify:110 continue111 case <-tw.stopNotify:112 tw.stopNotify <- struct{}{}113 return114 }115 }116117 timer := time.NewTimer(closestSlot.Time.Sub(now))118119 selectloop:120 for {121 select {122 case <-timer.C:123 tw.slotsLock.Lock()124 tw.slots.Remove(closestEl)125 tw.slotsLock.Unlock()126127 tw.dispatch(closestSlot)128129 break selectloop130 case newTarget := <-tw.updateNotify:131 // Avoid unnecessary restarts if new target is not going to affect our132 // current wait time.133 if closestSlot.Time.Sub(now) <= newTarget.Sub(now) {134 continue135 }136137 timer.Stop()138 // Recalculate new slot time.139 break selectloop140 case <-tw.stopNotify:141 tw.stopNotify <- struct{}{}142 return143 }144 }145 }146}