1package updatepipe23import (4 "context"5 "fmt"6 "os"7 "strconv"89 mess "github.com/foxcpp/go-imap-mess"10 "github.com/foxcpp/maddy/framework/log"11 "github.com/foxcpp/maddy/internal/updatepipe/pubsub"12)1314type PubSubPipe struct {15 PubSub pubsub.PubSub16 Log log.Logger17}1819func (p *PubSubPipe) Listen(upds chan<- mess.Update) error {20 go func() {21 for m := range p.PubSub.Listener() {22 id, upd, err := parseUpdate(m.Payload)23 if err != nil {24 p.Log.Error("failed to parse update", err)25 continue26 }27 if id == p.myID() {28 continue29 }30 upds <- *upd31 }32 }()33 return nil34}3536func (p *PubSubPipe) InitPush() error {37 return nil38}3940func (p *PubSubPipe) myID() string {41 return fmt.Sprintf("%d-%p", os.Getpid(), p)42}4344func (p *PubSubPipe) channel(key interface{}) (string, error) {45 var psKey string46 switch k := key.(type) {47 case string:48 psKey = k49 case uint64:50 psKey = "__uint64_" + strconv.FormatUint(k, 10)51 default:52 return "", fmt.Errorf("updatepipe: key type must be either string or uint64")53 }54 return psKey, nil55}5657func (p *PubSubPipe) Subscribe(key interface{}) {58 psKey, err := p.channel(key)59 if err != nil {60 p.Log.Error("invalid key passed to Subscribe", err)61 return62 }6364 if err := p.PubSub.Subscribe(context.TODO(), psKey); err != nil {65 p.Log.Error("pubsub subscribe failed", err)66 } else {67 p.Log.DebugMsg("subscribed to pubsub", "channel", psKey)68 }69}7071func (p *PubSubPipe) Unsubscribe(key interface{}) {72 psKey, err := p.channel(key)73 if err != nil {74 p.Log.Error("invalid key passed to Unsubscribe", err)75 return76 }7778 if err := p.PubSub.Unsubscribe(context.TODO(), psKey); err != nil {79 p.Log.Error("pubsub unsubscribe failed", err)80 } else {81 p.Log.DebugMsg("unsubscribed from pubsub", "channel", psKey)82 }83}8485func (p *PubSubPipe) Push(upd mess.Update) error {86 psKey, err := p.channel(upd.Key)87 if err != nil {88 return err89 }9091 updBlob, err := formatUpdate(p.myID(), upd)92 if err != nil {93 return err94 }9596 return p.PubSub.Publish(psKey, updBlob)97}9899func (p *PubSubPipe) Close() error {100 return p.PubSub.Close()101}