maddy

Fork https://github.com/foxcpp/maddy

git clone git://git.lin.moe/go/maddy.git

  1/*
  2Maddy Mail Server - Composable all-in-one email server.
  3Copyright © 2019-2020 Max Mazurov <fox.cpp@disroot.org>, Maddy Mail Server contributors
  4
  5This program is free software: you can redistribute it and/or modify
  6it under the terms of the GNU General Public License as published by
  7the Free Software Foundation, either version 3 of the License, or
  8(at your option) any later version.
  9
 10This program is distributed in the hope that it will be useful,
 11but WITHOUT ANY WARRANTY; without even the implied warranty of
 12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 13GNU General Public License for more details.
 14
 15You should have received a copy of the GNU General Public License
 16along with this program.  If not, see <https://www.gnu.org/licenses/>.
 17*/
 18
 19package queue
 20
 21import (
 22	"bytes"
 23	"context"
 24	"crypto/sha1"
 25	"encoding/hex"
 26	"errors"
 27	"io"
 28	"os"
 29	"path/filepath"
 30	"reflect"
 31	"strings"
 32	"testing"
 33	"time"
 34
 35	"github.com/emersion/go-message/textproto"
 36	"github.com/emersion/go-smtp"
 37	"github.com/foxcpp/maddy/framework/buffer"
 38	"github.com/foxcpp/maddy/framework/exterrors"
 39	"github.com/foxcpp/maddy/framework/log"
 40	"github.com/foxcpp/maddy/framework/module"
 41	"github.com/foxcpp/maddy/internal/testutils"
 42)
 43
 44// newTestQueue returns properly initialized Queue object usable for testing.
 45//
 46// See newTestQueueDir to create testing queue from an existing directory.
 47// It is called responsibility to remove queue directory created by this function.
 48func newTestQueue(t *testing.T, target module.DeliveryTarget) *Queue {
 49	return newTestQueueDir(t, target, t.TempDir())
 50}
 51
 52func cleanQueue(t *testing.T, q *Queue) {
 53	t.Log("--- queue.Close")
 54	if err := q.Close(); err != nil {
 55		t.Fatal("queue.Close:", err)
 56	}
 57}
 58
 59func newTestQueueDir(t *testing.T, target module.DeliveryTarget, dir string) *Queue {
 60	mod, _ := NewQueue("", "queue", nil, nil)
 61	q := mod.(*Queue)
 62	q.initialRetryTime = 0
 63	q.retryTimeScale = 1
 64	q.postInitDelay = 0
 65	q.maxTries = 5
 66	q.location = dir
 67	q.Target = target
 68
 69	if testing.Verbose() {
 70		q.Log = testutils.Logger(t, "queue")
 71	} else {
 72		q.Log = log.Logger{Out: log.NopOutput{}}
 73	}
 74
 75	if err := q.start(1); err != nil {
 76		panic(err)
 77	}
 78
 79	return q
 80}
 81
 82// unreliableTarget is a module.DeliveryTarget implementation that stores
 83// messages to a slice and sometimes fails with the specified error.
 84type unreliableTarget struct {
 85	committed chan testutils.Msg
 86	aborted   chan testutils.Msg
 87
 88	// Amount of completed deliveries (both failed and succeeded)
 89	passedMessages int
 90
 91	// To make unreliableTarget fail Commit for N-th delivery, set N-1-th
 92	// element of this slice to wanted error object. If slice is
 93	// nil/empty or N is bigger than its size - delivery will succeed.
 94	bodyFailures        []error
 95	bodyFailuresPartial []map[string]error
 96	rcptFailures        []map[string]error
 97}
 98
 99type unreliableTargetDelivery struct {
100	ut  *unreliableTarget
101	msg testutils.Msg
102}
103
104type unreliableTargetDeliveryPartial struct {
105	*unreliableTargetDelivery
106}
107
108func (utd *unreliableTargetDelivery) AddRcpt(ctx context.Context, rcptTo string, _ smtp.RcptOptions) error {
109	if len(utd.ut.rcptFailures) > utd.ut.passedMessages {
110		rcptErrs := utd.ut.rcptFailures[utd.ut.passedMessages]
111		if err := rcptErrs[rcptTo]; err != nil {
112			return err
113		}
114	}
115
116	utd.msg.RcptTo = append(utd.msg.RcptTo, rcptTo)
117	return nil
118}
119
120func (utd *unreliableTargetDelivery) Body(ctx context.Context, header textproto.Header, body buffer.Buffer) error {
121	if utd.ut.bodyFailuresPartial != nil {
122		return errors.New("partial failure occurred, no additional information available")
123	}
124
125	r, _ := body.Open()
126	utd.msg.Body, _ = io.ReadAll(r)
127
128	if len(utd.ut.bodyFailures) > utd.ut.passedMessages {
129		return utd.ut.bodyFailures[utd.ut.passedMessages]
130	}
131
132	return nil
133}
134
135func (utd *unreliableTargetDeliveryPartial) BodyNonAtomic(ctx context.Context, c module.StatusCollector, header textproto.Header, body buffer.Buffer) {
136	r, _ := body.Open()
137	utd.msg.Body, _ = io.ReadAll(r)
138
139	if len(utd.ut.bodyFailuresPartial) > utd.ut.passedMessages {
140		for rcpt, err := range utd.ut.bodyFailuresPartial[utd.ut.passedMessages] {
141			c.SetStatus(rcpt, err)
142		}
143	}
144}
145
146func (utd *unreliableTargetDelivery) Abort(ctx context.Context) error {
147	utd.ut.passedMessages++
148	if utd.ut.aborted != nil {
149		utd.ut.aborted <- utd.msg
150	}
151	return nil
152}
153
154func (utd *unreliableTargetDelivery) Commit(ctx context.Context) error {
155	utd.ut.passedMessages++
156	if utd.ut.committed != nil {
157		utd.ut.committed <- utd.msg
158	}
159	return nil
160}
161
162func (ut *unreliableTarget) Start(ctx context.Context, msgMeta *module.MsgMetadata, mailFrom string) (module.Delivery, error) {
163	if ut.bodyFailuresPartial != nil {
164		return &unreliableTargetDeliveryPartial{
165			&unreliableTargetDelivery{
166				ut: ut,
167				msg: testutils.Msg{
168					MsgMeta:  msgMeta,
169					MailFrom: mailFrom,
170				},
171			},
172		}, nil
173	}
174	return &unreliableTargetDelivery{
175		ut: ut,
176		msg: testutils.Msg{
177			MsgMeta:  msgMeta,
178			MailFrom: mailFrom,
179		},
180	}, nil
181}
182
183func readMsgChanTimeout(t *testing.T, ch <-chan testutils.Msg, timeout time.Duration) *testutils.Msg {
184	t.Helper()
185	timer := time.NewTimer(timeout)
186	select {
187	case msg := <-ch:
188		return &msg
189	case <-timer.C:
190		t.Fatal("chan read timed out")
191		return nil
192	}
193}
194
195func checkQueueDir(t *testing.T, q *Queue, expectedIDs []string) {
196	t.Helper()
197	// We use the map to lookups and also to mark messages we found
198	// we can report missing entries.
199	expectedMap := make(map[string]bool, len(expectedIDs))
200	for _, id := range expectedIDs {
201		expectedMap[id] = false
202	}
203
204	dir, err := os.ReadDir(q.location)
205	if err != nil {
206		t.Fatalf("failed to read queue directory: %v", err)
207	}
208
209	// Queue implementation uses file names in the following format:
210	// DELIVERY_ID.SOMETHING
211	for _, file := range dir {
212		if file.IsDir() {
213			t.Fatalf("queue should not create subdirectories in the store, but there is %s dir in it", file.Name())
214		}
215
216		nameParts := strings.Split(file.Name(), ".")
217		if len(nameParts) != 2 {
218			t.Fatalf("did the queue files name format changed? got %s", file.Name())
219		}
220
221		_, ok := expectedMap[nameParts[0]]
222		if !ok {
223			t.Errorf("message with unexpected Msg ID %s is stored in queue store", nameParts[0])
224			continue
225		}
226
227		expectedMap[nameParts[0]] = true
228	}
229
230	for id, found := range expectedMap {
231		if !found {
232			t.Errorf("expected message with Msg ID %s is missing from queue store", id)
233		}
234	}
235}
236
237func TestQueueDelivery(t *testing.T) {
238	t.Parallel()
239
240	dt := unreliableTarget{committed: make(chan testutils.Msg, 10)}
241	q := newTestQueue(t, &dt)
242	defer cleanQueue(t, q)
243
244	testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"})
245
246	// Wait for the delivery to complete and stop processing.
247	msg := readMsgChanTimeout(t, dt.committed, 5*time.Second)
248	q.Close()
249
250	testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}, "")
251
252	// There should be no queued messages.
253	checkQueueDir(t, q, []string{})
254}
255
256func TestQueueDelivery_PermanentFail_NonPartial(t *testing.T) {
257	t.Parallel()
258
259	dt := unreliableTarget{
260		bodyFailures: []error{
261			exterrors.WithTemporary(errors.New("you shall not pass"), false),
262		},
263		aborted: make(chan testutils.Msg, 10),
264	}
265	q := newTestQueue(t, &dt)
266	defer cleanQueue(t, q)
267
268	testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"})
269
270	// Queue will abort a delivery if it fails for all recipients.
271	readMsgChanTimeout(t, dt.aborted, 5*time.Second)
272	q.Close()
273
274	// Delivery is failed permanently, hence no retry should be rescheduled.
275	checkQueueDir(t, q, []string{})
276}
277
278func TestQueueDelivery_PermanentFail_Partial(t *testing.T) {
279	t.Parallel()
280
281	dt := unreliableTarget{
282		bodyFailuresPartial: []map[string]error{
283			{
284				"tester1@example.org": exterrors.WithTemporary(errors.New("you shall not pass"), false),
285				"tester2@example.org": exterrors.WithTemporary(errors.New("you shall not pass"), false),
286			},
287		},
288		aborted: make(chan testutils.Msg, 10),
289	}
290	q := newTestQueue(t, &dt)
291	defer cleanQueue(t, q)
292
293	testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"})
294
295	// This this is similar to the previous test, but checks PartialDelivery processing logic.
296	// Here delivery fails for recipients too, but this is reported using PartialDelivery.
297
298	readMsgChanTimeout(t, dt.aborted, 5*time.Second)
299	q.Close()
300	checkQueueDir(t, q, []string{})
301}
302
303func TestQueueDelivery_TemporaryFail(t *testing.T) {
304	t.Parallel()
305
306	dt := unreliableTarget{
307		bodyFailures: []error{
308			exterrors.WithTemporary(errors.New("you shall not pass"), true),
309		},
310		aborted:   make(chan testutils.Msg, 10),
311		committed: make(chan testutils.Msg, 10),
312	}
313	q := newTestQueue(t, &dt)
314	defer cleanQueue(t, q)
315
316	testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"})
317
318	// Delivery should be aborted, because it failed for all recipients.
319	readMsgChanTimeout(t, dt.aborted, 5*time.Second)
320
321	// Second retry, should work fine.
322	msg := readMsgChanTimeout(t, dt.committed, 5*time.Second)
323	testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}, "")
324
325	q.Close()
326	// No more retries scheduled, queue storage is clear.
327	defer checkQueueDir(t, q, []string{})
328}
329
330func TestQueueDelivery_TemporaryFail_Partial(t *testing.T) {
331	t.Parallel()
332
333	dt := unreliableTarget{
334		bodyFailuresPartial: []map[string]error{
335			{
336				"tester2@example.org": exterrors.WithTemporary(errors.New("go away"), true),
337			},
338		},
339		aborted:   make(chan testutils.Msg, 10),
340		committed: make(chan testutils.Msg, 10),
341	}
342	q := newTestQueue(t, &dt)
343	defer cleanQueue(t, q)
344
345	testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"})
346
347	// Committed, tester1@example.org - ok.
348	msg := readMsgChanTimeout(t, dt.committed, 5000*time.Second)
349	// Side note: unreliableTarget adds recipients to the msg object even if they were rejected
350	// later using a partial error. So slice below is all recipients that were submitted by
351	// the queue.
352	testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"}, "")
353
354	// committed #2, tester2@example.org - ok
355	msg = readMsgChanTimeout(t, dt.committed, 5000*time.Second)
356	testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, "")
357
358	q.Close()
359	// No more retries scheduled, queue storage is clear.
360	checkQueueDir(t, q, []string{})
361}
362
363func TestQueueDelivery_MultipleAttempts(t *testing.T) {
364	t.Parallel()
365
366	dt := unreliableTarget{
367		bodyFailuresPartial: []map[string]error{
368			{
369				"tester1@example.org": exterrors.WithTemporary(errors.New("you shall not pass 1"), false),
370				"tester2@example.org": exterrors.WithTemporary(errors.New("you shall not pass 2"), true),
371			},
372			{
373				"tester2@example.org": exterrors.WithTemporary(errors.New("you shall not pass 3"), true),
374			},
375		},
376		committed: make(chan testutils.Msg, 10),
377		aborted:   make(chan testutils.Msg, 10),
378	}
379	q := newTestQueue(t, &dt)
380	defer cleanQueue(t, q)
381
382	testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org", "tester3@example.org"})
383
384	// Committed because delivery to tester3@example.org is succeeded.
385	msg := readMsgChanTimeout(t, dt.committed, 5*time.Second)
386	// Side note: This slice contains all recipients submitted by the queue, even if
387	// they were rejected later using partialError.
388	testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org", "tester2@example.org", "tester3@example.org"}, "")
389
390	// tester1 is failed permanently, should not be retried.
391	// tester2 is failed temporary, should be retried.
392	readMsgChanTimeout(t, dt.aborted, 5*time.Second)
393
394	// Third attempt... tester2 delivered.
395	msg = readMsgChanTimeout(t, dt.committed, 5*time.Second)
396	testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, "")
397
398	q.Close()
399	// No more retries should be scheduled.
400	checkQueueDir(t, q, []string{})
401}
402
403func TestQueueDelivery_PermanentRcptReject(t *testing.T) {
404	t.Parallel()
405
406	dt := unreliableTarget{
407		rcptFailures: []map[string]error{
408			{
409				"tester1@example.org": exterrors.WithTemporary(errors.New("go away"), false),
410			},
411		},
412		committed: make(chan testutils.Msg, 10),
413	}
414	q := newTestQueue(t, &dt)
415	defer cleanQueue(t, q)
416
417	testutils.DoTestDelivery(t, q, "tester@example.org", []string{"tester1@example.org", "tester2@example.org"})
418
419	// Committed, tester2@example.org succeeded.
420	msg := readMsgChanTimeout(t, dt.committed, 5*time.Second)
421	testutils.CheckMsgID(t, msg, "tester@example.org", []string{"tester2@example.org"}, "")
422
423	q.Close()
424	// No more retries should be scheduled.
425	checkQueueDir(t, q, []string{})
426}
427
428func TestQueueDelivery_TemporaryRcptReject(t *testing.T) {
429	t.Parallel()
430
431	dt := unreliableTarget{
432		rcptFailures: []map[string]error{
433			{
434				"tester1@example.org": exterrors.WithTemporary(errors.New("go away"), true),
435			},
436		},
437		committed: make(chan testutils.Msg, 10),
438	}
439	q := newTestQueue(t, &dt)
440	defer cleanQueue(t, q)
441
442	// First attempt:
443	//  tester1 - temp. fail
444	//  tester2 - ok
445	// Second attempt:
446	//  tester1 - ok
447	testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"})
448
449	msg := readMsgChanTimeout(t, dt.committed, 5*time.Second)
450	// Unlike previous tests where unreliableTarget rejected recipients by partialError, here they are rejected
451	// by AddRcpt directly, so they are NOT saved by the target.
452	testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, "")
453
454	msg = readMsgChanTimeout(t, dt.committed, 5*time.Second)
455	testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org"}, "")
456
457	q.Close()
458	// No more retries should be scheduled.
459	checkQueueDir(t, q, []string{})
460}
461
462func TestQueueDelivery_SerializationRoundtrip(t *testing.T) {
463	t.Parallel()
464
465	dt := unreliableTarget{
466		rcptFailures: []map[string]error{
467			{
468				"tester1@example.org": exterrors.WithTemporary(errors.New("go away"), true),
469			},
470		},
471		committed: make(chan testutils.Msg, 10),
472	}
473	q := newTestQueue(t, &dt)
474	defer cleanQueue(t, q)
475
476	// This is the most tricky test because it is racy and I have no idea what can be done to avoid it.
477	// It relies on us calling Close before queue msgpipeline decides to retry delivery.
478	// Hence retry delay is increased from 0ms used in other tests to make it reliable.
479	q.initialRetryTime = 1 * time.Second
480
481	// To make sure we will not time out due to post-init delay.
482	q.postInitDelay = 0
483
484	deliveryID := testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"})
485
486	// Standard partial delivery, retry will be scheduled for tester1@example.org.
487	msg := readMsgChanTimeout(t, dt.committed, 5*time.Second)
488	testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, "")
489
490	// Then stop it.
491	q.Close()
492
493	// Make sure it is saved.
494	checkQueueDir(t, q, []string{deliveryID})
495
496	// Then reinit it.
497	q = newTestQueueDir(t, &dt, q.location)
498
499	// Wait for retry and check it.
500	msg = readMsgChanTimeout(t, dt.committed, 5*time.Second)
501	testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester1@example.org"}, "")
502
503	// Close it again.
504	q.Close()
505	// No more retries should be scheduled.
506	checkQueueDir(t, q, []string{})
507}
508
509func TestQueueDelivery_DeserlizationCleanUp(t *testing.T) {
510	t.Parallel()
511
512	test := func(t *testing.T, fileSuffix string) {
513		dt := unreliableTarget{
514			rcptFailures: []map[string]error{
515				{
516					"tester1@example.org": exterrors.WithTemporary(errors.New("go away"), true),
517				},
518			},
519			committed: make(chan testutils.Msg, 10),
520		}
521		q := newTestQueue(t, &dt)
522		defer cleanQueue(t, q)
523
524		// This is the most tricky test because it is racy and I have no idea what can be done to avoid it.
525		// It relies on us calling Close before queue msgpipeline decides to retry delivery.
526		// Hence retry delay is increased from 0ms used in other tests to make it reliable.
527		q.initialRetryTime = 1 * time.Second
528
529		// To make sure we will not time out due to post-init delay.
530		q.postInitDelay = 0
531
532		deliveryID := testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"})
533
534		// Standard partial delivery, retry will be scheduled for tester1@example.org.
535		msg := readMsgChanTimeout(t, dt.committed, 5*time.Second)
536		testutils.CheckMsgID(t, msg, "tester@example.com", []string{"tester2@example.org"}, "")
537
538		q.Close()
539
540		if err := os.Remove(filepath.Join(q.location, deliveryID+fileSuffix)); err != nil {
541			t.Fatal(err)
542		}
543
544		// Dangling files should be removed during load.
545		q = newTestQueueDir(t, &dt, q.location)
546		q.Close()
547
548		// Nothing should be left.
549		checkQueueDir(t, q, []string{})
550	}
551
552	t.Run("NoMeta", func(t *testing.T) {
553		t.Skip("Not implemented")
554		test(t, ".meta")
555	})
556	t.Run("NoBody", func(t *testing.T) {
557		test(t, ".body")
558	})
559	t.Run("NoHeader", func(t *testing.T) {
560		test(t, ".header")
561	})
562}
563
564func TestQueueDelivery_AbortIfNoRecipients(t *testing.T) {
565	t.Parallel()
566
567	dt := unreliableTarget{
568		rcptFailures: []map[string]error{
569			{
570				"tester1@example.org": exterrors.WithTemporary(errors.New("go away"), true),
571				"tester2@example.org": exterrors.WithTemporary(errors.New("go away"), true),
572			},
573		},
574		committed: make(chan testutils.Msg, 10),
575		aborted:   make(chan testutils.Msg, 10),
576	}
577	q := newTestQueue(t, &dt)
578	defer cleanQueue(t, q)
579
580	testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"})
581	readMsgChanTimeout(t, dt.aborted, 5*time.Second)
582}
583
584func TestQueueDelivery_AbortNoDangling(t *testing.T) {
585	t.Parallel()
586
587	dt := unreliableTarget{
588		rcptFailures: []map[string]error{
589			{
590				"tester1@example.org": exterrors.WithTemporary(errors.New("go away"), true),
591				"tester2@example.org": exterrors.WithTemporary(errors.New("go away"), true),
592			},
593		},
594		committed: make(chan testutils.Msg, 10),
595		aborted:   make(chan testutils.Msg, 10),
596	}
597	q := newTestQueue(t, &dt)
598	defer cleanQueue(t, q)
599
600	// Copied from testutils.DoTestDelivery.
601	IDRaw := sha1.Sum([]byte(t.Name()))
602	encodedID := hex.EncodeToString(IDRaw[:])
603
604	body := buffer.MemoryBuffer{Slice: []byte("foobar\r\n")}
605	ctx := module.MsgMetadata{
606		DontTraceSender: true,
607		ID:              encodedID,
608	}
609	delivery, err := q.Start(context.Background(), &ctx, "test3@example.org")
610	if err != nil {
611		t.Fatalf("unexpected Start err: %v", err)
612	}
613	for _, rcpt := range [...]string{"test@example.org", "test2@example.org"} {
614		if err := delivery.AddRcpt(context.Background(), rcpt, smtp.RcptOptions{}); err != nil {
615			t.Fatalf("unexpected AddRcpt err for %s: %v", rcpt, err)
616		}
617	}
618	if err := delivery.Body(context.Background(), textproto.Header{}, body); err != nil {
619		t.Fatalf("unexpected Body err: %v", err)
620	}
621	if err := delivery.Abort(context.Background()); err != nil {
622		t.Fatalf("unexpected Abort err: %v", err)
623	}
624
625	checkQueueDir(t, q, []string{})
626}
627
628func TestQueueDSN(t *testing.T) {
629	t.Parallel()
630
631	dsnTarget := unreliableTarget{
632		committed: make(chan testutils.Msg, 10),
633		aborted:   make(chan testutils.Msg, 10),
634	}
635
636	dt := unreliableTarget{
637		rcptFailures: []map[string]error{
638			{
639				"tester1@example.org": exterrors.WithTemporary(errors.New("go away"), false),
640				"tester2@example.org": exterrors.WithTemporary(errors.New("go away"), false),
641			},
642		},
643		committed: make(chan testutils.Msg, 10),
644		aborted:   make(chan testutils.Msg, 10),
645	}
646	q := newTestQueue(t, &dt)
647	q.hostname = "mx.example.org"
648	q.autogenMsgDomain = "example.org"
649	q.dsnPipeline = &dsnTarget
650	defer cleanQueue(t, q)
651
652	testutils.DoTestDelivery(t, q, "tester@example.com", []string{"tester1@example.org", "tester2@example.org"})
653
654	// Wait for message delivery attempt to complete (aborted because all recipients fail).
655	readMsgChanTimeout(t, dt.aborted, 5*time.Second)
656
657	// Wait for DSN.
658	msg := readMsgChanTimeout(t, dsnTarget.committed, 5*time.Second)
659
660	if msg.MailFrom != "" {
661		t.Fatalf("wrong MAIL FROM address in DSN: %v", msg.MailFrom)
662	}
663	if !reflect.DeepEqual(msg.RcptTo, []string{"tester@example.com"}) {
664		t.Fatalf("wrong RCPT TO address in DSN: %v", msg.RcptTo)
665	}
666}
667
668func TestQueueDSN_FromEmptyAddr(t *testing.T) {
669	t.Parallel()
670
671	dsnTarget := unreliableTarget{
672		committed: make(chan testutils.Msg, 10),
673		aborted:   make(chan testutils.Msg, 10),
674	}
675
676	dt := unreliableTarget{
677		rcptFailures: []map[string]error{
678			{
679				"tester1@example.org": exterrors.WithTemporary(errors.New("go away"), false),
680				"tester2@example.org": exterrors.WithTemporary(errors.New("go away"), false),
681			},
682		},
683		committed: make(chan testutils.Msg, 10),
684		aborted:   make(chan testutils.Msg, 10),
685	}
686	q := newTestQueue(t, &dt)
687	q.hostname = "mx.example.org"
688	q.autogenMsgDomain = "example.org"
689	q.dsnPipeline = &dsnTarget
690	defer cleanQueue(t, q)
691
692	testutils.DoTestDelivery(t, q, "", []string{"tester1@example.org", "tester2@example.org"})
693
694	// Wait for message delivery attempt to complete (aborted because all recipients fail).
695	readMsgChanTimeout(t, dt.aborted, 5*time.Second)
696
697	time.Sleep(1 * time.Second)
698
699	// There should be no DSN for it.
700	if dsnTarget.passedMessages != 0 {
701		t.Errorf("dsnTarget accepted %d messages", dsnTarget.passedMessages)
702	}
703	checkQueueDir(t, q, []string{})
704}
705
706func TestQueueDSN_NoDSNforDSN(t *testing.T) {
707	t.Parallel()
708
709	dsnTarget := unreliableTarget{
710		rcptFailures: []map[string]error{
711			{
712				"tester@example.org": exterrors.WithTemporary(errors.New("go away"), false),
713			},
714		},
715		committed: make(chan testutils.Msg, 10),
716		aborted:   make(chan testutils.Msg, 10),
717	}
718
719	dt := unreliableTarget{
720		rcptFailures: []map[string]error{
721			{
722				"tester1@example.org": exterrors.WithTemporary(errors.New("go away"), false),
723				"tester2@example.org": exterrors.WithTemporary(errors.New("go away"), false),
724			},
725		},
726		committed: make(chan testutils.Msg, 10),
727		aborted:   make(chan testutils.Msg, 10),
728	}
729	q := newTestQueue(t, &dt)
730	q.hostname = "mx.example.org"
731	q.autogenMsgDomain = "example.org"
732	q.dsnPipeline = &dsnTarget
733	defer cleanQueue(t, q)
734
735	testutils.DoTestDelivery(t, q, "tester@example.org", []string{"tester1@example.org", "tester2@example.org"})
736
737	// Wait for message delivery attempt to complete (aborted because all recipients fail).
738	readMsgChanTimeout(t, dt.aborted, 5*time.Second)
739
740	// DSN will be emitted but will fail, so 'aborted'
741	readMsgChanTimeout(t, dsnTarget.aborted, 5*time.Second)
742
743	time.Sleep(1 * time.Second)
744
745	// There should be no DSN for DSN (dsnTarget handled one message - the DSN itself).
746	if dsnTarget.passedMessages != 1 {
747		t.Errorf("dsnTarget accepted %d messages", dsnTarget.passedMessages)
748	}
749	checkQueueDir(t, q, []string{})
750}
751
752func TestQueueDSN_RcptRewrite(t *testing.T) {
753	t.Parallel()
754
755	dsnTarget := unreliableTarget{
756		committed: make(chan testutils.Msg, 10),
757		aborted:   make(chan testutils.Msg, 10),
758	}
759
760	dt := unreliableTarget{
761		rcptFailures: []map[string]error{
762			{
763				"test@example.org":  exterrors.WithTemporary(errors.New("go away"), false),
764				"test2@example.org": exterrors.WithTemporary(errors.New("go away"), false),
765			},
766		},
767		committed: make(chan testutils.Msg, 10),
768		aborted:   make(chan testutils.Msg, 10),
769	}
770	q := newTestQueue(t, &dt)
771	q.hostname = "mx.example.org"
772	q.autogenMsgDomain = "example.org"
773	q.dsnPipeline = &dsnTarget
774	defer cleanQueue(t, q)
775
776	IDRaw := sha1.Sum([]byte(t.Name()))
777	encodedID := hex.EncodeToString(IDRaw[:])
778
779	body := buffer.MemoryBuffer{Slice: []byte("foobar\r\n")}
780	ctx := module.MsgMetadata{
781		DontTraceSender: true,
782		OriginalFrom:    "test3@example.org",
783		OriginalRcpts: map[string]string{
784			"test@example.org":  "test+public@example.com",
785			"test2@example.org": "test2+public@example.com",
786		},
787		ID: encodedID,
788	}
789	delivery, err := q.Start(context.Background(), &ctx, "test3@example.org")
790	if err != nil {
791		t.Fatalf("unexpected Start err: %v", err)
792	}
793	for _, rcpt := range [...]string{"test@example.org", "test2@example.org"} {
794		if err := delivery.AddRcpt(context.Background(), rcpt, smtp.RcptOptions{}); err != nil {
795			t.Fatalf("unexpected AddRcpt err for %s: %v", rcpt, err)
796		}
797	}
798	if err := delivery.Body(context.Background(), textproto.Header{}, body); err != nil {
799		t.Fatalf("unexpected Body err: %v", err)
800	}
801	if err := delivery.Commit(context.Background()); err != nil {
802		t.Fatalf("unexpected Commit err: %v", err)
803	}
804
805	// Wait for message delivery attempt to complete (aborted because all recipients fail).
806	readMsgChanTimeout(t, dt.aborted, 5*time.Second)
807
808	// Wait for DSN.
809	msg := readMsgChanTimeout(t, dsnTarget.committed, 5*time.Second)
810
811	if msg.MailFrom != "" {
812		t.Fatalf("wrong MAIL FROM address in DSN: %v", msg.MailFrom)
813	}
814	if !reflect.DeepEqual(msg.RcptTo, []string{"test3@example.org"}) {
815		t.Fatalf("wrong RCPT TO address in DSN: %v", msg.RcptTo)
816	}
817
818	if bytes.Contains(msg.Body, []byte("test@example.org")) || bytes.Contains(msg.Body, []byte("test2@example.org")) {
819		t.Errorf("DSN contents mention real final addresses")
820	}
821	if !bytes.Contains(msg.Body, []byte("test+public@example.com")) || !bytes.Contains(msg.Body, []byte("test2+public@example.com")) {
822		t.Errorf("DSN contents do not mention original addresses")
823	}
824}
825
826func init() {
827	dontRecover = true
828}