mlisting

Mailing list service

git clone git://git.lin.moe/go/mlisting.git

  1package sqlite
  2
  3import (
  4	"bytes"
  5	"context"
  6	"database/sql"
  7	"encoding/json"
  8	"fmt"
  9	gomail "net/mail"
 10	"slices"
 11	"strings"
 12	"time"
 13
 14	"git.lin.moe/go/mlisting/storage"
 15	ql "github.com/Masterminds/squirrel"
 16	gomsg "github.com/emersion/go-message"
 17	"github.com/emersion/go-message/mail"
 18	"github.com/google/uuid"
 19)
 20
 21type List struct {
 22	name         string
 23	address      string
 24	description  string
 25	create_at    time.Time
 26	default_perm uint
 27
 28	db *sql.DB
 29}
 30
 31func (l *List) Address() string {
 32	return l.address
 33}
 34func (l *List) Name() string {
 35	return l.name
 36}
 37func (l *List) Description() string {
 38	return l.description
 39}
 40func (l *List) DefaultPerm() uint8 {
 41	return uint8(l.default_perm)
 42}
 43
 44func (l *List) Members(ctx context.Context) ([]storage.AddressInfo, error) {
 45	const _sql = `SELECT address, name, create_at, permission FROM member WHERE list=?`
 46	var infos = []storage.AddressInfo{}
 47	rows, err := l.db.QueryContext(ctx, _sql, l.address)
 48	if err != nil {
 49		return nil, err
 50	}
 51	defer rows.Close()
 52	for rows.Next() {
 53		info := new(AddressInfo)
 54		var perm sql.NullInt64
 55		if err := rows.Scan(&info.address, &info.name, &info.join_at, &perm); err != nil {
 56			return nil, err
 57		}
 58		if perm.Valid {
 59			info.perm = uint(perm.Int64)
 60		} else {
 61			info.perm = uint(l.DefaultPerm())
 62		}
 63		info.db = l.db
 64		infos = append(infos, info)
 65	}
 66	return infos, nil
 67}
 68
 69func (l *List) NewMember(ctx context.Context, addr string) (storage.AddressInfo, error) {
 70	maddr, err := mail.ParseAddress(addr)
 71	if err != nil {
 72		return nil, err
 73	}
 74	const _sql = `INSERT INTO member (address, name, list) VALUES (?, ?, ?) RETURNING address, name, create_at`
 75	row := l.db.QueryRowContext(ctx, _sql, maddr.Address, maddr.Name, l.address)
 76
 77	addrinfo := new(AddressInfo)
 78	addrinfo.db = l.db
 79	addrinfo.perm = l.default_perm
 80
 81	if err := row.Scan(&addrinfo.address, &addrinfo.name, &addrinfo.join_at); err != nil {
 82		return nil, err
 83	}
 84
 85	return addrinfo, nil
 86}
 87
 88func (l *List) DelMember(ctx context.Context, addr string) error {
 89	maddr, err := mail.ParseAddress(addr)
 90	if err != nil {
 91		return err
 92	}
 93	const _sql = `DELETE FROM member WHERE list=? AND address = ? `
 94	_, err = l.db.ExecContext(ctx, _sql, l.address, maddr.Address)
 95	return err
 96}
 97
 98func (l *List) UpdateMember(ctx context.Context, addr string, perm uint8) (storage.AddressInfo, error) {
 99	maddr, err := mail.ParseAddress(addr)
100	if err != nil {
101		return nil, err
102	}
103
104	const _sql = `UPDATE member SET permission=? WHERE list=? AND address=? RETURNING address, name, create_at, permission`
105	info := new(AddressInfo)
106	row := l.db.QueryRowContext(ctx, _sql, perm, l.address, maddr.Address)
107	if err := row.Scan(&info.address, &info.name, &info.join_at, &info.perm); err != nil {
108		return nil, err
109	}
110	info.db = l.db
111	return info, nil
112}
113
114func (l *List) GetMember(ctx context.Context, addr string) (storage.AddressInfo, error) {
115	maddr, err := mail.ParseAddress(addr)
116	if err != nil {
117		return nil, err
118	}
119
120	const _sql = `SELECT address, name, create_at, permission FROM member WHERE list=? AND address=?`
121	info := new(AddressInfo)
122	row := l.db.QueryRowContext(ctx, _sql, l.address, maddr.Address)
123
124	var perm sql.NullInt64
125	if err := row.Scan(&info.address, &info.name, &info.join_at, &perm); err != nil {
126		return nil, err
127	}
128	if perm.Valid {
129		info.perm = uint(perm.Int64)
130	} else {
131		info.perm = uint(l.DefaultPerm())
132	}
133	info.db = l.db
134
135	return info, nil
136}
137
138func (l *List) AddMessage(ctx context.Context, header gomail.Header, body []byte) (msg storage.Message, err error) {
139	var (
140		h       = mail.HeaderFromMap(header)
141		replyto *string
142		msgid   string
143	)
144	entity, err := gomsg.New(gomsg.HeaderFromMap(header), bytes.NewReader(body))
145	if err != nil {
146		return nil, err
147	}
148
149	msgid, err = h.MessageID()
150	if err != nil {
151		return
152	}
153
154	inreplyto, err := h.MsgIDList("In-Reply-To")
155	if err != nil {
156		return
157	} else if len(inreplyto) != 0 {
158		replyto = &inreplyto[0]
159	} else {
160		replyto = nil
161	}
162	if replyto == nil {
163		// if no In-Reply-To header, use last id in References as In-Reply-To value
164		references, err := h.MsgIDList("References")
165		if err == nil && len(references) != 0 {
166			replyto = &(references[len(references)-1])
167		}
168	}
169
170	rheader, err := json.Marshal(h.Map())
171	if err != nil {
172		return nil, err
173	}
174	subject, err := h.Text("Subject")
175	if err != nil {
176		subject = h.Get("Subject")
177	}
178	subject = strings.TrimSpace(subject)
179
180	text := storage.MailToText(entity)
181
182	parent, err := l.closestMessage(ctx, header)
183	if err != nil {
184		return
185	}
186	var mpath string
187	if parent != nil {
188		mpath = fmt.Sprintf("%s%d/", parent.(*Message).mpath, parent.(*Message).id)
189	} else {
190		mpath = "/"
191	}
192	create_at, err := h.Date()
193	if err != nil {
194		create_at = time.Now()
195	}
196
197	var result sql.Result
198	const _sql = `INSERT INTO message(message_id, list, header,
199                                                 body, text, mpath,
200                                                 subject, in_reply_to, create_at) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)`
201	result, err = l.db.ExecContext(ctx, _sql, msgid, l.address, rheader,
202		body, text, mpath,
203		subject, replyto, create_at)
204	if err != nil {
205		return
206	}
207
208	id, err := result.LastInsertId()
209	if err != nil {
210		return
211	}
212
213	msg, err = l.messageById(ctx, id)
214	if err != nil {
215		return
216	}
217
218	if parent != nil {
219		err = l.reparent(ctx, &(parent.(*Message).messageID), msg.(*Message))
220	} else {
221		err = l.reparent(ctx, nil, msg.(*Message))
222	}
223
224	if err != nil {
225		return nil, err
226	}
227
228	return
229}
230
231func (l *List) reparent(ctx context.Context, parentMsgID *string, msg *Message) error {
232	// 1. update messages direct reply to this
233	tx, err := l.db.BeginTx(ctx, nil)
234	thisMPath := fmt.Sprintf("%s%d/", msg.mpath, msg.id)
235
236	oldMPathMap := make(map[int64]string)
237	rows, err := tx.QueryContext(ctx, `SELECT id, mpath FROM message WHERE list=? AND in_reply_to=? `,
238		l.address, msg.messageID)
239	if err != nil {
240		tx.Rollback()
241		return err
242	}
243	for rows.Next() {
244		var (
245			id    int64
246			mpath string
247		)
248		if err := rows.Scan(&id, &mpath); err != nil {
249			tx.Rollback()
250			return err
251		}
252		oldMPathMap[id] = fmt.Sprintf("%s%d/", mpath, id)
253	}
254
255	if len(oldMPathMap) != 0 {
256		if rows, err = tx.QueryContext(ctx, `UPDATE message set mpath=? WHERE list=? AND in_reply_to=? RETURNING id`,
257			thisMPath,
258			l.address, msg.messageID); err != nil {
259			tx.Rollback()
260			return err
261		}
262
263		updatedSublings := []int64{}
264		for rows.Next() {
265			var (
266				id int64
267			)
268			if err := rows.Scan(&id); err != nil {
269				tx.Rollback()
270				return err
271			}
272			updatedSublings = append(updatedSublings, id)
273		}
274		for _, id := range updatedSublings {
275			if _, err := tx.ExecContext(ctx, `UPDATE message SET mpath = REPLACE(mpath, ?, ?) WHERE list=? AND mpath LIKE ? || '%'`,
276				oldMPathMap[id], fmt.Sprintf("%s%d/", thisMPath, id),
277				l.address, oldMPathMap[id]); err != nil {
278				tx.Rollback()
279				return err
280			}
281		}
282
283	}
284	if err := tx.Commit(); err != nil {
285		tx.Rollback()
286		return err
287	}
288
289	// 2. update message closer to this than before
290	if parentMsgID != nil {
291		// get sublings that in_reply_to parent indirectly (mpath same to current, but not reply to parent)
292		rows, err = l.db.QueryContext(ctx, `SELECT id, header, mpath FROM message WHERE list=? AND mpath=? AND in_reply_to IS NOT NULL AND in_reply_to != ?`,
293			l.address, msg.mpath, parentMsgID)
294	} else {
295		// msg is header of a thread
296		// get root message with not nil in_reply_to (mpath='/' and in_reply_to IS NOT NULL)
297		rows, err = l.db.QueryContext(ctx, `SELECT id, header, mpath FROM message WHERE list=? AND mpath=? AND in_reply_to IS NOT NULL`,
298			l.address, msg.mpath)
299	}
300	if err != nil {
301		return err
302	}
303	toCheckCloser := make(map[int64]struct {
304		header []byte
305		mpath  string
306	})
307	for rows.Next() {
308		var (
309			id       int64
310			rheader  []byte
311			oldmpath string
312		)
313		if err := rows.Scan(&id, &rheader, &oldmpath); err != nil {
314			return err
315		}
316		if id == msg.id {
317			continue
318		}
319		toCheckCloser[id] = struct {
320			header []byte
321			mpath  string
322		}{rheader, oldmpath}
323	}
324
325	for id, value := range toCheckCloser {
326		rheader := value.header
327		oldmpath := value.mpath
328
329		hmap := make(map[string][]string)
330		if err := json.Unmarshal(rheader, &hmap); err != nil {
331			continue
332		}
333
334		closerMsg, err := l.closestMessage(ctx, hmap)
335		if err != nil || closerMsg == nil {
336			continue
337		} else if closerMsg.(*Message).id != msg.id {
338			continue
339		}
340		closerMpath := fmt.Sprintf("%s%d/", msg.mpath, msg.id)
341
342		// TODO: should have a lock before get closestMessage ?
343		tx, err := l.db.BeginTx(ctx, nil)
344		if err != nil {
345			return err
346		}
347
348		if _, err = tx.ExecContext(ctx, `update message set mpath=? WHERE id=?`,
349			closerMpath, id,
350		); err != nil {
351			tx.Rollback()
352			return err
353		}
354		/*
355		   oldmpath -> closerMpath (closerMsg.mpath/closeMsg.id)
356		*/
357
358		// update submessages
359		closerMpath = fmt.Sprintf("%s%d/", closerMpath, id)
360		oldmpath = fmt.Sprintf("%s%d/", oldmpath, id)
361
362		if _, err := tx.ExecContext(ctx, `UPDATE message SET mpath = REPLACE(mpath, ?, ?) WHERE list=? AND mpath LIKE ? || '%'`,
363			oldmpath, closerMpath,
364			l.address, oldmpath); err != nil {
365			tx.Rollback()
366			return err
367		}
368		if err = tx.Commit(); err != nil {
369			tx.Rollback()
370			return err
371		}
372	}
373
374	return nil
375}
376
377func (l *List) messageById(ctx context.Context, id int64) (storage.Message, error) {
378	var rheader []byte
379	const _sql = `SELECT id, message_id, create_at, subject, header, body, text, mpath FROM message WHERE id = ? AND list = ?`
380	msg := new(Message)
381	msg.db = l.db
382	if err := l.db.QueryRowContext(ctx, _sql, id, l.address).Scan(
383		&msg.id, &msg.messageID,
384		&msg.createAt, &msg.subject,
385		&rheader, &msg.body, &msg.text,
386		&msg.mpath); err != nil {
387		return nil, err
388	}
389	hmap := make(map[string][]string)
390	if err := json.Unmarshal(rheader, &hmap); err != nil {
391		return nil, err
392	}
393	msg.header = mail.HeaderFromMap(hmap)
394
395	return msg, nil
396}
397func (l *List) Message(ctx context.Context, msgID string) (storage.Message, error) {
398	var rheader []byte
399	const _sql = `SELECT id, message_id, create_at, subject, header, body, text, mpath FROM message WHERE list=? AND message_id=?`
400	msg := new(Message)
401	msg.db = l.db
402	if err := l.db.QueryRowContext(ctx, _sql,
403		l.address, msgID).Scan(
404		&msg.id, &msg.messageID,
405		&msg.createAt, &msg.subject,
406		&rheader, &msg.body, &msg.text,
407		&msg.mpath); err != nil {
408		return nil, err
409	}
410	hmap := make(map[string][]string)
411	if err := json.Unmarshal(rheader, &hmap); err != nil {
412		return nil, err
413	}
414	msg.header = mail.HeaderFromMap(hmap)
415
416	return msg, nil
417}
418
419func (l *List) Messages(ctx context.Context, isThreadHead bool, search string, offset, limit uint) ([]storage.Message, int64, error) {
420
421	whereQ := "list=?"
422	var args = []any{l.address}
423	if isThreadHead {
424		whereQ += " AND mpath='/'"
425	}
426	if search != "" {
427		// sqlite3 double-quoted search string
428		search = strings.ReplaceAll(search, "\"", "\"\"")
429		search = "\"" + search + "\""
430
431		whereQ += " AND id in (select rowid from message_fts(?))"
432		args = append(args, search)
433	}
434	whereQ += " ORDER BY create_at DESC, id DESC"
435	var count int64
436	err := l.db.QueryRowContext(ctx, fmt.Sprintf("SELECT count(1) FROM message WHERE %s", whereQ), args...).Scan(&count)
437	if err != nil {
438		return nil, 0, err
439	}
440
441	args = append(args, limit, offset)
442	rows, err := l.db.QueryContext(ctx, fmt.Sprintf(
443		"SELECT id, message_id, create_at, subject, header, body, text, mpath FROM message WHERE %s LIMIT ? OFFSET ?",
444		whereQ,
445	), args...)
446	if err != nil {
447		return nil, 0, err
448	}
449	defer rows.Close()
450
451	var result = []storage.Message{}
452	for rows.Next() {
453		var msg = new(Message)
454		var rheader []byte
455		if err := rows.Scan(&msg.id, &msg.messageID,
456			&msg.createAt, &msg.subject,
457			&rheader, &msg.body, &msg.text,
458			&msg.mpath); err != nil {
459			return nil, 0, err
460		}
461
462		hmap := make(map[string][]string)
463		if err := json.Unmarshal(rheader, &hmap); err != nil {
464			return nil, 0, err
465		}
466		msg.header = mail.HeaderFromMap(hmap)
467		msg.db = l.db
468		result = append(result, msg)
469	}
470	return result, count, nil
471}
472
473func (l *List) NewRequest(ctx context.Context, from string, rtype storage.RequestType, expireAt time.Time) (string, error) {
474	maddr, err := mail.ParseAddress(from)
475	if err != nil {
476		return "", err
477	}
478
479	token := uuid.NewString()
480	const _sql = `INSERT INTO request(token, list, owner_address, request_type, expire_at) VALUES(?, ?, ?, ?, ?)`
481	_, err = l.db.ExecContext(ctx, _sql, token, l.address, maddr.Address, rtype, expireAt)
482	if err != nil {
483		return "", err
484	}
485	return token, nil
486}
487
488func (l *List) CompleteReqest(ctx context.Context, from, token string) (storage.RequestType, error) {
489	var (
490		expireAt time.Time
491		rtype    storage.RequestType
492		now      = time.Now()
493	)
494	maddr, err := mail.ParseAddress(from)
495	if err != nil {
496		return storage.REQUEST_INVALID, fmt.Errorf("invalid from address: %v", err)
497	}
498
499	if err := l.db.QueryRowContext(ctx,
500		`SELECT request_type, expire_at FROM request where list=? AND owner_address = ? AND token = ? AND confirmed_at IS NULL`,
501		l.address, maddr.Address, token).
502		Scan(&rtype, &expireAt); err != nil {
503		return storage.REQUEST_INVALID, err
504	}
505
506	if expireAt.Before(now) {
507		// TODO: should return denied message
508		return storage.REQUEST_INVALID, fmt.Errorf("token expired")
509	}
510
511	l.db.ExecContext(ctx, `UPDATE request SET confirmed_at=? where token=?`, now, token)
512
513	return rtype, nil
514}
515
516func (l *List) IsNewThread(ctx context.Context, header gomail.Header) (bool, error) {
517	ref, err := l.closestMessage(ctx, header)
518	if err != nil {
519		return false, err
520	}
521	return ref == nil, nil
522}
523
524func (l *List) DelMessagesRecursive(ctx context.Context, delmsg storage.Message) (msgs []storage.Message, err error) {
525	var rheader []byte
526
527	msg := new(Message)
528	msg.db = l.db
529	tx, err := l.db.BeginTx(ctx, nil)
530	if err != nil {
531		return nil, err
532	}
533
534	if err := tx.QueryRowContext(ctx, `DELETE FROM message WHERE list=? AND id=? RETURNING id, message_id, create_at, subject, header, body, text, mpath`,
535		l.address,
536		delmsg.(*Message).id).Scan(
537		&msg.id, &msg.messageID,
538		&msg.createAt, &msg.subject,
539		&rheader, &msg.body, &msg.text,
540		&msg.mpath); err != nil {
541		tx.Rollback()
542		return nil, err
543	}
544	hmap := make(map[string][]string)
545	if err := json.Unmarshal(rheader, &hmap); err != nil {
546		return nil, err
547	}
548	msg.header = mail.HeaderFromMap(hmap)
549	msgs = append(msgs, msg)
550
551	rows, err := tx.QueryContext(ctx, `DELETE FROM message WHERE list=? AND mpath LIKE ? || '%' RETURNING id, message_id, create_at, subject, header, body, text, mpath`,
552		l.address,
553		fmt.Sprintf("%s%d", msg.mpath, msg.id))
554
555	if err != nil {
556		tx.Rollback()
557		return nil, err
558	}
559
560	for rows.Next() {
561		var rheader []byte
562
563		submsg := new(Message)
564		submsg.db = l.db
565		if err := rows.Scan(
566			&submsg.id, &submsg.messageID,
567			&submsg.createAt, &submsg.subject,
568			&rheader, &submsg.body, &submsg.text,
569			&submsg.mpath,
570		); err != nil {
571			return nil, err
572		}
573		hmap := make(map[string][]string)
574		if err := json.Unmarshal(rheader, &hmap); err != nil {
575			return nil, err
576		}
577		submsg.header = mail.HeaderFromMap(hmap)
578		msgs = append(msgs, submsg)
579	}
580
581	if err := tx.Commit(); err != nil {
582		return nil, err
583	}
584
585	return
586}
587
588func (l *List) msgsIn(ctx context.Context, msgIDs ...string) (msgMap map[string]storage.Message, err error) {
589	if len(msgIDs) == 0 {
590		return nil, nil
591	}
592
593	msgMap = make(map[string]storage.Message)
594	q := ql.Select("id", "message_id", "create_at", "subject",
595		"header", "body", "text", "mpath").From("message")
596	idClause := ql.Or{}
597	for _, id := range msgIDs {
598		idClause = append(idClause, ql.Eq{"message_id": id})
599	}
600	rows, err := q.Where(ql.Eq{"list": l.address}).Where(idClause).RunWith(l.db).QueryContext(ctx)
601	if err != nil {
602		return nil, err
603	}
604	for rows.Next() {
605		var msg = new(Message)
606		var rheader []byte
607		rows.Scan(&msg.id, &msg.messageID,
608			&msg.createAt, &msg.subject,
609			&rheader, &msg.body, &msg.text,
610			&msg.mpath)
611		hmap := make(map[string][]string)
612		if err := json.Unmarshal(rheader, &hmap); err != nil {
613			return nil, err
614		}
615		msg.header = mail.HeaderFromMap(hmap)
616		msgMap[msg.messageID] = msg
617	}
618	return
619}
620
621func (l *List) msgHasSubject(ctx context.Context, subject string) (storage.Message, error) {
622	subject = strings.TrimSpace(subject)
623	var rheader []byte
624	const _sql = `SELECT id, message_id, create_at, subject, header, body, text, mpath FROM message WHERE subject = ? AND list = ?`
625	msg := new(Message)
626	msg.db = l.db
627
628	if err := l.db.QueryRowContext(ctx, _sql, subject, l.address).Scan(
629		&msg.id, &msg.messageID,
630		&msg.createAt, &msg.subject,
631		&rheader, &msg.body, &msg.text,
632		&msg.mpath); err != nil {
633		return nil, err
634	}
635	hmap := make(map[string][]string)
636	if err := json.Unmarshal(rheader, &hmap); err != nil {
637		return nil, err
638	}
639	msg.header = mail.HeaderFromMap(hmap)
640
641	return msg, nil
642
643}
644
645func (l *List) closestMessage(ctx context.Context, header gomail.Header) (storage.Message, error) {
646	h := mail.HeaderFromMap(header)
647	replyto, err := h.MsgIDList("In-Reply-To")
648	if err != nil {
649		return nil, err
650	}
651	if len(replyto) != 0 {
652		msg, err := l.Message(ctx, replyto[0])
653		if err != nil && err != sql.ErrNoRows {
654			return nil, err
655		} else if err == nil && msg != nil {
656			return msg, nil
657		}
658	}
659	references, err := h.MsgIDList("References")
660	if err != nil {
661		return nil, err
662	}
663	msgs, err := l.msgsIn(ctx, references...)
664	if err != nil {
665		return nil, err
666	}
667	slices.Reverse(references)
668	for _, id := range references {
669		if msg, ok := msgs[id]; ok {
670			return msg, nil
671		}
672	}
673
674	subject, err := h.Subject()
675	if err != nil {
676		return nil, err
677	}
678
679	if strings.HasPrefix(subject, "Re:") ||
680		strings.HasPrefix(subject, "RE:") ||
681		strings.HasPrefix(subject, "re:") {
682		msg, err := l.msgHasSubject(ctx, subject[3:])
683		if err != nil && err != sql.ErrNoRows {
684			return nil, err
685		} else if err == nil && msg != nil {
686			return msg, nil
687		}
688	}
689
690	return nil, nil
691}
692
693type AddressInfo struct {
694	address, name string
695	join_at       time.Time
696	perm          uint
697
698	db *sql.DB
699}
700
701func (a *AddressInfo) String() string {
702	return (&mail.Address{
703		Name:    a.name,
704		Address: a.address,
705	}).String()
706}
707
708func (a *AddressInfo) Name() string {
709	return a.name
710}
711
712func (a *AddressInfo) Address() string {
713	return a.address
714}
715
716func (a *AddressInfo) Perm() uint8 {
717	return uint8(a.perm)
718}
719
720func (a *AddressInfo) JoinedLists(ctx context.Context) ([]storage.List, error) {
721	const _sql = `SELECT list.address, list.create_at, list.description, list.default_perm
722                      FROM list INNER JOIN member ON list.address=member.list
723                      WHERE member.address=?`
724
725	rows, err := a.db.QueryContext(ctx, _sql, a.address)
726	if err != nil {
727		return nil, err
728	}
729	defer rows.Close()
730	var lists []storage.List
731	for rows.Next() {
732		list := new(List)
733		list.db = a.db
734		if err := rows.Scan(
735			&list.address,
736			&list.create_at,
737			&list.description,
738			&list.default_perm); err != nil {
739			return nil, err
740		}
741		lists = append(lists, list)
742	}
743	return lists, nil
744}