1/*2Maddy Mail Server - Composable all-in-one email server.3Copyright © 2019-2020 Max Mazurov <fox.cpp@disroot.org>, Maddy Mail Server contributors45This program is free software: you can redistribute it and/or modify6it under the terms of the GNU General Public License as published by7the Free Software Foundation, either version 3 of the License, or8(at your option) any later version.910This program is distributed in the hope that it will be useful,11but WITHOUT ANY WARRANTY; without even the implied warranty of12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the13GNU General Public License for more details.1415You should have received a copy of the GNU General Public License16along with this program. If not, see <https://www.gnu.org/licenses/>.17*/1819package queue2021import (22 "bytes"23 "context"24 "crypto/sha1"25 "encoding/hex"26 "errors"27 "io"28 "os"29 "path/filepath"30 "reflect"31 "strings"32 "testing"33 "time"3435 "github.com/emersion/go-message/textproto"36 "github.com/emersion/go-smtp"37 "github.com/foxcpp/maddy/framework/buffer"38 "github.com/foxcpp/maddy/framework/exterrors"39 "github.com/foxcpp/maddy/framework/log"40 "github.com/foxcpp/maddy/framework/module"41 "github.com/foxcpp/maddy/internal/testutils"42)4344// newTestQueue returns properly initialized Queue object usable for testing.45//46// See newTestQueueDir to create testing queue from an existing directory.47// It is called responsibility to remove queue directory created by this function.48func newTestQueue(t *testing.T, target module.DeliveryTarget) *Queue {49 return newTestQueueDir(t, target, t.TempDir())50}5152func cleanQueue(t *testing.T, q *Queue) {53 t.Log("--- queue.Close")54 if err := q.Close(); err != nil {55 t.Fatal("queue.Close:", err)56 }57}5859func newTestQueueDir(t *testing.T, target module.DeliveryTarget, dir string) *Queue {60 mod, _ := NewQueue("", "queue", nil, nil)61 q := mod.(*Queue)62 q.initialRetryTime = 063 q.retryTimeScale = 164 q.postInitDelay = 065 q.maxTries = 566 q.location = dir67 q.Target = target6869 if testing.Verbose() {70 q.Log = testutils.Logger(t, "queue")71 } else {72 q.Log = log.Logger{Out: log.NopOutput{}}73 }7475 if err := q.start(1); err != nil {76 panic(err)77 }7879 return q80}8182// unreliableTarget is a module.DeliveryTarget implementation that stores83// messages to a slice and sometimes fails with the specified error.84type unreliableTarget struct {85 committed chan testutils.Msg86 aborted chan testutils.Msg8788 // Amount of completed deliveries (both failed and succeeded)89 passedMessages int9091 // To make unreliableTarget fail Commit for N-th delivery, set N-1-th92 // element of this slice to wanted error object. If slice is93 // nil/empty or N is bigger than its size - delivery will succeed.94 bodyFailures []error95 bodyFailuresPartial []map[string]error96 rcptFailures []map[string]error97}9899type unreliableTargetDelivery struct {100 ut *unreliableTarget101 msg testutils.Msg102}103104type unreliableTargetDeliveryPartial struct {105 *unreliableTargetDelivery106}107108func (utd *unreliableTargetDelivery) AddRcpt(ctx context.Context, rcptTo string, _ smtp.RcptOptions) error {109 if len(utd.ut.rcptFailures) > utd.ut.passedMessages {110 rcptErrs := utd.ut.rcptFailures[utd.ut.passedMessages]111 if err := rcptErrs[rcptTo]; err != nil {112 return err113 }114 }115116 utd.msg.RcptTo = append(utd.msg.RcptTo, rcptTo)117 return nil118}119120func (utd *unreliableTargetDelivery) Body(ctx context.Context, header textproto.Header, body buffer.Buffer) error {121 if utd.ut.bodyFailuresPartial != nil {122 return errors.New("partial failure occurred, no additional information available")123 }124125 r, _ := body.Open()126 utd.msg.Body, _ = io.ReadAll(r)127128 if len(utd.ut.bodyFailures) > utd.ut.passedMessages {129 return utd.ut.bodyFailures[utd.ut.passedMessages]130 }131132 return nil133}134135func (utd *unreliableTargetDeliveryPartial) BodyNonAtomic(ctx context.Context, c module.StatusCollector, header textproto.Header, body buffer.Buffer) {136 r, _ := body.Open()137 utd.msg.Body, _ = io.ReadAll(r)138139 if len(utd.ut.bodyFailuresPartial) > utd.ut.passedMessages {140 for rcpt, err := range utd.ut.bodyFailuresPartial[utd.ut.passedMessages] {141 c.SetStatus(rcpt, err)142 }143 }144}145146func (utd *unreliableTargetDelivery) Abort(ctx context.Context) error {147 utd.ut.passedMessages++148 if utd.ut.aborted != nil {149 utd.ut.aborted <- utd.msg150 }151 return nil152}153154func (utd *unreliableTargetDelivery) Commit(ctx context.Context) error {155 utd.ut.passedMessages++156 if utd.ut.committed != nil {157 utd.ut.committed <- utd.msg158 }159 return nil160}161162func (ut *unreliableTarget) Start(ctx context.Context, msgMeta *module.MsgMetadata, mailFrom string) (module.Delivery, error) {163 if ut.bodyFailuresPartial != nil {164 return &unreliableTargetDeliveryPartial{165 &unreliableTargetDelivery{166 ut: ut,167 msg: testutils.Msg{168 MsgMeta: msgMeta,169 MailFrom: mailFrom,170 },171 },172 }, nil173 }174 return &unreliableTargetDelivery{175 ut: ut,176 msg: testutils.Msg{177 MsgMeta: msgMeta,178 MailFrom: mailFrom,179 },180 }, nil181}182183func readMsgChanTimeout(t *testing.T, ch <-chan testutils.Msg, timeout time.Duration) *testutils.Msg {184 t.Helper()185 timer := time.NewTimer(timeout)186 select {187 case msg := <-ch:188 return &msg189 case <-timer.C:190 t.Fatal("chan read timed out")191 return nil192 }193}194195func checkQueueDir(t *testing.T, q *Queue, expectedIDs []string) {196 t.Helper()197 // We use the map to lookups and also to mark messages we found198 // we can report missing entries.199 expectedMap := make(map[string]bool, len(expectedIDs))200 for _, id := range expectedIDs {201 expectedMap[id] = false202 }203204 dir, err := os.ReadDir(q.location)205 if err != nil {206 t.Fatalf("failed to read queue directory: %v", err)207 }208209 // Queue implementation uses file names in the following format:210 // DELIVERY_ID.SOMETHING211 for _, file := range dir {212 if file.IsDir() {213 t.Fatalf("queue should not create subdirectories in the store, but there is %s dir in it", file.Name())214 }215216 nameParts := strings.Split(file.Name(), ".")217 if len(nameParts) != 2 {218 t.Fatalf("did the queue files name format changed? got %s", file.Name())219 }220221 _, ok := expectedMap[nameParts[0]]222 if !ok {223 t.Errorf("message with unexpected Msg ID %s is stored in queue store", nameParts[0])224 continue225 }226227 expectedMap[nameParts[0]] = true228 }229230 for id, found := range expectedMap {231 if !found {232 t.Errorf("expected message with Msg ID %s is missing from queue store", id)233 }234 }235}236237func TestQueueDelivery(t *testing.T) {238 t.Parallel()239240 dt := unreliableTarget{committed: make(chan testutils.Msg, 10)}241 q := newTestQueue(t, &dt)242 defer cleanQueue(t, q)243244 testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"})245246 // Wait for the delivery to complete and stop processing.247 msg := readMsgChanTimeout(t, dt.committed, 5*time.Second)248 q.Close()249250 testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}, "")251252 // There should be no queued messages.253 checkQueueDir(t, q, []string{})254}255256func TestQueueDelivery_PermanentFail_NonPartial(t *testing.T) {257 t.Parallel()258259 dt := unreliableTarget{260 bodyFailures: []error{261 exterrors.WithTemporary(errors.New("you shall not pass"), false),262 },263 aborted: make(chan testutils.Msg, 10),264 }265 q := newTestQueue(t, &dt)266 defer cleanQueue(t, q)267268 testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"})269270 // Queue will abort a delivery if it fails for all recipients.271 readMsgChanTimeout(t, dt.aborted, 5*time.Second)272 q.Close()273274 // Delivery is failed permanently, hence no retry should be rescheduled.275 checkQueueDir(t, q, []string{})276}277278func TestQueueDelivery_PermanentFail_Partial(t *testing.T) {279 t.Parallel()280281 dt := unreliableTarget{282 bodyFailuresPartial: []map[string]error{283 {284 "tester1@example.org": exterrors.WithTemporary(errors.New("you shall not pass"), false),285 "tester2@example.org": exterrors.WithTemporary(errors.New("you shall not pass"), false),286 },287 },288 aborted: make(chan testutils.Msg, 10),289 }290 q := newTestQueue(t, &dt)291 defer cleanQueue(t, q)292293 testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"})294295 // This this is similar to the previous test, but checks PartialDelivery processing logic.296 // Here delivery fails for recipients too, but this is reported using PartialDelivery.297298 readMsgChanTimeout(t, dt.aborted, 5*time.Second)299 q.Close()300 checkQueueDir(t, q, []string{})301}302303func TestQueueDelivery_TemporaryFail(t *testing.T) {304 t.Parallel()305306 dt := unreliableTarget{307 bodyFailures: []error{308 exterrors.WithTemporary(errors.New("you shall not pass"), true),309 },310 aborted: make(chan testutils.Msg, 10),311 committed: make(chan testutils.Msg, 10),312 }313 q := newTestQueue(t, &dt)314 defer cleanQueue(t, q)315316 testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"})317318 // Delivery should be aborted, because it failed for all recipients.319 readMsgChanTimeout(t, dt.aborted, 5*time.Second)320321 // Second retry, should work fine.322 msg := readMsgChanTimeout(t, dt.committed, 5*time.Second)323 testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}, "")324325 q.Close()326 // No more retries scheduled, queue storage is clear.327 defer checkQueueDir(t, q, []string{})328}329330func TestQueueDelivery_TemporaryFail_Partial(t *testing.T) {331 t.Parallel()332333 dt := unreliableTarget{334 bodyFailuresPartial: []map[string]error{335 {336 "tester2@example.org": exterrors.WithTemporary(errors.New("go away"), true),337 },338 },339 aborted: make(chan testutils.Msg, 10),340 committed: make(chan testutils.Msg, 10),341 }342 q := newTestQueue(t, &dt)343 defer cleanQueue(t, q)344345 testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"})346347 // Committed, tester1@example.org - ok.348 msg := readMsgChanTimeout(t, dt.committed, 5000*time.Second)349 // Side note: unreliableTarget adds recipients to the msg object even if they were rejected350 // later using a partial error. So slice below is all recipients that were submitted by351 // the queue.352 testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}, "")353354 // committed #2, tester2@example.org - ok355 msg = readMsgChanTimeout(t, dt.committed, 5000*time.Second)356 testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, "")357358 q.Close()359 // No more retries scheduled, queue storage is clear.360 checkQueueDir(t, q, []string{})361}362363func TestQueueDelivery_MultipleAttempts(t *testing.T) {364 t.Parallel()365366 dt := unreliableTarget{367 bodyFailuresPartial: []map[string]error{368 {369 "tester1@example.org": exterrors.WithTemporary(errors.New("you shall not pass 1"), false),370 "tester2@example.org": exterrors.WithTemporary(errors.New("you shall not pass 2"), true),371 },372 {373 "tester2@example.org": exterrors.WithTemporary(errors.New("you shall not pass 3"), true),374 },375 },376 committed: make(chan testutils.Msg, 10),377 aborted: make(chan testutils.Msg, 10),378 }379 q := newTestQueue(t, &dt)380 defer cleanQueue(t, q)381382 testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org", "tester3@example.org"})383384 // Committed because delivery to tester3@example.org is succeeded.385 msg := readMsgChanTimeout(t, dt.committed, 5*time.Second)386 // Side note: This slice contains all recipients submitted by the queue, even if387 // they were rejected later using partialError.388 testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org", "tester3@example.org"}, "")389390 // tester1 is failed permanently, should not be retried.391 // tester2 is failed temporary, should be retried.392 readMsgChanTimeout(t, dt.aborted, 5*time.Second)393394 // Third attempt... tester2 delivered.395 msg = readMsgChanTimeout(t, dt.committed, 5*time.Second)396 testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, "")397398 q.Close()399 // No more retries should be scheduled.400 checkQueueDir(t, q, []string{})401}402403func TestQueueDelivery_PermanentRcptReject(t *testing.T) {404 t.Parallel()405406 dt := unreliableTarget{407 rcptFailures: []map[string]error{408 {409 "tester1@example.org": exterrors.WithTemporary(errors.New("go away"), false),410 },411 },412 committed: make(chan testutils.Msg, 10),413 }414 q := newTestQueue(t, &dt)415 defer cleanQueue(t, q)416417 testutils.DoTestDelivery(t, q, "tester@example.org", []string{"tester1@example.org", "tester2@example.org"})418419 // Committed, tester2@example.org succeeded.420 msg := readMsgChanTimeout(t, dt.committed, 5*time.Second)421 testutils.CheckMsgID(t, msg, "tester@example.org", []string{"tester2@example.org"}, "")422423 q.Close()424 // No more retries should be scheduled.425 checkQueueDir(t, q, []string{})426}427428func TestQueueDelivery_TemporaryRcptReject(t *testing.T) {429 t.Parallel()430431 dt := unreliableTarget{432 rcptFailures: []map[string]error{433 {434 "tester1@example.org": exterrors.WithTemporary(errors.New("go away"), true),435 },436 },437 committed: make(chan testutils.Msg, 10),438 }439 q := newTestQueue(t, &dt)440 defer cleanQueue(t, q)441442 // First attempt:443 // tester1 - temp. fail444 // tester2 - ok445 // Second attempt:446 // tester1 - ok447 testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"})448449 msg := readMsgChanTimeout(t, dt.committed, 5*time.Second)450 // Unlike previous tests where unreliableTarget rejected recipients by partialError, here they are rejected451 // by AddRcpt directly, so they are NOT saved by the target.452 testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, "")453454 msg = readMsgChanTimeout(t, dt.committed, 5*time.Second)455 testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org"}, "")456457 q.Close()458 // No more retries should be scheduled.459 checkQueueDir(t, q, []string{})460}461462func TestQueueDelivery_SerializationRoundtrip(t *testing.T) {463 t.Parallel()464465 dt := unreliableTarget{466 rcptFailures: []map[string]error{467 {468 "tester1@example.org": exterrors.WithTemporary(errors.New("go away"), true),469 },470 },471 committed: make(chan testutils.Msg, 10),472 }473 q := newTestQueue(t, &dt)474 defer cleanQueue(t, q)475476 // This is the most tricky test because it is racy and I have no idea what can be done to avoid it.477 // It relies on us calling Close before queue msgpipeline decides to retry delivery.478 // Hence retry delay is increased from 0ms used in other tests to make it reliable.479 q.initialRetryTime = 1 * time.Second480481 // To make sure we will not time out due to post-init delay.482 q.postInitDelay = 0483484 deliveryID := testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"})485486 // Standard partial delivery, retry will be scheduled for tester1@example.org.487 msg := readMsgChanTimeout(t, dt.committed, 5*time.Second)488 testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, "")489490 // Then stop it.491 q.Close()492493 // Make sure it is saved.494 checkQueueDir(t, q, []string{deliveryID})495496 // Then reinit it.497 q = newTestQueueDir(t, &dt, q.location)498499 // Wait for retry and check it.500 msg = readMsgChanTimeout(t, dt.committed, 5*time.Second)501 testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org"}, "")502503 // Close it again.504 q.Close()505 // No more retries should be scheduled.506 checkQueueDir(t, q, []string{})507}508509func TestQueueDelivery_DeserlizationCleanUp(t *testing.T) {510 t.Parallel()511512 test := func(t *testing.T, fileSuffix string) {513 dt := unreliableTarget{514 rcptFailures: []map[string]error{515 {516 "tester1@example.org": exterrors.WithTemporary(errors.New("go away"), true),517 },518 },519 committed: make(chan testutils.Msg, 10),520 }521 q := newTestQueue(t, &dt)522 defer cleanQueue(t, q)523524 // This is the most tricky test because it is racy and I have no idea what can be done to avoid it.525 // It relies on us calling Close before queue msgpipeline decides to retry delivery.526 // Hence retry delay is increased from 0ms used in other tests to make it reliable.527 q.initialRetryTime = 1 * time.Second528529 // To make sure we will not time out due to post-init delay.530 q.postInitDelay = 0531532 deliveryID := testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"})533534 // Standard partial delivery, retry will be scheduled for tester1@example.org.535 msg := readMsgChanTimeout(t, dt.committed, 5*time.Second)536 testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, "")537538 q.Close()539540 if err := os.Remove(filepath.Join(q.location, deliveryID+fileSuffix)); err != nil {541 t.Fatal(err)542 }543544 // Dangling files should be removed during load.545 q = newTestQueueDir(t, &dt, q.location)546 q.Close()547548 // Nothing should be left.549 checkQueueDir(t, q, []string{})550 }551552 t.Run("NoMeta", func(t *testing.T) {553 t.Skip("Not implemented")554 test(t, ".meta")555 })556 t.Run("NoBody", func(t *testing.T) {557 test(t, ".body")558 })559 t.Run("NoHeader", func(t *testing.T) {560 test(t, ".header")561 })562}563564func TestQueueDelivery_AbortIfNoRecipients(t *testing.T) {565 t.Parallel()566567 dt := unreliableTarget{568 rcptFailures: []map[string]error{569 {570 "tester1@example.org": exterrors.WithTemporary(errors.New("go away"), true),571 "tester2@example.org": exterrors.WithTemporary(errors.New("go away"), true),572 },573 },574 committed: make(chan testutils.Msg, 10),575 aborted: make(chan testutils.Msg, 10),576 }577 q := newTestQueue(t, &dt)578 defer cleanQueue(t, q)579580 testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"})581 readMsgChanTimeout(t, dt.aborted, 5*time.Second)582}583584func TestQueueDelivery_AbortNoDangling(t *testing.T) {585 t.Parallel()586587 dt := unreliableTarget{588 rcptFailures: []map[string]error{589 {590 "tester1@example.org": exterrors.WithTemporary(errors.New("go away"), true),591 "tester2@example.org": exterrors.WithTemporary(errors.New("go away"), true),592 },593 },594 committed: make(chan testutils.Msg, 10),595 aborted: make(chan testutils.Msg, 10),596 }597 q := newTestQueue(t, &dt)598 defer cleanQueue(t, q)599600 // Copied from testutils.DoTestDelivery.601 IDRaw := sha1.Sum([]byte(t.Name()))602 encodedID := hex.EncodeToString(IDRaw[:])603604 body := buffer.MemoryBuffer{Slice: []byte("foobar\r\n")}605 ctx := module.MsgMetadata{606 DontTraceSender: true,607 ID: encodedID,608 }609 delivery, err := q.Start(context.Background(), &ctx, "test3@example.org")610 if err != nil {611 t.Fatalf("unexpected Start err: %v", err)612 }613 for _, rcpt := range [...]string{"test@example.org", "test2@example.org"} {614 if err := delivery.AddRcpt(context.Background(), rcpt, smtp.RcptOptions{}); err != nil {615 t.Fatalf("unexpected AddRcpt err for %s: %v", rcpt, err)616 }617 }618 if err := delivery.Body(context.Background(), textproto.Header{}, body); err != nil {619 t.Fatalf("unexpected Body err: %v", err)620 }621 if err := delivery.Abort(context.Background()); err != nil {622 t.Fatalf("unexpected Abort err: %v", err)623 }624625 checkQueueDir(t, q, []string{})626}627628func TestQueueDSN(t *testing.T) {629 t.Parallel()630631 dsnTarget := unreliableTarget{632 committed: make(chan testutils.Msg, 10),633 aborted: make(chan testutils.Msg, 10),634 }635636 dt := unreliableTarget{637 rcptFailures: []map[string]error{638 {639 "tester1@example.org": exterrors.WithTemporary(errors.New("go away"), false),640 "tester2@example.org": exterrors.WithTemporary(errors.New("go away"), false),641 },642 },643 committed: make(chan testutils.Msg, 10),644 aborted: make(chan testutils.Msg, 10),645 }646 q := newTestQueue(t, &dt)647 q.hostname = "mx.example.org"648 q.autogenMsgDomain = "example.org"649 q.dsnPipeline = &dsnTarget650 defer cleanQueue(t, q)651652 testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"})653654 // Wait for message delivery attempt to complete (aborted because all recipients fail).655 readMsgChanTimeout(t, dt.aborted, 5*time.Second)656657 // Wait for DSN.658 msg := readMsgChanTimeout(t, dsnTarget.committed, 5*time.Second)659660 if msg.MailFrom != "" {661 t.Fatalf("wrong MAIL FROM address in DSN: %v", msg.MailFrom)662 }663 if !reflect.DeepEqual(msg.RcptTo, []string{"tester@example.com"}) {664 t.Fatalf("wrong RCPT TO address in DSN: %v", msg.RcptTo)665 }666}667668func TestQueueDSN_FromEmptyAddr(t *testing.T) {669 t.Parallel()670671 dsnTarget := unreliableTarget{672 committed: make(chan testutils.Msg, 10),673 aborted: make(chan testutils.Msg, 10),674 }675676 dt := unreliableTarget{677 rcptFailures: []map[string]error{678 {679 "tester1@example.org": exterrors.WithTemporary(errors.New("go away"), false),680 "tester2@example.org": exterrors.WithTemporary(errors.New("go away"), false),681 },682 },683 committed: make(chan testutils.Msg, 10),684 aborted: make(chan testutils.Msg, 10),685 }686 q := newTestQueue(t, &dt)687 q.hostname = "mx.example.org"688 q.autogenMsgDomain = "example.org"689 q.dsnPipeline = &dsnTarget690 defer cleanQueue(t, q)691692 testutils.DoTestDelivery(t, q, "", []string{"tester1@example.org", "tester2@example.org"})693694 // Wait for message delivery attempt to complete (aborted because all recipients fail).695 readMsgChanTimeout(t, dt.aborted, 5*time.Second)696697 time.Sleep(1 * time.Second)698699 // There should be no DSN for it.700 if dsnTarget.passedMessages != 0 {701 t.Errorf("dsnTarget accepted %d messages", dsnTarget.passedMessages)702 }703 checkQueueDir(t, q, []string{})704}705706func TestQueueDSN_NoDSNforDSN(t *testing.T) {707 t.Parallel()708709 dsnTarget := unreliableTarget{710 rcptFailures: []map[string]error{711 {712 "tester@example.org": exterrors.WithTemporary(errors.New("go away"), false),713 },714 },715 committed: make(chan testutils.Msg, 10),716 aborted: make(chan testutils.Msg, 10),717 }718719 dt := unreliableTarget{720 rcptFailures: []map[string]error{721 {722 "tester1@example.org": exterrors.WithTemporary(errors.New("go away"), false),723 "tester2@example.org": exterrors.WithTemporary(errors.New("go away"), false),724 },725 },726 committed: make(chan testutils.Msg, 10),727 aborted: make(chan testutils.Msg, 10),728 }729 q := newTestQueue(t, &dt)730 q.hostname = "mx.example.org"731 q.autogenMsgDomain = "example.org"732 q.dsnPipeline = &dsnTarget733 defer cleanQueue(t, q)734735 testutils.DoTestDelivery(t, q, "tester@example.org", []string{"tester1@example.org", "tester2@example.org"})736737 // Wait for message delivery attempt to complete (aborted because all recipients fail).738 readMsgChanTimeout(t, dt.aborted, 5*time.Second)739740 // DSN will be emitted but will fail, so 'aborted'741 readMsgChanTimeout(t, dsnTarget.aborted, 5*time.Second)742743 time.Sleep(1 * time.Second)744745 // There should be no DSN for DSN (dsnTarget handled one message - the DSN itself).746 if dsnTarget.passedMessages != 1 {747 t.Errorf("dsnTarget accepted %d messages", dsnTarget.passedMessages)748 }749 checkQueueDir(t, q, []string{})750}751752func TestQueueDSN_RcptRewrite(t *testing.T) {753 t.Parallel()754755 dsnTarget := unreliableTarget{756 committed: make(chan testutils.Msg, 10),757 aborted: make(chan testutils.Msg, 10),758 }759760 dt := unreliableTarget{761 rcptFailures: []map[string]error{762 {763 "test@example.org": exterrors.WithTemporary(errors.New("go away"), false),764 "test2@example.org": exterrors.WithTemporary(errors.New("go away"), false),765 },766 },767 committed: make(chan testutils.Msg, 10),768 aborted: make(chan testutils.Msg, 10),769 }770 q := newTestQueue(t, &dt)771 q.hostname = "mx.example.org"772 q.autogenMsgDomain = "example.org"773 q.dsnPipeline = &dsnTarget774 defer cleanQueue(t, q)775776 IDRaw := sha1.Sum([]byte(t.Name()))777 encodedID := hex.EncodeToString(IDRaw[:])778779 body := buffer.MemoryBuffer{Slice: []byte("foobar\r\n")}780 ctx := module.MsgMetadata{781 DontTraceSender: true,782 OriginalFrom: "test3@example.org",783 OriginalRcpts: map[string]string{784 "test@example.org": "test+public@example.com",785 "test2@example.org": "test2+public@example.com",786 },787 ID: encodedID,788 }789 delivery, err := q.Start(context.Background(), &ctx, "test3@example.org")790 if err != nil {791 t.Fatalf("unexpected Start err: %v", err)792 }793 for _, rcpt := range [...]string{"test@example.org", "test2@example.org"} {794 if err := delivery.AddRcpt(context.Background(), rcpt, smtp.RcptOptions{}); err != nil {795 t.Fatalf("unexpected AddRcpt err for %s: %v", rcpt, err)796 }797 }798 if err := delivery.Body(context.Background(), textproto.Header{}, body); err != nil {799 t.Fatalf("unexpected Body err: %v", err)800 }801 if err := delivery.Commit(context.Background()); err != nil {802 t.Fatalf("unexpected Commit err: %v", err)803 }804805 // Wait for message delivery attempt to complete (aborted because all recipients fail).806 readMsgChanTimeout(t, dt.aborted, 5*time.Second)807808 // Wait for DSN.809 msg := readMsgChanTimeout(t, dsnTarget.committed, 5*time.Second)810811 if msg.MailFrom != "" {812 t.Fatalf("wrong MAIL FROM address in DSN: %v", msg.MailFrom)813 }814 if !reflect.DeepEqual(msg.RcptTo, []string{"test3@example.org"}) {815 t.Fatalf("wrong RCPT TO address in DSN: %v", msg.RcptTo)816 }817818 if bytes.Contains(msg.Body, []byte("test@example.org")) || bytes.Contains(msg.Body, []byte("test2@example.org")) {819 t.Errorf("DSN contents mention real final addresses")820 }821 if !bytes.Contains(msg.Body, []byte("test+public@example.com")) || !bytes.Contains(msg.Body, []byte("test2+public@example.com")) {822 t.Errorf("DSN contents do not mention original addresses")823 }824}825826func init() {827 dontRecover = true828}