maddy

Fork https://github.com/foxcpp/maddy

git clone git://git.lin.moe/go/maddy.git

 1package pubsub
 2
 3import (
 4	"context"
 5	"database/sql"
 6	"time"
 7
 8	"github.com/foxcpp/maddy/framework/log"
 9	"github.com/lib/pq"
10)
11
12type Msg struct {
13	Key     string
14	Payload string
15}
16
17type PqPubSub struct {
18	Notify chan Msg
19
20	L      *pq.Listener
21	sender *sql.DB
22
23	Log log.Logger
24}
25
26func NewPQ(dsn string) (*PqPubSub, error) {
27	l := &PqPubSub{
28		Log:    log.Logger{Name: "pgpubsub"},
29		Notify: make(chan Msg),
30	}
31	l.L = pq.NewListener(dsn, 10*time.Second, time.Minute, l.eventHandler)
32	var err error
33	l.sender, err = sql.Open("postgres", dsn)
34	if err != nil {
35		return nil, err
36	}
37
38	go func() {
39		defer close(l.Notify)
40		for n := range l.L.Notify {
41			if n == nil {
42				continue
43			}
44
45			l.Notify <- Msg{Key: n.Channel, Payload: n.Extra}
46		}
47	}()
48
49	return l, nil
50}
51
52func (l *PqPubSub) Close() error {
53	l.sender.Close()
54	l.L.Close()
55	return nil
56}
57
58func (l *PqPubSub) eventHandler(ev pq.ListenerEventType, err error) {
59	switch ev {
60	case pq.ListenerEventConnected:
61		l.Log.DebugMsg("connected")
62	case pq.ListenerEventReconnected:
63		l.Log.Msg("connection reestablished")
64	case pq.ListenerEventConnectionAttemptFailed:
65		l.Log.Error("connection attempt failed", err)
66	case pq.ListenerEventDisconnected:
67		l.Log.Msg("connection closed", "err", err)
68	}
69}
70
71func (l *PqPubSub) Subscribe(_ context.Context, key string) error {
72	return l.L.Listen(key)
73}
74
75func (l *PqPubSub) Unsubscribe(_ context.Context, key string) error {
76	return l.L.Unlisten(key)
77}
78
79func (l *PqPubSub) Publish(key, payload string) error {
80	_, err := l.sender.Exec(`SELECT pg_notify($1, $2)`, key, payload)
81	return err
82}
83
84func (l *PqPubSub) Listener() chan Msg {
85	return l.Notify
86}