maddy

Fork https://github.com/foxcpp/maddy

git clone git://git.lin.moe/go/maddy.git

  1/*
  2Maddy Mail Server - Composable all-in-one email server.
  3Copyright © 2019-2020 Max Mazurov <fox.cpp@disroot.org>, Maddy Mail Server contributors
  4
  5This program is free software: you can redistribute it and/or modify
  6it under the terms of the GNU General Public License as published by
  7the Free Software Foundation, either version 3 of the License, or
  8(at your option) any later version.
  9
 10This program is distributed in the hope that it will be useful,
 11but WITHOUT ANY WARRANTY; without even the implied warranty of
 12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 13GNU General Public License for more details.
 14
 15You should have received a copy of the GNU General Public License
 16along with this program.  If not, see <https://www.gnu.org/licenses/>.
 17*/
 18
 19package limiters
 20
 21import (
 22	"context"
 23	"sync"
 24	"time"
 25)
 26
 27// BucketSet combines a group of Ls into a single key-indexed structure.
 28// Basically, each unique key gets its own counter. The main use case for
 29// BucketSet is to apply per-resource rate limiting.
 30//
 31// Amount of buckets is limited to a certain value. When the size of internal
 32// map is around or equal to that value, next Take call will attempt to remove
 33// any stale buckets from the group. If it is not possible to do so (all
 34// buckets are in active use), Take will return false. Alternatively, in some
 35// rare cases, some other (undefined) waiting Take can return false.
 36//
 37// A BucksetSet without a New function assigned is no-op: Take and TakeContext
 38// always succeed and Release does nothing.
 39type BucketSet struct {
 40	// New function is used to construct underlying L instances.
 41	//
 42	// It is safe to change it only when BucketSet is not used by any
 43	// goroutine.
 44	New func() L
 45
 46	// Time after which bucket is considered stale and can be removed from the
 47	// set. For safe use with Rate limiter, it should be at least as twice as
 48	// big as Rate refill interval.
 49	ReapInterval time.Duration
 50
 51	MaxBuckets int
 52
 53	mLck sync.Mutex
 54	m    map[string]*struct {
 55		r       L
 56		lastUse time.Time
 57	}
 58}
 59
 60func NewBucketSet(new_ func() L, reapInterval time.Duration, maxBuckets int) *BucketSet {
 61	return &BucketSet{
 62		New:          new_,
 63		ReapInterval: reapInterval,
 64		MaxBuckets:   maxBuckets,
 65		m: map[string]*struct {
 66			r       L
 67			lastUse time.Time
 68		}{},
 69	}
 70}
 71
 72func (r *BucketSet) Close() {
 73	r.mLck.Lock()
 74	defer r.mLck.Unlock()
 75
 76	for _, v := range r.m {
 77		v.r.Close()
 78	}
 79}
 80
 81func (r *BucketSet) take(key string) L {
 82	r.mLck.Lock()
 83	defer r.mLck.Unlock()
 84
 85	if len(r.m) > r.MaxBuckets {
 86		now := time.Now()
 87		// Attempt to get rid of stale buckets.
 88		for k, v := range r.m {
 89			if v.lastUse.Sub(now) > r.ReapInterval {
 90				// Drop the bucket, if there happen to be any waiting Take for it.
 91				// It will return 'false', but this is fine for us since this
 92				// whole 'reaping' process will run only when we are under a
 93				// high load and dropping random requests in this case is a
 94				// more or less reasonable thing to do.
 95				v.r.Close()
 96				delete(r.m, k)
 97			}
 98		}
 99
100		// Still full? E.g. all buckets are in use.
101		if len(r.m) > r.MaxBuckets {
102			return nil
103		}
104	}
105
106	bucket, ok := r.m[key]
107	if !ok {
108		r.m[key] = &struct {
109			r       L
110			lastUse time.Time
111		}{
112			r:       r.New(),
113			lastUse: time.Now(),
114		}
115		bucket = r.m[key]
116	}
117	r.m[key].lastUse = time.Now()
118
119	return bucket.r
120}
121
122func (r *BucketSet) Take(key string) bool {
123	if r.New == nil {
124		return true
125	}
126
127	bucket := r.take(key)
128	return bucket.Take()
129}
130
131func (r *BucketSet) Release(key string) {
132	if r.New == nil {
133		return
134	}
135
136	r.mLck.Lock()
137	defer r.mLck.Unlock()
138
139	bucket, ok := r.m[key]
140	if !ok {
141		return
142	}
143	bucket.r.Release()
144}
145
146func (r *BucketSet) TakeContext(ctx context.Context, key string) error {
147	if r.New == nil {
148		return nil
149	}
150
151	bucket := r.take(key)
152	return bucket.TakeContext(ctx)
153}