maddy

Fork https://github.com/foxcpp/maddy

git clone git://git.lin.moe/go/maddy.git

  1/*
  2Maddy Mail Server - Composable all-in-one email server.
  3Copyright © 2019-2020 Max Mazurov <fox.cpp@disroot.org>, Maddy Mail Server contributors
  4
  5This program is free software: you can redistribute it and/or modify
  6it under the terms of the GNU General Public License as published by
  7the Free Software Foundation, either version 3 of the License, or
  8(at your option) any later version.
  9
 10This program is distributed in the hope that it will be useful,
 11but WITHOUT ANY WARRANTY; without even the implied warranty of
 12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 13GNU General Public License for more details.
 14
 15You should have received a copy of the GNU General Public License
 16along with this program.  If not, see <https://www.gnu.org/licenses/>.
 17*/
 18
 19// Package imapsql implements SQL-based storage module
 20// using go-imap-sql library (github.com/foxcpp/go-imap-sql).
 21//
 22// Interfaces implemented:
 23// - module.StorageBackend
 24// - module.PlainAuth
 25// - module.DeliveryTarget
 26package imapsql
 27
 28import (
 29	"context"
 30	"crypto/sha1"
 31	"encoding/hex"
 32	"errors"
 33	"fmt"
 34	"path/filepath"
 35	"runtime/debug"
 36	"strconv"
 37	"strings"
 38
 39	"github.com/emersion/go-imap"
 40	sortthread "github.com/emersion/go-imap-sortthread"
 41	"github.com/emersion/go-imap/backend"
 42	mess "github.com/foxcpp/go-imap-mess"
 43	imapsql "github.com/foxcpp/go-imap-sql"
 44	"github.com/foxcpp/maddy/framework/config"
 45	modconfig "github.com/foxcpp/maddy/framework/config/module"
 46	"github.com/foxcpp/maddy/framework/dns"
 47	"github.com/foxcpp/maddy/framework/log"
 48	"github.com/foxcpp/maddy/framework/module"
 49	"github.com/foxcpp/maddy/internal/authz"
 50	"github.com/foxcpp/maddy/internal/updatepipe"
 51	"github.com/foxcpp/maddy/internal/updatepipe/pubsub"
 52
 53	_ "github.com/go-sql-driver/mysql"
 54	_ "github.com/lib/pq"
 55)
 56
 57type Storage struct {
 58	Back     *imapsql.Backend
 59	instName string
 60	Log      log.Logger
 61
 62	junkMbox string
 63
 64	driver string
 65	dsn    []string
 66
 67	resolver dns.Resolver
 68
 69	updPipe      updatepipe.P
 70	updPushStop  chan struct{}
 71	outboundUpds chan mess.Update
 72
 73	filters module.IMAPFilter
 74
 75	deliveryMap       module.Table
 76	deliveryNormalize func(context.Context, string) (string, error)
 77	authMap           module.Table
 78	authNormalize     func(context.Context, string) (string, error)
 79}
 80
 81func (store *Storage) Name() string {
 82	return "imapsql"
 83}
 84
 85func (store *Storage) InstanceName() string {
 86	return store.instName
 87}
 88
 89func New(_, instName string, _, inlineArgs []string) (module.Module, error) {
 90	store := &Storage{
 91		instName: instName,
 92		Log:      log.Logger{Name: "imapsql"},
 93		resolver: dns.DefaultResolver(),
 94	}
 95	if len(inlineArgs) != 0 {
 96		if len(inlineArgs) == 1 {
 97			return nil, errors.New("imapsql: expected at least 2 arguments")
 98		}
 99
100		store.driver = inlineArgs[0]
101		store.dsn = inlineArgs[1:]
102	}
103	return store, nil
104}
105
106func (store *Storage) Init(cfg *config.Map) error {
107	var (
108		driver            string
109		dsn               []string
110		appendlimitVal    int64 = -1
111		compression       []string
112		authNormalize     string
113		deliveryNormalize string
114
115		blobStore module.BlobStore
116	)
117
118	opts := imapsql.Opts{}
119	cfg.String("driver", false, false, store.driver, &driver)
120	cfg.StringList("dsn", false, false, store.dsn, &dsn)
121	cfg.Callback("fsstore", func(m *config.Map, node config.Node) error {
122		store.Log.Msg("'fsstore' directive is deprecated, use 'msg_store fs' instead")
123		return modconfig.ModuleFromNode("storage.blob", append([]string{"fs"}, node.Args...),
124			node, m.Globals, &blobStore)
125	})
126	cfg.Custom("msg_store", false, false, func() (interface{}, error) {
127		var store module.BlobStore
128		err := modconfig.ModuleFromNode("storage.blob", []string{"fs", "messages"},
129			config.Node{}, nil, &store)
130		return store, err
131	}, func(m *config.Map, node config.Node) (interface{}, error) {
132		var store module.BlobStore
133		err := modconfig.ModuleFromNode("storage.blob", node.Args,
134			node, m.Globals, &store)
135		return store, err
136	}, &blobStore)
137	cfg.StringList("compression", false, false, []string{"off"}, &compression)
138	cfg.DataSize("appendlimit", false, false, 32*1024*1024, &appendlimitVal)
139	cfg.Bool("debug", true, false, &store.Log.Debug)
140	cfg.Int("sqlite3_cache_size", false, false, 0, &opts.CacheSize)
141	cfg.Int("sqlite3_busy_timeout", false, false, 5000, &opts.BusyTimeout)
142	cfg.Bool("disable_recent", false, true, &opts.DisableRecent)
143	cfg.String("junk_mailbox", false, false, "Junk", &store.junkMbox)
144	cfg.Custom("imap_filter", false, false, func() (interface{}, error) {
145		return nil, nil
146	}, func(m *config.Map, node config.Node) (interface{}, error) {
147		var filter module.IMAPFilter
148		err := modconfig.GroupFromNode("imap_filters", node.Args, node, m.Globals, &filter)
149		return filter, err
150	}, &store.filters)
151	cfg.Custom("auth_map", false, false, func() (interface{}, error) {
152		return nil, nil
153	}, modconfig.TableDirective, &store.authMap)
154	cfg.String("auth_normalize", false, false, "auto", &authNormalize)
155	cfg.Custom("delivery_map", false, false, func() (interface{}, error) {
156		return nil, nil
157	}, modconfig.TableDirective, &store.deliveryMap)
158	cfg.String("delivery_normalize", false, false, "precis_casefold_email", &deliveryNormalize)
159
160	if _, err := cfg.Process(); err != nil {
161		return err
162	}
163
164	if dsn == nil {
165		return errors.New("imapsql: dsn is required")
166	}
167	if driver == "" {
168		return errors.New("imapsql: driver is required")
169	}
170
171	if driver == "sqlite3" {
172		if sqliteImpl == "modernc" {
173			store.Log.Println("using transpiled SQLite (modernc.org/sqlite), this is experimental")
174			driver = "sqlite"
175		} else if sqliteImpl == "cgo" {
176			store.Log.Debugln("using cgo SQLite")
177		} else if sqliteImpl == "missing" {
178			return errors.New("imapsql: SQLite is not supported, recompile without no_sqlite3 tag set")
179		}
180	}
181
182	deliveryNormFunc, ok := authz.NormalizeFuncs[deliveryNormalize]
183	if !ok {
184		return errors.New("imapsql: unknown normalization function: " + deliveryNormalize)
185	}
186	store.deliveryNormalize = func(ctx context.Context, s string) (string, error) {
187		return deliveryNormFunc(s)
188	}
189	if store.deliveryMap != nil {
190		store.deliveryNormalize = func(ctx context.Context, email string) (string, error) {
191			email, err := deliveryNormFunc(email)
192			if err != nil {
193				return "", err
194			}
195			mapped, ok, err := store.deliveryMap.Lookup(ctx, email)
196			if err != nil || !ok {
197				return "", userDoesNotExist(err)
198			}
199			return mapped, nil
200		}
201	}
202
203	if authNormalize != "auto" {
204		store.Log.Msg("auth_normalize in storage.imapsql is deprecated and will be removed in the next release, use storage_map in imap config instead")
205	}
206	authNormFunc, ok := authz.NormalizeFuncs[authNormalize]
207	if !ok {
208		return errors.New("imapsql: unknown normalization function: " + authNormalize)
209	}
210	store.authNormalize = func(ctx context.Context, s string) (string, error) {
211		return authNormFunc(s)
212	}
213	if store.authMap != nil {
214		store.Log.Msg("auth_map in storage.imapsql is deprecated and will be removed in the next release, use storage_map in imap config instead")
215		store.authNormalize = func(ctx context.Context, username string) (string, error) {
216			username, err := authNormFunc(username)
217			if err != nil {
218				return "", err
219			}
220			mapped, ok, err := store.authMap.Lookup(ctx, username)
221			if err != nil || !ok {
222				return "", userDoesNotExist(err)
223			}
224			return mapped, nil
225		}
226	}
227
228	opts.Log = &store.Log
229
230	if appendlimitVal == -1 {
231		opts.MaxMsgBytes = nil
232	} else {
233		// int is 32-bit on some platforms, so cut off values we can't actually
234		// use.
235		if int64(uint32(appendlimitVal)) != appendlimitVal {
236			return errors.New("imapsql: appendlimit value is too big")
237		}
238		opts.MaxMsgBytes = new(uint32)
239		*opts.MaxMsgBytes = uint32(appendlimitVal)
240	}
241	var err error
242
243	dsnStr := strings.Join(dsn, " ")
244
245	if len(compression) != 0 {
246		switch compression[0] {
247		case "zstd", "lz4":
248			opts.CompressAlgo = compression[0]
249			if len(compression) == 2 {
250				opts.CompressAlgoParams = compression[1]
251				if _, err := strconv.Atoi(compression[1]); err != nil {
252					return errors.New("imapsql: first argument for lz4 and zstd is compression level")
253				}
254			}
255			if len(compression) > 2 {
256				return errors.New("imapsql: expected at most 2 arguments")
257			}
258		case "off":
259			if len(compression) > 1 {
260				return errors.New("imapsql: expected at most 1 arguments")
261			}
262		default:
263			return errors.New("imapsql: unknown compression algorithm")
264		}
265	}
266
267	store.Back, err = imapsql.New(driver, dsnStr, ExtBlobStore{Base: blobStore}, opts)
268	if err != nil {
269		return fmt.Errorf("imapsql: %s", err)
270	}
271
272	store.Log.Debugln("go-imap-sql version", imapsql.VersionStr)
273
274	store.driver = driver
275	store.dsn = dsn
276
277	return nil
278}
279
280func (store *Storage) EnableUpdatePipe(mode updatepipe.BackendMode) error {
281	if store.updPipe != nil {
282		return nil
283	}
284
285	switch store.driver {
286	case "sqlite3":
287		dbId := sha1.Sum([]byte(strings.Join(store.dsn, " ")))
288		sockPath := filepath.Join(
289			config.RuntimeDirectory,
290			fmt.Sprintf("sql-%s.sock", hex.EncodeToString(dbId[:])))
291		store.Log.DebugMsg("using unix socket for external updates", "path", sockPath)
292		store.updPipe = &updatepipe.UnixSockPipe{
293			SockPath: sockPath,
294			Log:      log.Logger{Name: "storage.imapsql/updpipe", Debug: store.Log.Debug},
295		}
296	case "postgres":
297		store.Log.DebugMsg("using PostgreSQL broker for external updates")
298		ps, err := pubsub.NewPQ(strings.Join(store.dsn, " "))
299		if err != nil {
300			return fmt.Errorf("enable_update_pipe: %w", err)
301		}
302		ps.Log = log.Logger{Name: "storage.imapsql/updpipe/pubsub", Debug: store.Log.Debug}
303		pipe := &updatepipe.PubSubPipe{
304			PubSub: ps,
305			Log:    log.Logger{Name: "storage.imapsql/updpipe", Debug: store.Log.Debug},
306		}
307		store.Back.UpdateManager().ExternalUnsubscribe = pipe.Unsubscribe
308		store.Back.UpdateManager().ExternalSubscribe = pipe.Subscribe
309		store.updPipe = pipe
310	default:
311		return errors.New("imapsql: driver does not have an update pipe implementation")
312	}
313
314	inbound := make(chan mess.Update, 32)
315	outbound := make(chan mess.Update, 10)
316	store.outboundUpds = outbound
317
318	if mode == updatepipe.ModeReplicate {
319		if err := store.updPipe.Listen(inbound); err != nil {
320			store.updPipe = nil
321			return err
322		}
323	}
324
325	if err := store.updPipe.InitPush(); err != nil {
326		store.updPipe = nil
327		return err
328	}
329
330	store.Back.UpdateManager().SetExternalSink(outbound)
331
332	store.updPushStop = make(chan struct{}, 1)
333	go func() {
334		defer func() {
335			// Ensure we sent all outbound updates.
336			for upd := range outbound {
337				if err := store.updPipe.Push(upd); err != nil {
338					store.Log.Error("IMAP update pipe push failed", err)
339				}
340			}
341			store.updPushStop <- struct{}{}
342
343			if err := recover(); err != nil {
344				stack := debug.Stack()
345				log.Printf("panic during imapsql update push: %v\n%s", err, stack)
346			}
347		}()
348
349		for {
350			select {
351			case u := <-inbound:
352				store.Log.DebugMsg("external update received", "type", u.Type, "key", u.Key)
353				store.Back.UpdateManager().ExternalUpdate(u)
354			case u, ok := <-outbound:
355				if !ok {
356					return
357				}
358				store.Log.DebugMsg("sending external update", "type", u.Type, "key", u.Key)
359				if err := store.updPipe.Push(u); err != nil {
360					store.Log.Error("IMAP update pipe push failed", err)
361				}
362			}
363		}
364	}()
365
366	return nil
367}
368
369func (store *Storage) I18NLevel() int {
370	return 1
371}
372
373func (store *Storage) IMAPExtensions() []string {
374	return []string{"APPENDLIMIT", "MOVE", "CHILDREN", "SPECIAL-USE", "I18NLEVEL=1", "SORT", "THREAD=ORDEREDSUBJECT"}
375}
376
377func (store *Storage) CreateMessageLimit() *uint32 {
378	return store.Back.CreateMessageLimit()
379}
380
381func (store *Storage) GetOrCreateIMAPAcct(username string) (backend.User, error) {
382	accountName, err := store.authNormalize(context.TODO(), username)
383	if err != nil {
384		return nil, backend.ErrInvalidCredentials
385	}
386
387	return store.Back.GetOrCreateUser(accountName)
388}
389
390func (store *Storage) Lookup(ctx context.Context, key string) (string, bool, error) {
391	accountName, err := store.authNormalize(ctx, key)
392	if err != nil {
393		return "", false, nil
394	}
395
396	usr, err := store.Back.GetUser(accountName)
397	if err != nil {
398		if errors.Is(err, imapsql.ErrUserDoesntExists) {
399			return "", false, nil
400		}
401		return "", false, err
402	}
403	if err := usr.Logout(); err != nil {
404		store.Log.Error("logout failed", err, "username", accountName)
405	}
406
407	return "", true, nil
408}
409
410func (store *Storage) Close() error {
411	// Stop backend from generating new updates.
412	store.Back.Close()
413
414	// Wait for 'updates replicate' goroutine to actually stop so we will send
415	// all updates before shutting down (this is especially important for
416	// maddy subcommands).
417	if store.updPipe != nil {
418		close(store.outboundUpds)
419		<-store.updPushStop
420
421		store.updPipe.Close()
422	}
423
424	return nil
425}
426
427func (store *Storage) Login(_ *imap.ConnInfo, usenrame, password string) (backend.User, error) {
428	panic("This method should not be called and is added only to satisfy backend.Backend interface")
429}
430
431func (store *Storage) SupportedThreadAlgorithms() []sortthread.ThreadAlgorithm {
432	return []sortthread.ThreadAlgorithm{sortthread.OrderedSubject}
433}
434
435func init() {
436	module.Register("storage.imapsql", New)
437	module.Register("target.imapsql", New)
438}