1/*2Maddy Mail Server - Composable all-in-one email server.3Copyright © 2019-2020 Max Mazurov <fox.cpp@disroot.org>, Maddy Mail Server contributors45This program is free software: you can redistribute it and/or modify6it under the terms of the GNU General Public License as published by7the Free Software Foundation, either version 3 of the License, or8(at your option) any later version.910This program is distributed in the hope that it will be useful,11but WITHOUT ANY WARRANTY; without even the implied warranty of12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the13GNU General Public License for more details.1415You should have received a copy of the GNU General Public License16along with this program. If not, see <https://www.gnu.org/licenses/>.17*/1819package pool2021import (22 "context"23 "sync"24 "time"25)2627type Conn interface {28 Usable() bool29 LastUseAt() time.Time30 Close() error31}3233type Config struct {34 New func(ctx context.Context, key string) (Conn, error)35 MaxKeys int36 MaxConnsPerKey int37 MaxConnLifetimeSec int6438 StaleKeyLifetimeSec int6439}4041type slot struct {42 c chan Conn43 // To keep slot size smaller it is just a unix timestamp.44 lastUse int6445}4647type P struct {48 cfg Config49 keys map[string]slot50 keysLock sync.Mutex5152 cleanupStop chan struct{}53}5455func New(cfg Config) *P {56 if cfg.New == nil {57 cfg.New = func(context.Context, string) (Conn, error) {58 return nil, nil59 }60 }6162 p := &P{63 cfg: cfg,64 keys: make(map[string]slot, cfg.MaxKeys),65 cleanupStop: make(chan struct{}),66 }6768 go p.cleanUpTick(p.cleanupStop)6970 return p71}7273func (p *P) cleanUpTick(stop chan struct{}) {74 ctx := context.Background()75 tick := time.NewTicker(time.Minute)76 defer tick.Stop()7778 for {79 select {80 case <-tick.C:81 p.CleanUp(ctx)82 case <-stop:83 return84 }85 }86}8788func (p *P) CleanUp(ctx context.Context) {89 p.keysLock.Lock()90 defer p.keysLock.Unlock()9192 for k, v := range p.keys {93 if v.lastUse+p.cfg.StaleKeyLifetimeSec > time.Now().Unix() {94 continue95 }9697 close(v.c)98 for conn := range v.c {99 go conn.Close()100 }101 delete(p.keys, k)102 }103}104105func (p *P) Get(ctx context.Context, key string) (Conn, error) {106 p.keysLock.Lock()107108 bucket, ok := p.keys[key]109 if !ok {110 p.keysLock.Unlock()111 return p.cfg.New(ctx, key)112 }113114 if time.Now().Unix()-bucket.lastUse > p.cfg.MaxConnLifetimeSec {115 // Drop bucket.116 delete(p.keys, key)117 close(bucket.c)118119 // Close might take some time, unlock early.120 p.keysLock.Unlock()121122 for conn := range bucket.c {123 conn.Close()124 }125126 return p.cfg.New(ctx, key)127 }128129 p.keysLock.Unlock()130131 for {132 var conn Conn133 select {134 case conn, ok = <-bucket.c:135 if !ok {136 return p.cfg.New(ctx, key)137 }138 default:139 return p.cfg.New(ctx, key)140 }141142 if !conn.Usable() {143 // Close might take some time, run in parallel.144 go conn.Close()145 continue146 }147 if conn.LastUseAt().Add(time.Duration(p.cfg.MaxConnLifetimeSec) * time.Second).Before(time.Now()) {148 go conn.Close()149 continue150 }151152 return conn, nil153 }154}155156func (p *P) Return(key string, c Conn) {157 p.keysLock.Lock()158 defer p.keysLock.Unlock()159160 if p.keys == nil {161 return162 }163164 bucket, ok := p.keys[key]165 if !ok {166 // Garbage-collect stale buckets.167 if len(p.keys) == p.cfg.MaxKeys {168 for k, v := range p.keys {169 if v.lastUse+p.cfg.StaleKeyLifetimeSec > time.Now().Unix() {170 continue171 }172 delete(p.keys, k)173 close(v.c)174175 for conn := range v.c {176 conn.Close()177 }178 }179 }180181 bucket = slot{182 c: make(chan Conn, p.cfg.MaxConnsPerKey),183 lastUse: time.Now().Unix(),184 }185 p.keys[key] = bucket186 }187188 select {189 case bucket.c <- c:190 bucket.lastUse = time.Now().Unix()191 default:192 // Let it go, let it go...193 go c.Close()194 }195}196197func (p *P) Close() {198 p.cleanupStop <- struct{}{}199200 p.keysLock.Lock()201 defer p.keysLock.Unlock()202203 for k, v := range p.keys {204 close(v.c)205 for conn := range v.c {206 conn.Close()207 }208 delete(p.keys, k)209 }210 p.keys = nil211}