maddy

Fork https://github.com/foxcpp/maddy

git clone git://git.lin.moe/go/maddy.git

  1package updatepipe
  2
  3import (
  4	"context"
  5	"fmt"
  6	"os"
  7	"strconv"
  8
  9	mess "github.com/foxcpp/go-imap-mess"
 10	"github.com/foxcpp/maddy/framework/log"
 11	"github.com/foxcpp/maddy/internal/updatepipe/pubsub"
 12)
 13
 14type PubSubPipe struct {
 15	PubSub pubsub.PubSub
 16	Log    log.Logger
 17}
 18
 19func (p *PubSubPipe) Listen(upds chan<- mess.Update) error {
 20	go func() {
 21		for m := range p.PubSub.Listener() {
 22			id, upd, err := parseUpdate(m.Payload)
 23			if err != nil {
 24				p.Log.Error("failed to parse update", err)
 25				continue
 26			}
 27			if id == p.myID() {
 28				continue
 29			}
 30			upds <- *upd
 31		}
 32	}()
 33	return nil
 34}
 35
 36func (p *PubSubPipe) InitPush() error {
 37	return nil
 38}
 39
 40func (p *PubSubPipe) myID() string {
 41	return fmt.Sprintf("%d-%p", os.Getpid(), p)
 42}
 43
 44func (p *PubSubPipe) channel(key interface{}) (string, error) {
 45	var psKey string
 46	switch k := key.(type) {
 47	case string:
 48		psKey = k
 49	case uint64:
 50		psKey = "__uint64_" + strconv.FormatUint(k, 10)
 51	default:
 52		return "", fmt.Errorf("updatepipe: key type must be either string or uint64")
 53	}
 54	return psKey, nil
 55}
 56
 57func (p *PubSubPipe) Subscribe(key interface{}) {
 58	psKey, err := p.channel(key)
 59	if err != nil {
 60		p.Log.Error("invalid key passed to Subscribe", err)
 61		return
 62	}
 63
 64	if err := p.PubSub.Subscribe(context.TODO(), psKey); err != nil {
 65		p.Log.Error("pubsub subscribe failed", err)
 66	} else {
 67		p.Log.DebugMsg("subscribed to pubsub", "channel", psKey)
 68	}
 69}
 70
 71func (p *PubSubPipe) Unsubscribe(key interface{}) {
 72	psKey, err := p.channel(key)
 73	if err != nil {
 74		p.Log.Error("invalid key passed to Unsubscribe", err)
 75		return
 76	}
 77
 78	if err := p.PubSub.Unsubscribe(context.TODO(), psKey); err != nil {
 79		p.Log.Error("pubsub unsubscribe failed", err)
 80	} else {
 81		p.Log.DebugMsg("unsubscribed from pubsub", "channel", psKey)
 82	}
 83}
 84
 85func (p *PubSubPipe) Push(upd mess.Update) error {
 86	psKey, err := p.channel(upd.Key)
 87	if err != nil {
 88		return err
 89	}
 90
 91	updBlob, err := formatUpdate(p.myID(), upd)
 92	if err != nil {
 93		return err
 94	}
 95
 96	return p.PubSub.Publish(psKey, updBlob)
 97}
 98
 99func (p *PubSubPipe) Close() error {
100	return p.PubSub.Close()
101}