mlisting

Mailing list service

git clone git://git.lin.moe/go/mlisting.git

  1package serve
  2
  3import (
  4	"context"
  5	"net/http"
  6	"net/url"
  7	"os"
  8	"os/signal"
  9	"strings"
 10	"sync"
 11	"syscall"
 12	"time"
 13
 14	rcmd "git.lin.moe/go/mlisting/cmd"
 15	"git.lin.moe/go/mlisting/config"
 16	"git.lin.moe/go/mlisting/service"
 17	"git.lin.moe/go/mlisting/storage"
 18	humanize "github.com/dustin/go-humanize"
 19	"github.com/emersion/go-sasl"
 20	"github.com/emersion/go-smtp"
 21	"github.com/spf13/cobra"
 22)
 23
 24func init() {
 25	cmd := &cobra.Command{
 26		Use:   "serve",
 27		Short: "start lmtp service",
 28		Run:   execute,
 29	}
 30
 31	rcmd.Cmd.AddCommand(cmd)
 32}
 33
 34func execute(cmd *cobra.Command, args []string) {
 35	ctx, cancel := signal.NotifyContext(cmd.Context(), syscall.SIGTERM, os.Interrupt)
 36	defer cancel()
 37
 38	cfg := config.FromContext(ctx)
 39	l := cfg.NewLogger()
 40
 41	st, ok := storage.FromContext(ctx)
 42	if !ok {
 43		l.Error("unset storage")
 44		return
 45	}
 46
 47	// init lmtp server
 48	mta := service.SMTPClient{
 49		Address:    cfg.SMTP.Address,
 50		ConnType:   cfg.SMTP.ConnType,
 51		AuthClient: nil,
 52	}
 53	if cfg.SMTP.AuthUser != "" || cfg.SMTP.AuthPassword != "" {
 54		mta.AuthClient = sasl.NewPlainClient("", cfg.SMTP.AuthUser, cfg.SMTP.AuthPassword)
 55	}
 56
 57	be := service.NewLMTPBackend(ctx, st, &mta, l)
 58	be.ConnTimeout = cfg.LMTP.ConnTimeout
 59	if cfg.LMTP.MaxDataSize != "" {
 60		max_data_size, err := humanize.ParseBytes(cfg.LMTP.MaxDataSize)
 61		if err != nil {
 62			l.Error(err.Error())
 63			return
 64		}
 65
 66		be.SizeLimit = int64(max_data_size)
 67	}
 68
 69	lmtpuri, err := url.Parse(cfg.LMTP.Listen)
 70	if err != nil {
 71		l.Error(err.Error())
 72		return
 73	}
 74
 75	s := smtp.NewServer(be)
 76	s.Network = lmtpuri.Scheme
 77	s.Addr = strings.Join([]string{lmtpuri.Host, lmtpuri.Path}, "")
 78	s.LMTP = true
 79	s.Domain = "localhost"
 80	s.WriteTimeout = 10 * time.Second
 81	s.ReadTimeout = 10 * time.Second
 82	s.MaxMessageBytes = 1024 * 1024
 83	s.MaxRecipients = 50
 84	s.AllowInsecureAuth = true
 85
 86	go func() {
 87		l.Info("lmtp service start", "listen", lmtpuri)
 88		if err := s.ListenAndServe(); err != nil {
 89			l.Error(err.Error())
 90			cancel()
 91		}
 92	}()
 93
 94	// init http server
 95	h := http.Server{
 96		Handler: service.NewHTTPMux(cfg, st, l),
 97		Addr:    cfg.Http.Listen,
 98	}
 99
100	go func() {
101		l.Info("http service start", "listen", cfg.Http.Listen)
102		if err := h.ListenAndServe(); err != nil {
103			l.Error(err.Error())
104		}
105	}()
106
107	var metric_srv http.Server
108	if cfg.Http.MetricsListen != "" {
109		metric_srv = http.Server{
110			Handler: service.NewMetricMux(cfg, st, l),
111			Addr:    cfg.Http.MetricsListen,
112		}
113
114		go func() {
115			l.Info("metrics service start", "listen", cfg.Http.MetricsListen)
116			if err := metric_srv.ListenAndServe(); err != nil {
117				l.Error(err.Error())
118			}
119		}()
120	}
121
122	<-ctx.Done()
123
124	l.Info("stopping service")
125	ctx, cancel = context.WithTimeout(context.TODO(), time.Second*30)
126	defer cancel()
127
128	var exit_wg sync.WaitGroup
129	exit_wg.Add(1)
130	go func() {
131		if err := h.Shutdown(ctx); err != nil {
132			l.Error(err.Error())
133		}
134		l.Info("http service stopped")
135
136		exit_wg.Done()
137	}()
138	exit_wg.Add(1)
139	go func() {
140		if err := s.Shutdown(ctx); err != nil {
141			l.Error(err.Error())
142		}
143		l.Info("lmtp service stopped")
144		exit_wg.Done()
145	}()
146	if cfg.Http.MetricsListen != "" {
147		exit_wg.Add(1)
148		go func() {
149			if err := metric_srv.Shutdown(ctx); err != nil {
150				l.Error(err.Error())
151			}
152			l.Info("metrics service stopped")
153			exit_wg.Done()
154		}()
155
156	}
157	exit_wg.Wait()
158	l.Info("service stopped")
159}