Merge pull request #2962 from filecoin-project/refactor/rip-paych-pubsub

Paych refactor: replace custom pubsub impl with lib
This commit is contained in:
Łukasz Magiera 2020-08-11 18:32:05 +02:00 committed by GitHub
commit 51cf4a5749
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 58 additions and 61 deletions

1
go.mod
View File

@ -44,6 +44,7 @@ require (
github.com/google/uuid v1.1.1 github.com/google/uuid v1.1.1
github.com/gorilla/mux v1.7.4 github.com/gorilla/mux v1.7.4
github.com/gorilla/websocket v1.4.2 github.com/gorilla/websocket v1.4.2
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e
github.com/hashicorp/go-multierror v1.1.0 github.com/hashicorp/go-multierror v1.1.0
github.com/hashicorp/golang-lru v0.5.4 github.com/hashicorp/golang-lru v0.5.4
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d

View File

@ -1,61 +1,56 @@
package paychmgr package paychmgr
import ( import (
"sync" "golang.org/x/xerrors"
"github.com/hannahhoward/go-pubsub"
"github.com/google/uuid"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
) )
type msgListener struct {
id string
cb func(c cid.Cid, err error)
}
type msgListeners struct { type msgListeners struct {
lk sync.Mutex ps *pubsub.PubSub
listeners []*msgListener
} }
func (ml *msgListeners) onMsg(mcid cid.Cid, cb func(error)) string { type msgCompleteEvt struct {
ml.lk.Lock() mcid cid.Cid
defer ml.lk.Unlock() err error
l := &msgListener{
id: uuid.New().String(),
cb: func(c cid.Cid, err error) {
if mcid.Equals(c) {
cb(err)
}
},
}
ml.listeners = append(ml.listeners, l)
return l.id
} }
func (ml *msgListeners) fireMsgComplete(mcid cid.Cid, err error) { type subscriberFn func(msgCompleteEvt)
ml.lk.Lock()
defer ml.lk.Unlock()
for _, l := range ml.listeners { func newMsgListeners() msgListeners {
l.cb(mcid, err) ps := pubsub.New(func(event pubsub.Event, subFn pubsub.SubscriberFn) error {
} evt, ok := event.(msgCompleteEvt)
if !ok {
return xerrors.Errorf("wrong type of event")
}
sub, ok := subFn.(subscriberFn)
if !ok {
return xerrors.Errorf("wrong type of subscriber")
}
sub(evt)
return nil
})
return msgListeners{ps: ps}
} }
func (ml *msgListeners) unsubscribe(sub string) { // onMsgComplete registers a callback for when the message with the given cid
ml.lk.Lock() // completes
defer ml.lk.Unlock() func (ml *msgListeners) onMsgComplete(mcid cid.Cid, cb func(error)) pubsub.Unsubscribe {
var fn subscriberFn = func(evt msgCompleteEvt) {
for i, l := range ml.listeners { if mcid.Equals(evt.mcid) {
if l.id == sub { cb(evt.err)
ml.removeListener(i)
return
} }
} }
return ml.ps.Subscribe(fn)
} }
func (ml *msgListeners) removeListener(i int) { // fireMsgComplete is called when a message completes
copy(ml.listeners[i:], ml.listeners[i+1:]) func (ml *msgListeners) fireMsgComplete(mcid cid.Cid, err error) {
ml.listeners[len(ml.listeners)-1] = nil e := ml.ps.Publish(msgCompleteEvt{mcid: mcid, err: err})
ml.listeners = ml.listeners[:len(ml.listeners)-1] if e != nil {
// In theory we shouldn't ever get an error here
log.Errorf("unexpected error publishing message complete: %s", e)
}
} }

View File

@ -17,12 +17,12 @@ func testCids() []cid.Cid {
} }
func TestMsgListener(t *testing.T) { func TestMsgListener(t *testing.T) {
var ml msgListeners ml := newMsgListeners()
done := false done := false
experr := xerrors.Errorf("some err") experr := xerrors.Errorf("some err")
cids := testCids() cids := testCids()
ml.onMsg(cids[0], func(err error) { ml.onMsgComplete(cids[0], func(err error) {
require.Equal(t, experr, err) require.Equal(t, experr, err)
done = true done = true
}) })
@ -35,11 +35,11 @@ func TestMsgListener(t *testing.T) {
} }
func TestMsgListenerNilErr(t *testing.T) { func TestMsgListenerNilErr(t *testing.T) {
var ml msgListeners ml := newMsgListeners()
done := false done := false
cids := testCids() cids := testCids()
ml.onMsg(cids[0], func(err error) { ml.onMsgComplete(cids[0], func(err error) {
require.Nil(t, err) require.Nil(t, err)
done = true done = true
}) })
@ -52,20 +52,20 @@ func TestMsgListenerNilErr(t *testing.T) {
} }
func TestMsgListenerUnsub(t *testing.T) { func TestMsgListenerUnsub(t *testing.T) {
var ml msgListeners ml := newMsgListeners()
done := false done := false
experr := xerrors.Errorf("some err") experr := xerrors.Errorf("some err")
cids := testCids() cids := testCids()
id1 := ml.onMsg(cids[0], func(err error) { unsub := ml.onMsgComplete(cids[0], func(err error) {
t.Fatal("should not call unsubscribed listener") t.Fatal("should not call unsubscribed listener")
}) })
ml.onMsg(cids[0], func(err error) { ml.onMsgComplete(cids[0], func(err error) {
require.Equal(t, experr, err) require.Equal(t, experr, err)
done = true done = true
}) })
ml.unsubscribe(id1) unsub()
ml.fireMsgComplete(cids[0], experr) ml.fireMsgComplete(cids[0], experr)
if !done { if !done {
@ -74,17 +74,17 @@ func TestMsgListenerUnsub(t *testing.T) {
} }
func TestMsgListenerMulti(t *testing.T) { func TestMsgListenerMulti(t *testing.T) {
var ml msgListeners ml := newMsgListeners()
count := 0 count := 0
cids := testCids() cids := testCids()
ml.onMsg(cids[0], func(err error) { ml.onMsgComplete(cids[0], func(err error) {
count++ count++
}) })
ml.onMsg(cids[0], func(err error) { ml.onMsgComplete(cids[0], func(err error) {
count++ count++
}) })
ml.onMsg(cids[1], func(err error) { ml.onMsgComplete(cids[1], func(err error) {
count++ count++
}) })

View File

@ -35,12 +35,13 @@ type channelAccessor struct {
func newChannelAccessor(pm *Manager) *channelAccessor { func newChannelAccessor(pm *Manager) *channelAccessor {
return &channelAccessor{ return &channelAccessor{
lk: &channelLock{globalLock: &pm.lk}, lk: &channelLock{globalLock: &pm.lk},
sm: pm.sm, sm: pm.sm,
sa: &stateAccessor{sm: pm.sm}, sa: &stateAccessor{sm: pm.sm},
api: pm.pchapi, api: pm.pchapi,
store: pm.store, store: pm.store,
waitCtx: pm.ctx, msgListeners: newMsgListeners(),
waitCtx: pm.ctx,
} }
} }

View File

@ -640,7 +640,7 @@ type onMsgRes struct {
func (ca *channelAccessor) msgPromise(ctx context.Context, mcid cid.Cid) chan onMsgRes { func (ca *channelAccessor) msgPromise(ctx context.Context, mcid cid.Cid) chan onMsgRes {
promise := make(chan onMsgRes) promise := make(chan onMsgRes)
triggerUnsub := make(chan struct{}) triggerUnsub := make(chan struct{})
sub := ca.msgListeners.onMsg(mcid, func(err error) { unsub := ca.msgListeners.onMsgComplete(mcid, func(err error) {
close(triggerUnsub) close(triggerUnsub)
// Use a go-routine so as not to block the event handler loop // Use a go-routine so as not to block the event handler loop
@ -671,7 +671,7 @@ func (ca *channelAccessor) msgPromise(ctx context.Context, mcid cid.Cid) chan on
case <-triggerUnsub: case <-triggerUnsub:
} }
ca.msgListeners.unsubscribe(sub) unsub()
}() }()
return promise return promise