1/*2Maddy Mail Server - Composable all-in-one email server.3Copyright © 2019-2020 Max Mazurov <fox.cpp@disroot.org>, Maddy Mail Server contributors45This program is free software: you can redistribute it and/or modify6it under the terms of the GNU General Public License as published by7the Free Software Foundation, either version 3 of the License, or8(at your option) any later version.910This program is distributed in the hope that it will be useful,11but WITHOUT ANY WARRANTY; without even the implied warranty of12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the13GNU General Public License for more details.1415You should have received a copy of the GNU General Public License16along with this program. If not, see <https://www.gnu.org/licenses/>.17*/1819// Package imapsql implements SQL-based storage module20// using go-imap-sql library (github.com/foxcpp/go-imap-sql).21//22// Interfaces implemented:23// - module.StorageBackend24// - module.PlainAuth25// - module.DeliveryTarget26package imapsql2728import (29 "context"30 "crypto/sha1"31 "encoding/hex"32 "errors"33 "fmt"34 "path/filepath"35 "runtime/debug"36 "strconv"37 "strings"3839 "github.com/emersion/go-imap"40 sortthread "github.com/emersion/go-imap-sortthread"41 "github.com/emersion/go-imap/backend"42 mess "github.com/foxcpp/go-imap-mess"43 imapsql "github.com/foxcpp/go-imap-sql"44 "github.com/foxcpp/maddy/framework/config"45 modconfig "github.com/foxcpp/maddy/framework/config/module"46 "github.com/foxcpp/maddy/framework/dns"47 "github.com/foxcpp/maddy/framework/log"48 "github.com/foxcpp/maddy/framework/module"49 "github.com/foxcpp/maddy/internal/authz"50 "github.com/foxcpp/maddy/internal/updatepipe"51 "github.com/foxcpp/maddy/internal/updatepipe/pubsub"5253 _ "github.com/go-sql-driver/mysql"54 _ "github.com/lib/pq"55)5657type Storage struct {58 Back *imapsql.Backend59 instName string60 Log log.Logger6162 junkMbox string6364 driver string65 dsn []string6667 resolver dns.Resolver6869 updPipe updatepipe.P70 updPushStop chan struct{}71 outboundUpds chan mess.Update7273 filters module.IMAPFilter7475 deliveryMap module.Table76 deliveryNormalize func(context.Context, string) (string, error)77 authMap module.Table78 authNormalize func(context.Context, string) (string, error)79}8081func (store *Storage) Name() string {82 return "imapsql"83}8485func (store *Storage) InstanceName() string {86 return store.instName87}8889func New(_, instName string, _, inlineArgs []string) (module.Module, error) {90 store := &Storage{91 instName: instName,92 Log: log.Logger{Name: "imapsql"},93 resolver: dns.DefaultResolver(),94 }95 if len(inlineArgs) != 0 {96 if len(inlineArgs) == 1 {97 return nil, errors.New("imapsql: expected at least 2 arguments")98 }99100 store.driver = inlineArgs[0]101 store.dsn = inlineArgs[1:]102 }103 return store, nil104}105106func (store *Storage) Init(cfg *config.Map) error {107 var (108 driver string109 dsn []string110 appendlimitVal int64 = -1111 compression []string112 authNormalize string113 deliveryNormalize string114115 blobStore module.BlobStore116 )117118 opts := imapsql.Opts{}119 cfg.String("driver", false, false, store.driver, &driver)120 cfg.StringList("dsn", false, false, store.dsn, &dsn)121 cfg.Callback("fsstore", func(m *config.Map, node config.Node) error {122 store.Log.Msg("'fsstore' directive is deprecated, use 'msg_store fs' instead")123 return modconfig.ModuleFromNode("storage.blob", append([]string{"fs"}, node.Args...),124 node, m.Globals, &blobStore)125 })126 cfg.Custom("msg_store", false, false, func() (interface{}, error) {127 var store module.BlobStore128 err := modconfig.ModuleFromNode("storage.blob", []string{"fs", "messages"},129 config.Node{}, nil, &store)130 return store, err131 }, func(m *config.Map, node config.Node) (interface{}, error) {132 var store module.BlobStore133 err := modconfig.ModuleFromNode("storage.blob", node.Args,134 node, m.Globals, &store)135 return store, err136 }, &blobStore)137 cfg.StringList("compression", false, false, []string{"off"}, &compression)138 cfg.DataSize("appendlimit", false, false, 32*1024*1024, &appendlimitVal)139 cfg.Bool("debug", true, false, &store.Log.Debug)140 cfg.Int("sqlite3_cache_size", false, false, 0, &opts.CacheSize)141 cfg.Int("sqlite3_busy_timeout", false, false, 5000, &opts.BusyTimeout)142 cfg.Bool("disable_recent", false, true, &opts.DisableRecent)143 cfg.String("junk_mailbox", false, false, "Junk", &store.junkMbox)144 cfg.Custom("imap_filter", false, false, func() (interface{}, error) {145 return nil, nil146 }, func(m *config.Map, node config.Node) (interface{}, error) {147 var filter module.IMAPFilter148 err := modconfig.GroupFromNode("imap_filters", node.Args, node, m.Globals, &filter)149 return filter, err150 }, &store.filters)151 cfg.Custom("auth_map", false, false, func() (interface{}, error) {152 return nil, nil153 }, modconfig.TableDirective, &store.authMap)154 cfg.String("auth_normalize", false, false, "auto", &authNormalize)155 cfg.Custom("delivery_map", false, false, func() (interface{}, error) {156 return nil, nil157 }, modconfig.TableDirective, &store.deliveryMap)158 cfg.String("delivery_normalize", false, false, "precis_casefold_email", &deliveryNormalize)159160 if _, err := cfg.Process(); err != nil {161 return err162 }163164 if dsn == nil {165 return errors.New("imapsql: dsn is required")166 }167 if driver == "" {168 return errors.New("imapsql: driver is required")169 }170171 if driver == "sqlite3" {172 if sqliteImpl == "modernc" {173 store.Log.Println("using transpiled SQLite (modernc.org/sqlite), this is experimental")174 driver = "sqlite"175 } else if sqliteImpl == "cgo" {176 store.Log.Debugln("using cgo SQLite")177 } else if sqliteImpl == "missing" {178 return errors.New("imapsql: SQLite is not supported, recompile without no_sqlite3 tag set")179 }180 }181182 deliveryNormFunc, ok := authz.NormalizeFuncs[deliveryNormalize]183 if !ok {184 return errors.New("imapsql: unknown normalization function: " + deliveryNormalize)185 }186 store.deliveryNormalize = func(ctx context.Context, s string) (string, error) {187 return deliveryNormFunc(s)188 }189 if store.deliveryMap != nil {190 store.deliveryNormalize = func(ctx context.Context, email string) (string, error) {191 email, err := deliveryNormFunc(email)192 if err != nil {193 return "", err194 }195 mapped, ok, err := store.deliveryMap.Lookup(ctx, email)196 if err != nil || !ok {197 return "", userDoesNotExist(err)198 }199 return mapped, nil200 }201 }202203 if authNormalize != "auto" {204 store.Log.Msg("auth_normalize in storage.imapsql is deprecated and will be removed in the next release, use storage_map in imap config instead")205 }206 authNormFunc, ok := authz.NormalizeFuncs[authNormalize]207 if !ok {208 return errors.New("imapsql: unknown normalization function: " + authNormalize)209 }210 store.authNormalize = func(ctx context.Context, s string) (string, error) {211 return authNormFunc(s)212 }213 if store.authMap != nil {214 store.Log.Msg("auth_map in storage.imapsql is deprecated and will be removed in the next release, use storage_map in imap config instead")215 store.authNormalize = func(ctx context.Context, username string) (string, error) {216 username, err := authNormFunc(username)217 if err != nil {218 return "", err219 }220 mapped, ok, err := store.authMap.Lookup(ctx, username)221 if err != nil || !ok {222 return "", userDoesNotExist(err)223 }224 return mapped, nil225 }226 }227228 opts.Log = &store.Log229230 if appendlimitVal == -1 {231 opts.MaxMsgBytes = nil232 } else {233 // int is 32-bit on some platforms, so cut off values we can't actually234 // use.235 if int64(uint32(appendlimitVal)) != appendlimitVal {236 return errors.New("imapsql: appendlimit value is too big")237 }238 opts.MaxMsgBytes = new(uint32)239 *opts.MaxMsgBytes = uint32(appendlimitVal)240 }241 var err error242243 dsnStr := strings.Join(dsn, " ")244245 if len(compression) != 0 {246 switch compression[0] {247 case "zstd", "lz4":248 opts.CompressAlgo = compression[0]249 if len(compression) == 2 {250 opts.CompressAlgoParams = compression[1]251 if _, err := strconv.Atoi(compression[1]); err != nil {252 return errors.New("imapsql: first argument for lz4 and zstd is compression level")253 }254 }255 if len(compression) > 2 {256 return errors.New("imapsql: expected at most 2 arguments")257 }258 case "off":259 if len(compression) > 1 {260 return errors.New("imapsql: expected at most 1 arguments")261 }262 default:263 return errors.New("imapsql: unknown compression algorithm")264 }265 }266267 store.Back, err = imapsql.New(driver, dsnStr, ExtBlobStore{Base: blobStore}, opts)268 if err != nil {269 return fmt.Errorf("imapsql: %s", err)270 }271272 store.Log.Debugln("go-imap-sql version", imapsql.VersionStr)273274 store.driver = driver275 store.dsn = dsn276277 return nil278}279280func (store *Storage) EnableUpdatePipe(mode updatepipe.BackendMode) error {281 if store.updPipe != nil {282 return nil283 }284285 switch store.driver {286 case "sqlite3":287 dbId := sha1.Sum([]byte(strings.Join(store.dsn, " ")))288 sockPath := filepath.Join(289 config.RuntimeDirectory,290 fmt.Sprintf("sql-%s.sock", hex.EncodeToString(dbId[:])))291 store.Log.DebugMsg("using unix socket for external updates", "path", sockPath)292 store.updPipe = &updatepipe.UnixSockPipe{293 SockPath: sockPath,294 Log: log.Logger{Name: "storage.imapsql/updpipe", Debug: store.Log.Debug},295 }296 case "postgres":297 store.Log.DebugMsg("using PostgreSQL broker for external updates")298 ps, err := pubsub.NewPQ(strings.Join(store.dsn, " "))299 if err != nil {300 return fmt.Errorf("enable_update_pipe: %w", err)301 }302 ps.Log = log.Logger{Name: "storage.imapsql/updpipe/pubsub", Debug: store.Log.Debug}303 pipe := &updatepipe.PubSubPipe{304 PubSub: ps,305 Log: log.Logger{Name: "storage.imapsql/updpipe", Debug: store.Log.Debug},306 }307 store.Back.UpdateManager().ExternalUnsubscribe = pipe.Unsubscribe308 store.Back.UpdateManager().ExternalSubscribe = pipe.Subscribe309 store.updPipe = pipe310 default:311 return errors.New("imapsql: driver does not have an update pipe implementation")312 }313314 inbound := make(chan mess.Update, 32)315 outbound := make(chan mess.Update, 10)316 store.outboundUpds = outbound317318 if mode == updatepipe.ModeReplicate {319 if err := store.updPipe.Listen(inbound); err != nil {320 store.updPipe = nil321 return err322 }323 }324325 if err := store.updPipe.InitPush(); err != nil {326 store.updPipe = nil327 return err328 }329330 store.Back.UpdateManager().SetExternalSink(outbound)331332 store.updPushStop = make(chan struct{}, 1)333 go func() {334 defer func() {335 // Ensure we sent all outbound updates.336 for upd := range outbound {337 if err := store.updPipe.Push(upd); err != nil {338 store.Log.Error("IMAP update pipe push failed", err)339 }340 }341 store.updPushStop <- struct{}{}342343 if err := recover(); err != nil {344 stack := debug.Stack()345 log.Printf("panic during imapsql update push: %v\n%s", err, stack)346 }347 }()348349 for {350 select {351 case u := <-inbound:352 store.Log.DebugMsg("external update received", "type", u.Type, "key", u.Key)353 store.Back.UpdateManager().ExternalUpdate(u)354 case u, ok := <-outbound:355 if !ok {356 return357 }358 store.Log.DebugMsg("sending external update", "type", u.Type, "key", u.Key)359 if err := store.updPipe.Push(u); err != nil {360 store.Log.Error("IMAP update pipe push failed", err)361 }362 }363 }364 }()365366 return nil367}368369func (store *Storage) I18NLevel() int {370 return 1371}372373func (store *Storage) IMAPExtensions() []string {374 return []string{"APPENDLIMIT", "MOVE", "CHILDREN", "SPECIAL-USE", "I18NLEVEL=1", "SORT", "THREAD=ORDEREDSUBJECT"}375}376377func (store *Storage) CreateMessageLimit() *uint32 {378 return store.Back.CreateMessageLimit()379}380381func (store *Storage) GetOrCreateIMAPAcct(username string) (backend.User, error) {382 accountName, err := store.authNormalize(context.TODO(), username)383 if err != nil {384 return nil, backend.ErrInvalidCredentials385 }386387 return store.Back.GetOrCreateUser(accountName)388}389390func (store *Storage) Lookup(ctx context.Context, key string) (string, bool, error) {391 accountName, err := store.authNormalize(ctx, key)392 if err != nil {393 return "", false, nil394 }395396 usr, err := store.Back.GetUser(accountName)397 if err != nil {398 if errors.Is(err, imapsql.ErrUserDoesntExists) {399 return "", false, nil400 }401 return "", false, err402 }403 if err := usr.Logout(); err != nil {404 store.Log.Error("logout failed", err, "username", accountName)405 }406407 return "", true, nil408}409410func (store *Storage) Close() error {411 // Stop backend from generating new updates.412 store.Back.Close()413414 // Wait for 'updates replicate' goroutine to actually stop so we will send415 // all updates before shutting down (this is especially important for416 // maddy subcommands).417 if store.updPipe != nil {418 close(store.outboundUpds)419 <-store.updPushStop420421 store.updPipe.Close()422 }423424 return nil425}426427func (store *Storage) Login(_ *imap.ConnInfo, usenrame, password string) (backend.User, error) {428 panic("This method should not be called and is added only to satisfy backend.Backend interface")429}430431func (store *Storage) SupportedThreadAlgorithms() []sortthread.ThreadAlgorithm {432 return []sortthread.ThreadAlgorithm{sortthread.OrderedSubject}433}434435func init() {436 module.Register("storage.imapsql", New)437 module.Register("target.imapsql", New)438}