1/*2Maddy Mail Server - Composable all-in-one email server.3Copyright © 2019-2020 Max Mazurov <fox.cpp@disroot.org>, Maddy Mail Server contributors45This program is free software: you can redistribute it and/or modify6it under the terms of the GNU General Public License as published by7the Free Software Foundation, either version 3 of the License, or8(at your option) any later version.910This program is distributed in the hope that it will be useful,11but WITHOUT ANY WARRANTY; without even the implied warranty of12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the13GNU General Public License for more details.1415You should have received a copy of the GNU General Public License16along with this program. If not, see <https://www.gnu.org/licenses/>.17*/1819// Package remote implements module which does outgoing20// message delivery using servers discovered using DNS MX records.21//22// Implemented interfaces:23// - module.DeliveryTarget24package remote2526import (27 "context"28 "crypto/tls"29 "errors"30 "fmt"31 "net"32 "runtime/trace"33 "strings"34 "sync"35 "time"3637 "github.com/emersion/go-message/textproto"38 "github.com/emersion/go-smtp"39 "github.com/foxcpp/maddy/framework/address"40 "github.com/foxcpp/maddy/framework/buffer"41 "github.com/foxcpp/maddy/framework/config"42 modconfig "github.com/foxcpp/maddy/framework/config/module"43 tls2 "github.com/foxcpp/maddy/framework/config/tls"44 "github.com/foxcpp/maddy/framework/dns"45 "github.com/foxcpp/maddy/framework/exterrors"46 "github.com/foxcpp/maddy/framework/log"47 "github.com/foxcpp/maddy/framework/module"48 "github.com/foxcpp/maddy/internal/limits"49 "github.com/foxcpp/maddy/internal/smtpconn/pool"50 "github.com/foxcpp/maddy/internal/target"51 "golang.org/x/net/idna"52)5354var smtpPort = "25"5556func moduleError(err error) error {57 return exterrors.WithFields(err, map[string]interface{}{58 "target": "remote",59 })60}6162type Target struct {63 name string64 hostname string65 localIP string66 ipv4 bool67 tlsConfig *tls.Config6869 resolver dns.Resolver70 dialer func(ctx context.Context, network, addr string) (net.Conn, error)71 extResolver *dns.ExtResolver7273 policies []module.MXAuthPolicy74 limits *limits.Group75 allowSecOverride bool76 relaxedREQUIRETLS bool7778 pool *pool.P79 connReuseLimit int8081 Log log.Logger8283 connectTimeout time.Duration84 commandTimeout time.Duration85 submissionTimeout time.Duration86}8788var _ module.DeliveryTarget = &Target{}8990func New(_, instName string, _, inlineArgs []string) (module.Module, error) {91 if len(inlineArgs) != 0 {92 return nil, errors.New("remote: inline arguments are not used")93 }94 // Keep this synchronized with testTarget.95 return &Target{96 name: instName,97 resolver: dns.DefaultResolver(),98 dialer: (&net.Dialer{}).DialContext,99 Log: log.Logger{Name: "remote"},100 }, nil101}102103func (rt *Target) Init(cfg *config.Map) error {104 var err error105 rt.extResolver, err = dns.NewExtResolver()106 if err != nil {107 rt.Log.Error("cannot initialize DNSSEC-aware resolver, DNSSEC and DANE are not available", err)108 }109110 cfg.String("hostname", true, true, "", &rt.hostname)111 cfg.String("local_ip", false, false, "", &rt.localIP)112 cfg.Bool("force_ipv4", false, false, &rt.ipv4)113 cfg.Bool("debug", true, false, &rt.Log.Debug)114 cfg.Custom("tls_client", true, false, func() (interface{}, error) {115 return &tls.Config{}, nil116 }, tls2.TLSClientBlock, &rt.tlsConfig)117 cfg.Custom("mx_auth", false, false, func() (interface{}, error) {118 // Default is "no policies" to follow the principles of explicit119 // configuration (if it is not requested - it is not done).120 return nil, nil121 }, func(cfg *config.Map, n config.Node) (interface{}, error) {122 // Module instance is &PolicyGroup.123 var p *PolicyGroup124 if err := modconfig.GroupFromNode("mx_auth", n.Args, n, cfg.Globals, &p); err != nil {125 return nil, err126 }127 return p.L, nil128 }, &rt.policies)129 cfg.Custom("limits", false, false, func() (interface{}, error) {130 return &limits.Group{}, nil131 }, func(cfg *config.Map, n config.Node) (interface{}, error) {132 var g *limits.Group133 if err := modconfig.GroupFromNode("limits", n.Args, n, cfg.Globals, &g); err != nil {134 return nil, err135 }136 return g, nil137 }, &rt.limits)138 cfg.Bool("requiretls_override", false, true, &rt.allowSecOverride)139 cfg.Bool("relaxed_requiretls", false, true, &rt.relaxedREQUIRETLS)140 cfg.Int("conn_reuse_limit", false, false, 10, &rt.connReuseLimit)141 cfg.Duration("connect_timeout", false, false, 5*time.Minute, &rt.connectTimeout)142 cfg.Duration("command_timeout", false, false, 5*time.Minute, &rt.commandTimeout)143 cfg.Duration("submission_timeout", false, false, 5*time.Minute, &rt.submissionTimeout)144145 poolCfg := pool.Config{146 MaxKeys: 5000,147 MaxConnsPerKey: 5, // basically, max. amount of idle connections in cache148 MaxConnLifetimeSec: 150, // 2.5 mins, half of recommended idle time from RFC 5321149 StaleKeyLifetimeSec: 60 * 5, // should be bigger than MaxConnLifetimeSec150 }151 cfg.Int("conn_max_idle_count", false, false, 5, &poolCfg.MaxConnsPerKey)152 cfg.Int64("conn_max_idle_time", false, false, 150, &poolCfg.MaxConnLifetimeSec)153154 if _, err := cfg.Process(); err != nil {155 return err156 }157 rt.pool = pool.New(poolCfg)158159 // INTERNATIONALIZATION: See RFC 6531 Section 3.7.1.160 rt.hostname, err = idna.ToASCII(rt.hostname)161 if err != nil {162 return fmt.Errorf("remote: cannot represent the hostname as an A-label name: %w", err)163 }164165 if rt.localIP != "" {166 addr, err := net.ResolveTCPAddr("tcp", rt.localIP+":0")167 if err != nil {168 return fmt.Errorf("remote: failed to parse local IP: %w", err)169 }170 rt.dialer = (&net.Dialer{171 LocalAddr: addr,172 }).DialContext173 }174 if rt.ipv4 {175 dial := rt.dialer176 rt.dialer = func(ctx context.Context, network, addr string) (net.Conn, error) {177 if network == "tcp" {178 network = "tcp4"179 }180 return dial(ctx, network, addr)181 }182 }183184 return nil185}186187func (rt *Target) Close() error {188 rt.pool.Close()189190 return nil191}192193func (rt *Target) Name() string {194 return "remote"195}196197func (rt *Target) InstanceName() string {198 return rt.name199}200201type remoteDelivery struct {202 rt *Target203 mailFrom string204 msgMeta *module.MsgMetadata205 Log log.Logger206207 recipients []string208 connections map[string]*mxConn209210 policies []module.DeliveryMXAuthPolicy211}212213func (rt *Target) Start(ctx context.Context, msgMeta *module.MsgMetadata, mailFrom string) (module.Delivery, error) {214 policies := make([]module.DeliveryMXAuthPolicy, 0, len(rt.policies))215 if !(msgMeta.TLSRequireOverride && rt.allowSecOverride) {216 for _, p := range rt.policies {217 policies = append(policies, p.Start(msgMeta))218 }219 }220221 var (222 ratelimitDomain string223 err error224 )225 // This will leave ratelimitDomain = "" for null return path which is fine226 // for purposes of ratelimiting.227 if mailFrom != "" {228 _, ratelimitDomain, err = address.Split(mailFrom)229 if err != nil {230 return nil, &exterrors.SMTPError{231 Code: 501,232 EnhancedCode: exterrors.EnhancedCode{5, 1, 8},233 Message: "Malformed sender address",234 TargetName: "remote",235 Err: err,236 }237 }238 }239240 // Domain is already should be normalized by the message source (e.g.241 // endpoint/smtp).242 region := trace.StartRegion(ctx, "remote/limits.Take")243 addr := net.IPv4(127, 0, 0, 1)244 if msgMeta.Conn != nil && msgMeta.Conn.RemoteAddr != nil {245 tcpAddr, ok := msgMeta.Conn.RemoteAddr.(*net.TCPAddr)246 if ok {247 addr = tcpAddr.IP248 }249 }250 if err := rt.limits.TakeMsg(ctx, addr, ratelimitDomain); err != nil {251 region.End()252 return nil, &exterrors.SMTPError{253 Code: 451,254 EnhancedCode: exterrors.EnhancedCode{4, 4, 5},255 Message: "High load, try again later",256 Reason: "Global limit timeout",257 TargetName: "remote",258 Err: err,259 }260 }261 region.End()262263 return &remoteDelivery{264 rt: rt,265 mailFrom: mailFrom,266 msgMeta: msgMeta,267 Log: target.DeliveryLogger(rt.Log, msgMeta),268 connections: map[string]*mxConn{},269 policies: policies,270 }, nil271}272273func (rd *remoteDelivery) AddRcpt(ctx context.Context, to string, opts smtp.RcptOptions) error {274 defer trace.StartRegion(ctx, "remote/AddRcpt").End()275276 if rd.msgMeta.Quarantine {277 return &exterrors.SMTPError{278 Code: 550,279 EnhancedCode: exterrors.EnhancedCode{5, 7, 0},280 Message: "Refusing to deliver a quarantined message",281 TargetName: "remote",282 }283 }284285 _, domain, err := address.Split(to)286 if err != nil {287 return err288 }289290 // Special-case for <postmaster> address. If it is not handled by a rewrite rule before291 // - we should not attempt to do anything with it and reject it as invalid.292 if domain == "" {293 return &exterrors.SMTPError{294 Code: 550,295 EnhancedCode: exterrors.EnhancedCode{5, 1, 1},296 Message: "<postmaster> address it no supported",297 TargetName: "remote",298 }299 }300301 if strings.HasPrefix(domain, "[") {302 return &exterrors.SMTPError{303 Code: 550,304 EnhancedCode: exterrors.EnhancedCode{5, 1, 1},305 Message: "IP address literals are not supported",306 TargetName: "remote",307 }308 }309310 conn, err := rd.connectionForDomain(ctx, domain)311 if err != nil {312 return err313 }314315 if err := conn.Rcpt(ctx, to, opts); err != nil {316 return moduleError(err)317 }318 conn.lastUseAt = time.Now()319320 rd.recipients = append(rd.recipients, to)321 return nil322}323324type multipleErrs struct {325 errs map[string]error326 statusLck sync.Mutex327}328329func (m *multipleErrs) Error() string {330 m.statusLck.Lock()331 defer m.statusLck.Unlock()332 return fmt.Sprintf("Partial delivery failure, per-rcpt info: %+v", m.errs)333}334335func (m *multipleErrs) Fields() map[string]interface{} {336 m.statusLck.Lock()337 defer m.statusLck.Unlock()338339 // If there are any temporary errors - the sender should retry to make sure340 // all recipients will get the message. However, since we can't tell it341 // which recipients got the message, this will generate duplicates for342 // them.343 //344 // We favor delivery with duplicates over incomplete delivery here.345346 var (347 code = 550348 enchCode = exterrors.EnhancedCode{5, 0, 0}349 )350 for _, err := range m.errs {351 if exterrors.IsTemporary(err) {352 code = 451353 enchCode = exterrors.EnhancedCode{4, 0, 0}354 }355 }356357 return map[string]interface{}{358 "smtp_code": code,359 "smtp_enchcode": enchCode,360 "smtp_msg": "Partial delivery failure, additional attempts may result in duplicates",361 "target": "remote",362 "errs": m.errs,363 }364}365366func (m *multipleErrs) SetStatus(rcptTo string, err error) {367 m.statusLck.Lock()368 defer m.statusLck.Unlock()369 m.errs[rcptTo] = err370}371372func (rd *remoteDelivery) Body(ctx context.Context, header textproto.Header, buffer buffer.Buffer) error {373 defer trace.StartRegion(ctx, "remote/Body").End()374375 merr := multipleErrs{376 errs: make(map[string]error),377 }378 rd.BodyNonAtomic(ctx, &merr, header, buffer)379380 for _, v := range merr.errs {381 if v != nil {382 if len(merr.errs) == 1 {383 return v384 }385 return &merr386 }387 }388 return nil389}390391func (rd *remoteDelivery) BodyNonAtomic(ctx context.Context, c module.StatusCollector, header textproto.Header, b buffer.Buffer) {392 defer trace.StartRegion(ctx, "remote/BodyNonAtomic").End()393394 if rd.msgMeta.Quarantine {395 for _, rcpt := range rd.recipients {396 c.SetStatus(rcpt, &exterrors.SMTPError{397 Code: 550,398 EnhancedCode: exterrors.EnhancedCode{5, 7, 0},399 Message: "Refusing to deliver quarantined message",400 TargetName: "remote",401 })402 }403 return404 }405406 var wg sync.WaitGroup407408 for i, conn := range rd.connections {409 wg.Add(1)410 go func() {411 defer wg.Done()412413 bodyR, err := b.Open()414 if err != nil {415 for _, rcpt := range conn.Rcpts() {416 c.SetStatus(rcpt, err)417 }418 return419 }420 defer bodyR.Close()421422 err = conn.Data(ctx, header, bodyR)423 for _, rcpt := range conn.Rcpts() {424 c.SetStatus(rcpt, err)425 }426 rd.connections[i].errored = err != nil427 conn.lastUseAt = time.Now()428 }()429 }430431 wg.Wait()432}433434func (rd *remoteDelivery) Abort(ctx context.Context) error {435 return rd.Close()436}437438func (rd *remoteDelivery) Commit(ctx context.Context) error {439 // It is not possible to implement it atomically, so users of remoteDelivery have to440 // take care of partial failures.441 return rd.Close()442}443444func (rd *remoteDelivery) Close() error {445 for _, conn := range rd.connections {446 rd.rt.limits.ReleaseDest(conn.domain)447 conn.transactions++448449 if !conn.Usable() {450 rd.Log.Debugf("disconnected %v from %s (errored=%v,transactions=%v,disconnected before=%v)",451 conn.LocalAddr(), conn.ServerName(), conn.errored, conn.transactions, conn.C.Client() == nil)452 conn.Close()453 } else {454 rd.Log.Debugf("returning connection %v for %s to pool", conn.LocalAddr(), conn.ServerName())455 rd.rt.pool.Return(conn.domain, conn)456 }457 }458459 var (460 ratelimitDomain string461 err error462 )463 if rd.mailFrom != "" {464 _, ratelimitDomain, err = address.Split(rd.mailFrom)465 if err != nil {466 return err467 }468 }469470 addr := net.IPv4(127, 0, 0, 1)471 if rd.msgMeta.Conn != nil && rd.msgMeta.Conn.RemoteAddr != nil {472 tcpAddr, ok := rd.msgMeta.Conn.RemoteAddr.(*net.TCPAddr)473 if ok {474 addr = tcpAddr.IP475 }476 }477 rd.rt.limits.ReleaseMsg(addr, ratelimitDomain)478479 return nil480}481482func init() {483 module.Register("target.remote", New)484}