maddy

Fork https://github.com/foxcpp/maddy

git clone git://git.lin.moe/go/maddy.git

  1/*
  2Maddy Mail Server - Composable all-in-one email server.
  3Copyright © 2019-2020 Max Mazurov <fox.cpp@disroot.org>, Maddy Mail Server contributors
  4
  5This program is free software: you can redistribute it and/or modify
  6it under the terms of the GNU General Public License as published by
  7the Free Software Foundation, either version 3 of the License, or
  8(at your option) any later version.
  9
 10This program is distributed in the hope that it will be useful,
 11but WITHOUT ANY WARRANTY; without even the implied warranty of
 12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 13GNU General Public License for more details.
 14
 15You should have received a copy of the GNU General Public License
 16along with this program.  If not, see <https://www.gnu.org/licenses/>.
 17*/
 18
 19// Package remote implements module which does outgoing
 20// message delivery using servers discovered using DNS MX records.
 21//
 22// Implemented interfaces:
 23// - module.DeliveryTarget
 24package remote
 25
 26import (
 27	"context"
 28	"crypto/tls"
 29	"errors"
 30	"fmt"
 31	"net"
 32	"runtime/trace"
 33	"strings"
 34	"sync"
 35	"time"
 36
 37	"github.com/emersion/go-message/textproto"
 38	"github.com/emersion/go-smtp"
 39	"github.com/foxcpp/maddy/framework/address"
 40	"github.com/foxcpp/maddy/framework/buffer"
 41	"github.com/foxcpp/maddy/framework/config"
 42	modconfig "github.com/foxcpp/maddy/framework/config/module"
 43	tls2 "github.com/foxcpp/maddy/framework/config/tls"
 44	"github.com/foxcpp/maddy/framework/dns"
 45	"github.com/foxcpp/maddy/framework/exterrors"
 46	"github.com/foxcpp/maddy/framework/log"
 47	"github.com/foxcpp/maddy/framework/module"
 48	"github.com/foxcpp/maddy/internal/limits"
 49	"github.com/foxcpp/maddy/internal/smtpconn/pool"
 50	"github.com/foxcpp/maddy/internal/target"
 51	"golang.org/x/net/idna"
 52)
 53
 54var smtpPort = "25"
 55
 56func moduleError(err error) error {
 57	return exterrors.WithFields(err, map[string]interface{}{
 58		"target": "remote",
 59	})
 60}
 61
 62type Target struct {
 63	name      string
 64	hostname  string
 65	localIP   string
 66	ipv4      bool
 67	tlsConfig *tls.Config
 68
 69	resolver    dns.Resolver
 70	dialer      func(ctx context.Context, network, addr string) (net.Conn, error)
 71	extResolver *dns.ExtResolver
 72
 73	policies          []module.MXAuthPolicy
 74	limits            *limits.Group
 75	allowSecOverride  bool
 76	relaxedREQUIRETLS bool
 77
 78	pool           *pool.P
 79	connReuseLimit int
 80
 81	Log log.Logger
 82
 83	connectTimeout    time.Duration
 84	commandTimeout    time.Duration
 85	submissionTimeout time.Duration
 86}
 87
 88var _ module.DeliveryTarget = &Target{}
 89
 90func New(_, instName string, _, inlineArgs []string) (module.Module, error) {
 91	if len(inlineArgs) != 0 {
 92		return nil, errors.New("remote: inline arguments are not used")
 93	}
 94	// Keep this synchronized with testTarget.
 95	return &Target{
 96		name:     instName,
 97		resolver: dns.DefaultResolver(),
 98		dialer:   (&net.Dialer{}).DialContext,
 99		Log:      log.Logger{Name: "remote"},
100	}, nil
101}
102
103func (rt *Target) Init(cfg *config.Map) error {
104	var err error
105	rt.extResolver, err = dns.NewExtResolver()
106	if err != nil {
107		rt.Log.Error("cannot initialize DNSSEC-aware resolver, DNSSEC and DANE are not available", err)
108	}
109
110	cfg.String("hostname", true, true, "", &rt.hostname)
111	cfg.String("local_ip", false, false, "", &rt.localIP)
112	cfg.Bool("force_ipv4", false, false, &rt.ipv4)
113	cfg.Bool("debug", true, false, &rt.Log.Debug)
114	cfg.Custom("tls_client", true, false, func() (interface{}, error) {
115		return &tls.Config{}, nil
116	}, tls2.TLSClientBlock, &rt.tlsConfig)
117	cfg.Custom("mx_auth", false, false, func() (interface{}, error) {
118		// Default is "no policies" to follow the principles of explicit
119		// configuration (if it is not requested - it is not done).
120		return nil, nil
121	}, func(cfg *config.Map, n config.Node) (interface{}, error) {
122		// Module instance is &PolicyGroup.
123		var p *PolicyGroup
124		if err := modconfig.GroupFromNode("mx_auth", n.Args, n, cfg.Globals, &p); err != nil {
125			return nil, err
126		}
127		return p.L, nil
128	}, &rt.policies)
129	cfg.Custom("limits", false, false, func() (interface{}, error) {
130		return &limits.Group{}, nil
131	}, func(cfg *config.Map, n config.Node) (interface{}, error) {
132		var g *limits.Group
133		if err := modconfig.GroupFromNode("limits", n.Args, n, cfg.Globals, &g); err != nil {
134			return nil, err
135		}
136		return g, nil
137	}, &rt.limits)
138	cfg.Bool("requiretls_override", false, true, &rt.allowSecOverride)
139	cfg.Bool("relaxed_requiretls", false, true, &rt.relaxedREQUIRETLS)
140	cfg.Int("conn_reuse_limit", false, false, 10, &rt.connReuseLimit)
141	cfg.Duration("connect_timeout", false, false, 5*time.Minute, &rt.connectTimeout)
142	cfg.Duration("command_timeout", false, false, 5*time.Minute, &rt.commandTimeout)
143	cfg.Duration("submission_timeout", false, false, 5*time.Minute, &rt.submissionTimeout)
144
145	poolCfg := pool.Config{
146		MaxKeys:             5000,
147		MaxConnsPerKey:      5,      // basically, max. amount of idle connections in cache
148		MaxConnLifetimeSec:  150,    // 2.5 mins, half of recommended idle time from RFC 5321
149		StaleKeyLifetimeSec: 60 * 5, // should be bigger than MaxConnLifetimeSec
150	}
151	cfg.Int("conn_max_idle_count", false, false, 5, &poolCfg.MaxConnsPerKey)
152	cfg.Int64("conn_max_idle_time", false, false, 150, &poolCfg.MaxConnLifetimeSec)
153
154	if _, err := cfg.Process(); err != nil {
155		return err
156	}
157	rt.pool = pool.New(poolCfg)
158
159	// INTERNATIONALIZATION: See RFC 6531 Section 3.7.1.
160	rt.hostname, err = idna.ToASCII(rt.hostname)
161	if err != nil {
162		return fmt.Errorf("remote: cannot represent the hostname as an A-label name: %w", err)
163	}
164
165	if rt.localIP != "" {
166		addr, err := net.ResolveTCPAddr("tcp", rt.localIP+":0")
167		if err != nil {
168			return fmt.Errorf("remote: failed to parse local IP: %w", err)
169		}
170		rt.dialer = (&net.Dialer{
171			LocalAddr: addr,
172		}).DialContext
173	}
174	if rt.ipv4 {
175		dial := rt.dialer
176		rt.dialer = func(ctx context.Context, network, addr string) (net.Conn, error) {
177			if network == "tcp" {
178				network = "tcp4"
179			}
180			return dial(ctx, network, addr)
181		}
182	}
183
184	return nil
185}
186
187func (rt *Target) Close() error {
188	rt.pool.Close()
189
190	return nil
191}
192
193func (rt *Target) Name() string {
194	return "remote"
195}
196
197func (rt *Target) InstanceName() string {
198	return rt.name
199}
200
201type remoteDelivery struct {
202	rt       *Target
203	mailFrom string
204	msgMeta  *module.MsgMetadata
205	Log      log.Logger
206
207	recipients  []string
208	connections map[string]*mxConn
209
210	policies []module.DeliveryMXAuthPolicy
211}
212
213func (rt *Target) Start(ctx context.Context, msgMeta *module.MsgMetadata, mailFrom string) (module.Delivery, error) {
214	policies := make([]module.DeliveryMXAuthPolicy, 0, len(rt.policies))
215	if !(msgMeta.TLSRequireOverride && rt.allowSecOverride) {
216		for _, p := range rt.policies {
217			policies = append(policies, p.Start(msgMeta))
218		}
219	}
220
221	var (
222		ratelimitDomain string
223		err             error
224	)
225	// This will leave ratelimitDomain = "" for null return path which is fine
226	// for purposes of ratelimiting.
227	if mailFrom != "" {
228		_, ratelimitDomain, err = address.Split(mailFrom)
229		if err != nil {
230			return nil, &exterrors.SMTPError{
231				Code:         501,
232				EnhancedCode: exterrors.EnhancedCode{5, 1, 8},
233				Message:      "Malformed sender address",
234				TargetName:   "remote",
235				Err:          err,
236			}
237		}
238	}
239
240	// Domain is already should be normalized by the message source (e.g.
241	// endpoint/smtp).
242	region := trace.StartRegion(ctx, "remote/limits.Take")
243	addr := net.IPv4(127, 0, 0, 1)
244	if msgMeta.Conn != nil && msgMeta.Conn.RemoteAddr != nil {
245		tcpAddr, ok := msgMeta.Conn.RemoteAddr.(*net.TCPAddr)
246		if ok {
247			addr = tcpAddr.IP
248		}
249	}
250	if err := rt.limits.TakeMsg(ctx, addr, ratelimitDomain); err != nil {
251		region.End()
252		return nil, &exterrors.SMTPError{
253			Code:         451,
254			EnhancedCode: exterrors.EnhancedCode{4, 4, 5},
255			Message:      "High load, try again later",
256			Reason:       "Global limit timeout",
257			TargetName:   "remote",
258			Err:          err,
259		}
260	}
261	region.End()
262
263	return &remoteDelivery{
264		rt:          rt,
265		mailFrom:    mailFrom,
266		msgMeta:     msgMeta,
267		Log:         target.DeliveryLogger(rt.Log, msgMeta),
268		connections: map[string]*mxConn{},
269		policies:    policies,
270	}, nil
271}
272
273func (rd *remoteDelivery) AddRcpt(ctx context.Context, to string, opts smtp.RcptOptions) error {
274	defer trace.StartRegion(ctx, "remote/AddRcpt").End()
275
276	if rd.msgMeta.Quarantine {
277		return &exterrors.SMTPError{
278			Code:         550,
279			EnhancedCode: exterrors.EnhancedCode{5, 7, 0},
280			Message:      "Refusing to deliver a quarantined message",
281			TargetName:   "remote",
282		}
283	}
284
285	_, domain, err := address.Split(to)
286	if err != nil {
287		return err
288	}
289
290	// Special-case for <postmaster> address. If it is not handled by a rewrite rule before
291	// - we should not attempt to do anything with it and reject it as invalid.
292	if domain == "" {
293		return &exterrors.SMTPError{
294			Code:         550,
295			EnhancedCode: exterrors.EnhancedCode{5, 1, 1},
296			Message:      "<postmaster> address it no supported",
297			TargetName:   "remote",
298		}
299	}
300
301	if strings.HasPrefix(domain, "[") {
302		return &exterrors.SMTPError{
303			Code:         550,
304			EnhancedCode: exterrors.EnhancedCode{5, 1, 1},
305			Message:      "IP address literals are not supported",
306			TargetName:   "remote",
307		}
308	}
309
310	conn, err := rd.connectionForDomain(ctx, domain)
311	if err != nil {
312		return err
313	}
314
315	if err := conn.Rcpt(ctx, to, opts); err != nil {
316		return moduleError(err)
317	}
318	conn.lastUseAt = time.Now()
319
320	rd.recipients = append(rd.recipients, to)
321	return nil
322}
323
324type multipleErrs struct {
325	errs      map[string]error
326	statusLck sync.Mutex
327}
328
329func (m *multipleErrs) Error() string {
330	m.statusLck.Lock()
331	defer m.statusLck.Unlock()
332	return fmt.Sprintf("Partial delivery failure, per-rcpt info: %+v", m.errs)
333}
334
335func (m *multipleErrs) Fields() map[string]interface{} {
336	m.statusLck.Lock()
337	defer m.statusLck.Unlock()
338
339	// If there are any temporary errors - the sender should retry to make sure
340	// all recipients will get the message. However, since we can't tell it
341	// which recipients got the message, this will generate duplicates for
342	// them.
343	//
344	// We favor delivery with duplicates over incomplete delivery here.
345
346	var (
347		code     = 550
348		enchCode = exterrors.EnhancedCode{5, 0, 0}
349	)
350	for _, err := range m.errs {
351		if exterrors.IsTemporary(err) {
352			code = 451
353			enchCode = exterrors.EnhancedCode{4, 0, 0}
354		}
355	}
356
357	return map[string]interface{}{
358		"smtp_code":     code,
359		"smtp_enchcode": enchCode,
360		"smtp_msg":      "Partial delivery failure, additional attempts may result in duplicates",
361		"target":        "remote",
362		"errs":          m.errs,
363	}
364}
365
366func (m *multipleErrs) SetStatus(rcptTo string, err error) {
367	m.statusLck.Lock()
368	defer m.statusLck.Unlock()
369	m.errs[rcptTo] = err
370}
371
372func (rd *remoteDelivery) Body(ctx context.Context, header textproto.Header, buffer buffer.Buffer) error {
373	defer trace.StartRegion(ctx, "remote/Body").End()
374
375	merr := multipleErrs{
376		errs: make(map[string]error),
377	}
378	rd.BodyNonAtomic(ctx, &merr, header, buffer)
379
380	for _, v := range merr.errs {
381		if v != nil {
382			if len(merr.errs) == 1 {
383				return v
384			}
385			return &merr
386		}
387	}
388	return nil
389}
390
391func (rd *remoteDelivery) BodyNonAtomic(ctx context.Context, c module.StatusCollector, header textproto.Header, b buffer.Buffer) {
392	defer trace.StartRegion(ctx, "remote/BodyNonAtomic").End()
393
394	if rd.msgMeta.Quarantine {
395		for _, rcpt := range rd.recipients {
396			c.SetStatus(rcpt, &exterrors.SMTPError{
397				Code:         550,
398				EnhancedCode: exterrors.EnhancedCode{5, 7, 0},
399				Message:      "Refusing to deliver quarantined message",
400				TargetName:   "remote",
401			})
402		}
403		return
404	}
405
406	var wg sync.WaitGroup
407
408	for i, conn := range rd.connections {
409		wg.Add(1)
410		go func() {
411			defer wg.Done()
412
413			bodyR, err := b.Open()
414			if err != nil {
415				for _, rcpt := range conn.Rcpts() {
416					c.SetStatus(rcpt, err)
417				}
418				return
419			}
420			defer bodyR.Close()
421
422			err = conn.Data(ctx, header, bodyR)
423			for _, rcpt := range conn.Rcpts() {
424				c.SetStatus(rcpt, err)
425			}
426			rd.connections[i].errored = err != nil
427			conn.lastUseAt = time.Now()
428		}()
429	}
430
431	wg.Wait()
432}
433
434func (rd *remoteDelivery) Abort(ctx context.Context) error {
435	return rd.Close()
436}
437
438func (rd *remoteDelivery) Commit(ctx context.Context) error {
439	// It is not possible to implement it atomically, so users of remoteDelivery have to
440	// take care of partial failures.
441	return rd.Close()
442}
443
444func (rd *remoteDelivery) Close() error {
445	for _, conn := range rd.connections {
446		rd.rt.limits.ReleaseDest(conn.domain)
447		conn.transactions++
448
449		if !conn.Usable() {
450			rd.Log.Debugf("disconnected %v from %s (errored=%v,transactions=%v,disconnected before=%v)",
451				conn.LocalAddr(), conn.ServerName(), conn.errored, conn.transactions, conn.C.Client() == nil)
452			conn.Close()
453		} else {
454			rd.Log.Debugf("returning connection %v for %s to pool", conn.LocalAddr(), conn.ServerName())
455			rd.rt.pool.Return(conn.domain, conn)
456		}
457	}
458
459	var (
460		ratelimitDomain string
461		err             error
462	)
463	if rd.mailFrom != "" {
464		_, ratelimitDomain, err = address.Split(rd.mailFrom)
465		if err != nil {
466			return err
467		}
468	}
469
470	addr := net.IPv4(127, 0, 0, 1)
471	if rd.msgMeta.Conn != nil && rd.msgMeta.Conn.RemoteAddr != nil {
472		tcpAddr, ok := rd.msgMeta.Conn.RemoteAddr.(*net.TCPAddr)
473		if ok {
474			addr = tcpAddr.IP
475		}
476	}
477	rd.rt.limits.ReleaseMsg(addr, ratelimitDomain)
478
479	return nil
480}
481
482func init() {
483	module.Register("target.remote", New)
484}