mlisting

Mailing list service

git clone git://git.lin.moe/go/mlisting.git

  1package service
  2
  3import (
  4	"context"
  5	"database/sql"
  6	"fmt"
  7	"io"
  8	"maps"
  9	"strings"
 10	"time"
 11
 12	gomail "net/mail"
 13
 14	"git.lin.moe/go/mlisting/config"
 15	"git.lin.moe/go/mlisting/storage"
 16	"github.com/emersion/go-message/mail"
 17	"github.com/emersion/go-message/textproto"
 18	"github.com/emersion/go-smtp"
 19	"github.com/prometheus/client_golang/prometheus"
 20)
 21
 22const (
 23	CMD_SUBSCRIBE   = "subscribe"
 24	CMD_UNSUBSCRIBE = "unsubscribe"
 25	CMD_CONFIRM     = "confirm"
 26	CMD_POST        = "post"
 27)
 28
 29const (
 30	REQUEST_EXPIRE_DURATION = time.Hour * 24
 31)
 32
 33type LMTPBackend struct {
 34	ctx     context.Context
 35	storage storage.Storage
 36	logger  config.Logger
 37	mta     MTA
 38
 39	ConnTimeout time.Duration
 40	SizeLimit   int64
 41}
 42
 43func NewLMTPBackend(ctx context.Context, st storage.Storage, mta MTA, l config.Logger) *LMTPBackend {
 44	return &LMTPBackend{
 45		ctx:     ctx,
 46		storage: st,
 47		logger:  l,
 48		mta:     mta,
 49	}
 50}
 51
 52func (l *LMTPBackend) NewSession(c *smtp.Conn) (smtp.Session, error) {
 53	var (
 54		ctx    context.Context
 55		cancel context.CancelFunc
 56	)
 57
 58	if l.ConnTimeout != 0 {
 59		ctx, cancel = context.WithTimeout(l.ctx, l.ConnTimeout)
 60		c.Conn().SetDeadline(time.Now().Add(l.ConnTimeout))
 61	} else {
 62		ctx, cancel = context.WithCancel(l.ctx)
 63	}
 64
 65	return &lmtpSession{
 66		ctx:     ctx,
 67		cancel:  cancel,
 68		storage: l.storage,
 69		logger:  l.logger,
 70		mta:     l.mta,
 71		maxSize: l.SizeLimit,
 72		List:    nil,
 73		Cmd:     "",
 74	}, nil
 75}
 76
 77type lmtpSession struct {
 78	List storage.List
 79	Cmd  string
 80
 81	from    string
 82	storage storage.Storage
 83	mta     MTA
 84
 85	ctx    context.Context
 86	cancel context.CancelFunc
 87
 88	logger  config.Logger
 89	maxSize int64
 90}
 91
 92func (s *lmtpSession) Reset() {
 93	s.List = nil
 94	s.Cmd = ""
 95}
 96func (s *lmtpSession) Logout() error {
 97	s.cancel()
 98	return s.ctx.Err()
 99}
100
101func (s *lmtpSession) AuthPlain(username, password string) error {
102	return smtp.ErrAuthUnsupported
103}
104
105func (s *lmtpSession) Mail(from string, opts *smtp.MailOptions) error {
106	s.from = from
107	s.logger.Info("mail", "from", from)
108	return nil
109}
110
111func (s *lmtpSession) Rcpt(to string, opts *smtp.RcptOptions) error {
112	if s.List != nil {
113		if s.List.Address() == to {
114			return nil
115		}
116		return fmt.Errorf("Send to multiple addresses is not allowed")
117	}
118	select {
119	case <-s.ctx.Done():
120		return s.ctx.Err()
121	default:
122	}
123
124	s.logger.Info("rcpt", "to", to)
125
126	addrTo := Address{Address: to}
127	list, err := s.storage.GetList(s.ctx, addrTo.Base())
128	if err != nil {
129		s.logger.Error("search rcpt list failed", "err", err, "to", addrTo.Base())
130		return &smtp.SMTPError{
131			Code:         550,
132			EnhancedCode: smtp.EnhancedCode{5, 1, 1},
133			Message:      "Rcpt target is not exists",
134		}
135	}
136	s.List = list
137	s.Cmd = addrTo.Action()
138	if s.Cmd == "" {
139		s.Cmd = CMD_POST
140	}
141	return nil
142}
143
144func (s *lmtpSession) Data(r io.Reader) error {
145	return smtp.ErrServerClosed
146}
147
148func (s *lmtpSession) LMTPData(r io.Reader, status smtp.StatusCollector) error {
149	select {
150	case <-s.ctx.Done():
151		return s.ctx.Err()
152	default:
153	}
154
155	if s.maxSize != 0 {
156		r = io.LimitReader(r, s.maxSize)
157	}
158	msg, err := gomail.ReadMessage(r)
159	if err != nil {
160		return &smtp.SMTPError{
161			Code:    500,
162			Message: err.Error(),
163		}
164	}
165
166	body, err := io.ReadAll(msg.Body)
167	if err != nil {
168		return &smtp.SMTPError{
169			Code:    500,
170			Message: err.Error(),
171		}
172	}
173
174	lmtpMetrics.msgReceived.With(prometheus.Labels{"list": s.List.Address()}).Inc()
175
176	err = s.process(msg.Header, body)
177	if err != nil {
178		switch v := err.(type) {
179		case TemplateError:
180			sender, err := mail.ParseAddress(msg.Header.Get("From"))
181			if err != nil {
182				return &smtp.SMTPError{
183					Code:         500,
184					EnhancedCode: smtp.EnhancedCode{1, 0, 0},
185					Message:      fmt.Sprintf("parse From header: %v", err),
186				}
187			}
188
189			reply_msg, err := v.Message()
190			if err != nil {
191				s.logger.Error("generate listtext failed", "err", err)
192				return &smtp.SMTPError{
193					Code:    451,
194					Message: "auto-response message generate failed",
195				}
196			}
197
198			header := mail.HeaderFromMap(reply_msg.Header)
199			header.SetDate(time.Now())
200			header.GenerateMessageID()
201
202			body, err := io.ReadAll(reply_msg.Body)
203			if err != nil {
204				s.logger.Error("read listtext failed", "err", err)
205				return &smtp.SMTPError{
206					Code:    451,
207					Message: "auto-response message generate failed",
208				}
209
210			}
211
212			if err := s.mta.Send(
213				header.Header.Header,
214				body,
215				config.FromContext(s.ctx).SMTP.Sender,
216				[]string{sender.Address},
217			); err != nil {
218				s.logger.Error("send listtext message failed", "err", err)
219				return &smtp.SMTPError{
220					Code:         421,
221					EnhancedCode: smtp.EnhancedCode{4, 0, 0},
222					Message:      err.Error(),
223				}
224			}
225			return nil
226		default:
227			s.logger.Error("process email failed", "err", err)
228			return err
229		}
230	}
231
232	return nil
233}
234
235func (s *lmtpSession) prepare(header gomail.Header) error {
236	h := mail.HeaderFromMap(header)
237	msgid, err := h.MessageID()
238	if err != nil {
239		return &smtp.SMTPError{
240			Code:         500,
241			EnhancedCode: smtp.EnhancedCode{1, 0, 0},
242			Message:      fmt.Sprintf("Message-ID header: %v", err),
243		}
244	}
245	if msgid == "" {
246		if err := h.GenerateMessageID(); err != nil {
247			return &smtp.SMTPError{
248				Code:    500,
249				Message: fmt.Sprintf("generate Message-ID header: %v", err),
250			}
251		}
252	}
253
254	addr := &Address{Address: s.List.Address()}
255	listname, _, listhost := addr.Parts()
256	h.Set("List-Unsubscribe",
257		fmt.Sprintf("<mailto:%s?subject=unsubscribe>", addr.WithAction(CMD_UNSUBSCRIBE).String()))
258
259	h.Set("List-Subscribe",
260		fmt.Sprintf("<mailto:%s?subject=subscribe>", addr.WithAction(CMD_SUBSCRIBE).String()))
261	h.Set("List-Post",
262		fmt.Sprintf("<mailto:%s>", s.List.Address()),
263	)
264	h.Set("List-ID",
265		fmt.Sprintf("%s <%s.%s>", s.List.Name(), listname, listhost),
266	)
267	h.Set("Sender",
268		fmt.Sprintf("%s <%s>", s.List.Name(), s.List.Address()),
269	)
270	if s.List.DefaultPerm()&storage.PERM_BROWSE != 0 {
271		if cfg := config.FromContext(s.ctx); cfg != nil && cfg.Http.Home != "" {
272			h.Set("List-Archive", fmt.Sprintf("%s/list/%s/", cfg.Http.Home, s.List.Address()))
273			if msgid, err := h.MessageID(); err == nil {
274				h.Set("Archived-At",
275					fmt.Sprintf("%s/list/%s/%s/", cfg.Http.Home, s.List.Address(), msgid))
276			}
277		}
278	}
279
280	maps.Copy(header, h.Map())
281	return nil
282}
283
284func (s *lmtpSession) process(header gomail.Header, body []byte) error {
285	sender, err := mail.ParseAddress(header.Get("From"))
286	if err != nil {
287		return &smtp.SMTPError{
288			Code:         500,
289			EnhancedCode: smtp.EnhancedCode{1, 0, 0},
290			Message:      fmt.Sprintf("parse From header: %v", err),
291		}
292	}
293
294	switch s.Cmd {
295	case CMD_POST:
296		return s.forward(header, body)
297	case CMD_SUBSCRIBE:
298		if s.List.DefaultPerm()&storage.PERM_BROWSE == 0 {
299			return &TemplateDenied{
300				InHeader: textproto.HeaderFromMap(header),
301				List:     s.List,
302				Reason:   DENIED_SUBSCRIBE_NOPERM,
303			}
304		}
305		if _, err := s.List.GetMember(s.ctx, sender.String()); err == nil {
306			return &TemplateDenied{
307				InHeader: textproto.HeaderFromMap(header),
308				List:     s.List,
309				Reason:   DENIED_SUBSCRIBED_ALREADY,
310			}
311		}
312
313		token, err := s.List.NewRequest(s.ctx, sender.Address, storage.REQUEST_SUBSCRIBE, time.Now().Add(REQUEST_EXPIRE_DURATION))
314		if err != nil {
315			return &smtp.SMTPError{
316				Code:    500,
317				Message: fmt.Sprintf("request handle: %v", err),
318			}
319		}
320		return &TemplateNeedConfirm{List: s.List, Token: token,
321			InHeader:    textproto.HeaderFromMap(header),
322			RequestType: storage.REQUEST_SUBSCRIBE}
323	case CMD_UNSUBSCRIBE:
324		if _, err := s.List.GetMember(s.ctx, sender.String()); err != nil {
325			if err == sql.ErrNoRows {
326				return nil
327			}
328			return &smtp.SMTPError{
329				Code:    500,
330				Message: fmt.Sprintf("load address info: %v", err),
331			}
332		}
333
334		token, err := s.List.NewRequest(s.ctx,
335			sender.Address, storage.REQUEST_UNSUBSCRIBE,
336			time.Now().Add(REQUEST_EXPIRE_DURATION))
337		if err != nil {
338			return &smtp.SMTPError{
339				Code:    500,
340				Message: fmt.Sprintf("request handle: %v", err),
341			}
342		}
343
344		return &TemplateNeedConfirm{List: s.List, Token: token,
345			InHeader:    textproto.HeaderFromMap(header),
346			RequestType: storage.REQUEST_UNSUBSCRIBE}
347
348	case CMD_CONFIRM:
349		token := header.Get("Subject")
350		token = strings.TrimSpace(token)
351		if strings.HasPrefix(strings.ToLower(token), "re:") {
352			token = strings.TrimSpace(token[len("re:"):])
353		}
354		if strings.HasPrefix(strings.ToLower(token), "confirm") {
355			token = strings.TrimSpace(token[len("confirm"):])
356		}
357		rtype, err := s.List.CompleteReqest(s.ctx, sender.Address, token)
358		if err != nil {
359			// TODO: should this be error response?
360			return &smtp.SMTPError{
361				Code:    500,
362				Message: fmt.Sprintf("request handle: %v", err),
363			}
364		}
365		switch rtype {
366		case storage.REQUEST_SUBSCRIBE:
367			if _, err = s.List.NewMember(s.ctx, sender.String()); err != nil {
368				return &smtp.SMTPError{
369					Code:    500,
370					Message: err.Error(),
371				}
372			}
373		case storage.REQUEST_UNSUBSCRIBE:
374			if err := s.List.DelMember(s.ctx, sender.String()); err != nil {
375				return &smtp.SMTPError{
376					Code:    500,
377					Message: err.Error(),
378				}
379			}
380		default:
381			return &smtp.SMTPError{
382				Code:    501,
383				Message: "unknown mailing list action",
384			}
385		}
386		return &TemplatePostRequest{List: s.List, RequestType: rtype, InHeader: textproto.HeaderFromMap(header)}
387	default:
388		return &smtp.SMTPError{
389			Code:         550,
390			EnhancedCode: smtp.EnhancedCode{5, 1, 1},
391			Message:      "invalid operate command",
392		}
393
394	}
395}
396
397func (s *lmtpSession) forward(header gomail.Header, body []byte) error {
398	var (
399		err error
400		h   = mail.HeaderFromMap(header)
401	)
402
403	msgid, err := h.MessageID()
404	if err != nil {
405		return &smtp.SMTPError{
406			Code:         500,
407			EnhancedCode: smtp.EnhancedCode{1, 0, 0},
408			Message:      fmt.Sprintf("Message-ID header: %v", err),
409		}
410	}
411	sender, err := mail.ParseAddress(header.Get("From"))
412	if err != nil {
413		return &smtp.SMTPError{
414			Code:         500,
415			EnhancedCode: smtp.EnhancedCode{1, 0, 0},
416			Message:      fmt.Sprintf("parse From header: %v", err),
417		}
418	}
419
420	// check permission
421	var perm uint8
422	if sender, err := s.List.GetMember(s.ctx, sender.String()); err != nil {
423		perm = s.List.DefaultPerm()
424	} else {
425		perm = sender.Perm()
426	}
427
428	is_new_thread, err := s.List.IsNewThread(s.ctx, header)
429	if err != nil {
430		return &smtp.SMTPError{
431			Code:    500,
432			Message: err.Error(),
433		}
434	}
435	if is_new_thread && perm&storage.PERM_POST == 0 {
436		s.logger.Info("post permission check failed", "perm", storage.PermString(perm), "from", sender)
437		return &TemplateDenied{
438			InHeader: textproto.HeaderFromMap(header),
439			List:     s.List,
440			Reason:   DENIED_POST_NOPERM,
441		}
442	} else if perm&storage.PERM_REPLY == 0 {
443		s.logger.Info("reply permission check failed", "perm", storage.PermString(perm), "from", sender)
444		return &TemplateDenied{
445			InHeader: textproto.HeaderFromMap(header),
446			List:     s.List,
447			Reason:   DENIED_REPLY_NOPERM,
448		}
449	}
450	s.logger.Debug("ready to forword message", "perm", perm, "is_new_thread", is_new_thread)
451
452	_, err = s.List.Message(s.ctx, msgid)
453	if err == nil {
454		s.logger.Info("skip forward a existed message", "id", msgid)
455		return nil
456	}
457
458	if err = s.prepare(header); err != nil {
459		s.logger.Error("prepare email failed", "err", err)
460		return err
461	}
462
463	// filter receptor
464	rcpts := []*mail.Address{}
465	from, err := mail.ParseAddressList(h.Get("From"))
466	if err != nil {
467		return &smtp.SMTPError{
468			Code:         500,
469			EnhancedCode: smtp.EnhancedCode{1, 0, 0},
470			Message:      fmt.Sprintf("parse From header: %v", err),
471		}
472	}
473	rcpts = append(rcpts, from...)
474
475	to, err := mail.ParseAddressList(h.Get("To"))
476	if err != nil {
477		return &smtp.SMTPError{
478			Code:         500,
479			EnhancedCode: smtp.EnhancedCode{5, 1, 1},
480			Message:      fmt.Sprintf("parse To header: %v", err),
481		}
482	}
483
484	if h.Get("CC") != "" {
485		cc, err := mail.ParseAddressList(h.Get("CC"))
486		if err != nil {
487			return &smtp.SMTPError{
488				Code:         500,
489				EnhancedCode: smtp.EnhancedCode{5, 1, 0},
490				Message:      fmt.Sprintf("parse CC header: %v", err),
491			}
492		}
493		rcpts = append(rcpts, cc...)
494	}
495	rcpts = append(rcpts, to...)
496
497	mems, err := s.List.Members(s.ctx)
498	if err != nil {
499		return &smtp.SMTPError{
500			Code:    500,
501			Message: fmt.Sprintf("get list members: %v", err),
502		}
503	}
504	forwardTo := []string{}
505mem_loop:
506	for _, m := range mems {
507		for _, r := range rcpts {
508			if r.Address == m.Address() {
509				continue mem_loop
510			}
511		}
512		if m.Perm()&storage.PERM_BROWSE == 0 {
513			continue mem_loop
514		}
515		forwardTo = append(forwardTo, m.Address())
516	}
517
518	s.logger.Debug("archive message", "id", msgid)
519
520	lmtpMetrics.msgForwarded.With(prometheus.Labels{"list": s.List.Address()}).Inc()
521
522	if _, err = s.List.AddMessage(s.ctx, header, body); err != nil {
523		return &smtp.SMTPError{
524			Code:    520,
525			Message: fmt.Sprintf("create message archive: %v", err),
526		}
527	}
528
529	s.logger.Debug("forward mail", "to", forwardTo, "from", from)
530	if len(forwardTo) != 0 {
531		select {
532		case <-s.ctx.Done():
533			return s.ctx.Err()
534		default:
535			if err := s.mta.Send(textproto.HeaderFromMap(header), body,
536				config.FromContext(s.ctx).SMTP.Sender,
537				forwardTo); err != nil {
538				return &smtp.SMTPError{
539					Code:         421,
540					EnhancedCode: smtp.EnhancedCode{4, 0, 0},
541					Message:      err.Error(),
542				}
543			}
544		}
545
546		lmtpMetrics.msgSent.With(prometheus.Labels{"list": s.List.Address()}).Add(float64(len(forwardTo)))
547	}
548	return nil
549}
550
551// rfc5322 = "(?i)(?:[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*|\"(?:[\\x01-\\x08\\x0b\\x0c\\x0e-\\x1f\\x21\\x23-\\x5b\\x5d-\\x7f]|\\\\[\\x01-\\x09\\x0b\\x0c\\x0e-\\x7f])*\")@(?:(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?|\\[(?:(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9]))\\.){3}(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9])|[a-z0-9-]*[a-z0-9]:(?:[\\x01-\\x08\\x0b\\x0c\\x0e-\\x1f\\x21-\\x5a\\x53-\\x7f]|\\\\[\\x01-\\x09\\x0b\\x0c\\x0e-\\x7f])+)\\])"
552//	validRfc5322Regexp = regexp.MustCompile(fmt.Sprintf("^%s*$", rfc5322))