1package pubsub23import (4 "context"5 "database/sql"6 "time"78 "github.com/foxcpp/maddy/framework/log"9 "github.com/lib/pq"10)1112type Msg struct {13 Key string14 Payload string15}1617type PqPubSub struct {18 Notify chan Msg1920 L *pq.Listener21 sender *sql.DB2223 Log log.Logger24}2526func NewPQ(dsn string) (*PqPubSub, error) {27 l := &PqPubSub{28 Log: log.Logger{Name: "pgpubsub"},29 Notify: make(chan Msg),30 }31 l.L = pq.NewListener(dsn, 10*time.Second, time.Minute, l.eventHandler)32 var err error33 l.sender, err = sql.Open("postgres", dsn)34 if err != nil {35 return nil, err36 }3738 go func() {39 defer close(l.Notify)40 for n := range l.L.Notify {41 if n == nil {42 continue43 }4445 l.Notify <- Msg{Key: n.Channel, Payload: n.Extra}46 }47 }()4849 return l, nil50}5152func (l *PqPubSub) Close() error {53 l.sender.Close()54 l.L.Close()55 return nil56}5758func (l *PqPubSub) eventHandler(ev pq.ListenerEventType, err error) {59 switch ev {60 case pq.ListenerEventConnected:61 l.Log.DebugMsg("connected")62 case pq.ListenerEventReconnected:63 l.Log.Msg("connection reestablished")64 case pq.ListenerEventConnectionAttemptFailed:65 l.Log.Error("connection attempt failed", err)66 case pq.ListenerEventDisconnected:67 l.Log.Msg("connection closed", "err", err)68 }69}7071func (l *PqPubSub) Subscribe(_ context.Context, key string) error {72 return l.L.Listen(key)73}7475func (l *PqPubSub) Unsubscribe(_ context.Context, key string) error {76 return l.L.Unlisten(key)77}7879func (l *PqPubSub) Publish(key, payload string) error {80 _, err := l.sender.Exec(`SELECT pg_notify($1, $2)`, key, payload)81 return err82}8384func (l *PqPubSub) Listener() chan Msg {85 return l.Notify86}