maddy

Fork https://github.com/foxcpp/maddy

git clone git://git.lin.moe/go/maddy.git

  1/*
  2Maddy Mail Server - Composable all-in-one email server.
  3Copyright © 2019-2020 Max Mazurov <fox.cpp@disroot.org>, Maddy Mail Server contributors
  4
  5This program is free software: you can redistribute it and/or modify
  6it under the terms of the GNU General Public License as published by
  7the Free Software Foundation, either version 3 of the License, or
  8(at your option) any later version.
  9
 10This program is distributed in the hope that it will be useful,
 11but WITHOUT ANY WARRANTY; without even the implied warranty of
 12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 13GNU General Public License for more details.
 14
 15You should have received a copy of the GNU General Public License
 16along with this program.  If not, see <https://www.gnu.org/licenses/>.
 17*/
 18
 19/*
 20Package queue implements module which keeps messages on disk and tries delivery
 21to the configured target (usually remote) multiple times until all recipients
 22are succeeded.
 23
 24Interfaces implemented:
 25- module.DeliveryTarget
 26
 27Implementation summary follows.
 28
 29All scheduled deliveries are attempted to the configured DeliveryTarget.
 30All metadata is preserved on disk.
 31
 32Failure status is determined on per-recipient basis:
 33  - Delivery.Start fail handled as a failure for all recipients.
 34  - Delivery.AddRcpt fail handled as a failure for the corresponding recipient.
 35  - Delivery.Body fail handled as a failure for all recipients.
 36  - If Delivery implements PartialDelivery, then
 37    PartialDelivery.BodyNonAtomic is used instead. Failures are determined based
 38    on StatusCollector.SetStatus calls done by target in this case.
 39
 40For each failure check is done to see if it is a permanent failure
 41or a temporary one. This is done using exterrors.IsTemporaryOrUnspec.
 42That is, errors are assumed to be temporary by default.
 43All errors are converted to SMTPError then due to a storage limitations.
 44
 45If there are any *temporary* failed recipients, delivery will be retried
 46after delay *only for these* recipients.
 47
 48Last error for each recipient is saved for reporting in NDN. A NDN is generated
 49if there are any failed recipients left after
 50last attempt to deliver the message.
 51
 52Amount of attempts for each message is limited to a certain configured number.
 53After last attempt, all recipients that are still temporary failing are assumed
 54to be permanently failed.
 55*/
 56package queue
 57
 58import (
 59	"bufio"
 60	"bytes"
 61	"context"
 62	"encoding/json"
 63	"errors"
 64	"fmt"
 65	"io"
 66	"math"
 67	"os"
 68	"path/filepath"
 69	"runtime"
 70	"runtime/debug"
 71	"runtime/trace"
 72	"strconv"
 73	"strings"
 74	"sync"
 75	"time"
 76
 77	"github.com/emersion/go-message/textproto"
 78	"github.com/emersion/go-smtp"
 79	"github.com/foxcpp/maddy/framework/buffer"
 80	"github.com/foxcpp/maddy/framework/config"
 81	modconfig "github.com/foxcpp/maddy/framework/config/module"
 82	"github.com/foxcpp/maddy/framework/exterrors"
 83	"github.com/foxcpp/maddy/framework/log"
 84	"github.com/foxcpp/maddy/framework/module"
 85	"github.com/foxcpp/maddy/internal/dsn"
 86	"github.com/foxcpp/maddy/internal/msgpipeline"
 87	"github.com/foxcpp/maddy/internal/target"
 88)
 89
 90// partialError describes state of partially successful message delivery.
 91type partialError struct {
 92
 93	// Underlying error objects for each recipient.
 94	Errs map[string]error
 95
 96	// Fields can be accessed without holding this lock, but only after
 97	// target.BodyNonAtomic/Body returns.
 98	statusLock *sync.Mutex
 99}
100
101// SetStatus implements module.StatusCollector so partialError can be
102// passed directly to PartialDelivery.BodyNonAtomic.
103func (pe *partialError) SetStatus(rcptTo string, err error) {
104	log.Debugf("PartialError.SetStatus(%s, %v)", rcptTo, err)
105	if err == nil {
106		return
107	}
108	pe.statusLock.Lock()
109	defer pe.statusLock.Unlock()
110	pe.Errs[rcptTo] = err
111}
112
113func (pe partialError) Error() string {
114	return fmt.Sprintf("delivery failed for some recipients: %v", pe.Errs)
115}
116
117// dontRecover controls the behavior of panic handlers, if it is set to true -
118// they are disabled and so tests will panic to avoid masking bugs.
119var dontRecover = false
120
121type Queue struct {
122	name             string
123	location         string
124	hostname         string
125	autogenMsgDomain string
126	wheel            *TimeWheel
127
128	dsnPipeline module.DeliveryTarget
129
130	// Retry delay is calculated using the following formula:
131	// initialRetryTime * retryTimeScale ^ (TriesCount - 1)
132
133	initialRetryTime time.Duration
134	retryTimeScale   float64
135	maxTries         int
136
137	// If any delivery is scheduled in less than postInitDelay
138	// after Init, its delay will be increased by postInitDelay.
139	//
140	// Say, if postInitDelay is 10 secs.
141	// Then if some message is scheduled to delivered 5 seconds
142	// after init, it will be actually delivered 15 seconds
143	// after start-up.
144	//
145	// This delay is added to make that if maddy is killed shortly
146	// after start-up for whatever reason it will not affect the queue.
147	postInitDelay time.Duration
148
149	Log    log.Logger
150	Target module.DeliveryTarget
151
152	deliveryWg sync.WaitGroup
153	// Buffered channel used to restrict count of deliveries attempted
154	// in parallel.
155	deliverySemaphore chan struct{}
156}
157
158type QueueMetadata struct {
159	MsgMeta *module.MsgMetadata
160	From    string
161
162	// Recipients that should be tried next.
163	// May or may not be equal to partialError.TemporaryFailed.
164	To []string
165
166	// Information about previous failures.
167	// Preserved to be included in a bounce message.
168	FailedRcpts          []string
169	TemporaryFailedRcpts []string
170	// All errors are converted to SMTPError we can serialize and
171	// also it is directly usable for bounce messages.
172	RcptErrs map[string]*smtp.SMTPError
173
174	// Amount of times delivery *already tried*.
175	TriesCount map[string]int
176
177	FirstAttempt time.Time
178	LastAttempt  time.Time
179}
180
181type queueSlot struct {
182	ID string
183
184	// If nil - Hdr and Body are invalid, all values should be read from
185	// disk.
186	Meta *QueueMetadata
187	Hdr  *textproto.Header
188	Body buffer.Buffer
189}
190
191func NewQueue(_, instName string, _, inlineArgs []string) (module.Module, error) {
192	q := &Queue{
193		name:             instName,
194		initialRetryTime: 15 * time.Minute,
195		retryTimeScale:   1.25,
196		postInitDelay:    10 * time.Second,
197		Log:              log.Logger{Name: "queue"},
198	}
199	switch len(inlineArgs) {
200	case 0:
201		// Not inline definition.
202	case 1:
203		q.location = inlineArgs[0]
204	default:
205		return nil, errors.New("queue: wrong amount of inline arguments")
206	}
207	return q, nil
208}
209
210func (q *Queue) Init(cfg *config.Map) error {
211	var maxParallelism int
212	cfg.Bool("debug", true, false, &q.Log.Debug)
213	cfg.Int("max_tries", false, false, 20, &q.maxTries)
214	cfg.Int("max_parallelism", false, false, 16, &maxParallelism)
215	cfg.String("location", false, false, q.location, &q.location)
216	cfg.Custom("target", false, true, nil, modconfig.DeliveryDirective, &q.Target)
217	cfg.String("hostname", true, true, "", &q.hostname)
218	cfg.String("autogenerated_msg_domain", true, false, "", &q.autogenMsgDomain)
219	cfg.Custom("bounce", false, false, nil, func(m *config.Map, node config.Node) (interface{}, error) {
220		return msgpipeline.New(m.Globals, node.Children)
221	}, &q.dsnPipeline)
222	if _, err := cfg.Process(); err != nil {
223		return err
224	}
225
226	if q.dsnPipeline != nil {
227		if q.autogenMsgDomain == "" {
228			return errors.New("queue: autogenerated_msg_domain is required if bounce {} is specified")
229		}
230
231		q.dsnPipeline.(*msgpipeline.MsgPipeline).Hostname = q.hostname
232		q.dsnPipeline.(*msgpipeline.MsgPipeline).Log = log.Logger{Name: "queue/pipeline", Debug: q.Log.Debug}
233	}
234	if q.location == "" && q.name == "" {
235		return errors.New("queue: need explicit location directive or inline argument if defined inline")
236	}
237	if q.location == "" {
238		q.location = filepath.Join(config.StateDirectory, q.name)
239	}
240
241	// TODO: Check location write permissions.
242	if err := os.MkdirAll(q.location, os.ModePerm); err != nil {
243		return err
244	}
245
246	return q.start(maxParallelism)
247}
248
249func (q *Queue) start(maxParallelism int) error {
250	q.wheel = NewTimeWheel(q.dispatch)
251	q.deliverySemaphore = make(chan struct{}, maxParallelism)
252
253	if err := q.readDiskQueue(); err != nil {
254		return err
255	}
256
257	q.Log.Debugf("delivery target: %T", q.Target)
258
259	return nil
260}
261
262func (q *Queue) Close() error {
263	q.wheel.Close()
264	q.deliveryWg.Wait()
265
266	return nil
267}
268
269// discardBroken changes the name of metadata file to have .meta_broken
270// extension.
271//
272// Further attempts to deliver (due to a timewheel) it will fail due to
273// non-existent meta-data file.
274//
275// No error handling is done since this function is called from panic handler.
276func (q *Queue) discardBroken(id string) {
277	err := os.Rename(filepath.Join(q.location, id+".meta"), filepath.Join(q.location, id+".meta_broken"))
278	if err != nil {
279		// Note: Global logger is used in case there is something wrong with Queue.Log.
280		log.Printf("can't mark the queue message as broken: %v", err)
281	}
282}
283
284func (q *Queue) dispatch(value TimeSlot) {
285	slot := value.Value.(queueSlot)
286
287	q.Log.Debugln("starting delivery for", slot.ID)
288
289	q.deliveryWg.Add(1)
290	go func() {
291		q.Log.Debugln("waiting on delivery semaphore for", slot.ID)
292		q.deliverySemaphore <- struct{}{}
293		defer func() {
294			<-q.deliverySemaphore
295			q.deliveryWg.Done()
296
297			if dontRecover {
298				return
299			}
300
301			if err := recover(); err != nil {
302				stack := debug.Stack()
303				log.Printf("panic during queue dispatch %s: %v\n%s", slot.ID, err, stack)
304				q.discardBroken(slot.ID)
305			}
306		}()
307
308		q.Log.Debugln("delivery semaphore acquired for", slot.ID)
309		var (
310			meta *QueueMetadata
311			hdr  textproto.Header
312			body buffer.Buffer
313		)
314		if slot.Meta == nil {
315			var err error
316			meta, hdr, body, err = q.openMessage(slot.ID)
317			if err != nil {
318				q.Log.Error("read message", err, slot.ID)
319				return
320			}
321			if meta == nil {
322				panic("wtf")
323			}
324		} else {
325			meta = slot.Meta
326			hdr = *slot.Hdr
327			body = slot.Body
328		}
329
330		q.tryDelivery(meta, hdr, body)
331	}()
332}
333
334func toSMTPErr(err error) *smtp.SMTPError {
335	if err == nil {
336		return nil
337	}
338
339	res := &smtp.SMTPError{
340		Code:         554,
341		EnhancedCode: smtp.EnhancedCode{5, 0, 0},
342		Message:      "Internal server error",
343	}
344
345	if exterrors.IsTemporaryOrUnspec(err) {
346		res.Code = 451
347		res.EnhancedCode = smtp.EnhancedCode{4, 0, 0}
348	}
349
350	ctxInfo := exterrors.Fields(err)
351	ctxCode, ok := ctxInfo["smtp_code"].(int)
352	if ok {
353		res.Code = ctxCode
354	}
355	ctxEnchCode, ok := ctxInfo["smtp_enchcode"].(smtp.EnhancedCode)
356	if ok {
357		res.EnhancedCode = ctxEnchCode
358	}
359	ctxMsg, ok := ctxInfo["smtp_msg"].(string)
360	if ok {
361		res.Message = ctxMsg
362	}
363
364	if smtpErr, ok := err.(*smtp.SMTPError); ok {
365		log.Printf("plain SMTP error returned, this is deprecated")
366		res.Code = smtpErr.Code
367		res.EnhancedCode = smtpErr.EnhancedCode
368		res.Message = smtpErr.Message
369	}
370
371	return res
372}
373
374func (q *Queue) tryDelivery(meta *QueueMetadata, header textproto.Header, body buffer.Buffer) {
375	dl := target.DeliveryLogger(q.Log, meta.MsgMeta)
376
377	partialErr := q.deliver(meta, header, body)
378	dl.Debugf("errors: %v", partialErr.Errs)
379
380	// While iterating the list of recipients we also pick the smallest tries count
381	// and use it to calculate the delay for the next attempt.
382	smallestTriesCount := 999999
383
384	if meta.TriesCount == nil {
385		meta.TriesCount = make(map[string]int)
386	}
387
388	// Check attempted recipients and corresponding errors.
389	// Split list into two parts: recipients that should be retried (newRcpts)
390	// and recipients DSN will be generated for.
391	newRcpts := make([]string, 0, len(partialErr.Errs))
392	failedRcpts := make([]string, 0, len(partialErr.Errs))
393	for _, rcpt := range meta.To {
394		rcptErr, ok := partialErr.Errs[rcpt]
395		if !ok {
396			dl.Msg("delivered", "rcpt", rcpt, "attempt", meta.TriesCount[rcpt]+1)
397			continue
398		}
399
400		// Save last error (either temporary or permanent) for reporting in the DSN.
401		dl.Error("delivery attempt failed", rcptErr, "rcpt", rcpt)
402		meta.RcptErrs[rcpt] = toSMTPErr(rcptErr)
403
404		temporary := exterrors.IsTemporaryOrUnspec(rcptErr)
405		if !temporary || meta.TriesCount[rcpt]+1 >= q.maxTries {
406			delete(meta.TriesCount, rcpt)
407			dl.Msg("not delivered, permanent error", "rcpt", rcpt)
408			failedRcpts = append(failedRcpts, rcpt)
409			continue
410		}
411
412		// Temporary error, increase tries counter and requeue.
413		meta.TriesCount[rcpt]++
414		newRcpts = append(newRcpts, rcpt)
415
416		// See smallestTriesCount comment.
417		if count := meta.TriesCount[rcpt]; count < smallestTriesCount {
418			smallestTriesCount = count
419		}
420	}
421
422	// Generate DSN for recipients that failed permanently this time.
423	if len(failedRcpts) != 0 {
424		q.emitDSN(meta, header, failedRcpts)
425	}
426	// No recipients to try, either all failed or all succeeded.
427	if len(newRcpts) == 0 {
428		q.removeFromDisk(meta.MsgMeta)
429		return
430	}
431
432	meta.To = newRcpts
433	meta.LastAttempt = time.Now()
434
435	if err := q.updateMetadataOnDisk(meta); err != nil {
436		dl.Error("meta-data update", err)
437	}
438
439	nextTryTime := time.Now()
440	// Delay between retries grows exponentally, the formula is:
441	// initialRetryTime * retryTimeScale ^ (smallestTriesCount - 1)
442	dl.Debugf("delay: %v * %v ^ (%v - 1)", q.initialRetryTime, q.retryTimeScale, smallestTriesCount)
443	scaleFactor := time.Duration(math.Pow(q.retryTimeScale, float64(smallestTriesCount-1)))
444	nextTryTime = nextTryTime.Add(q.initialRetryTime * scaleFactor)
445	dl.Msg("will retry",
446		"attempts_count", meta.TriesCount,
447		"next_try_delay", time.Until(nextTryTime),
448		"rcpts", meta.To)
449
450	q.wheel.Add(nextTryTime, queueSlot{
451		ID: meta.MsgMeta.ID,
452
453		// Do not keep (meta-)data in memory to reduce usage.  At this point,
454		// it is safe on disk and next try will reread it.
455		Meta: nil,
456		Hdr:  nil,
457		Body: nil,
458	})
459}
460
461func (q *Queue) deliver(meta *QueueMetadata, header textproto.Header, body buffer.Buffer) partialError {
462	dl := target.DeliveryLogger(q.Log, meta.MsgMeta)
463	perr := partialError{
464		Errs:       map[string]error{},
465		statusLock: new(sync.Mutex),
466	}
467
468	msgMeta := meta.MsgMeta.DeepCopy()
469	msgMeta.ID = msgMeta.ID + "-" + strconv.FormatInt(time.Now().Unix(), 16)
470	dl.Debugf("using message ID = %s", msgMeta.ID)
471
472	msgCtx, msgTask := trace.NewTask(context.Background(), "Queue delivery")
473	defer msgTask.End()
474
475	mailCtx, mailTask := trace.NewTask(msgCtx, "MAIL FROM")
476	delivery, err := q.Target.Start(mailCtx, msgMeta, meta.From)
477	mailTask.End()
478	if err != nil {
479		dl.Debugf("target.Start failed: %v", err)
480		for _, rcpt := range meta.To {
481			perr.Errs[rcpt] = err
482		}
483		return perr
484	}
485	dl.Debugf("target.Start OK")
486
487	var acceptedRcpts []string
488	for _, rcpt := range meta.To {
489		rcptCtx, rcptTask := trace.NewTask(msgCtx, "RCPT TO")
490		if err := delivery.AddRcpt(rcptCtx, rcpt, smtp.RcptOptions{} /* TODO: DSN support */); err != nil {
491			dl.Debugf("delivery.AddRcpt %s failed: %v", rcpt, err)
492			perr.Errs[rcpt] = err
493		} else {
494			dl.Debugf("delivery.AddRcpt %s OK", rcpt)
495			acceptedRcpts = append(acceptedRcpts, rcpt)
496		}
497		rcptTask.End()
498	}
499
500	if len(acceptedRcpts) == 0 {
501		dl.Debugf("delivery.Abort (no accepted recipients)")
502		if err := delivery.Abort(msgCtx); err != nil {
503			dl.Error("delivery.Abort failed", err)
504		}
505		return perr
506	}
507
508	expandToPartialErr := func(err error) {
509		for _, rcpt := range acceptedRcpts {
510			perr.Errs[rcpt] = err
511		}
512	}
513
514	bodyCtx, bodyTask := trace.NewTask(msgCtx, "DATA")
515	defer bodyTask.End()
516
517	partDelivery, ok := delivery.(module.PartialDelivery)
518	if ok {
519		dl.Debugf("using delivery.BodyNonAtomic")
520		partDelivery.BodyNonAtomic(bodyCtx, &perr, header, body)
521	} else {
522		if err := delivery.Body(bodyCtx, header, body); err != nil {
523			dl.Debugf("delivery.Body failed: %v", err)
524			expandToPartialErr(err)
525		}
526		dl.Debugf("delivery.Body OK")
527	}
528
529	allFailed := true
530	for _, rcpt := range acceptedRcpts {
531		if perr.Errs[rcpt] == nil {
532			allFailed = false
533		}
534	}
535	if allFailed {
536		// No recipients succeeded.
537		dl.Debugf("delivery.Abort (all recipients failed)")
538		if err := delivery.Abort(bodyCtx); err != nil {
539			dl.Msg("delivery.Abort failed", err)
540		}
541		return perr
542	}
543
544	if err := delivery.Commit(bodyCtx); err != nil {
545		dl.Debugf("delivery.Commit failed: %v", err)
546		expandToPartialErr(err)
547	}
548	dl.Debugf("delivery.Commit OK")
549
550	return perr
551}
552
553type queueDelivery struct {
554	q    *Queue
555	meta *QueueMetadata
556
557	header textproto.Header
558	body   buffer.Buffer
559}
560
561func (qd *queueDelivery) AddRcpt(ctx context.Context, rcptTo string, _ smtp.RcptOptions) error {
562	qd.meta.To = append(qd.meta.To, rcptTo)
563	return nil
564}
565
566func (qd *queueDelivery) Body(ctx context.Context, header textproto.Header, body buffer.Buffer) error {
567	defer trace.StartRegion(ctx, "queue/Body").End()
568
569	// Body buffer initially passed to us may not be valid after "delivery" to queue completes.
570	// storeNewMessage returns a new buffer object created from message blob stored on disk.
571	storedBody, err := qd.q.storeNewMessage(qd.meta, header, body)
572	if err != nil {
573		return err
574	}
575
576	qd.body = storedBody
577	qd.header = header
578	return nil
579}
580
581func (qd *queueDelivery) Abort(ctx context.Context) error {
582	defer trace.StartRegion(ctx, "queue/Abort").End()
583
584	if qd.body != nil {
585		qd.q.removeFromDisk(qd.meta.MsgMeta)
586	}
587	return nil
588}
589
590func (qd *queueDelivery) Commit(ctx context.Context) error {
591	defer trace.StartRegion(ctx, "queue/Commit").End()
592
593	if qd.meta == nil {
594		panic("queue: double Commit")
595	}
596
597	qd.q.wheel.Add(time.Time{}, queueSlot{
598		ID:   qd.meta.MsgMeta.ID,
599		Meta: qd.meta,
600		Hdr:  &qd.header,
601		Body: qd.body,
602	})
603	qd.meta = nil
604	qd.body = nil
605	return nil
606}
607
608func (q *Queue) Start(ctx context.Context, msgMeta *module.MsgMetadata, mailFrom string) (module.Delivery, error) {
609	meta := &QueueMetadata{
610		MsgMeta:      msgMeta,
611		From:         mailFrom,
612		RcptErrs:     map[string]*smtp.SMTPError{},
613		FirstAttempt: time.Now(),
614		LastAttempt:  time.Now(),
615	}
616	return &queueDelivery{q: q, meta: meta}, nil
617}
618
619func (q *Queue) removeFromDisk(msgMeta *module.MsgMetadata) {
620	id := msgMeta.ID
621	dl := target.DeliveryLogger(q.Log, msgMeta)
622
623	// Order is important.
624	// If we remove header and body but can't remove meta now - readDiskQueue
625	// will detect and report it.
626	headerPath := filepath.Join(q.location, id+".header")
627	if err := os.Remove(headerPath); err != nil {
628		dl.Error("failed to remove header from disk", err)
629	}
630	bodyPath := filepath.Join(q.location, id+".body")
631	if err := os.Remove(bodyPath); err != nil {
632		dl.Error("failed to remove body from disk", err)
633	}
634	metaPath := filepath.Join(q.location, id+".meta")
635	if err := os.Remove(metaPath); err != nil {
636		dl.Error("failed to remove meta-data from disk", err)
637	}
638	dl.Debugf("removed message from disk")
639}
640
641func (q *Queue) readDiskQueue() error {
642	dirInfo, err := os.ReadDir(q.location)
643	if err != nil {
644		return err
645	}
646
647	// TODO(GH #209): Rewrite this function to pass all sub-tests in TestQueueDelivery_DeserializationCleanUp/NoMeta.
648
649	loadedCount := 0
650	for _, entry := range dirInfo {
651		// We start loading from meta-data files and then check whether ID.header and ID.body exist.
652		// This allows us to properly detect dangling body files.
653		if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".meta") {
654			continue
655		}
656		id := entry.Name()[:len(entry.Name())-5]
657
658		meta, err := q.readMessageMeta(id)
659		if err != nil {
660			q.Log.Printf("failed to read meta-data, skipping: %v (msg ID = %s)", err, id)
661			continue
662		}
663
664		// Check header file existence.
665		if _, err := os.Stat(filepath.Join(q.location, id+".header")); err != nil {
666			if os.IsNotExist(err) {
667				q.Log.Printf("header file doesn't exist for msg ID = %s", id)
668				q.tryRemoveDanglingFile(id + ".meta")
669				q.tryRemoveDanglingFile(id + ".body")
670			} else {
671				q.Log.Printf("skipping nonstat'able header file: %v (msg ID = %s)", err, id)
672			}
673			continue
674		}
675
676		// Check body file existence.
677		if _, err := os.Stat(filepath.Join(q.location, id+".body")); err != nil {
678			if os.IsNotExist(err) {
679				q.Log.Printf("body file doesn't exist for msg ID = %s", id)
680				q.tryRemoveDanglingFile(id + ".meta")
681				q.tryRemoveDanglingFile(id + ".header")
682			} else {
683				q.Log.Printf("skipping nonstat'able body file: %v (msg ID = %s)", err, id)
684			}
685			continue
686		}
687
688		smallestTriesCount := 999999
689		for _, count := range meta.TriesCount {
690			if smallestTriesCount > count {
691				smallestTriesCount = count
692			}
693		}
694		nextTryTime := meta.LastAttempt
695		scaleFactor := time.Duration(math.Pow(q.retryTimeScale, float64(smallestTriesCount-1)))
696		nextTryTime = nextTryTime.Add(q.initialRetryTime * scaleFactor)
697
698		if time.Until(nextTryTime) < q.postInitDelay {
699			nextTryTime = time.Now().Add(q.postInitDelay)
700		}
701
702		q.Log.Debugf("will try to deliver (msg ID = %s) in %v (%v)", id, time.Until(nextTryTime), nextTryTime)
703		q.wheel.Add(nextTryTime, queueSlot{
704			ID: id,
705		})
706		loadedCount++
707	}
708
709	if loadedCount != 0 {
710		q.Log.Printf("loaded %d saved queue entries", loadedCount)
711	}
712
713	return nil
714}
715
716func (q *Queue) storeNewMessage(meta *QueueMetadata, header textproto.Header, body buffer.Buffer) (buffer.Buffer, error) {
717	id := meta.MsgMeta.ID
718
719	headerPath := filepath.Join(q.location, id+".header")
720	headerFile, err := os.Create(headerPath)
721	if err != nil {
722		return nil, err
723	}
724	defer headerFile.Close()
725
726	if err := textproto.WriteHeader(headerFile, header); err != nil {
727		q.tryRemoveDanglingFile(id + ".header")
728		return nil, err
729	}
730
731	bodyReader, err := body.Open()
732	if err != nil {
733		q.tryRemoveDanglingFile(id + ".header")
734		return nil, err
735	}
736	defer bodyReader.Close()
737
738	bodyPath := filepath.Join(q.location, id+".body")
739	bodyFile, err := os.Create(bodyPath)
740	if err != nil {
741		return nil, err
742	}
743	defer bodyFile.Close()
744
745	if _, err := io.Copy(bodyFile, bodyReader); err != nil {
746		q.tryRemoveDanglingFile(id + ".body")
747		q.tryRemoveDanglingFile(id + ".header")
748		return nil, err
749	}
750
751	if err := q.updateMetadataOnDisk(meta); err != nil {
752		q.tryRemoveDanglingFile(id + ".body")
753		q.tryRemoveDanglingFile(id + ".header")
754		return nil, err
755	}
756
757	if err := headerFile.Sync(); err != nil {
758		return nil, err
759	}
760
761	if err := bodyFile.Sync(); err != nil {
762		return nil, err
763	}
764
765	return buffer.FileBuffer{Path: bodyPath, LenHint: body.Len()}, nil
766}
767
768func (q *Queue) updateMetadataOnDisk(meta *QueueMetadata) error {
769	metaPath := filepath.Join(q.location, meta.MsgMeta.ID+".meta")
770
771	var file *os.File
772	var err error
773	if runtime.GOOS == "windows" {
774		file, err = os.Create(metaPath)
775		if err != nil {
776			return err
777		}
778	} else {
779		file, err = os.Create(metaPath + ".new")
780		if err != nil {
781			return err
782		}
783	}
784	defer file.Close()
785
786	metaCopy := *meta
787	metaCopy.MsgMeta = meta.MsgMeta.DeepCopy()
788	metaCopy.MsgMeta.Conn = nil
789
790	if err := json.NewEncoder(file).Encode(metaCopy); err != nil {
791		return err
792	}
793
794	if err := file.Sync(); err != nil {
795		return err
796	}
797
798	if runtime.GOOS != "windows" {
799		if err := os.Rename(metaPath+".new", metaPath); err != nil {
800			return err
801		}
802	}
803
804	return nil
805}
806
807func (q *Queue) readMessageMeta(id string) (*QueueMetadata, error) {
808	metaPath := filepath.Join(q.location, id+".meta")
809	file, err := os.Open(metaPath)
810	if err != nil {
811		return nil, err
812	}
813	defer file.Close()
814
815	meta := &QueueMetadata{}
816
817	meta.MsgMeta = &module.MsgMetadata{}
818
819	// There is a couple of problems we have to solve before we would be able to
820	// serialize ConnState.
821	// 1. future.Future can't be serialized.
822	// 2. net.Addr can't be deserialized because we don't know the concrete type.
823
824	if err := json.NewDecoder(file).Decode(meta); err != nil {
825		return nil, err
826	}
827
828	return meta, nil
829}
830
831type BufferedReadCloser struct {
832	*bufio.Reader
833	io.Closer
834}
835
836func (q *Queue) tryRemoveDanglingFile(name string) {
837	if err := os.Remove(filepath.Join(q.location, name)); err != nil {
838		q.Log.Error("dangling file remove failed", err)
839		return
840	}
841	q.Log.Printf("removed dangling file %s", name)
842}
843
844func (q *Queue) openMessage(id string) (*QueueMetadata, textproto.Header, buffer.Buffer, error) {
845	meta, err := q.readMessageMeta(id)
846	if err != nil {
847		return nil, textproto.Header{}, nil, err
848	}
849
850	bodyPath := filepath.Join(q.location, id+".body")
851	_, err = os.Stat(bodyPath)
852	if err != nil {
853		if os.IsNotExist(err) {
854			q.tryRemoveDanglingFile(id + ".meta")
855		}
856		return nil, textproto.Header{}, nil, err
857	}
858	body := buffer.FileBuffer{Path: bodyPath}
859
860	headerPath := filepath.Join(q.location, id+".header")
861	headerFile, err := os.Open(headerPath)
862	if err != nil {
863		if os.IsNotExist(err) {
864			q.tryRemoveDanglingFile(id + ".meta")
865			q.tryRemoveDanglingFile(id + ".body")
866		}
867		return nil, textproto.Header{}, nil, err
868	}
869
870	bufferedHeader := bufio.NewReader(headerFile)
871	header, err := textproto.ReadHeader(bufferedHeader)
872	if err != nil {
873		return nil, textproto.Header{}, nil, err
874	}
875
876	return meta, header, body, nil
877}
878
879func (q *Queue) InstanceName() string {
880	return q.name
881}
882
883func (q *Queue) Name() string {
884	return "queue"
885}
886
887func (q *Queue) emitDSN(meta *QueueMetadata, header textproto.Header, failedRcpts []string) {
888	// If, apparently, we have no DSN msgpipeline configured - do nothing.
889	if q.dsnPipeline == nil {
890		return
891	}
892
893	// Null return-path, used in DSNs.
894	if meta.MsgMeta.OriginalFrom == "" {
895		return
896	}
897
898	dsnID, err := module.GenerateMsgID()
899	if err != nil {
900		q.Log.Error("rand.Rand error", err)
901		return
902	}
903
904	dsnEnvelope := dsn.Envelope{
905		MsgID: "<" + dsnID + "@" + q.autogenMsgDomain + ">",
906		From:  "MAILER-DAEMON@" + q.autogenMsgDomain,
907		To:    meta.MsgMeta.OriginalFrom,
908	}
909	mtaInfo := dsn.ReportingMTAInfo{
910		ReportingMTA:    q.hostname,
911		XSender:         meta.From,
912		XMessageID:      meta.MsgMeta.ID,
913		ArrivalDate:     meta.FirstAttempt,
914		LastAttemptDate: meta.LastAttempt,
915	}
916	if !meta.MsgMeta.DontTraceSender && meta.MsgMeta.Conn != nil {
917		mtaInfo.ReceivedFromMTA = meta.MsgMeta.Conn.Hostname
918	}
919
920	rcptInfo := make([]dsn.RecipientInfo, 0, len(meta.RcptErrs))
921	for _, rcpt := range failedRcpts {
922		rcptErr := meta.RcptErrs[rcpt]
923		// rcptErr is stored in RcptErrs using the effective recipient address,
924		// not the original one.
925
926		originalRcpt := meta.MsgMeta.OriginalRcpts[rcpt]
927		if originalRcpt != "" {
928			rcpt = originalRcpt
929		}
930
931		rcptInfo = append(rcptInfo, dsn.RecipientInfo{
932			FinalRecipient: rcpt,
933			Action:         dsn.ActionFailed,
934			Status:         rcptErr.EnhancedCode,
935			DiagnosticCode: rcptErr,
936		})
937	}
938
939	var dsnBodyBlob bytes.Buffer
940	dl := target.DeliveryLogger(q.Log, meta.MsgMeta)
941	dsnHeader, err := dsn.GenerateDSN(meta.MsgMeta.SMTPOpts.UTF8, dsnEnvelope, mtaInfo, rcptInfo, header, &dsnBodyBlob)
942	if err != nil {
943		dl.Error("failed to generate fail DSN", err)
944		return
945	}
946	dsnBody := buffer.MemoryBuffer{Slice: dsnBodyBlob.Bytes()}
947
948	dsnMeta := &module.MsgMetadata{
949		ID: dsnID,
950		SMTPOpts: smtp.MailOptions{
951			UTF8:       meta.MsgMeta.SMTPOpts.UTF8,
952			RequireTLS: meta.MsgMeta.SMTPOpts.RequireTLS,
953		},
954	}
955	dl.Msg("generated failed DSN", "dsn_id", dsnID)
956
957	msgCtx, msgTask := trace.NewTask(context.Background(), "DSN Delivery")
958	defer msgTask.End()
959
960	mailCtx, mailTask := trace.NewTask(msgCtx, "MAIL FROM")
961	dsnDelivery, err := q.dsnPipeline.Start(mailCtx, dsnMeta, "")
962	mailTask.End()
963	if err != nil {
964		dl.Error("failed to enqueue DSN", err, "dsn_id", dsnID)
965		return
966	}
967
968	defer func() {
969		if err != nil {
970			dl.Error("failed to enqueue DSN", err, "dsn_id", dsnID)
971			if err := dsnDelivery.Abort(msgCtx); err != nil {
972				dl.Error("failed to abort DSN delivery", err, "dsn_id", dsnID)
973			}
974		}
975	}()
976
977	rcptCtx, rcptTask := trace.NewTask(msgCtx, "RCPT TO")
978	if err = dsnDelivery.AddRcpt(rcptCtx, meta.From, smtp.RcptOptions{}); err != nil {
979		rcptTask.End()
980		return
981	}
982	rcptTask.End()
983
984	bodyCtx, bodyTask := trace.NewTask(msgCtx, "DATA")
985	if err = dsnDelivery.Body(bodyCtx, dsnHeader, dsnBody); err != nil {
986		bodyTask.End()
987		return
988	}
989	if err = dsnDelivery.Commit(bodyCtx); err != nil {
990		bodyTask.End()
991		return
992	}
993	bodyTask.End()
994}
995
996func init() {
997	module.Register("target.queue", NewQueue)
998}