1/*2Maddy Mail Server - Composable all-in-one email server.3Copyright © 2019-2020 Max Mazurov <fox.cpp@disroot.org>, Maddy Mail Server contributors45This program is free software: you can redistribute it and/or modify6it under the terms of the GNU General Public License as published by7the Free Software Foundation, either version 3 of the License, or8(at your option) any later version.910This program is distributed in the hope that it will be useful,11but WITHOUT ANY WARRANTY; without even the implied warranty of12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the13GNU General Public License for more details.1415You should have received a copy of the GNU General Public License16along with this program. If not, see <https://www.gnu.org/licenses/>.17*/1819package smtp2021import (22 "bytes"23 "context"24 "crypto/tls"25 "fmt"26 "io"27 "net"28 "os"29 "path/filepath"30 "strings"31 "sync"32 "sync/atomic"33 "time"3435 "github.com/emersion/go-smtp"36 "github.com/foxcpp/maddy/framework/buffer"37 "github.com/foxcpp/maddy/framework/config"38 modconfig "github.com/foxcpp/maddy/framework/config/module"39 tls2 "github.com/foxcpp/maddy/framework/config/tls"40 "github.com/foxcpp/maddy/framework/dns"41 "github.com/foxcpp/maddy/framework/future"42 "github.com/foxcpp/maddy/framework/log"43 "github.com/foxcpp/maddy/framework/module"44 "github.com/foxcpp/maddy/internal/auth"45 "github.com/foxcpp/maddy/internal/authz"46 "github.com/foxcpp/maddy/internal/limits"47 "github.com/foxcpp/maddy/internal/msgpipeline"48 "github.com/foxcpp/maddy/internal/proxy_protocol"49 "golang.org/x/net/idna"50)5152type Endpoint struct {53 saslAuth auth.SASLAuth54 serv *smtp.Server55 name string56 addrs []string57 listeners []net.Listener58 proxyProtocol *proxy_protocol.ProxyProtocol59 pipeline *msgpipeline.MsgPipeline60 resolver dns.Resolver61 limits *limits.Group6263 buffer func(r io.Reader) (buffer.Buffer, error)6465 authAlwaysRequired bool66 submission bool67 lmtp bool68 deferServerReject bool69 maxLoggedRcptErrors int70 maxReceived int71 maxHeaderBytes int647273 sessionCnt atomic.Int327475 listenersWg sync.WaitGroup7677 Log log.Logger78}7980func (endp *Endpoint) Name() string {81 return endp.name82}8384func (endp *Endpoint) InstanceName() string {85 return endp.name86}8788func New(modName string, addrs []string) (module.Module, error) {89 endp := &Endpoint{90 name: modName,91 addrs: addrs,92 submission: modName == "submission",93 lmtp: modName == "lmtp",94 resolver: dns.DefaultResolver(),95 buffer: buffer.BufferInMemory,96 Log: log.Logger{Name: modName},97 saslAuth: auth.SASLAuth{98 Log: log.Logger{Name: modName + "/sasl"},99 },100 }101 return endp, nil102}103104func (endp *Endpoint) Init(cfg *config.Map) error {105 endp.serv = smtp.NewServer(endp)106 endp.serv.ErrorLog = endp.Log107 endp.serv.LMTP = endp.lmtp108 endp.serv.EnableSMTPUTF8 = true109 endp.serv.EnableREQUIRETLS = true110 if err := endp.setConfig(cfg); err != nil {111 return err112 }113114 addresses := make([]config.Endpoint, 0, len(endp.addrs))115 for _, addr := range endp.addrs {116 saddr, err := config.ParseEndpoint(addr)117 if err != nil {118 return fmt.Errorf("%s: invalid address: %s", addr, endp.name)119 }120121 addresses = append(addresses, saddr)122 }123124 if err := endp.setupListeners(addresses); err != nil {125 for _, l := range endp.listeners {126 l.Close()127 }128 return err129 }130131 allLocal := true132 for _, addr := range addresses {133 if addr.Scheme != "unix" && !strings.HasPrefix(addr.Host, "127.0.0.") {134 allLocal = false135 }136 }137138 if endp.serv.AllowInsecureAuth && !allLocal {139 endp.Log.Println("authentication over unencrypted connections is allowed, this is insecure configuration and should be used only for testing!")140 }141 if endp.serv.TLSConfig == nil {142 if !allLocal {143 endp.Log.Println("TLS is disabled, this is insecure configuration and should be used only for testing!")144 }145146 endp.serv.AllowInsecureAuth = true147 }148149 return nil150}151152func autoBufferMode(maxSize int, dir string) func(io.Reader) (buffer.Buffer, error) {153 return func(r io.Reader) (buffer.Buffer, error) {154 // First try to read up to N bytes.155 initial := make([]byte, maxSize)156 actualSize, err := io.ReadFull(r, initial)157 if err != nil {158 if err == io.ErrUnexpectedEOF {159 log.Debugln("autobuffer: keeping the message in RAM (read", actualSize, "bytes, got EOF)")160 return buffer.MemoryBuffer{Slice: initial[:actualSize]}, nil161 }162 if err == io.EOF {163 // Special case: message with empty body.164 return buffer.MemoryBuffer{}, nil165 }166 // Some I/O error happened, bail out.167 return nil, err168 }169 if actualSize < maxSize {170 // Ok, the message is smaller than N. Make a MemoryBuffer and171 // handle it in RAM.172 log.Debugln("autobuffer: keeping the message in RAM (read", actualSize, "bytes, got short read)")173 return buffer.MemoryBuffer{Slice: initial[:actualSize]}, nil174 }175176 log.Debugln("autobuffer: spilling the message to the FS")177 // The message is big. Dump what we got to the disk and continue writing it there.178 return buffer.BufferInFile(179 io.MultiReader(bytes.NewReader(initial[:actualSize]), r),180 dir)181 }182}183184func bufferModeDirective(_ *config.Map, node config.Node) (interface{}, error) {185 if len(node.Args) < 1 {186 return nil, config.NodeErr(node, "at least one argument required")187 }188 switch node.Args[0] {189 case "ram":190 if len(node.Args) > 1 {191 return nil, config.NodeErr(node, "no additional arguments for 'ram' mode")192 }193 return buffer.BufferInMemory, nil194 case "fs":195 path := filepath.Join(config.StateDirectory, "buffer")196 if err := os.MkdirAll(path, 0o700); err != nil {197 return nil, err198 }199 switch len(node.Args) {200 case 2:201 path = node.Args[1]202 fallthrough203 case 1:204 return func(r io.Reader) (buffer.Buffer, error) {205 return buffer.BufferInFile(r, path)206 }, nil207 default:208 return nil, config.NodeErr(node, "too many arguments for 'fs' mode")209 }210 case "auto":211 path := filepath.Join(config.StateDirectory, "buffer")212 if err := os.MkdirAll(path, 0o700); err != nil {213 return nil, err214 }215216 maxSize := 1 * 1024 * 1024 // 1 MiB217 switch len(node.Args) {218 case 3:219 path = node.Args[2]220 fallthrough221 case 2:222 var err error223 maxSize, err = config.ParseDataSize(node.Args[1])224 if err != nil {225 return nil, config.NodeErr(node, "%v", err)226 }227 fallthrough228 case 1:229 return autoBufferMode(maxSize, path), nil230 default:231 return nil, config.NodeErr(node, "too many arguments for 'auto' mode")232 }233 default:234 return nil, config.NodeErr(node, "unknown buffer mode: %v", node.Args[0])235 }236}237238func (endp *Endpoint) setConfig(cfg *config.Map) error {239 var (240 hostname string241 err error242 ioDebug bool243 )244245 cfg.Callback("auth", func(m *config.Map, node config.Node) error {246 return endp.saslAuth.AddProvider(m, node)247 })248 cfg.Bool("sasl_login", false, false, &endp.saslAuth.EnableLogin)249 cfg.String("hostname", true, true, "", &hostname)250 config.EnumMapped(cfg, "auth_map_normalize", true, false, authz.NormalizeFuncs, authz.NormalizeAuto,251 &endp.saslAuth.AuthNormalize)252 modconfig.Table(cfg, "auth_map", true, false, nil, &endp.saslAuth.AuthMap)253 cfg.Duration("write_timeout", false, false, 1*time.Minute, &endp.serv.WriteTimeout)254 cfg.Duration("read_timeout", false, false, 10*time.Minute, &endp.serv.ReadTimeout)255 cfg.DataSize("max_message_size", false, false, 32*1024*1024, &endp.serv.MaxMessageBytes)256 cfg.DataSize("max_header_size", false, false, 1*1024*1024, &endp.maxHeaderBytes)257 cfg.Int("max_recipients", false, false, 20000, &endp.serv.MaxRecipients)258 cfg.Int("max_received", false, false, 50, &endp.maxReceived)259 cfg.Custom("buffer", false, false, func() (interface{}, error) {260 path := filepath.Join(config.StateDirectory, "buffer")261 if err := os.MkdirAll(path, 0o700); err != nil {262 return nil, err263 }264 return autoBufferMode(1*1024*1024 /* 1 MiB */, path), nil265 }, bufferModeDirective, &endp.buffer)266 cfg.Custom("tls", true, endp.name != "lmtp", nil, tls2.TLSDirective, &endp.serv.TLSConfig)267 cfg.Custom("proxy_protocol", false, false, nil, proxy_protocol.ProxyProtocolDirective, &endp.proxyProtocol)268 cfg.Bool("insecure_auth", endp.name == "lmtp", false, &endp.serv.AllowInsecureAuth)269 cfg.Int("smtp_max_line_length", false, false, 4000, &endp.serv.MaxLineLength)270 cfg.Bool("io_debug", false, false, &ioDebug)271 cfg.Bool("debug", true, false, &endp.Log.Debug)272 cfg.Bool("defer_sender_reject", false, true, &endp.deferServerReject)273 cfg.Int("max_logged_rcpt_errors", false, false, 5, &endp.maxLoggedRcptErrors)274 cfg.Custom("limits", false, false, func() (interface{}, error) {275 return &limits.Group{}, nil276 }, func(cfg *config.Map, n config.Node) (interface{}, error) {277 var g *limits.Group278 if err := modconfig.GroupFromNode("limits", n.Args, n, cfg.Globals, &g); err != nil {279 return nil, err280 }281 return g, nil282 }, &endp.limits)283 cfg.AllowUnknown()284 unknown, err := cfg.Process()285 if err != nil {286 return err287 }288289 endp.saslAuth.Log.Debug = endp.Log.Debug290291 // INTERNATIONALIZATION: See RFC 6531 Section 3.3.292 endp.serv.Domain, err = idna.ToASCII(hostname)293 if err != nil {294 return fmt.Errorf("%s: cannot represent the hostname as an A-label name: %w", endp.name, err)295 }296297 endp.pipeline, err = msgpipeline.New(cfg.Globals, unknown)298 if err != nil {299 return err300 }301 endp.pipeline.Hostname = endp.serv.Domain302 endp.pipeline.Resolver = endp.resolver303 endp.pipeline.Log = log.Logger{Name: "smtp/pipeline", Debug: endp.Log.Debug}304 endp.pipeline.FirstPipeline = true305306 if endp.submission {307 endp.authAlwaysRequired = true308 if len(endp.saslAuth.SASLMechanisms()) == 0 {309 return fmt.Errorf("%s: auth. provider must be set for submission endpoint", endp.name)310 }311 }312313 if ioDebug {314 endp.serv.Debug = endp.Log.DebugWriter()315 endp.Log.Println("I/O debugging is on! It may leak passwords in logs, be careful!")316 }317318 return nil319}320321func (endp *Endpoint) setupListeners(addresses []config.Endpoint) error {322 for _, addr := range addresses {323 var l net.Listener324 var err error325 l, err = net.Listen(addr.Network(), addr.Address())326 if err != nil {327 return fmt.Errorf("%s: %w", endp.name, err)328 }329 endp.Log.Printf("listening on %v", addr)330331 if addr.IsTLS() {332 if endp.serv.TLSConfig == nil {333 return fmt.Errorf("%s: can't bind on SMTPS endpoint without TLS configuration", endp.name)334 }335 l = tls.NewListener(l, endp.serv.TLSConfig)336 }337338 if endp.proxyProtocol != nil {339 l = proxy_protocol.NewListener(l, endp.proxyProtocol, endp.Log)340 }341342 endp.listeners = append(endp.listeners, l)343344 endp.listenersWg.Add(1)345 go func() {346 if err := endp.serv.Serve(l); err != nil {347 endp.Log.Printf("failed to serve %s: %s", addr, err)348 }349 endp.listenersWg.Done()350 }()351 }352353 return nil354}355356func (endp *Endpoint) NewSession(conn *smtp.Conn) (smtp.Session, error) {357 sess := endp.newSession(conn)358359 // Executed before authentication and session initialization.360 if err := endp.pipeline.RunEarlyChecks(context.TODO(), &sess.connState); err != nil {361 if err := sess.Logout(); err != nil {362 endp.Log.Error("early checks logout failed", err)363 }364 return nil, endp.wrapErr("", true, "EHLO", err)365 }366367 endp.sessionCnt.Add(1)368369 return sess, nil370}371372func (endp *Endpoint) newSession(conn *smtp.Conn) *Session {373 s := &Session{374 endp: endp,375 log: endp.Log,376 sessionCtx: context.Background(),377 }378379 // Used in tests.380 if conn == nil {381 return s382 }383384 s.connState = module.ConnState{385 Hostname: conn.Hostname(),386 LocalAddr: conn.Conn().LocalAddr(),387 RemoteAddr: conn.Conn().RemoteAddr(),388 }389 if tlsState, ok := conn.TLSConnectionState(); ok {390 s.connState.TLS = tlsState391 }392393 if endp.serv.LMTP {394 s.connState.Proto = "LMTP"395 } else {396 // Check if TLS connection conn struct is poplated.397 // If it is - we are ssing TLS.398 if s.connState.TLS.HandshakeComplete {399 s.connState.Proto = "ESMTPS"400 } else {401 s.connState.Proto = "ESMTP"402 }403 }404405 if endp.resolver != nil {406 rdnsCtx, cancelRDNS := context.WithCancel(s.sessionCtx)407 s.connState.RDNSName = future.New()408 s.cancelRDNS = cancelRDNS409 go s.fetchRDNSName(rdnsCtx)410 }411412 return s413}414415func (endp *Endpoint) ConnectionCount() int {416 return int(endp.sessionCnt.Load())417}418419func (endp *Endpoint) Close() error {420 endp.serv.Close()421 endp.listenersWg.Wait()422 return nil423}424425func init() {426 module.RegisterEndpoint("smtp", New)427 module.RegisterEndpoint("submission", New)428 module.RegisterEndpoint("lmtp", New)429}