1/*2Maddy Mail Server - Composable all-in-one email server.3Copyright © 2019-2020 Max Mazurov <fox.cpp@disroot.org>, Maddy Mail Server contributors45This program is free software: you can redistribute it and/or modify6it under the terms of the GNU General Public License as published by7the Free Software Foundation, either version 3 of the License, or8(at your option) any later version.910This program is distributed in the hope that it will be useful,11but WITHOUT ANY WARRANTY; without even the implied warranty of12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the13GNU General Public License for more details.1415You should have received a copy of the GNU General Public License16along with this program. If not, see <https://www.gnu.org/licenses/>.17*/1819package msgpipeline2021import (22 "context"2324 "github.com/emersion/go-message/textproto"25 "github.com/emersion/go-smtp"26 "github.com/foxcpp/maddy/framework/address"27 "github.com/foxcpp/maddy/framework/buffer"28 "github.com/foxcpp/maddy/framework/config"29 "github.com/foxcpp/maddy/framework/dns"30 "github.com/foxcpp/maddy/framework/exterrors"31 "github.com/foxcpp/maddy/framework/log"32 "github.com/foxcpp/maddy/framework/module"33 "github.com/foxcpp/maddy/internal/modify"34 "github.com/foxcpp/maddy/internal/target"35 "golang.org/x/sync/errgroup"36)3738// MsgPipeline is a object that is responsible for selecting delivery targets39// for the message and running necessary checks and modifiers.40//41// It implements module.DeliveryTarget.42//43// It is not a "module object" and is intended to be used as part of message44// source (Submission, SMTP, JMAP modules) implementation.45type MsgPipeline struct {46 msgpipelineCfg47 Hostname string48 Resolver dns.Resolver4950 // Used to indicate the pipeline is handling messages received from the51 // external source and not from any other module. That is, this MsgPipeline52 // is an instance embedded in endpoint/smtp implementation, for example.53 //54 // This is a hack since only MsgPipeline can execute some operations at the55 // right time but it is not a good idea to execute them multiple multiple56 // times for a single message that might be actually handled my multiple57 // pipelines via 'msgpipeline' module or 'reroute' directive.58 //59 // At the moment, the only such operation is the addition of the Received60 // header field. See where it happens for explanation on why it is done61 // exactly in this place.62 FirstPipeline bool6364 Log log.Logger65}6667type rcptIn struct {68 t module.Table69 block *rcptBlock70}7172type sourceBlock struct {73 checks []module.Check74 modifiers modify.Group75 rejectErr error76 rcptIn []rcptIn77 perRcpt map[string]*rcptBlock78 defaultRcpt *rcptBlock79}8081type rcptBlock struct {82 checks []module.Check83 modifiers modify.Group84 rejectErr error85 targets []module.DeliveryTarget86}8788func New(globals map[string]interface{}, cfg []config.Node) (*MsgPipeline, error) {89 parsedCfg, err := parseMsgPipelineRootCfg(globals, cfg)90 return &MsgPipeline{91 msgpipelineCfg: parsedCfg,92 Resolver: dns.DefaultResolver(),93 }, err94}9596func (d *MsgPipeline) RunEarlyChecks(ctx context.Context, state *module.ConnState) error {97 eg, checkCtx := errgroup.WithContext(ctx)9899 // TODO: See if there is some point in parallelization of this100 // function.101 for _, check := range d.globalChecks {102 earlyCheck, ok := check.(module.EarlyCheck)103 if !ok {104 continue105 }106107 eg.Go(func() error {108 return earlyCheck.CheckConnection(checkCtx, state)109 })110 }111 return eg.Wait()112}113114// Start starts new message delivery, runs connection and sender checks, sender modifiers115// and selects source block from config to use for handling.116//117// Returned module.Delivery implements PartialDelivery. If underlying target doesn't118// support it, msgpipeline will copy the returned error for all recipients handled119// by target.120func (d *MsgPipeline) Start(ctx context.Context, msgMeta *module.MsgMetadata, mailFrom string) (module.Delivery, error) {121 dd := msgpipelineDelivery{122 d: d,123 rcptModifiersState: make(map[*rcptBlock]module.ModifierState),124 deliveries: make(map[module.DeliveryTarget]*delivery),125 msgMeta: msgMeta,126 log: target.DeliveryLogger(d.Log, msgMeta),127 }128 dd.checkRunner = newCheckRunner(msgMeta, dd.log, d.Resolver)129 dd.checkRunner.doDMARC = d.doDMARC130131 if msgMeta.OriginalRcpts == nil {132 msgMeta.OriginalRcpts = map[string]string{}133 }134135 if err := dd.start(ctx, msgMeta, mailFrom); err != nil {136 dd.close()137 return nil, err138 }139140 return &dd, nil141}142143func (dd *msgpipelineDelivery) start(ctx context.Context, msgMeta *module.MsgMetadata, mailFrom string) error {144 var err error145146 if err := dd.checkRunner.checkConnSender(ctx, dd.d.globalChecks, mailFrom); err != nil {147 return err148 }149150 if mailFrom, err = dd.initRunGlobalModifiers(ctx, msgMeta, mailFrom); err != nil {151 return err152 }153154 sourceBlock, err := dd.srcBlockForAddr(ctx, mailFrom)155 if err != nil {156 return err157 }158 if sourceBlock.rejectErr != nil {159 dd.log.Debugf("sender %s rejected with error: %v", mailFrom, sourceBlock.rejectErr)160 return sourceBlock.rejectErr161 }162 dd.sourceBlock = sourceBlock163164 if err := dd.checkRunner.checkConnSender(ctx, sourceBlock.checks, mailFrom); err != nil {165 return err166 }167168 sourceModifiersState, err := sourceBlock.modifiers.ModStateForMsg(ctx, msgMeta)169 if err != nil {170 return err171 }172 mailFrom, err = sourceModifiersState.RewriteSender(ctx, mailFrom)173 if err != nil {174 return err175 }176 dd.sourceModifiersState = sourceModifiersState177178 dd.sourceAddr = mailFrom179 return nil180}181182func (dd *msgpipelineDelivery) initRunGlobalModifiers(ctx context.Context, msgMeta *module.MsgMetadata, mailFrom string) (string, error) {183 globalModifiersState, err := dd.d.globalModifiers.ModStateForMsg(ctx, msgMeta)184 if err != nil {185 return "", err186 }187 mailFrom, err = globalModifiersState.RewriteSender(ctx, mailFrom)188 if err != nil {189 globalModifiersState.Close()190 return "", err191 }192 dd.globalModifiersState = globalModifiersState193 return mailFrom, nil194}195196func (dd *msgpipelineDelivery) srcBlockForAddr(ctx context.Context, mailFrom string) (sourceBlock, error) {197 cleanFrom := mailFrom198 if mailFrom != "" {199 var err error200 cleanFrom, err = address.ForLookup(mailFrom)201 if err != nil {202 return sourceBlock{}, &exterrors.SMTPError{203 Code: 501,204 EnhancedCode: exterrors.EnhancedCode{5, 1, 7},205 Message: "Unable to normalize the sender address",206 Err: err,207 }208 }209 }210211 for _, srcIn := range dd.d.sourceIn {212 _, ok, err := srcIn.t.Lookup(ctx, cleanFrom)213 if err != nil {214 dd.log.Error("source_in lookup failed", err, "key", cleanFrom)215 continue216 }217 if !ok {218 continue219 }220 return srcIn.block, nil221 }222223 // First try to match against complete address.224 srcBlock, ok := dd.d.perSource[cleanFrom]225 if !ok {226 // Then try domain-only.227 _, domain, err := address.Split(cleanFrom)228 // mailFrom != "" is added as a special condition229 // instead of extending address.Split because ""230 // is not a valid RFC 282 address and only a special231 // value for SMTP.232 if err != nil && cleanFrom != "" {233 return sourceBlock{}, &exterrors.SMTPError{234 Code: 501,235 EnhancedCode: exterrors.EnhancedCode{5, 1, 3},236 Message: "Invalid sender address",237 Err: err,238 Reason: "Can't extract local-part and host-part",239 }240 }241242 // domain is already case-folded and normalized by the message source.243 srcBlock, ok = dd.d.perSource[domain]244 if !ok {245 // Fallback to the default source block.246 srcBlock = dd.d.defaultSource247 dd.log.Debugf("sender %s matched by default rule", mailFrom)248 } else {249 dd.log.Debugf("sender %s matched by domain rule '%s'", mailFrom, domain)250 }251 } else {252 dd.log.Debugf("sender %s matched by address rule '%s'", mailFrom, cleanFrom)253 }254 return srcBlock, nil255}256257type delivery struct {258 module.Delivery259 // Recipient addresses this delivery object is used for, original values (not modified by RewriteRcpt).260 recipients []string261}262263type msgpipelineDelivery struct {264 d *MsgPipeline265266 globalModifiersState module.ModifierState267 sourceModifiersState module.ModifierState268 rcptModifiersState map[*rcptBlock]module.ModifierState269270 log log.Logger271272 sourceAddr string273 sourceBlock sourceBlock274275 deliveries map[module.DeliveryTarget]*delivery276 msgMeta *module.MsgMetadata277 checkRunner *checkRunner278}279280func (dd *msgpipelineDelivery) AddRcpt(ctx context.Context, to string, opts smtp.RcptOptions) error {281 if err := dd.checkRunner.checkRcpt(ctx, dd.d.globalChecks, to); err != nil {282 return err283 }284 if err := dd.checkRunner.checkRcpt(ctx, dd.sourceBlock.checks, to); err != nil {285 return err286 }287288 originalTo := to289290 newTo, err := dd.globalModifiersState.RewriteRcpt(ctx, to)291 if err != nil {292 return err293 }294 dd.log.Debugln("global rcpt modifiers:", to, "=>", newTo)295 resultTo := newTo296 newTo = []string{}297298 for _, to = range resultTo {299 var tempTo []string300 tempTo, err = dd.sourceModifiersState.RewriteRcpt(ctx, to)301 if err != nil {302 return err303 }304 newTo = append(newTo, tempTo...)305 }306 dd.log.Debugln("per-source rcpt modifiers:", to, "=>", newTo)307 resultTo = newTo308309 for _, to = range resultTo {310 wrapErr := func(err error) error {311 return exterrors.WithFields(err, map[string]interface{}{312 "effective_rcpt": to,313 })314 }315316 rcptBlock, err := dd.rcptBlockForAddr(ctx, to)317 if err != nil {318 return wrapErr(err)319 }320321 if rcptBlock.rejectErr != nil {322 return wrapErr(rcptBlock.rejectErr)323 }324325 if err := dd.checkRunner.checkRcpt(ctx, rcptBlock.checks, to); err != nil {326 return wrapErr(err)327 }328329 rcptModifiersState, err := dd.getRcptModifiers(ctx, rcptBlock, to)330 if err != nil {331 return wrapErr(err)332 }333334 newTo, err = rcptModifiersState.RewriteRcpt(ctx, to)335 if err != nil {336 rcptModifiersState.Close()337 return wrapErr(err)338 }339 dd.log.Debugln("per-rcpt modifiers:", to, "=>", newTo)340341 for _, to = range newTo {342 wrapErr = func(err error) error {343 return exterrors.WithFields(err, map[string]interface{}{344 "effective_rcpt": to,345 })346 }347348 if originalTo != to {349 dd.msgMeta.OriginalRcpts[to] = originalTo350 }351352 for _, tgt := range rcptBlock.targets {353 // Do not wrap errors coming from nested pipeline target delivery since354 // that pipeline itself will insert effective_rcpt field and could do355 // its own rewriting - we do not want to hide it from the admin in356 // error messages.357 wrapErr := wrapErr358 if _, ok := tgt.(*MsgPipeline); ok {359 wrapErr = func(err error) error { return err }360 }361362 delivery, err := dd.getDelivery(ctx, tgt)363 if err != nil {364 return wrapErr(err)365 }366367 if err := delivery.AddRcpt(ctx, to, opts); err != nil {368 return wrapErr(err)369 }370 delivery.recipients = append(delivery.recipients, originalTo)371 }372 }373 }374375 return nil376}377378func (dd *msgpipelineDelivery) Body(ctx context.Context, header textproto.Header, body buffer.Buffer) error {379 if err := dd.checkRunner.checkBody(ctx, dd.d.globalChecks, header, body); err != nil {380 return err381 }382 if err := dd.checkRunner.checkBody(ctx, dd.sourceBlock.checks, header, body); err != nil {383 return err384 }385 for blk := range dd.rcptModifiersState {386 if err := dd.checkRunner.checkBody(ctx, blk.checks, header, body); err != nil {387 return err388 }389 }390391 if dd.d.FirstPipeline {392 // Add Received *after* checks to make sure they see the message literally393 // how we received it BUT place it below any other field that might be394 // added by applyResults (including Authentication-Results)395 // per recommendation in RFC 7001, Section 4 (see GH issue #135).396 received, err := target.GenerateReceived(ctx, dd.msgMeta, dd.d.Hostname, dd.msgMeta.OriginalFrom)397 if err != nil {398 return err399 }400 header.Add("Received", received)401 }402403 if err := dd.checkRunner.applyResults(dd.d.Hostname, &header); err != nil {404 return err405 }406407 // Run modifiers after Authentication-Results addition to make408 // sure signatures, etc will cover it.409 if err := dd.globalModifiersState.RewriteBody(ctx, &header, body); err != nil {410 return err411 }412 if err := dd.sourceModifiersState.RewriteBody(ctx, &header, body); err != nil {413 return err414 }415 for _, modifiers := range dd.rcptModifiersState {416 if err := modifiers.RewriteBody(ctx, &header, body); err != nil {417 return err418 }419 }420421 for _, delivery := range dd.deliveries {422 if err := delivery.Body(ctx, header, body); err != nil {423 return err424 }425 dd.log.Debugf("delivery.Body ok, Delivery object = %T", delivery)426 }427 return nil428}429430// statusCollector wraps StatusCollector and adds reverse translation431// of recipients for all statuses.]432//433// We can't let delivery targets set statuses directly because they see434// modified addresses (RewriteRcpt) and we are supposed to report435// statuses using original values. Additionally, we should still avoid436// collect-and-them-report approach since statuses should be reported437// as soon as possible (that is required by LMTP).438type statusCollector struct {439 originalRcpts map[string]string440 wrapped module.StatusCollector441}442443func (sc statusCollector) SetStatus(rcptTo string, err error) {444 original, ok := sc.originalRcpts[rcptTo]445 if ok {446 rcptTo = original447 }448 sc.wrapped.SetStatus(rcptTo, err)449}450451func (dd *msgpipelineDelivery) BodyNonAtomic(ctx context.Context, c module.StatusCollector, header textproto.Header, body buffer.Buffer) {452 setStatusAll := func(err error) {453 for _, delivery := range dd.deliveries {454 for _, rcpt := range delivery.recipients {455 c.SetStatus(rcpt, err)456 }457 }458 }459460 if err := dd.checkRunner.checkBody(ctx, dd.d.globalChecks, header, body); err != nil {461 setStatusAll(err)462 return463 }464 if err := dd.checkRunner.checkBody(ctx, dd.sourceBlock.checks, header, body); err != nil {465 setStatusAll(err)466 return467 }468469 // Run modifiers after Authentication-Results addition to make470 // sure signatures, etc will cover it.471 if err := dd.globalModifiersState.RewriteBody(ctx, &header, body); err != nil {472 setStatusAll(err)473 return474 }475 if err := dd.sourceModifiersState.RewriteBody(ctx, &header, body); err != nil {476 setStatusAll(err)477 return478 }479 for _, modifiers := range dd.rcptModifiersState {480 if err := modifiers.RewriteBody(ctx, &header, body); err != nil {481 setStatusAll(err)482 return483 }484 }485486 for _, delivery := range dd.deliveries {487 partDelivery, ok := delivery.Delivery.(module.PartialDelivery)488 if ok {489 partDelivery.BodyNonAtomic(ctx, statusCollector{490 originalRcpts: dd.msgMeta.OriginalRcpts,491 wrapped: c,492 }, header, body)493 continue494 }495496 if err := delivery.Body(ctx, header, body); err != nil {497 for _, rcpt := range delivery.recipients {498 c.SetStatus(rcpt, err)499 }500 }501 }502}503504func (dd msgpipelineDelivery) Commit(ctx context.Context) error {505 dd.close()506507 for _, delivery := range dd.deliveries {508 if err := delivery.Commit(ctx); err != nil {509 // No point in Committing remaining deliveries, everything is broken already.510 return err511 }512 }513 return nil514}515516func (dd *msgpipelineDelivery) close() {517 dd.checkRunner.close()518519 if dd.globalModifiersState != nil {520 dd.globalModifiersState.Close()521 }522 if dd.sourceModifiersState != nil {523 dd.sourceModifiersState.Close()524 }525 for _, modifiers := range dd.rcptModifiersState {526 modifiers.Close()527 }528}529530func (dd msgpipelineDelivery) Abort(ctx context.Context) error {531 dd.close()532533 var lastErr error534 for _, delivery := range dd.deliveries {535 if err := delivery.Abort(ctx); err != nil {536 dd.log.Debugf("delivery.Abort failure, Delivery object = %T: %v", delivery, err)537 lastErr = err538 // Continue anyway and try to Abort all remaining delivery objects.539 }540 }541 return lastErr542}543544func (dd *msgpipelineDelivery) rcptBlockForAddr(ctx context.Context, rcptTo string) (*rcptBlock, error) {545 cleanRcpt, err := address.ForLookup(rcptTo)546 if err != nil {547 return nil, &exterrors.SMTPError{548 Code: 553,549 EnhancedCode: exterrors.EnhancedCode{5, 1, 2},550 Message: "Unable to normalize the recipient address",551 Err: err,552 }553 }554555 for _, rcptIn := range dd.sourceBlock.rcptIn {556 _, ok, err := rcptIn.t.Lookup(ctx, cleanRcpt)557 if err != nil {558 dd.log.Error("destination_in lookup failed", err, "key", cleanRcpt)559 continue560 }561 if !ok {562 continue563 }564 return rcptIn.block, nil565 }566567 // First try to match against complete address.568 rcptBlock, ok := dd.sourceBlock.perRcpt[cleanRcpt]569 if !ok {570 // Then try domain-only.571 _, domain, err := address.Split(cleanRcpt)572 if err != nil {573 return nil, &exterrors.SMTPError{574 Code: 501,575 EnhancedCode: exterrors.EnhancedCode{5, 1, 3},576 Message: "Invalid recipient address",577 Err: err,578 Reason: "Can't extract local-part and host-part",579 }580 }581582 // domain is already case-folded and normalized because it is a part of583 // cleanRcpt.584 rcptBlock, ok = dd.sourceBlock.perRcpt[domain]585 if !ok {586 // Fallback to the default source block.587 rcptBlock = dd.sourceBlock.defaultRcpt588 dd.log.Debugf("recipient %s matched by default rule (clean = %s)", rcptTo, cleanRcpt)589 } else {590 dd.log.Debugf("recipient %s matched by domain rule '%s'", rcptTo, domain)591 }592 } else {593 dd.log.Debugf("recipient %s matched by address rule '%s'", rcptTo, cleanRcpt)594 }595 return rcptBlock, nil596}597598func (dd *msgpipelineDelivery) getRcptModifiers(ctx context.Context, rcptBlock *rcptBlock, rcptTo string) (module.ModifierState, error) {599 rcptModifiersState, ok := dd.rcptModifiersState[rcptBlock]600 if ok {601 return rcptModifiersState, nil602 }603604 rcptModifiersState, err := rcptBlock.modifiers.ModStateForMsg(ctx, dd.msgMeta)605 if err != nil {606 return nil, err607 }608609 newSender, err := rcptModifiersState.RewriteSender(ctx, dd.sourceAddr)610 if err == nil && newSender != dd.sourceAddr {611 dd.log.Msg("Per-recipient modifier changed sender address. This is not supported and will "+612 "be ignored.", "rcpt", rcptTo, "originalFrom", dd.sourceAddr, "modifiedFrom", newSender)613 }614615 dd.rcptModifiersState[rcptBlock] = rcptModifiersState616 return rcptModifiersState, nil617}618619func (dd *msgpipelineDelivery) getDelivery(ctx context.Context, tgt module.DeliveryTarget) (*delivery, error) {620 delivery_, ok := dd.deliveries[tgt]621 if ok {622 return delivery_, nil623 }624625 deliveryObj, err := tgt.Start(ctx, dd.msgMeta, dd.sourceAddr)626 if err != nil {627 dd.log.Debugf("tgt.Start(%s) failure, target = %s: %v", dd.sourceAddr, objectName(tgt), err)628 return nil, err629 }630 delivery_ = &delivery{Delivery: deliveryObj}631632 dd.log.Debugf("tgt.Start(%s) ok, target = %s", dd.sourceAddr, objectName(tgt))633634 dd.deliveries[tgt] = delivery_635 return delivery_, nil636}637638// Mock returns a MsgPipeline that merely delivers messages to a specified target639// and runs a set of checks.640//641// It is meant for use in tests for modules that embed a pipeline object.642func Mock(tgt module.DeliveryTarget, globalChecks []module.Check) *MsgPipeline {643 return &MsgPipeline{644 msgpipelineCfg: msgpipelineCfg{645 globalChecks: globalChecks,646 perSource: map[string]sourceBlock{},647 defaultSource: sourceBlock{648 perRcpt: map[string]*rcptBlock{},649 defaultRcpt: &rcptBlock{650 targets: []module.DeliveryTarget{tgt},651 },652 },653 },654 }655}