maddy

Fork https://github.com/foxcpp/maddy

git clone git://git.lin.moe/go/maddy.git

  1/*
  2Maddy Mail Server - Composable all-in-one email server.
  3Copyright © 2019-2020 Max Mazurov <fox.cpp@disroot.org>, Maddy Mail Server contributors
  4
  5This program is free software: you can redistribute it and/or modify
  6it under the terms of the GNU General Public License as published by
  7the Free Software Foundation, either version 3 of the License, or
  8(at your option) any later version.
  9
 10This program is distributed in the hope that it will be useful,
 11but WITHOUT ANY WARRANTY; without even the implied warranty of
 12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 13GNU General Public License for more details.
 14
 15You should have received a copy of the GNU General Public License
 16along with this program.  If not, see <https://www.gnu.org/licenses/>.
 17*/
 18
 19package pool
 20
 21import (
 22	"context"
 23	"sync"
 24	"time"
 25)
 26
 27type Conn interface {
 28	Usable() bool
 29	LastUseAt() time.Time
 30	Close() error
 31}
 32
 33type Config struct {
 34	New                 func(ctx context.Context, key string) (Conn, error)
 35	MaxKeys             int
 36	MaxConnsPerKey      int
 37	MaxConnLifetimeSec  int64
 38	StaleKeyLifetimeSec int64
 39}
 40
 41type slot struct {
 42	c chan Conn
 43	// To keep slot size smaller it is just a unix timestamp.
 44	lastUse int64
 45}
 46
 47type P struct {
 48	cfg      Config
 49	keys     map[string]slot
 50	keysLock sync.Mutex
 51
 52	cleanupStop chan struct{}
 53}
 54
 55func New(cfg Config) *P {
 56	if cfg.New == nil {
 57		cfg.New = func(context.Context, string) (Conn, error) {
 58			return nil, nil
 59		}
 60	}
 61
 62	p := &P{
 63		cfg:         cfg,
 64		keys:        make(map[string]slot, cfg.MaxKeys),
 65		cleanupStop: make(chan struct{}),
 66	}
 67
 68	go p.cleanUpTick(p.cleanupStop)
 69
 70	return p
 71}
 72
 73func (p *P) cleanUpTick(stop chan struct{}) {
 74	ctx := context.Background()
 75	tick := time.NewTicker(time.Minute)
 76	defer tick.Stop()
 77
 78	for {
 79		select {
 80		case <-tick.C:
 81			p.CleanUp(ctx)
 82		case <-stop:
 83			return
 84		}
 85	}
 86}
 87
 88func (p *P) CleanUp(ctx context.Context) {
 89	p.keysLock.Lock()
 90	defer p.keysLock.Unlock()
 91
 92	for k, v := range p.keys {
 93		if v.lastUse+p.cfg.StaleKeyLifetimeSec > time.Now().Unix() {
 94			continue
 95		}
 96
 97		close(v.c)
 98		for conn := range v.c {
 99			go conn.Close()
100		}
101		delete(p.keys, k)
102	}
103}
104
105func (p *P) Get(ctx context.Context, key string) (Conn, error) {
106	p.keysLock.Lock()
107
108	bucket, ok := p.keys[key]
109	if !ok {
110		p.keysLock.Unlock()
111		return p.cfg.New(ctx, key)
112	}
113
114	if time.Now().Unix()-bucket.lastUse > p.cfg.MaxConnLifetimeSec {
115		// Drop bucket.
116		delete(p.keys, key)
117		close(bucket.c)
118
119		// Close might take some time, unlock early.
120		p.keysLock.Unlock()
121
122		for conn := range bucket.c {
123			conn.Close()
124		}
125
126		return p.cfg.New(ctx, key)
127	}
128
129	p.keysLock.Unlock()
130
131	for {
132		var conn Conn
133		select {
134		case conn, ok = <-bucket.c:
135			if !ok {
136				return p.cfg.New(ctx, key)
137			}
138		default:
139			return p.cfg.New(ctx, key)
140		}
141
142		if !conn.Usable() {
143			// Close might take some time, run in parallel.
144			go conn.Close()
145			continue
146		}
147		if conn.LastUseAt().Add(time.Duration(p.cfg.MaxConnLifetimeSec) * time.Second).Before(time.Now()) {
148			go conn.Close()
149			continue
150		}
151
152		return conn, nil
153	}
154}
155
156func (p *P) Return(key string, c Conn) {
157	p.keysLock.Lock()
158	defer p.keysLock.Unlock()
159
160	if p.keys == nil {
161		return
162	}
163
164	bucket, ok := p.keys[key]
165	if !ok {
166		// Garbage-collect stale buckets.
167		if len(p.keys) == p.cfg.MaxKeys {
168			for k, v := range p.keys {
169				if v.lastUse+p.cfg.StaleKeyLifetimeSec > time.Now().Unix() {
170					continue
171				}
172				delete(p.keys, k)
173				close(v.c)
174
175				for conn := range v.c {
176					conn.Close()
177				}
178			}
179		}
180
181		bucket = slot{
182			c:       make(chan Conn, p.cfg.MaxConnsPerKey),
183			lastUse: time.Now().Unix(),
184		}
185		p.keys[key] = bucket
186	}
187
188	select {
189	case bucket.c <- c:
190		bucket.lastUse = time.Now().Unix()
191	default:
192		// Let it go, let it go...
193		go c.Close()
194	}
195}
196
197func (p *P) Close() {
198	p.cleanupStop <- struct{}{}
199
200	p.keysLock.Lock()
201	defer p.keysLock.Unlock()
202
203	for k, v := range p.keys {
204		close(v.c)
205		for conn := range v.c {
206			conn.Close()
207		}
208		delete(p.keys, k)
209	}
210	p.keys = nil
211}