1/*2Maddy Mail Server - Composable all-in-one email server.3Copyright © 2019-2020 Max Mazurov <fox.cpp@disroot.org>, Maddy Mail Server contributors45This program is free software: you can redistribute it and/or modify6it under the terms of the GNU General Public License as published by7the Free Software Foundation, either version 3 of the License, or8(at your option) any later version.910This program is distributed in the hope that it will be useful,11but WITHOUT ANY WARRANTY; without even the implied warranty of12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the13GNU General Public License for more details.1415You should have received a copy of the GNU General Public License16along with this program. If not, see <https://www.gnu.org/licenses/>.17*/1819package msgpipeline2021import (22 "context"23 "runtime/debug"24 "sync"2526 "github.com/emersion/go-message/textproto"27 "github.com/emersion/go-msgauth/authres"28 "github.com/foxcpp/maddy/framework/buffer"29 "github.com/foxcpp/maddy/framework/dns"30 "github.com/foxcpp/maddy/framework/exterrors"31 "github.com/foxcpp/maddy/framework/log"32 "github.com/foxcpp/maddy/framework/module"33 "github.com/foxcpp/maddy/internal/dmarc"34)3536// checkRunner runs groups of checks, collects and merges results.37// It also makes sure that each check gets only one state object created.38type checkRunner struct {39 msgMeta *module.MsgMetadata40 mailFrom string41 mailFromReceived bool4243 checkedRcpts []string44 checkedRcptsPerCheck map[module.CheckState]map[string]struct{}45 checkedRcptsLock sync.Mutex4647 resolver dns.Resolver48 doDMARC bool49 didDMARCFetch bool50 dmarcVerify *dmarc.Verifier5152 log log.Logger5354 states map[module.Check]module.CheckState5556 mergedRes module.CheckResult57}5859func newCheckRunner(msgMeta *module.MsgMetadata, log log.Logger, r dns.Resolver) *checkRunner {60 return &checkRunner{61 msgMeta: msgMeta,62 checkedRcptsPerCheck: map[module.CheckState]map[string]struct{}{},63 log: log,64 resolver: r,65 dmarcVerify: dmarc.NewVerifier(r),66 states: make(map[module.Check]module.CheckState),67 }68}6970func (cr *checkRunner) checkStates(ctx context.Context, checks []module.Check) ([]module.CheckState, error) {71 states := make([]module.CheckState, 0, len(checks))72 newStates := make([]module.CheckState, 0, len(checks))73 newStatesMap := make(map[module.Check]module.CheckState, len(checks))74 closeStates := func() {75 for _, state := range states {76 state.Close()77 }78 }7980 for _, check := range checks {81 state, ok := cr.states[check]82 if ok {83 states = append(states, state)84 continue85 }8687 cr.log.Debugf("initializing state for %v (%p)", objectName(check), check)88 state, err := check.CheckStateForMsg(ctx, cr.msgMeta)89 if err != nil {90 closeStates()91 return nil, err92 }93 states = append(states, state)94 newStates = append(newStates, state)95 newStatesMap[check] = state96 }9798 if len(newStates) == 0 {99 return states, nil100 }101102 // Here we replay previous CheckConnection/CheckSender/CheckRcpt calls103 // for any newly initialized checks so they all get change to see all these things.104 //105 // Done outside of check loop above to make sure we can run these for multiple106 // checks in parallel.107 if cr.mailFromReceived {108 err := cr.runAndMergeResults(newStates, func(s module.CheckState) module.CheckResult {109 res := s.CheckConnection(ctx)110 return res111 })112 if err != nil {113 closeStates()114 return nil, err115 }116 err = cr.runAndMergeResults(newStates, func(s module.CheckState) module.CheckResult {117 res := s.CheckSender(ctx, cr.mailFrom)118 return res119 })120 if err != nil {121 closeStates()122 return nil, err123 }124 }125126 if len(cr.checkedRcpts) != 0 {127 for _, rcpt := range cr.checkedRcpts {128 err := cr.runAndMergeResults(states, func(s module.CheckState) module.CheckResult {129 // Avoid calling CheckRcpt for the same recipient for the same check130 // multiple times, even if requested.131 cr.checkedRcptsLock.Lock()132 if _, ok := cr.checkedRcptsPerCheck[s][rcpt]; ok {133 cr.checkedRcptsLock.Unlock()134 return module.CheckResult{}135 }136 if cr.checkedRcptsPerCheck[s] == nil {137 cr.checkedRcptsPerCheck[s] = make(map[string]struct{})138 }139 cr.checkedRcptsPerCheck[s][rcpt] = struct{}{}140 cr.checkedRcptsLock.Unlock()141142 res := s.CheckRcpt(ctx, rcpt)143 return res144 })145 if err != nil {146 closeStates()147 return nil, err148 }149 }150 }151152 // This is done after all actions that can fail so we will not have to remove153 // state objects from main map.154 for check, state := range newStatesMap {155 cr.states[check] = state156 }157158 return states, nil159}160161func (cr *checkRunner) runAndMergeResults(states []module.CheckState, runner func(module.CheckState) module.CheckResult) error {162 data := struct {163 authResLock sync.Mutex164 headerLock sync.Mutex165166 quarantineErr error167 quarantineCheck string168 setQuarantineErr sync.Once169170 rejectErr error171 rejectCheck string172 setRejectErr sync.Once173174 wg sync.WaitGroup175 }{}176177 for _, state := range states {178 data.wg.Add(1)179 go func() {180 defer func() {181 data.wg.Done()182 if err := recover(); err != nil {183 stack := debug.Stack()184 log.Printf("panic during check execution: %v\n%s", err, stack)185 }186 }()187188 subCheckRes := runner(state)189190 // We check the length because we don't want to take locks191 // when it is not necessary.192 if len(subCheckRes.AuthResult) != 0 {193 data.authResLock.Lock()194 cr.mergedRes.AuthResult = append(cr.mergedRes.AuthResult, subCheckRes.AuthResult...)195 data.authResLock.Unlock()196 }197 if subCheckRes.Header.Len() != 0 {198 data.headerLock.Lock()199 for field := subCheckRes.Header.Fields(); field.Next(); {200 formatted, err := field.Raw()201 if err != nil {202 cr.log.Error("malformed header field added by check", err)203 }204 cr.mergedRes.Header.AddRaw(formatted)205 }206 data.headerLock.Unlock()207 }208209 if subCheckRes.Quarantine {210 data.setQuarantineErr.Do(func() {211 data.quarantineErr = subCheckRes.Reason212 })213 } else if subCheckRes.Reject {214 data.setRejectErr.Do(func() {215 data.rejectErr = subCheckRes.Reason216 })217 } else if subCheckRes.Reason != nil {218 // 'action ignore' case. There is Reason, but action.Apply set219 // both Reject and Quarantine to false. Log the reason for220 // purposes of deployment testing.221 cr.log.Error("no check action", subCheckRes.Reason)222 }223 }()224 }225226 data.wg.Wait()227 if data.rejectErr != nil {228 return data.rejectErr229 }230231 if data.quarantineErr != nil {232 cr.log.Error("quarantined", data.quarantineErr)233 cr.mergedRes.Quarantine = true234 }235236 return nil237}238239func (cr *checkRunner) checkConnSender(ctx context.Context, checks []module.Check, mailFrom string) error {240 cr.mailFrom = mailFrom241 cr.mailFromReceived = true242243 // checkStates will run CheckConnection and CheckSender.244 _, err := cr.checkStates(ctx, checks)245 return err246}247248func (cr *checkRunner) checkRcpt(ctx context.Context, checks []module.Check, rcptTo string) error {249 states, err := cr.checkStates(ctx, checks)250 if err != nil {251 return err252 }253254 err = cr.runAndMergeResults(states, func(s module.CheckState) module.CheckResult {255 cr.checkedRcptsLock.Lock()256 if _, ok := cr.checkedRcptsPerCheck[s][rcptTo]; ok {257 cr.checkedRcptsLock.Unlock()258 return module.CheckResult{}259 }260 if cr.checkedRcptsPerCheck[s] == nil {261 cr.checkedRcptsPerCheck[s] = make(map[string]struct{})262 }263 cr.checkedRcptsPerCheck[s][rcptTo] = struct{}{}264 cr.checkedRcptsLock.Unlock()265266 res := s.CheckRcpt(ctx, rcptTo)267 return res268 })269270 cr.checkedRcpts = append(cr.checkedRcpts, rcptTo)271 return err272}273274func (cr *checkRunner) checkBody(ctx context.Context, checks []module.Check, header textproto.Header, body buffer.Buffer) error {275 states, err := cr.checkStates(ctx, checks)276 if err != nil {277 return err278 }279280 if cr.doDMARC && !cr.didDMARCFetch {281 cr.dmarcVerify.FetchRecord(ctx, header)282 cr.didDMARCFetch = true283 }284285 return cr.runAndMergeResults(states, func(s module.CheckState) module.CheckResult {286 res := s.CheckBody(ctx, header, body)287 return res288 })289}290291func (cr *checkRunner) applyResults(hostname string, header *textproto.Header) error {292 if cr.mergedRes.Quarantine {293 cr.msgMeta.Quarantine = true294 }295296 if cr.doDMARC {297 dmarcRes, policy := cr.dmarcVerify.Apply(cr.mergedRes.AuthResult)298 cr.mergedRes.AuthResult = append(cr.mergedRes.AuthResult, &dmarcRes.Authres)299 switch policy {300 case dmarc.PolicyReject:301 code := 550302 enchCode := exterrors.EnhancedCode{5, 7, 1}303 if dmarcRes.Authres.Value == authres.ResultTempError {304 code = 450305 enchCode[0] = 4306 }307 return &exterrors.SMTPError{308 Code: code,309 EnhancedCode: enchCode,310 Message: "DMARC check failed",311 CheckName: "dmarc",312 Misc: map[string]interface{}{313 "reason": dmarcRes.Authres.Reason,314 "dkim_res": dmarcRes.DKIMResult.Value,315 "dkim_domain": dmarcRes.DKIMResult.Domain,316 "spf_res": dmarcRes.SPFResult.Value,317 "spf_from": dmarcRes.SPFResult.From,318 },319 }320 case dmarc.PolicyQuarantine:321 cr.msgMeta.Quarantine = true322323 // Mimick the message structure for regular checks.324 cr.log.Msg("quarantined", "reason", dmarcRes.Authres.Reason, "check", "dmarc")325 }326 }327328 // After results for all checks are checked, authRes will be populated with values329 // we should put into Authentication-Results header.330 if len(cr.mergedRes.AuthResult) != 0 {331 header.Add("Authentication-Results", authres.Format(hostname, cr.mergedRes.AuthResult))332 }333334 for field := cr.mergedRes.Header.Fields(); field.Next(); {335 formatted, err := field.Raw()336 if err != nil {337 cr.log.Error("malformed header field added by check", err)338 }339 header.AddRaw(formatted)340 }341 return nil342}343344func (cr *checkRunner) close() {345 cr.dmarcVerify.Close()346 for _, state := range cr.states {347 state.Close()348 }349}