maddy

Fork https://github.com/foxcpp/maddy

git clone git://git.lin.moe/go/maddy.git

  1/*
  2Maddy Mail Server - Composable all-in-one email server.
  3Copyright © 2019-2020 Max Mazurov <fox.cpp@disroot.org>, Maddy Mail Server contributors
  4
  5This program is free software: you can redistribute it and/or modify
  6it under the terms of the GNU General Public License as published by
  7the Free Software Foundation, either version 3 of the License, or
  8(at your option) any later version.
  9
 10This program is distributed in the hope that it will be useful,
 11but WITHOUT ANY WARRANTY; without even the implied warranty of
 12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 13GNU General Public License for more details.
 14
 15You should have received a copy of the GNU General Public License
 16along with this program.  If not, see <https://www.gnu.org/licenses/>.
 17*/
 18
 19package msgpipeline
 20
 21import (
 22	"context"
 23	"runtime/debug"
 24	"sync"
 25
 26	"github.com/emersion/go-message/textproto"
 27	"github.com/emersion/go-msgauth/authres"
 28	"github.com/foxcpp/maddy/framework/buffer"
 29	"github.com/foxcpp/maddy/framework/dns"
 30	"github.com/foxcpp/maddy/framework/exterrors"
 31	"github.com/foxcpp/maddy/framework/log"
 32	"github.com/foxcpp/maddy/framework/module"
 33	"github.com/foxcpp/maddy/internal/dmarc"
 34)
 35
 36// checkRunner runs groups of checks, collects and merges results.
 37// It also makes sure that each check gets only one state object created.
 38type checkRunner struct {
 39	msgMeta          *module.MsgMetadata
 40	mailFrom         string
 41	mailFromReceived bool
 42
 43	checkedRcpts         []string
 44	checkedRcptsPerCheck map[module.CheckState]map[string]struct{}
 45	checkedRcptsLock     sync.Mutex
 46
 47	resolver      dns.Resolver
 48	doDMARC       bool
 49	didDMARCFetch bool
 50	dmarcVerify   *dmarc.Verifier
 51
 52	log log.Logger
 53
 54	states map[module.Check]module.CheckState
 55
 56	mergedRes module.CheckResult
 57}
 58
 59func newCheckRunner(msgMeta *module.MsgMetadata, log log.Logger, r dns.Resolver) *checkRunner {
 60	return &checkRunner{
 61		msgMeta:              msgMeta,
 62		checkedRcptsPerCheck: map[module.CheckState]map[string]struct{}{},
 63		log:                  log,
 64		resolver:             r,
 65		dmarcVerify:          dmarc.NewVerifier(r),
 66		states:               make(map[module.Check]module.CheckState),
 67	}
 68}
 69
 70func (cr *checkRunner) checkStates(ctx context.Context, checks []module.Check) ([]module.CheckState, error) {
 71	states := make([]module.CheckState, 0, len(checks))
 72	newStates := make([]module.CheckState, 0, len(checks))
 73	newStatesMap := make(map[module.Check]module.CheckState, len(checks))
 74	closeStates := func() {
 75		for _, state := range states {
 76			state.Close()
 77		}
 78	}
 79
 80	for _, check := range checks {
 81		state, ok := cr.states[check]
 82		if ok {
 83			states = append(states, state)
 84			continue
 85		}
 86
 87		cr.log.Debugf("initializing state for %v (%p)", objectName(check), check)
 88		state, err := check.CheckStateForMsg(ctx, cr.msgMeta)
 89		if err != nil {
 90			closeStates()
 91			return nil, err
 92		}
 93		states = append(states, state)
 94		newStates = append(newStates, state)
 95		newStatesMap[check] = state
 96	}
 97
 98	if len(newStates) == 0 {
 99		return states, nil
100	}
101
102	// Here we replay previous CheckConnection/CheckSender/CheckRcpt calls
103	// for any newly initialized checks so they all get change to see all these things.
104	//
105	// Done outside of check loop above to make sure we can run these for multiple
106	// checks in parallel.
107	if cr.mailFromReceived {
108		err := cr.runAndMergeResults(newStates, func(s module.CheckState) module.CheckResult {
109			res := s.CheckConnection(ctx)
110			return res
111		})
112		if err != nil {
113			closeStates()
114			return nil, err
115		}
116		err = cr.runAndMergeResults(newStates, func(s module.CheckState) module.CheckResult {
117			res := s.CheckSender(ctx, cr.mailFrom)
118			return res
119		})
120		if err != nil {
121			closeStates()
122			return nil, err
123		}
124	}
125
126	if len(cr.checkedRcpts) != 0 {
127		for _, rcpt := range cr.checkedRcpts {
128			err := cr.runAndMergeResults(states, func(s module.CheckState) module.CheckResult {
129				// Avoid calling CheckRcpt for the same recipient for the same check
130				// multiple times, even if requested.
131				cr.checkedRcptsLock.Lock()
132				if _, ok := cr.checkedRcptsPerCheck[s][rcpt]; ok {
133					cr.checkedRcptsLock.Unlock()
134					return module.CheckResult{}
135				}
136				if cr.checkedRcptsPerCheck[s] == nil {
137					cr.checkedRcptsPerCheck[s] = make(map[string]struct{})
138				}
139				cr.checkedRcptsPerCheck[s][rcpt] = struct{}{}
140				cr.checkedRcptsLock.Unlock()
141
142				res := s.CheckRcpt(ctx, rcpt)
143				return res
144			})
145			if err != nil {
146				closeStates()
147				return nil, err
148			}
149		}
150	}
151
152	// This is done after all actions that can fail so we will not have to remove
153	// state objects from main map.
154	for check, state := range newStatesMap {
155		cr.states[check] = state
156	}
157
158	return states, nil
159}
160
161func (cr *checkRunner) runAndMergeResults(states []module.CheckState, runner func(module.CheckState) module.CheckResult) error {
162	data := struct {
163		authResLock sync.Mutex
164		headerLock  sync.Mutex
165
166		quarantineErr    error
167		quarantineCheck  string
168		setQuarantineErr sync.Once
169
170		rejectErr    error
171		rejectCheck  string
172		setRejectErr sync.Once
173
174		wg sync.WaitGroup
175	}{}
176
177	for _, state := range states {
178		data.wg.Add(1)
179		go func() {
180			defer func() {
181				data.wg.Done()
182				if err := recover(); err != nil {
183					stack := debug.Stack()
184					log.Printf("panic during check execution: %v\n%s", err, stack)
185				}
186			}()
187
188			subCheckRes := runner(state)
189
190			// We check the length because we don't want to take locks
191			// when it is not necessary.
192			if len(subCheckRes.AuthResult) != 0 {
193				data.authResLock.Lock()
194				cr.mergedRes.AuthResult = append(cr.mergedRes.AuthResult, subCheckRes.AuthResult...)
195				data.authResLock.Unlock()
196			}
197			if subCheckRes.Header.Len() != 0 {
198				data.headerLock.Lock()
199				for field := subCheckRes.Header.Fields(); field.Next(); {
200					formatted, err := field.Raw()
201					if err != nil {
202						cr.log.Error("malformed header field added by check", err)
203					}
204					cr.mergedRes.Header.AddRaw(formatted)
205				}
206				data.headerLock.Unlock()
207			}
208
209			if subCheckRes.Quarantine {
210				data.setQuarantineErr.Do(func() {
211					data.quarantineErr = subCheckRes.Reason
212				})
213			} else if subCheckRes.Reject {
214				data.setRejectErr.Do(func() {
215					data.rejectErr = subCheckRes.Reason
216				})
217			} else if subCheckRes.Reason != nil {
218				// 'action ignore' case. There is Reason, but action.Apply set
219				// both Reject and Quarantine to false. Log the reason for
220				// purposes of deployment testing.
221				cr.log.Error("no check action", subCheckRes.Reason)
222			}
223		}()
224	}
225
226	data.wg.Wait()
227	if data.rejectErr != nil {
228		return data.rejectErr
229	}
230
231	if data.quarantineErr != nil {
232		cr.log.Error("quarantined", data.quarantineErr)
233		cr.mergedRes.Quarantine = true
234	}
235
236	return nil
237}
238
239func (cr *checkRunner) checkConnSender(ctx context.Context, checks []module.Check, mailFrom string) error {
240	cr.mailFrom = mailFrom
241	cr.mailFromReceived = true
242
243	// checkStates will run CheckConnection and CheckSender.
244	_, err := cr.checkStates(ctx, checks)
245	return err
246}
247
248func (cr *checkRunner) checkRcpt(ctx context.Context, checks []module.Check, rcptTo string) error {
249	states, err := cr.checkStates(ctx, checks)
250	if err != nil {
251		return err
252	}
253
254	err = cr.runAndMergeResults(states, func(s module.CheckState) module.CheckResult {
255		cr.checkedRcptsLock.Lock()
256		if _, ok := cr.checkedRcptsPerCheck[s][rcptTo]; ok {
257			cr.checkedRcptsLock.Unlock()
258			return module.CheckResult{}
259		}
260		if cr.checkedRcptsPerCheck[s] == nil {
261			cr.checkedRcptsPerCheck[s] = make(map[string]struct{})
262		}
263		cr.checkedRcptsPerCheck[s][rcptTo] = struct{}{}
264		cr.checkedRcptsLock.Unlock()
265
266		res := s.CheckRcpt(ctx, rcptTo)
267		return res
268	})
269
270	cr.checkedRcpts = append(cr.checkedRcpts, rcptTo)
271	return err
272}
273
274func (cr *checkRunner) checkBody(ctx context.Context, checks []module.Check, header textproto.Header, body buffer.Buffer) error {
275	states, err := cr.checkStates(ctx, checks)
276	if err != nil {
277		return err
278	}
279
280	if cr.doDMARC && !cr.didDMARCFetch {
281		cr.dmarcVerify.FetchRecord(ctx, header)
282		cr.didDMARCFetch = true
283	}
284
285	return cr.runAndMergeResults(states, func(s module.CheckState) module.CheckResult {
286		res := s.CheckBody(ctx, header, body)
287		return res
288	})
289}
290
291func (cr *checkRunner) applyResults(hostname string, header *textproto.Header) error {
292	if cr.mergedRes.Quarantine {
293		cr.msgMeta.Quarantine = true
294	}
295
296	if cr.doDMARC {
297		dmarcRes, policy := cr.dmarcVerify.Apply(cr.mergedRes.AuthResult)
298		cr.mergedRes.AuthResult = append(cr.mergedRes.AuthResult, &dmarcRes.Authres)
299		switch policy {
300		case dmarc.PolicyReject:
301			code := 550
302			enchCode := exterrors.EnhancedCode{5, 7, 1}
303			if dmarcRes.Authres.Value == authres.ResultTempError {
304				code = 450
305				enchCode[0] = 4
306			}
307			return &exterrors.SMTPError{
308				Code:         code,
309				EnhancedCode: enchCode,
310				Message:      "DMARC check failed",
311				CheckName:    "dmarc",
312				Misc: map[string]interface{}{
313					"reason":      dmarcRes.Authres.Reason,
314					"dkim_res":    dmarcRes.DKIMResult.Value,
315					"dkim_domain": dmarcRes.DKIMResult.Domain,
316					"spf_res":     dmarcRes.SPFResult.Value,
317					"spf_from":    dmarcRes.SPFResult.From,
318				},
319			}
320		case dmarc.PolicyQuarantine:
321			cr.msgMeta.Quarantine = true
322
323			// Mimick the message structure for regular checks.
324			cr.log.Msg("quarantined", "reason", dmarcRes.Authres.Reason, "check", "dmarc")
325		}
326	}
327
328	// After results for all checks are checked, authRes will be populated with values
329	// we should put into Authentication-Results header.
330	if len(cr.mergedRes.AuthResult) != 0 {
331		header.Add("Authentication-Results", authres.Format(hostname, cr.mergedRes.AuthResult))
332	}
333
334	for field := cr.mergedRes.Header.Fields(); field.Next(); {
335		formatted, err := field.Raw()
336		if err != nil {
337			cr.log.Error("malformed header field added by check", err)
338		}
339		header.AddRaw(formatted)
340	}
341	return nil
342}
343
344func (cr *checkRunner) close() {
345	cr.dmarcVerify.Close()
346	for _, state := range cr.states {
347		state.Close()
348	}
349}