maddy

Fork https://github.com/foxcpp/maddy

git clone git://git.lin.moe/go/maddy.git

  1/*
  2Maddy Mail Server - Composable all-in-one email server.
  3Copyright © 2019-2020 Max Mazurov <fox.cpp@disroot.org>, Maddy Mail Server contributors
  4
  5This program is free software: you can redistribute it and/or modify
  6it under the terms of the GNU General Public License as published by
  7the Free Software Foundation, either version 3 of the License, or
  8(at your option) any later version.
  9
 10This program is distributed in the hope that it will be useful,
 11but WITHOUT ANY WARRANTY; without even the implied warranty of
 12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 13GNU General Public License for more details.
 14
 15You should have received a copy of the GNU General Public License
 16along with this program.  If not, see <https://www.gnu.org/licenses/>.
 17*/
 18
 19package msgpipeline
 20
 21import (
 22	"context"
 23
 24	"github.com/emersion/go-message/textproto"
 25	"github.com/emersion/go-smtp"
 26	"github.com/foxcpp/maddy/framework/address"
 27	"github.com/foxcpp/maddy/framework/buffer"
 28	"github.com/foxcpp/maddy/framework/config"
 29	"github.com/foxcpp/maddy/framework/dns"
 30	"github.com/foxcpp/maddy/framework/exterrors"
 31	"github.com/foxcpp/maddy/framework/log"
 32	"github.com/foxcpp/maddy/framework/module"
 33	"github.com/foxcpp/maddy/internal/modify"
 34	"github.com/foxcpp/maddy/internal/target"
 35	"golang.org/x/sync/errgroup"
 36)
 37
 38// MsgPipeline is a object that is responsible for selecting delivery targets
 39// for the message and running necessary checks and modifiers.
 40//
 41// It implements module.DeliveryTarget.
 42//
 43// It is not a "module object" and is intended to be used as part of message
 44// source (Submission, SMTP, JMAP modules) implementation.
 45type MsgPipeline struct {
 46	msgpipelineCfg
 47	Hostname string
 48	Resolver dns.Resolver
 49
 50	// Used to indicate the pipeline is handling messages received from the
 51	// external source and not from any other module. That is, this MsgPipeline
 52	// is an instance embedded in endpoint/smtp implementation, for example.
 53	//
 54	// This is a hack since only MsgPipeline can execute some operations at the
 55	// right time but it is not a good idea to execute them multiple multiple
 56	// times for a single message that might be actually handled my multiple
 57	// pipelines via 'msgpipeline' module or 'reroute' directive.
 58	//
 59	// At the moment, the only such operation is the addition of the Received
 60	// header field. See where it happens for explanation on why it is done
 61	// exactly in this place.
 62	FirstPipeline bool
 63
 64	Log log.Logger
 65}
 66
 67type rcptIn struct {
 68	t     module.Table
 69	block *rcptBlock
 70}
 71
 72type sourceBlock struct {
 73	checks      []module.Check
 74	modifiers   modify.Group
 75	rejectErr   error
 76	rcptIn      []rcptIn
 77	perRcpt     map[string]*rcptBlock
 78	defaultRcpt *rcptBlock
 79}
 80
 81type rcptBlock struct {
 82	checks    []module.Check
 83	modifiers modify.Group
 84	rejectErr error
 85	targets   []module.DeliveryTarget
 86}
 87
 88func New(globals map[string]interface{}, cfg []config.Node) (*MsgPipeline, error) {
 89	parsedCfg, err := parseMsgPipelineRootCfg(globals, cfg)
 90	return &MsgPipeline{
 91		msgpipelineCfg: parsedCfg,
 92		Resolver:       dns.DefaultResolver(),
 93	}, err
 94}
 95
 96func (d *MsgPipeline) RunEarlyChecks(ctx context.Context, state *module.ConnState) error {
 97	eg, checkCtx := errgroup.WithContext(ctx)
 98
 99	// TODO: See if there is some point in parallelization of this
100	// function.
101	for _, check := range d.globalChecks {
102		earlyCheck, ok := check.(module.EarlyCheck)
103		if !ok {
104			continue
105		}
106
107		eg.Go(func() error {
108			return earlyCheck.CheckConnection(checkCtx, state)
109		})
110	}
111	return eg.Wait()
112}
113
114// Start starts new message delivery, runs connection and sender checks, sender modifiers
115// and selects source block from config to use for handling.
116//
117// Returned module.Delivery implements PartialDelivery. If underlying target doesn't
118// support it, msgpipeline will copy the returned error for all recipients handled
119// by target.
120func (d *MsgPipeline) Start(ctx context.Context, msgMeta *module.MsgMetadata, mailFrom string) (module.Delivery, error) {
121	dd := msgpipelineDelivery{
122		d:                  d,
123		rcptModifiersState: make(map[*rcptBlock]module.ModifierState),
124		deliveries:         make(map[module.DeliveryTarget]*delivery),
125		msgMeta:            msgMeta,
126		log:                target.DeliveryLogger(d.Log, msgMeta),
127	}
128	dd.checkRunner = newCheckRunner(msgMeta, dd.log, d.Resolver)
129	dd.checkRunner.doDMARC = d.doDMARC
130
131	if msgMeta.OriginalRcpts == nil {
132		msgMeta.OriginalRcpts = map[string]string{}
133	}
134
135	if err := dd.start(ctx, msgMeta, mailFrom); err != nil {
136		dd.close()
137		return nil, err
138	}
139
140	return &dd, nil
141}
142
143func (dd *msgpipelineDelivery) start(ctx context.Context, msgMeta *module.MsgMetadata, mailFrom string) error {
144	var err error
145
146	if err := dd.checkRunner.checkConnSender(ctx, dd.d.globalChecks, mailFrom); err != nil {
147		return err
148	}
149
150	if mailFrom, err = dd.initRunGlobalModifiers(ctx, msgMeta, mailFrom); err != nil {
151		return err
152	}
153
154	sourceBlock, err := dd.srcBlockForAddr(ctx, mailFrom)
155	if err != nil {
156		return err
157	}
158	if sourceBlock.rejectErr != nil {
159		dd.log.Debugf("sender %s rejected with error: %v", mailFrom, sourceBlock.rejectErr)
160		return sourceBlock.rejectErr
161	}
162	dd.sourceBlock = sourceBlock
163
164	if err := dd.checkRunner.checkConnSender(ctx, sourceBlock.checks, mailFrom); err != nil {
165		return err
166	}
167
168	sourceModifiersState, err := sourceBlock.modifiers.ModStateForMsg(ctx, msgMeta)
169	if err != nil {
170		return err
171	}
172	mailFrom, err = sourceModifiersState.RewriteSender(ctx, mailFrom)
173	if err != nil {
174		return err
175	}
176	dd.sourceModifiersState = sourceModifiersState
177
178	dd.sourceAddr = mailFrom
179	return nil
180}
181
182func (dd *msgpipelineDelivery) initRunGlobalModifiers(ctx context.Context, msgMeta *module.MsgMetadata, mailFrom string) (string, error) {
183	globalModifiersState, err := dd.d.globalModifiers.ModStateForMsg(ctx, msgMeta)
184	if err != nil {
185		return "", err
186	}
187	mailFrom, err = globalModifiersState.RewriteSender(ctx, mailFrom)
188	if err != nil {
189		globalModifiersState.Close()
190		return "", err
191	}
192	dd.globalModifiersState = globalModifiersState
193	return mailFrom, nil
194}
195
196func (dd *msgpipelineDelivery) srcBlockForAddr(ctx context.Context, mailFrom string) (sourceBlock, error) {
197	cleanFrom := mailFrom
198	if mailFrom != "" {
199		var err error
200		cleanFrom, err = address.ForLookup(mailFrom)
201		if err != nil {
202			return sourceBlock{}, &exterrors.SMTPError{
203				Code:         501,
204				EnhancedCode: exterrors.EnhancedCode{5, 1, 7},
205				Message:      "Unable to normalize the sender address",
206				Err:          err,
207			}
208		}
209	}
210
211	for _, srcIn := range dd.d.sourceIn {
212		_, ok, err := srcIn.t.Lookup(ctx, cleanFrom)
213		if err != nil {
214			dd.log.Error("source_in lookup failed", err, "key", cleanFrom)
215			continue
216		}
217		if !ok {
218			continue
219		}
220		return srcIn.block, nil
221	}
222
223	// First try to match against complete address.
224	srcBlock, ok := dd.d.perSource[cleanFrom]
225	if !ok {
226		// Then try domain-only.
227		_, domain, err := address.Split(cleanFrom)
228		// mailFrom != "" is added as a special condition
229		// instead of extending address.Split because ""
230		// is not a valid RFC 282 address and only a special
231		// value for SMTP.
232		if err != nil && cleanFrom != "" {
233			return sourceBlock{}, &exterrors.SMTPError{
234				Code:         501,
235				EnhancedCode: exterrors.EnhancedCode{5, 1, 3},
236				Message:      "Invalid sender address",
237				Err:          err,
238				Reason:       "Can't extract local-part and host-part",
239			}
240		}
241
242		// domain is already case-folded and normalized by the message source.
243		srcBlock, ok = dd.d.perSource[domain]
244		if !ok {
245			// Fallback to the default source block.
246			srcBlock = dd.d.defaultSource
247			dd.log.Debugf("sender %s matched by default rule", mailFrom)
248		} else {
249			dd.log.Debugf("sender %s matched by domain rule '%s'", mailFrom, domain)
250		}
251	} else {
252		dd.log.Debugf("sender %s matched by address rule '%s'", mailFrom, cleanFrom)
253	}
254	return srcBlock, nil
255}
256
257type delivery struct {
258	module.Delivery
259	// Recipient addresses this delivery object is used for, original values (not modified by RewriteRcpt).
260	recipients []string
261}
262
263type msgpipelineDelivery struct {
264	d *MsgPipeline
265
266	globalModifiersState module.ModifierState
267	sourceModifiersState module.ModifierState
268	rcptModifiersState   map[*rcptBlock]module.ModifierState
269
270	log log.Logger
271
272	sourceAddr  string
273	sourceBlock sourceBlock
274
275	deliveries  map[module.DeliveryTarget]*delivery
276	msgMeta     *module.MsgMetadata
277	checkRunner *checkRunner
278}
279
280func (dd *msgpipelineDelivery) AddRcpt(ctx context.Context, to string, opts smtp.RcptOptions) error {
281	if err := dd.checkRunner.checkRcpt(ctx, dd.d.globalChecks, to); err != nil {
282		return err
283	}
284	if err := dd.checkRunner.checkRcpt(ctx, dd.sourceBlock.checks, to); err != nil {
285		return err
286	}
287
288	originalTo := to
289
290	newTo, err := dd.globalModifiersState.RewriteRcpt(ctx, to)
291	if err != nil {
292		return err
293	}
294	dd.log.Debugln("global rcpt modifiers:", to, "=>", newTo)
295	resultTo := newTo
296	newTo = []string{}
297
298	for _, to = range resultTo {
299		var tempTo []string
300		tempTo, err = dd.sourceModifiersState.RewriteRcpt(ctx, to)
301		if err != nil {
302			return err
303		}
304		newTo = append(newTo, tempTo...)
305	}
306	dd.log.Debugln("per-source rcpt modifiers:", to, "=>", newTo)
307	resultTo = newTo
308
309	for _, to = range resultTo {
310		wrapErr := func(err error) error {
311			return exterrors.WithFields(err, map[string]interface{}{
312				"effective_rcpt": to,
313			})
314		}
315
316		rcptBlock, err := dd.rcptBlockForAddr(ctx, to)
317		if err != nil {
318			return wrapErr(err)
319		}
320
321		if rcptBlock.rejectErr != nil {
322			return wrapErr(rcptBlock.rejectErr)
323		}
324
325		if err := dd.checkRunner.checkRcpt(ctx, rcptBlock.checks, to); err != nil {
326			return wrapErr(err)
327		}
328
329		rcptModifiersState, err := dd.getRcptModifiers(ctx, rcptBlock, to)
330		if err != nil {
331			return wrapErr(err)
332		}
333
334		newTo, err = rcptModifiersState.RewriteRcpt(ctx, to)
335		if err != nil {
336			rcptModifiersState.Close()
337			return wrapErr(err)
338		}
339		dd.log.Debugln("per-rcpt modifiers:", to, "=>", newTo)
340
341		for _, to = range newTo {
342			wrapErr = func(err error) error {
343				return exterrors.WithFields(err, map[string]interface{}{
344					"effective_rcpt": to,
345				})
346			}
347
348			if originalTo != to {
349				dd.msgMeta.OriginalRcpts[to] = originalTo
350			}
351
352			for _, tgt := range rcptBlock.targets {
353				// Do not wrap errors coming from nested pipeline target delivery since
354				// that pipeline itself will insert effective_rcpt field and could do
355				// its own rewriting - we do not want to hide it from the admin in
356				// error messages.
357				wrapErr := wrapErr
358				if _, ok := tgt.(*MsgPipeline); ok {
359					wrapErr = func(err error) error { return err }
360				}
361
362				delivery, err := dd.getDelivery(ctx, tgt)
363				if err != nil {
364					return wrapErr(err)
365				}
366
367				if err := delivery.AddRcpt(ctx, to, opts); err != nil {
368					return wrapErr(err)
369				}
370				delivery.recipients = append(delivery.recipients, originalTo)
371			}
372		}
373	}
374
375	return nil
376}
377
378func (dd *msgpipelineDelivery) Body(ctx context.Context, header textproto.Header, body buffer.Buffer) error {
379	if err := dd.checkRunner.checkBody(ctx, dd.d.globalChecks, header, body); err != nil {
380		return err
381	}
382	if err := dd.checkRunner.checkBody(ctx, dd.sourceBlock.checks, header, body); err != nil {
383		return err
384	}
385	for blk := range dd.rcptModifiersState {
386		if err := dd.checkRunner.checkBody(ctx, blk.checks, header, body); err != nil {
387			return err
388		}
389	}
390
391	if dd.d.FirstPipeline {
392		// Add Received *after* checks to make sure they see the message literally
393		// how we received it BUT place it below any other field that might be
394		// added by applyResults (including Authentication-Results)
395		// per recommendation in RFC 7001, Section 4 (see GH issue #135).
396		received, err := target.GenerateReceived(ctx, dd.msgMeta, dd.d.Hostname, dd.msgMeta.OriginalFrom)
397		if err != nil {
398			return err
399		}
400		header.Add("Received", received)
401	}
402
403	if err := dd.checkRunner.applyResults(dd.d.Hostname, &header); err != nil {
404		return err
405	}
406
407	// Run modifiers after Authentication-Results addition to make
408	// sure signatures, etc will cover it.
409	if err := dd.globalModifiersState.RewriteBody(ctx, &header, body); err != nil {
410		return err
411	}
412	if err := dd.sourceModifiersState.RewriteBody(ctx, &header, body); err != nil {
413		return err
414	}
415	for _, modifiers := range dd.rcptModifiersState {
416		if err := modifiers.RewriteBody(ctx, &header, body); err != nil {
417			return err
418		}
419	}
420
421	for _, delivery := range dd.deliveries {
422		if err := delivery.Body(ctx, header, body); err != nil {
423			return err
424		}
425		dd.log.Debugf("delivery.Body ok, Delivery object = %T", delivery)
426	}
427	return nil
428}
429
430// statusCollector wraps StatusCollector and adds reverse translation
431// of recipients for all statuses.]
432//
433// We can't let delivery targets set statuses directly because they see
434// modified addresses (RewriteRcpt) and we are supposed to report
435// statuses using original values. Additionally, we should still avoid
436// collect-and-them-report approach since statuses should be reported
437// as soon as possible (that is required by LMTP).
438type statusCollector struct {
439	originalRcpts map[string]string
440	wrapped       module.StatusCollector
441}
442
443func (sc statusCollector) SetStatus(rcptTo string, err error) {
444	original, ok := sc.originalRcpts[rcptTo]
445	if ok {
446		rcptTo = original
447	}
448	sc.wrapped.SetStatus(rcptTo, err)
449}
450
451func (dd *msgpipelineDelivery) BodyNonAtomic(ctx context.Context, c module.StatusCollector, header textproto.Header, body buffer.Buffer) {
452	setStatusAll := func(err error) {
453		for _, delivery := range dd.deliveries {
454			for _, rcpt := range delivery.recipients {
455				c.SetStatus(rcpt, err)
456			}
457		}
458	}
459
460	if err := dd.checkRunner.checkBody(ctx, dd.d.globalChecks, header, body); err != nil {
461		setStatusAll(err)
462		return
463	}
464	if err := dd.checkRunner.checkBody(ctx, dd.sourceBlock.checks, header, body); err != nil {
465		setStatusAll(err)
466		return
467	}
468
469	// Run modifiers after Authentication-Results addition to make
470	// sure signatures, etc will cover it.
471	if err := dd.globalModifiersState.RewriteBody(ctx, &header, body); err != nil {
472		setStatusAll(err)
473		return
474	}
475	if err := dd.sourceModifiersState.RewriteBody(ctx, &header, body); err != nil {
476		setStatusAll(err)
477		return
478	}
479	for _, modifiers := range dd.rcptModifiersState {
480		if err := modifiers.RewriteBody(ctx, &header, body); err != nil {
481			setStatusAll(err)
482			return
483		}
484	}
485
486	for _, delivery := range dd.deliveries {
487		partDelivery, ok := delivery.Delivery.(module.PartialDelivery)
488		if ok {
489			partDelivery.BodyNonAtomic(ctx, statusCollector{
490				originalRcpts: dd.msgMeta.OriginalRcpts,
491				wrapped:       c,
492			}, header, body)
493			continue
494		}
495
496		if err := delivery.Body(ctx, header, body); err != nil {
497			for _, rcpt := range delivery.recipients {
498				c.SetStatus(rcpt, err)
499			}
500		}
501	}
502}
503
504func (dd msgpipelineDelivery) Commit(ctx context.Context) error {
505	dd.close()
506
507	for _, delivery := range dd.deliveries {
508		if err := delivery.Commit(ctx); err != nil {
509			// No point in Committing remaining deliveries, everything is broken already.
510			return err
511		}
512	}
513	return nil
514}
515
516func (dd *msgpipelineDelivery) close() {
517	dd.checkRunner.close()
518
519	if dd.globalModifiersState != nil {
520		dd.globalModifiersState.Close()
521	}
522	if dd.sourceModifiersState != nil {
523		dd.sourceModifiersState.Close()
524	}
525	for _, modifiers := range dd.rcptModifiersState {
526		modifiers.Close()
527	}
528}
529
530func (dd msgpipelineDelivery) Abort(ctx context.Context) error {
531	dd.close()
532
533	var lastErr error
534	for _, delivery := range dd.deliveries {
535		if err := delivery.Abort(ctx); err != nil {
536			dd.log.Debugf("delivery.Abort failure, Delivery object = %T: %v", delivery, err)
537			lastErr = err
538			// Continue anyway and try to Abort all remaining delivery objects.
539		}
540	}
541	return lastErr
542}
543
544func (dd *msgpipelineDelivery) rcptBlockForAddr(ctx context.Context, rcptTo string) (*rcptBlock, error) {
545	cleanRcpt, err := address.ForLookup(rcptTo)
546	if err != nil {
547		return nil, &exterrors.SMTPError{
548			Code:         553,
549			EnhancedCode: exterrors.EnhancedCode{5, 1, 2},
550			Message:      "Unable to normalize the recipient address",
551			Err:          err,
552		}
553	}
554
555	for _, rcptIn := range dd.sourceBlock.rcptIn {
556		_, ok, err := rcptIn.t.Lookup(ctx, cleanRcpt)
557		if err != nil {
558			dd.log.Error("destination_in lookup failed", err, "key", cleanRcpt)
559			continue
560		}
561		if !ok {
562			continue
563		}
564		return rcptIn.block, nil
565	}
566
567	// First try to match against complete address.
568	rcptBlock, ok := dd.sourceBlock.perRcpt[cleanRcpt]
569	if !ok {
570		// Then try domain-only.
571		_, domain, err := address.Split(cleanRcpt)
572		if err != nil {
573			return nil, &exterrors.SMTPError{
574				Code:         501,
575				EnhancedCode: exterrors.EnhancedCode{5, 1, 3},
576				Message:      "Invalid recipient address",
577				Err:          err,
578				Reason:       "Can't extract local-part and host-part",
579			}
580		}
581
582		// domain is already case-folded and normalized because it is a part of
583		// cleanRcpt.
584		rcptBlock, ok = dd.sourceBlock.perRcpt[domain]
585		if !ok {
586			// Fallback to the default source block.
587			rcptBlock = dd.sourceBlock.defaultRcpt
588			dd.log.Debugf("recipient %s matched by default rule (clean = %s)", rcptTo, cleanRcpt)
589		} else {
590			dd.log.Debugf("recipient %s matched by domain rule '%s'", rcptTo, domain)
591		}
592	} else {
593		dd.log.Debugf("recipient %s matched by address rule '%s'", rcptTo, cleanRcpt)
594	}
595	return rcptBlock, nil
596}
597
598func (dd *msgpipelineDelivery) getRcptModifiers(ctx context.Context, rcptBlock *rcptBlock, rcptTo string) (module.ModifierState, error) {
599	rcptModifiersState, ok := dd.rcptModifiersState[rcptBlock]
600	if ok {
601		return rcptModifiersState, nil
602	}
603
604	rcptModifiersState, err := rcptBlock.modifiers.ModStateForMsg(ctx, dd.msgMeta)
605	if err != nil {
606		return nil, err
607	}
608
609	newSender, err := rcptModifiersState.RewriteSender(ctx, dd.sourceAddr)
610	if err == nil && newSender != dd.sourceAddr {
611		dd.log.Msg("Per-recipient modifier changed sender address. This is not supported and will "+
612			"be ignored.", "rcpt", rcptTo, "originalFrom", dd.sourceAddr, "modifiedFrom", newSender)
613	}
614
615	dd.rcptModifiersState[rcptBlock] = rcptModifiersState
616	return rcptModifiersState, nil
617}
618
619func (dd *msgpipelineDelivery) getDelivery(ctx context.Context, tgt module.DeliveryTarget) (*delivery, error) {
620	delivery_, ok := dd.deliveries[tgt]
621	if ok {
622		return delivery_, nil
623	}
624
625	deliveryObj, err := tgt.Start(ctx, dd.msgMeta, dd.sourceAddr)
626	if err != nil {
627		dd.log.Debugf("tgt.Start(%s) failure, target = %s: %v", dd.sourceAddr, objectName(tgt), err)
628		return nil, err
629	}
630	delivery_ = &delivery{Delivery: deliveryObj}
631
632	dd.log.Debugf("tgt.Start(%s) ok, target = %s", dd.sourceAddr, objectName(tgt))
633
634	dd.deliveries[tgt] = delivery_
635	return delivery_, nil
636}
637
638// Mock returns a MsgPipeline that merely delivers messages to a specified target
639// and runs a set of checks.
640//
641// It is meant for use in tests for modules that embed a pipeline object.
642func Mock(tgt module.DeliveryTarget, globalChecks []module.Check) *MsgPipeline {
643	return &MsgPipeline{
644		msgpipelineCfg: msgpipelineCfg{
645			globalChecks: globalChecks,
646			perSource:    map[string]sourceBlock{},
647			defaultSource: sourceBlock{
648				perRcpt: map[string]*rcptBlock{},
649				defaultRcpt: &rcptBlock{
650					targets: []module.DeliveryTarget{tgt},
651				},
652			},
653		},
654	}
655}