1/*2Maddy Mail Server - Composable all-in-one email server.3Copyright © 2019-2020 Max Mazurov <fox.cpp@disroot.org>, Maddy Mail Server contributors45This program is free software: you can redistribute it and/or modify6it under the terms of the GNU General Public License as published by7the Free Software Foundation, either version 3 of the License, or8(at your option) any later version.910This program is distributed in the hope that it will be useful,11but WITHOUT ANY WARRANTY; without even the implied warranty of12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the13GNU General Public License for more details.1415You should have received a copy of the GNU General Public License16along with this program. If not, see <https://www.gnu.org/licenses/>.17*/1819/*20Package queue implements module which keeps messages on disk and tries delivery21to the configured target (usually remote) multiple times until all recipients22are succeeded.2324Interfaces implemented:25- module.DeliveryTarget2627Implementation summary follows.2829All scheduled deliveries are attempted to the configured DeliveryTarget.30All metadata is preserved on disk.3132Failure status is determined on per-recipient basis:33 - Delivery.Start fail handled as a failure for all recipients.34 - Delivery.AddRcpt fail handled as a failure for the corresponding recipient.35 - Delivery.Body fail handled as a failure for all recipients.36 - If Delivery implements PartialDelivery, then37 PartialDelivery.BodyNonAtomic is used instead. Failures are determined based38 on StatusCollector.SetStatus calls done by target in this case.3940For each failure check is done to see if it is a permanent failure41or a temporary one. This is done using exterrors.IsTemporaryOrUnspec.42That is, errors are assumed to be temporary by default.43All errors are converted to SMTPError then due to a storage limitations.4445If there are any *temporary* failed recipients, delivery will be retried46after delay *only for these* recipients.4748Last error for each recipient is saved for reporting in NDN. A NDN is generated49if there are any failed recipients left after50last attempt to deliver the message.5152Amount of attempts for each message is limited to a certain configured number.53After last attempt, all recipients that are still temporary failing are assumed54to be permanently failed.55*/56package queue5758import (59 "bufio"60 "bytes"61 "context"62 "encoding/json"63 "errors"64 "fmt"65 "io"66 "math"67 "os"68 "path/filepath"69 "runtime"70 "runtime/debug"71 "runtime/trace"72 "strconv"73 "strings"74 "sync"75 "time"7677 "github.com/emersion/go-message/textproto"78 "github.com/emersion/go-smtp"79 "github.com/foxcpp/maddy/framework/buffer"80 "github.com/foxcpp/maddy/framework/config"81 modconfig "github.com/foxcpp/maddy/framework/config/module"82 "github.com/foxcpp/maddy/framework/exterrors"83 "github.com/foxcpp/maddy/framework/log"84 "github.com/foxcpp/maddy/framework/module"85 "github.com/foxcpp/maddy/internal/dsn"86 "github.com/foxcpp/maddy/internal/msgpipeline"87 "github.com/foxcpp/maddy/internal/target"88)8990// partialError describes state of partially successful message delivery.91type partialError struct {9293 // Underlying error objects for each recipient.94 Errs map[string]error9596 // Fields can be accessed without holding this lock, but only after97 // target.BodyNonAtomic/Body returns.98 statusLock *sync.Mutex99}100101// SetStatus implements module.StatusCollector so partialError can be102// passed directly to PartialDelivery.BodyNonAtomic.103func (pe *partialError) SetStatus(rcptTo string, err error) {104 log.Debugf("PartialError.SetStatus(%s, %v)", rcptTo, err)105 if err == nil {106 return107 }108 pe.statusLock.Lock()109 defer pe.statusLock.Unlock()110 pe.Errs[rcptTo] = err111}112113func (pe partialError) Error() string {114 return fmt.Sprintf("delivery failed for some recipients: %v", pe.Errs)115}116117// dontRecover controls the behavior of panic handlers, if it is set to true -118// they are disabled and so tests will panic to avoid masking bugs.119var dontRecover = false120121type Queue struct {122 name string123 location string124 hostname string125 autogenMsgDomain string126 wheel *TimeWheel127128 dsnPipeline module.DeliveryTarget129130 // Retry delay is calculated using the following formula:131 // initialRetryTime * retryTimeScale ^ (TriesCount - 1)132133 initialRetryTime time.Duration134 retryTimeScale float64135 maxTries int136137 // If any delivery is scheduled in less than postInitDelay138 // after Init, its delay will be increased by postInitDelay.139 //140 // Say, if postInitDelay is 10 secs.141 // Then if some message is scheduled to delivered 5 seconds142 // after init, it will be actually delivered 15 seconds143 // after start-up.144 //145 // This delay is added to make that if maddy is killed shortly146 // after start-up for whatever reason it will not affect the queue.147 postInitDelay time.Duration148149 Log log.Logger150 Target module.DeliveryTarget151152 deliveryWg sync.WaitGroup153 // Buffered channel used to restrict count of deliveries attempted154 // in parallel.155 deliverySemaphore chan struct{}156}157158type QueueMetadata struct {159 MsgMeta *module.MsgMetadata160 From string161162 // Recipients that should be tried next.163 // May or may not be equal to partialError.TemporaryFailed.164 To []string165166 // Information about previous failures.167 // Preserved to be included in a bounce message.168 FailedRcpts []string169 TemporaryFailedRcpts []string170 // All errors are converted to SMTPError we can serialize and171 // also it is directly usable for bounce messages.172 RcptErrs map[string]*smtp.SMTPError173174 // Amount of times delivery *already tried*.175 TriesCount map[string]int176177 FirstAttempt time.Time178 LastAttempt time.Time179}180181type queueSlot struct {182 ID string183184 // If nil - Hdr and Body are invalid, all values should be read from185 // disk.186 Meta *QueueMetadata187 Hdr *textproto.Header188 Body buffer.Buffer189}190191func NewQueue(_, instName string, _, inlineArgs []string) (module.Module, error) {192 q := &Queue{193 name: instName,194 initialRetryTime: 15 * time.Minute,195 retryTimeScale: 1.25,196 postInitDelay: 10 * time.Second,197 Log: log.Logger{Name: "queue"},198 }199 switch len(inlineArgs) {200 case 0:201 // Not inline definition.202 case 1:203 q.location = inlineArgs[0]204 default:205 return nil, errors.New("queue: wrong amount of inline arguments")206 }207 return q, nil208}209210func (q *Queue) Init(cfg *config.Map) error {211 var maxParallelism int212 cfg.Bool("debug", true, false, &q.Log.Debug)213 cfg.Int("max_tries", false, false, 20, &q.maxTries)214 cfg.Int("max_parallelism", false, false, 16, &maxParallelism)215 cfg.String("location", false, false, q.location, &q.location)216 cfg.Custom("target", false, true, nil, modconfig.DeliveryDirective, &q.Target)217 cfg.String("hostname", true, true, "", &q.hostname)218 cfg.String("autogenerated_msg_domain", true, false, "", &q.autogenMsgDomain)219 cfg.Custom("bounce", false, false, nil, func(m *config.Map, node config.Node) (interface{}, error) {220 return msgpipeline.New(m.Globals, node.Children)221 }, &q.dsnPipeline)222 if _, err := cfg.Process(); err != nil {223 return err224 }225226 if q.dsnPipeline != nil {227 if q.autogenMsgDomain == "" {228 return errors.New("queue: autogenerated_msg_domain is required if bounce {} is specified")229 }230231 q.dsnPipeline.(*msgpipeline.MsgPipeline).Hostname = q.hostname232 q.dsnPipeline.(*msgpipeline.MsgPipeline).Log = log.Logger{Name: "queue/pipeline", Debug: q.Log.Debug}233 }234 if q.location == "" && q.name == "" {235 return errors.New("queue: need explicit location directive or inline argument if defined inline")236 }237 if q.location == "" {238 q.location = filepath.Join(config.StateDirectory, q.name)239 }240241 // TODO: Check location write permissions.242 if err := os.MkdirAll(q.location, os.ModePerm); err != nil {243 return err244 }245246 return q.start(maxParallelism)247}248249func (q *Queue) start(maxParallelism int) error {250 q.wheel = NewTimeWheel(q.dispatch)251 q.deliverySemaphore = make(chan struct{}, maxParallelism)252253 if err := q.readDiskQueue(); err != nil {254 return err255 }256257 q.Log.Debugf("delivery target: %T", q.Target)258259 return nil260}261262func (q *Queue) Close() error {263 q.wheel.Close()264 q.deliveryWg.Wait()265266 return nil267}268269// discardBroken changes the name of metadata file to have .meta_broken270// extension.271//272// Further attempts to deliver (due to a timewheel) it will fail due to273// non-existent meta-data file.274//275// No error handling is done since this function is called from panic handler.276func (q *Queue) discardBroken(id string) {277 err := os.Rename(filepath.Join(q.location, id+".meta"), filepath.Join(q.location, id+".meta_broken"))278 if err != nil {279 // Note: Global logger is used in case there is something wrong with Queue.Log.280 log.Printf("can't mark the queue message as broken: %v", err)281 }282}283284func (q *Queue) dispatch(value TimeSlot) {285 slot := value.Value.(queueSlot)286287 q.Log.Debugln("starting delivery for", slot.ID)288289 q.deliveryWg.Add(1)290 go func() {291 q.Log.Debugln("waiting on delivery semaphore for", slot.ID)292 q.deliverySemaphore <- struct{}{}293 defer func() {294 <-q.deliverySemaphore295 q.deliveryWg.Done()296297 if dontRecover {298 return299 }300301 if err := recover(); err != nil {302 stack := debug.Stack()303 log.Printf("panic during queue dispatch %s: %v\n%s", slot.ID, err, stack)304 q.discardBroken(slot.ID)305 }306 }()307308 q.Log.Debugln("delivery semaphore acquired for", slot.ID)309 var (310 meta *QueueMetadata311 hdr textproto.Header312 body buffer.Buffer313 )314 if slot.Meta == nil {315 var err error316 meta, hdr, body, err = q.openMessage(slot.ID)317 if err != nil {318 q.Log.Error("read message", err, slot.ID)319 return320 }321 if meta == nil {322 panic("wtf")323 }324 } else {325 meta = slot.Meta326 hdr = *slot.Hdr327 body = slot.Body328 }329330 q.tryDelivery(meta, hdr, body)331 }()332}333334func toSMTPErr(err error) *smtp.SMTPError {335 if err == nil {336 return nil337 }338339 res := &smtp.SMTPError{340 Code: 554,341 EnhancedCode: smtp.EnhancedCode{5, 0, 0},342 Message: "Internal server error",343 }344345 if exterrors.IsTemporaryOrUnspec(err) {346 res.Code = 451347 res.EnhancedCode = smtp.EnhancedCode{4, 0, 0}348 }349350 ctxInfo := exterrors.Fields(err)351 ctxCode, ok := ctxInfo["smtp_code"].(int)352 if ok {353 res.Code = ctxCode354 }355 ctxEnchCode, ok := ctxInfo["smtp_enchcode"].(smtp.EnhancedCode)356 if ok {357 res.EnhancedCode = ctxEnchCode358 }359 ctxMsg, ok := ctxInfo["smtp_msg"].(string)360 if ok {361 res.Message = ctxMsg362 }363364 if smtpErr, ok := err.(*smtp.SMTPError); ok {365 log.Printf("plain SMTP error returned, this is deprecated")366 res.Code = smtpErr.Code367 res.EnhancedCode = smtpErr.EnhancedCode368 res.Message = smtpErr.Message369 }370371 return res372}373374func (q *Queue) tryDelivery(meta *QueueMetadata, header textproto.Header, body buffer.Buffer) {375 dl := target.DeliveryLogger(q.Log, meta.MsgMeta)376377 partialErr := q.deliver(meta, header, body)378 dl.Debugf("errors: %v", partialErr.Errs)379380 // While iterating the list of recipients we also pick the smallest tries count381 // and use it to calculate the delay for the next attempt.382 smallestTriesCount := 999999383384 if meta.TriesCount == nil {385 meta.TriesCount = make(map[string]int)386 }387388 // Check attempted recipients and corresponding errors.389 // Split list into two parts: recipients that should be retried (newRcpts)390 // and recipients DSN will be generated for.391 newRcpts := make([]string, 0, len(partialErr.Errs))392 failedRcpts := make([]string, 0, len(partialErr.Errs))393 for _, rcpt := range meta.To {394 rcptErr, ok := partialErr.Errs[rcpt]395 if !ok {396 dl.Msg("delivered", "rcpt", rcpt, "attempt", meta.TriesCount[rcpt]+1)397 continue398 }399400 // Save last error (either temporary or permanent) for reporting in the DSN.401 dl.Error("delivery attempt failed", rcptErr, "rcpt", rcpt)402 meta.RcptErrs[rcpt] = toSMTPErr(rcptErr)403404 temporary := exterrors.IsTemporaryOrUnspec(rcptErr)405 if !temporary || meta.TriesCount[rcpt]+1 >= q.maxTries {406 delete(meta.TriesCount, rcpt)407 dl.Msg("not delivered, permanent error", "rcpt", rcpt)408 failedRcpts = append(failedRcpts, rcpt)409 continue410 }411412 // Temporary error, increase tries counter and requeue.413 meta.TriesCount[rcpt]++414 newRcpts = append(newRcpts, rcpt)415416 // See smallestTriesCount comment.417 if count := meta.TriesCount[rcpt]; count < smallestTriesCount {418 smallestTriesCount = count419 }420 }421422 // Generate DSN for recipients that failed permanently this time.423 if len(failedRcpts) != 0 {424 q.emitDSN(meta, header, failedRcpts)425 }426 // No recipients to try, either all failed or all succeeded.427 if len(newRcpts) == 0 {428 q.removeFromDisk(meta.MsgMeta)429 return430 }431432 meta.To = newRcpts433 meta.LastAttempt = time.Now()434435 if err := q.updateMetadataOnDisk(meta); err != nil {436 dl.Error("meta-data update", err)437 }438439 nextTryTime := time.Now()440 // Delay between retries grows exponentally, the formula is:441 // initialRetryTime * retryTimeScale ^ (smallestTriesCount - 1)442 dl.Debugf("delay: %v * %v ^ (%v - 1)", q.initialRetryTime, q.retryTimeScale, smallestTriesCount)443 scaleFactor := time.Duration(math.Pow(q.retryTimeScale, float64(smallestTriesCount-1)))444 nextTryTime = nextTryTime.Add(q.initialRetryTime * scaleFactor)445 dl.Msg("will retry",446 "attempts_count", meta.TriesCount,447 "next_try_delay", time.Until(nextTryTime),448 "rcpts", meta.To)449450 q.wheel.Add(nextTryTime, queueSlot{451 ID: meta.MsgMeta.ID,452453 // Do not keep (meta-)data in memory to reduce usage. At this point,454 // it is safe on disk and next try will reread it.455 Meta: nil,456 Hdr: nil,457 Body: nil,458 })459}460461func (q *Queue) deliver(meta *QueueMetadata, header textproto.Header, body buffer.Buffer) partialError {462 dl := target.DeliveryLogger(q.Log, meta.MsgMeta)463 perr := partialError{464 Errs: map[string]error{},465 statusLock: new(sync.Mutex),466 }467468 msgMeta := meta.MsgMeta.DeepCopy()469 msgMeta.ID = msgMeta.ID + "-" + strconv.FormatInt(time.Now().Unix(), 16)470 dl.Debugf("using message ID = %s", msgMeta.ID)471472 msgCtx, msgTask := trace.NewTask(context.Background(), "Queue delivery")473 defer msgTask.End()474475 mailCtx, mailTask := trace.NewTask(msgCtx, "MAIL FROM")476 delivery, err := q.Target.Start(mailCtx, msgMeta, meta.From)477 mailTask.End()478 if err != nil {479 dl.Debugf("target.Start failed: %v", err)480 for _, rcpt := range meta.To {481 perr.Errs[rcpt] = err482 }483 return perr484 }485 dl.Debugf("target.Start OK")486487 var acceptedRcpts []string488 for _, rcpt := range meta.To {489 rcptCtx, rcptTask := trace.NewTask(msgCtx, "RCPT TO")490 if err := delivery.AddRcpt(rcptCtx, rcpt, smtp.RcptOptions{} /* TODO: DSN support */); err != nil {491 dl.Debugf("delivery.AddRcpt %s failed: %v", rcpt, err)492 perr.Errs[rcpt] = err493 } else {494 dl.Debugf("delivery.AddRcpt %s OK", rcpt)495 acceptedRcpts = append(acceptedRcpts, rcpt)496 }497 rcptTask.End()498 }499500 if len(acceptedRcpts) == 0 {501 dl.Debugf("delivery.Abort (no accepted recipients)")502 if err := delivery.Abort(msgCtx); err != nil {503 dl.Error("delivery.Abort failed", err)504 }505 return perr506 }507508 expandToPartialErr := func(err error) {509 for _, rcpt := range acceptedRcpts {510 perr.Errs[rcpt] = err511 }512 }513514 bodyCtx, bodyTask := trace.NewTask(msgCtx, "DATA")515 defer bodyTask.End()516517 partDelivery, ok := delivery.(module.PartialDelivery)518 if ok {519 dl.Debugf("using delivery.BodyNonAtomic")520 partDelivery.BodyNonAtomic(bodyCtx, &perr, header, body)521 } else {522 if err := delivery.Body(bodyCtx, header, body); err != nil {523 dl.Debugf("delivery.Body failed: %v", err)524 expandToPartialErr(err)525 }526 dl.Debugf("delivery.Body OK")527 }528529 allFailed := true530 for _, rcpt := range acceptedRcpts {531 if perr.Errs[rcpt] == nil {532 allFailed = false533 }534 }535 if allFailed {536 // No recipients succeeded.537 dl.Debugf("delivery.Abort (all recipients failed)")538 if err := delivery.Abort(bodyCtx); err != nil {539 dl.Msg("delivery.Abort failed", err)540 }541 return perr542 }543544 if err := delivery.Commit(bodyCtx); err != nil {545 dl.Debugf("delivery.Commit failed: %v", err)546 expandToPartialErr(err)547 }548 dl.Debugf("delivery.Commit OK")549550 return perr551}552553type queueDelivery struct {554 q *Queue555 meta *QueueMetadata556557 header textproto.Header558 body buffer.Buffer559}560561func (qd *queueDelivery) AddRcpt(ctx context.Context, rcptTo string, _ smtp.RcptOptions) error {562 qd.meta.To = append(qd.meta.To, rcptTo)563 return nil564}565566func (qd *queueDelivery) Body(ctx context.Context, header textproto.Header, body buffer.Buffer) error {567 defer trace.StartRegion(ctx, "queue/Body").End()568569 // Body buffer initially passed to us may not be valid after "delivery" to queue completes.570 // storeNewMessage returns a new buffer object created from message blob stored on disk.571 storedBody, err := qd.q.storeNewMessage(qd.meta, header, body)572 if err != nil {573 return err574 }575576 qd.body = storedBody577 qd.header = header578 return nil579}580581func (qd *queueDelivery) Abort(ctx context.Context) error {582 defer trace.StartRegion(ctx, "queue/Abort").End()583584 if qd.body != nil {585 qd.q.removeFromDisk(qd.meta.MsgMeta)586 }587 return nil588}589590func (qd *queueDelivery) Commit(ctx context.Context) error {591 defer trace.StartRegion(ctx, "queue/Commit").End()592593 if qd.meta == nil {594 panic("queue: double Commit")595 }596597 qd.q.wheel.Add(time.Time{}, queueSlot{598 ID: qd.meta.MsgMeta.ID,599 Meta: qd.meta,600 Hdr: &qd.header,601 Body: qd.body,602 })603 qd.meta = nil604 qd.body = nil605 return nil606}607608func (q *Queue) Start(ctx context.Context, msgMeta *module.MsgMetadata, mailFrom string) (module.Delivery, error) {609 meta := &QueueMetadata{610 MsgMeta: msgMeta,611 From: mailFrom,612 RcptErrs: map[string]*smtp.SMTPError{},613 FirstAttempt: time.Now(),614 LastAttempt: time.Now(),615 }616 return &queueDelivery{q: q, meta: meta}, nil617}618619func (q *Queue) removeFromDisk(msgMeta *module.MsgMetadata) {620 id := msgMeta.ID621 dl := target.DeliveryLogger(q.Log, msgMeta)622623 // Order is important.624 // If we remove header and body but can't remove meta now - readDiskQueue625 // will detect and report it.626 headerPath := filepath.Join(q.location, id+".header")627 if err := os.Remove(headerPath); err != nil {628 dl.Error("failed to remove header from disk", err)629 }630 bodyPath := filepath.Join(q.location, id+".body")631 if err := os.Remove(bodyPath); err != nil {632 dl.Error("failed to remove body from disk", err)633 }634 metaPath := filepath.Join(q.location, id+".meta")635 if err := os.Remove(metaPath); err != nil {636 dl.Error("failed to remove meta-data from disk", err)637 }638 dl.Debugf("removed message from disk")639}640641func (q *Queue) readDiskQueue() error {642 dirInfo, err := os.ReadDir(q.location)643 if err != nil {644 return err645 }646647 // TODO(GH #209): Rewrite this function to pass all sub-tests in TestQueueDelivery_DeserializationCleanUp/NoMeta.648649 loadedCount := 0650 for _, entry := range dirInfo {651 // We start loading from meta-data files and then check whether ID.header and ID.body exist.652 // This allows us to properly detect dangling body files.653 if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".meta") {654 continue655 }656 id := entry.Name()[:len(entry.Name())-5]657658 meta, err := q.readMessageMeta(id)659 if err != nil {660 q.Log.Printf("failed to read meta-data, skipping: %v (msg ID = %s)", err, id)661 continue662 }663664 // Check header file existence.665 if _, err := os.Stat(filepath.Join(q.location, id+".header")); err != nil {666 if os.IsNotExist(err) {667 q.Log.Printf("header file doesn't exist for msg ID = %s", id)668 q.tryRemoveDanglingFile(id + ".meta")669 q.tryRemoveDanglingFile(id + ".body")670 } else {671 q.Log.Printf("skipping nonstat'able header file: %v (msg ID = %s)", err, id)672 }673 continue674 }675676 // Check body file existence.677 if _, err := os.Stat(filepath.Join(q.location, id+".body")); err != nil {678 if os.IsNotExist(err) {679 q.Log.Printf("body file doesn't exist for msg ID = %s", id)680 q.tryRemoveDanglingFile(id + ".meta")681 q.tryRemoveDanglingFile(id + ".header")682 } else {683 q.Log.Printf("skipping nonstat'able body file: %v (msg ID = %s)", err, id)684 }685 continue686 }687688 smallestTriesCount := 999999689 for _, count := range meta.TriesCount {690 if smallestTriesCount > count {691 smallestTriesCount = count692 }693 }694 nextTryTime := meta.LastAttempt695 scaleFactor := time.Duration(math.Pow(q.retryTimeScale, float64(smallestTriesCount-1)))696 nextTryTime = nextTryTime.Add(q.initialRetryTime * scaleFactor)697698 if time.Until(nextTryTime) < q.postInitDelay {699 nextTryTime = time.Now().Add(q.postInitDelay)700 }701702 q.Log.Debugf("will try to deliver (msg ID = %s) in %v (%v)", id, time.Until(nextTryTime), nextTryTime)703 q.wheel.Add(nextTryTime, queueSlot{704 ID: id,705 })706 loadedCount++707 }708709 if loadedCount != 0 {710 q.Log.Printf("loaded %d saved queue entries", loadedCount)711 }712713 return nil714}715716func (q *Queue) storeNewMessage(meta *QueueMetadata, header textproto.Header, body buffer.Buffer) (buffer.Buffer, error) {717 id := meta.MsgMeta.ID718719 headerPath := filepath.Join(q.location, id+".header")720 headerFile, err := os.Create(headerPath)721 if err != nil {722 return nil, err723 }724 defer headerFile.Close()725726 if err := textproto.WriteHeader(headerFile, header); err != nil {727 q.tryRemoveDanglingFile(id + ".header")728 return nil, err729 }730731 bodyReader, err := body.Open()732 if err != nil {733 q.tryRemoveDanglingFile(id + ".header")734 return nil, err735 }736 defer bodyReader.Close()737738 bodyPath := filepath.Join(q.location, id+".body")739 bodyFile, err := os.Create(bodyPath)740 if err != nil {741 return nil, err742 }743 defer bodyFile.Close()744745 if _, err := io.Copy(bodyFile, bodyReader); err != nil {746 q.tryRemoveDanglingFile(id + ".body")747 q.tryRemoveDanglingFile(id + ".header")748 return nil, err749 }750751 if err := q.updateMetadataOnDisk(meta); err != nil {752 q.tryRemoveDanglingFile(id + ".body")753 q.tryRemoveDanglingFile(id + ".header")754 return nil, err755 }756757 if err := headerFile.Sync(); err != nil {758 return nil, err759 }760761 if err := bodyFile.Sync(); err != nil {762 return nil, err763 }764765 return buffer.FileBuffer{Path: bodyPath, LenHint: body.Len()}, nil766}767768func (q *Queue) updateMetadataOnDisk(meta *QueueMetadata) error {769 metaPath := filepath.Join(q.location, meta.MsgMeta.ID+".meta")770771 var file *os.File772 var err error773 if runtime.GOOS == "windows" {774 file, err = os.Create(metaPath)775 if err != nil {776 return err777 }778 } else {779 file, err = os.Create(metaPath + ".new")780 if err != nil {781 return err782 }783 }784 defer file.Close()785786 metaCopy := *meta787 metaCopy.MsgMeta = meta.MsgMeta.DeepCopy()788 metaCopy.MsgMeta.Conn = nil789790 if err := json.NewEncoder(file).Encode(metaCopy); err != nil {791 return err792 }793794 if err := file.Sync(); err != nil {795 return err796 }797798 if runtime.GOOS != "windows" {799 if err := os.Rename(metaPath+".new", metaPath); err != nil {800 return err801 }802 }803804 return nil805}806807func (q *Queue) readMessageMeta(id string) (*QueueMetadata, error) {808 metaPath := filepath.Join(q.location, id+".meta")809 file, err := os.Open(metaPath)810 if err != nil {811 return nil, err812 }813 defer file.Close()814815 meta := &QueueMetadata{}816817 meta.MsgMeta = &module.MsgMetadata{}818819 // There is a couple of problems we have to solve before we would be able to820 // serialize ConnState.821 // 1. future.Future can't be serialized.822 // 2. net.Addr can't be deserialized because we don't know the concrete type.823824 if err := json.NewDecoder(file).Decode(meta); err != nil {825 return nil, err826 }827828 return meta, nil829}830831type BufferedReadCloser struct {832 *bufio.Reader833 io.Closer834}835836func (q *Queue) tryRemoveDanglingFile(name string) {837 if err := os.Remove(filepath.Join(q.location, name)); err != nil {838 q.Log.Error("dangling file remove failed", err)839 return840 }841 q.Log.Printf("removed dangling file %s", name)842}843844func (q *Queue) openMessage(id string) (*QueueMetadata, textproto.Header, buffer.Buffer, error) {845 meta, err := q.readMessageMeta(id)846 if err != nil {847 return nil, textproto.Header{}, nil, err848 }849850 bodyPath := filepath.Join(q.location, id+".body")851 _, err = os.Stat(bodyPath)852 if err != nil {853 if os.IsNotExist(err) {854 q.tryRemoveDanglingFile(id + ".meta")855 }856 return nil, textproto.Header{}, nil, err857 }858 body := buffer.FileBuffer{Path: bodyPath}859860 headerPath := filepath.Join(q.location, id+".header")861 headerFile, err := os.Open(headerPath)862 if err != nil {863 if os.IsNotExist(err) {864 q.tryRemoveDanglingFile(id + ".meta")865 q.tryRemoveDanglingFile(id + ".body")866 }867 return nil, textproto.Header{}, nil, err868 }869870 bufferedHeader := bufio.NewReader(headerFile)871 header, err := textproto.ReadHeader(bufferedHeader)872 if err != nil {873 return nil, textproto.Header{}, nil, err874 }875876 return meta, header, body, nil877}878879func (q *Queue) InstanceName() string {880 return q.name881}882883func (q *Queue) Name() string {884 return "queue"885}886887func (q *Queue) emitDSN(meta *QueueMetadata, header textproto.Header, failedRcpts []string) {888 // If, apparently, we have no DSN msgpipeline configured - do nothing.889 if q.dsnPipeline == nil {890 return891 }892893 // Null return-path, used in DSNs.894 if meta.MsgMeta.OriginalFrom == "" {895 return896 }897898 dsnID, err := module.GenerateMsgID()899 if err != nil {900 q.Log.Error("rand.Rand error", err)901 return902 }903904 dsnEnvelope := dsn.Envelope{905 MsgID: "<" + dsnID + "@" + q.autogenMsgDomain + ">",906 From: "MAILER-DAEMON@" + q.autogenMsgDomain,907 To: meta.MsgMeta.OriginalFrom,908 }909 mtaInfo := dsn.ReportingMTAInfo{910 ReportingMTA: q.hostname,911 XSender: meta.From,912 XMessageID: meta.MsgMeta.ID,913 ArrivalDate: meta.FirstAttempt,914 LastAttemptDate: meta.LastAttempt,915 }916 if !meta.MsgMeta.DontTraceSender && meta.MsgMeta.Conn != nil {917 mtaInfo.ReceivedFromMTA = meta.MsgMeta.Conn.Hostname918 }919920 rcptInfo := make([]dsn.RecipientInfo, 0, len(meta.RcptErrs))921 for _, rcpt := range failedRcpts {922 rcptErr := meta.RcptErrs[rcpt]923 // rcptErr is stored in RcptErrs using the effective recipient address,924 // not the original one.925926 originalRcpt := meta.MsgMeta.OriginalRcpts[rcpt]927 if originalRcpt != "" {928 rcpt = originalRcpt929 }930931 rcptInfo = append(rcptInfo, dsn.RecipientInfo{932 FinalRecipient: rcpt,933 Action: dsn.ActionFailed,934 Status: rcptErr.EnhancedCode,935 DiagnosticCode: rcptErr,936 })937 }938939 var dsnBodyBlob bytes.Buffer940 dl := target.DeliveryLogger(q.Log, meta.MsgMeta)941 dsnHeader, err := dsn.GenerateDSN(meta.MsgMeta.SMTPOpts.UTF8, dsnEnvelope, mtaInfo, rcptInfo, header, &dsnBodyBlob)942 if err != nil {943 dl.Error("failed to generate fail DSN", err)944 return945 }946 dsnBody := buffer.MemoryBuffer{Slice: dsnBodyBlob.Bytes()}947948 dsnMeta := &module.MsgMetadata{949 ID: dsnID,950 SMTPOpts: smtp.MailOptions{951 UTF8: meta.MsgMeta.SMTPOpts.UTF8,952 RequireTLS: meta.MsgMeta.SMTPOpts.RequireTLS,953 },954 }955 dl.Msg("generated failed DSN", "dsn_id", dsnID)956957 msgCtx, msgTask := trace.NewTask(context.Background(), "DSN Delivery")958 defer msgTask.End()959960 mailCtx, mailTask := trace.NewTask(msgCtx, "MAIL FROM")961 dsnDelivery, err := q.dsnPipeline.Start(mailCtx, dsnMeta, "")962 mailTask.End()963 if err != nil {964 dl.Error("failed to enqueue DSN", err, "dsn_id", dsnID)965 return966 }967968 defer func() {969 if err != nil {970 dl.Error("failed to enqueue DSN", err, "dsn_id", dsnID)971 if err := dsnDelivery.Abort(msgCtx); err != nil {972 dl.Error("failed to abort DSN delivery", err, "dsn_id", dsnID)973 }974 }975 }()976977 rcptCtx, rcptTask := trace.NewTask(msgCtx, "RCPT TO")978 if err = dsnDelivery.AddRcpt(rcptCtx, meta.From, smtp.RcptOptions{}); err != nil {979 rcptTask.End()980 return981 }982 rcptTask.End()983984 bodyCtx, bodyTask := trace.NewTask(msgCtx, "DATA")985 if err = dsnDelivery.Body(bodyCtx, dsnHeader, dsnBody); err != nil {986 bodyTask.End()987 return988 }989 if err = dsnDelivery.Commit(bodyCtx); err != nil {990 bodyTask.End()991 return992 }993 bodyTask.End()994}995996func init() {997 module.Register("target.queue", NewQueue)998}