maddy

Fork https://github.com/foxcpp/maddy

git clone git://git.lin.moe/go/maddy.git

  1/*
  2Maddy Mail Server - Composable all-in-one email server.
  3Copyright © 2019-2020 Max Mazurov <fox.cpp@disroot.org>, Maddy Mail Server contributors
  4
  5This program is free software: you can redistribute it and/or modify
  6it under the terms of the GNU General Public License as published by
  7the Free Software Foundation, either version 3 of the License, or
  8(at your option) any later version.
  9
 10This program is distributed in the hope that it will be useful,
 11but WITHOUT ANY WARRANTY; without even the implied warranty of
 12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 13GNU General Public License for more details.
 14
 15You should have received a copy of the GNU General Public License
 16along with this program.  If not, see <https://www.gnu.org/licenses/>.
 17*/
 18
 19package smtp
 20
 21import (
 22	"bytes"
 23	"context"
 24	"crypto/tls"
 25	"fmt"
 26	"io"
 27	"net"
 28	"os"
 29	"path/filepath"
 30	"strings"
 31	"sync"
 32	"sync/atomic"
 33	"time"
 34
 35	"github.com/emersion/go-smtp"
 36	"github.com/foxcpp/maddy/framework/buffer"
 37	"github.com/foxcpp/maddy/framework/config"
 38	modconfig "github.com/foxcpp/maddy/framework/config/module"
 39	tls2 "github.com/foxcpp/maddy/framework/config/tls"
 40	"github.com/foxcpp/maddy/framework/dns"
 41	"github.com/foxcpp/maddy/framework/future"
 42	"github.com/foxcpp/maddy/framework/log"
 43	"github.com/foxcpp/maddy/framework/module"
 44	"github.com/foxcpp/maddy/internal/auth"
 45	"github.com/foxcpp/maddy/internal/authz"
 46	"github.com/foxcpp/maddy/internal/limits"
 47	"github.com/foxcpp/maddy/internal/msgpipeline"
 48	"github.com/foxcpp/maddy/internal/proxy_protocol"
 49	"golang.org/x/net/idna"
 50)
 51
 52type Endpoint struct {
 53	saslAuth      auth.SASLAuth
 54	serv          *smtp.Server
 55	name          string
 56	addrs         []string
 57	listeners     []net.Listener
 58	proxyProtocol *proxy_protocol.ProxyProtocol
 59	pipeline      *msgpipeline.MsgPipeline
 60	resolver      dns.Resolver
 61	limits        *limits.Group
 62
 63	buffer func(r io.Reader) (buffer.Buffer, error)
 64
 65	authAlwaysRequired  bool
 66	submission          bool
 67	lmtp                bool
 68	deferServerReject   bool
 69	maxLoggedRcptErrors int
 70	maxReceived         int
 71	maxHeaderBytes      int64
 72
 73	sessionCnt atomic.Int32
 74
 75	listenersWg sync.WaitGroup
 76
 77	Log log.Logger
 78}
 79
 80func (endp *Endpoint) Name() string {
 81	return endp.name
 82}
 83
 84func (endp *Endpoint) InstanceName() string {
 85	return endp.name
 86}
 87
 88func New(modName string, addrs []string) (module.Module, error) {
 89	endp := &Endpoint{
 90		name:       modName,
 91		addrs:      addrs,
 92		submission: modName == "submission",
 93		lmtp:       modName == "lmtp",
 94		resolver:   dns.DefaultResolver(),
 95		buffer:     buffer.BufferInMemory,
 96		Log:        log.Logger{Name: modName},
 97		saslAuth: auth.SASLAuth{
 98			Log: log.Logger{Name: modName + "/sasl"},
 99		},
100	}
101	return endp, nil
102}
103
104func (endp *Endpoint) Init(cfg *config.Map) error {
105	endp.serv = smtp.NewServer(endp)
106	endp.serv.ErrorLog = endp.Log
107	endp.serv.LMTP = endp.lmtp
108	endp.serv.EnableSMTPUTF8 = true
109	endp.serv.EnableREQUIRETLS = true
110	if err := endp.setConfig(cfg); err != nil {
111		return err
112	}
113
114	addresses := make([]config.Endpoint, 0, len(endp.addrs))
115	for _, addr := range endp.addrs {
116		saddr, err := config.ParseEndpoint(addr)
117		if err != nil {
118			return fmt.Errorf("%s: invalid address: %s", addr, endp.name)
119		}
120
121		addresses = append(addresses, saddr)
122	}
123
124	if err := endp.setupListeners(addresses); err != nil {
125		for _, l := range endp.listeners {
126			l.Close()
127		}
128		return err
129	}
130
131	allLocal := true
132	for _, addr := range addresses {
133		if addr.Scheme != "unix" && !strings.HasPrefix(addr.Host, "127.0.0.") {
134			allLocal = false
135		}
136	}
137
138	if endp.serv.AllowInsecureAuth && !allLocal {
139		endp.Log.Println("authentication over unencrypted connections is allowed, this is insecure configuration and should be used only for testing!")
140	}
141	if endp.serv.TLSConfig == nil {
142		if !allLocal {
143			endp.Log.Println("TLS is disabled, this is insecure configuration and should be used only for testing!")
144		}
145
146		endp.serv.AllowInsecureAuth = true
147	}
148
149	return nil
150}
151
152func autoBufferMode(maxSize int, dir string) func(io.Reader) (buffer.Buffer, error) {
153	return func(r io.Reader) (buffer.Buffer, error) {
154		// First try to read up to N bytes.
155		initial := make([]byte, maxSize)
156		actualSize, err := io.ReadFull(r, initial)
157		if err != nil {
158			if err == io.ErrUnexpectedEOF {
159				log.Debugln("autobuffer: keeping the message in RAM (read", actualSize, "bytes, got EOF)")
160				return buffer.MemoryBuffer{Slice: initial[:actualSize]}, nil
161			}
162			if err == io.EOF {
163				// Special case: message with empty body.
164				return buffer.MemoryBuffer{}, nil
165			}
166			// Some I/O error happened, bail out.
167			return nil, err
168		}
169		if actualSize < maxSize {
170			// Ok, the message is smaller than N. Make a MemoryBuffer and
171			// handle it in RAM.
172			log.Debugln("autobuffer: keeping the message in RAM (read", actualSize, "bytes, got short read)")
173			return buffer.MemoryBuffer{Slice: initial[:actualSize]}, nil
174		}
175
176		log.Debugln("autobuffer: spilling the message to the FS")
177		// The message is big. Dump what we got to the disk and continue writing it there.
178		return buffer.BufferInFile(
179			io.MultiReader(bytes.NewReader(initial[:actualSize]), r),
180			dir)
181	}
182}
183
184func bufferModeDirective(_ *config.Map, node config.Node) (interface{}, error) {
185	if len(node.Args) < 1 {
186		return nil, config.NodeErr(node, "at least one argument required")
187	}
188	switch node.Args[0] {
189	case "ram":
190		if len(node.Args) > 1 {
191			return nil, config.NodeErr(node, "no additional arguments for 'ram' mode")
192		}
193		return buffer.BufferInMemory, nil
194	case "fs":
195		path := filepath.Join(config.StateDirectory, "buffer")
196		if err := os.MkdirAll(path, 0o700); err != nil {
197			return nil, err
198		}
199		switch len(node.Args) {
200		case 2:
201			path = node.Args[1]
202			fallthrough
203		case 1:
204			return func(r io.Reader) (buffer.Buffer, error) {
205				return buffer.BufferInFile(r, path)
206			}, nil
207		default:
208			return nil, config.NodeErr(node, "too many arguments for 'fs' mode")
209		}
210	case "auto":
211		path := filepath.Join(config.StateDirectory, "buffer")
212		if err := os.MkdirAll(path, 0o700); err != nil {
213			return nil, err
214		}
215
216		maxSize := 1 * 1024 * 1024 // 1 MiB
217		switch len(node.Args) {
218		case 3:
219			path = node.Args[2]
220			fallthrough
221		case 2:
222			var err error
223			maxSize, err = config.ParseDataSize(node.Args[1])
224			if err != nil {
225				return nil, config.NodeErr(node, "%v", err)
226			}
227			fallthrough
228		case 1:
229			return autoBufferMode(maxSize, path), nil
230		default:
231			return nil, config.NodeErr(node, "too many arguments for 'auto' mode")
232		}
233	default:
234		return nil, config.NodeErr(node, "unknown buffer mode: %v", node.Args[0])
235	}
236}
237
238func (endp *Endpoint) setConfig(cfg *config.Map) error {
239	var (
240		hostname string
241		err      error
242		ioDebug  bool
243	)
244
245	cfg.Callback("auth", func(m *config.Map, node config.Node) error {
246		return endp.saslAuth.AddProvider(m, node)
247	})
248	cfg.Bool("sasl_login", false, false, &endp.saslAuth.EnableLogin)
249	cfg.String("hostname", true, true, "", &hostname)
250	config.EnumMapped(cfg, "auth_map_normalize", true, false, authz.NormalizeFuncs, authz.NormalizeAuto,
251		&endp.saslAuth.AuthNormalize)
252	modconfig.Table(cfg, "auth_map", true, false, nil, &endp.saslAuth.AuthMap)
253	cfg.Duration("write_timeout", false, false, 1*time.Minute, &endp.serv.WriteTimeout)
254	cfg.Duration("read_timeout", false, false, 10*time.Minute, &endp.serv.ReadTimeout)
255	cfg.DataSize("max_message_size", false, false, 32*1024*1024, &endp.serv.MaxMessageBytes)
256	cfg.DataSize("max_header_size", false, false, 1*1024*1024, &endp.maxHeaderBytes)
257	cfg.Int("max_recipients", false, false, 20000, &endp.serv.MaxRecipients)
258	cfg.Int("max_received", false, false, 50, &endp.maxReceived)
259	cfg.Custom("buffer", false, false, func() (interface{}, error) {
260		path := filepath.Join(config.StateDirectory, "buffer")
261		if err := os.MkdirAll(path, 0o700); err != nil {
262			return nil, err
263		}
264		return autoBufferMode(1*1024*1024 /* 1 MiB */, path), nil
265	}, bufferModeDirective, &endp.buffer)
266	cfg.Custom("tls", true, endp.name != "lmtp", nil, tls2.TLSDirective, &endp.serv.TLSConfig)
267	cfg.Custom("proxy_protocol", false, false, nil, proxy_protocol.ProxyProtocolDirective, &endp.proxyProtocol)
268	cfg.Bool("insecure_auth", endp.name == "lmtp", false, &endp.serv.AllowInsecureAuth)
269	cfg.Int("smtp_max_line_length", false, false, 4000, &endp.serv.MaxLineLength)
270	cfg.Bool("io_debug", false, false, &ioDebug)
271	cfg.Bool("debug", true, false, &endp.Log.Debug)
272	cfg.Bool("defer_sender_reject", false, true, &endp.deferServerReject)
273	cfg.Int("max_logged_rcpt_errors", false, false, 5, &endp.maxLoggedRcptErrors)
274	cfg.Custom("limits", false, false, func() (interface{}, error) {
275		return &limits.Group{}, nil
276	}, func(cfg *config.Map, n config.Node) (interface{}, error) {
277		var g *limits.Group
278		if err := modconfig.GroupFromNode("limits", n.Args, n, cfg.Globals, &g); err != nil {
279			return nil, err
280		}
281		return g, nil
282	}, &endp.limits)
283	cfg.AllowUnknown()
284	unknown, err := cfg.Process()
285	if err != nil {
286		return err
287	}
288
289	endp.saslAuth.Log.Debug = endp.Log.Debug
290
291	// INTERNATIONALIZATION: See RFC 6531 Section 3.3.
292	endp.serv.Domain, err = idna.ToASCII(hostname)
293	if err != nil {
294		return fmt.Errorf("%s: cannot represent the hostname as an A-label name: %w", endp.name, err)
295	}
296
297	endp.pipeline, err = msgpipeline.New(cfg.Globals, unknown)
298	if err != nil {
299		return err
300	}
301	endp.pipeline.Hostname = endp.serv.Domain
302	endp.pipeline.Resolver = endp.resolver
303	endp.pipeline.Log = log.Logger{Name: "smtp/pipeline", Debug: endp.Log.Debug}
304	endp.pipeline.FirstPipeline = true
305
306	if endp.submission {
307		endp.authAlwaysRequired = true
308		if len(endp.saslAuth.SASLMechanisms()) == 0 {
309			return fmt.Errorf("%s: auth. provider must be set for submission endpoint", endp.name)
310		}
311	}
312
313	if ioDebug {
314		endp.serv.Debug = endp.Log.DebugWriter()
315		endp.Log.Println("I/O debugging is on! It may leak passwords in logs, be careful!")
316	}
317
318	return nil
319}
320
321func (endp *Endpoint) setupListeners(addresses []config.Endpoint) error {
322	for _, addr := range addresses {
323		var l net.Listener
324		var err error
325		l, err = net.Listen(addr.Network(), addr.Address())
326		if err != nil {
327			return fmt.Errorf("%s: %w", endp.name, err)
328		}
329		endp.Log.Printf("listening on %v", addr)
330
331		if addr.IsTLS() {
332			if endp.serv.TLSConfig == nil {
333				return fmt.Errorf("%s: can't bind on SMTPS endpoint without TLS configuration", endp.name)
334			}
335			l = tls.NewListener(l, endp.serv.TLSConfig)
336		}
337
338		if endp.proxyProtocol != nil {
339			l = proxy_protocol.NewListener(l, endp.proxyProtocol, endp.Log)
340		}
341
342		endp.listeners = append(endp.listeners, l)
343
344		endp.listenersWg.Add(1)
345		go func() {
346			if err := endp.serv.Serve(l); err != nil {
347				endp.Log.Printf("failed to serve %s: %s", addr, err)
348			}
349			endp.listenersWg.Done()
350		}()
351	}
352
353	return nil
354}
355
356func (endp *Endpoint) NewSession(conn *smtp.Conn) (smtp.Session, error) {
357	sess := endp.newSession(conn)
358
359	// Executed before authentication and session initialization.
360	if err := endp.pipeline.RunEarlyChecks(context.TODO(), &sess.connState); err != nil {
361		if err := sess.Logout(); err != nil {
362			endp.Log.Error("early checks logout failed", err)
363		}
364		return nil, endp.wrapErr("", true, "EHLO", err)
365	}
366
367	endp.sessionCnt.Add(1)
368
369	return sess, nil
370}
371
372func (endp *Endpoint) newSession(conn *smtp.Conn) *Session {
373	s := &Session{
374		endp:       endp,
375		log:        endp.Log,
376		sessionCtx: context.Background(),
377	}
378
379	// Used in tests.
380	if conn == nil {
381		return s
382	}
383
384	s.connState = module.ConnState{
385		Hostname:   conn.Hostname(),
386		LocalAddr:  conn.Conn().LocalAddr(),
387		RemoteAddr: conn.Conn().RemoteAddr(),
388	}
389	if tlsState, ok := conn.TLSConnectionState(); ok {
390		s.connState.TLS = tlsState
391	}
392
393	if endp.serv.LMTP {
394		s.connState.Proto = "LMTP"
395	} else {
396		// Check if TLS connection conn struct is poplated.
397		// If it is - we are ssing TLS.
398		if s.connState.TLS.HandshakeComplete {
399			s.connState.Proto = "ESMTPS"
400		} else {
401			s.connState.Proto = "ESMTP"
402		}
403	}
404
405	if endp.resolver != nil {
406		rdnsCtx, cancelRDNS := context.WithCancel(s.sessionCtx)
407		s.connState.RDNSName = future.New()
408		s.cancelRDNS = cancelRDNS
409		go s.fetchRDNSName(rdnsCtx)
410	}
411
412	return s
413}
414
415func (endp *Endpoint) ConnectionCount() int {
416	return int(endp.sessionCnt.Load())
417}
418
419func (endp *Endpoint) Close() error {
420	endp.serv.Close()
421	endp.listenersWg.Wait()
422	return nil
423}
424
425func init() {
426	module.RegisterEndpoint("smtp", New)
427	module.RegisterEndpoint("submission", New)
428	module.RegisterEndpoint("lmtp", New)
429}