fix(retrievalmarket): add mutex for subscribers
Add a mutex to protect access to the subscriber list for retrieval market
This commit is contained in:
parent
5b74a71dd3
commit
99309ec0dc
@ -30,9 +30,10 @@ type client struct {
|
||||
node retrievalmarket.RetrievalClientNode
|
||||
// The parameters should be replaced by RetrievalClientNode
|
||||
|
||||
nextDealLk sync.Mutex
|
||||
nextDealID retrievalmarket.DealID
|
||||
subscribers []retrievalmarket.ClientSubscriber
|
||||
nextDealLk sync.Mutex
|
||||
nextDealID retrievalmarket.DealID
|
||||
subscribersLk sync.RWMutex
|
||||
subscribers []retrievalmarket.ClientSubscriber
|
||||
}
|
||||
|
||||
// NewClient creates a new retrieval client
|
||||
@ -129,6 +130,8 @@ func (c *client) Retrieve(ctx context.Context, pieceCID []byte, params retrieval
|
||||
// Subsequent, repeated calls to the func with the same Subscriber are a no-op.
|
||||
func (c *client) unsubscribeAt(sub retrievalmarket.ClientSubscriber) retrievalmarket.Unsubscribe {
|
||||
return func() {
|
||||
c.subscribersLk.Lock()
|
||||
defer c.subscribersLk.Unlock()
|
||||
curLen := len(c.subscribers)
|
||||
for i, el := range c.subscribers {
|
||||
if reflect.ValueOf(sub) == reflect.ValueOf(el) {
|
||||
@ -141,13 +144,18 @@ func (c *client) unsubscribeAt(sub retrievalmarket.ClientSubscriber) retrievalma
|
||||
}
|
||||
|
||||
func (c *client) notifySubscribers(evt retrievalmarket.ClientEvent, ds retrievalmarket.ClientDealState) {
|
||||
c.subscribersLk.RLock()
|
||||
defer c.subscribersLk.RUnlock()
|
||||
for _, cb := range c.subscribers {
|
||||
cb(evt, ds)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *client) SubscribeToEvents(subscriber retrievalmarket.ClientSubscriber) retrievalmarket.Unsubscribe {
|
||||
c.subscribersLk.Lock()
|
||||
c.subscribers = append(c.subscribers, subscriber)
|
||||
c.subscribersLk.Unlock()
|
||||
|
||||
return c.unsubscribeAt(subscriber)
|
||||
}
|
||||
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"io"
|
||||
"reflect"
|
||||
"sync"
|
||||
|
||||
"github.com/ipfs/go-blockservice"
|
||||
"github.com/ipfs/go-cid"
|
||||
@ -14,7 +15,7 @@ import (
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-cbor-util"
|
||||
cborutil "github.com/filecoin-project/go-cbor-util"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
|
||||
@ -37,7 +38,8 @@ type provider struct {
|
||||
|
||||
pricePerByte retrievalmarket.BigInt
|
||||
|
||||
subscribers []retrievalmarket.ProviderSubscriber
|
||||
subscribersLk sync.RWMutex
|
||||
subscribers []retrievalmarket.ProviderSubscriber
|
||||
}
|
||||
|
||||
// NewProvider returns a new retrieval provider
|
||||
@ -74,6 +76,8 @@ func (p *provider) SetPaymentInterval(paymentInterval uint64, paymentIntervalInc
|
||||
// Subsequent, repeated calls to the func with the same Subscriber are a no-op.
|
||||
func (p *provider) unsubscribeAt(sub retrievalmarket.ProviderSubscriber) retrievalmarket.Unsubscribe {
|
||||
return func() {
|
||||
p.subscribersLk.Lock()
|
||||
defer p.subscribersLk.Unlock()
|
||||
curLen := len(p.subscribers)
|
||||
for i, el := range p.subscribers {
|
||||
if reflect.ValueOf(sub) == reflect.ValueOf(el) {
|
||||
@ -86,6 +90,8 @@ func (p *provider) unsubscribeAt(sub retrievalmarket.ProviderSubscriber) retriev
|
||||
}
|
||||
|
||||
func (p *provider) notifySubscribers(evt retrievalmarket.ProviderEvent, ds retrievalmarket.ProviderDealState) {
|
||||
p.subscribersLk.RLock()
|
||||
defer p.subscribersLk.RUnlock()
|
||||
for _, cb := range p.subscribers {
|
||||
cb(evt, ds)
|
||||
}
|
||||
@ -94,7 +100,10 @@ func (p *provider) notifySubscribers(evt retrievalmarket.ProviderEvent, ds retri
|
||||
// SubscribeToEvents listens for events that happen related to client retrievals
|
||||
// TODO: Implement updates as part of https://github.com/filecoin-project/go-retrieval-market-project/issues/7
|
||||
func (p *provider) SubscribeToEvents(subscriber retrievalmarket.ProviderSubscriber) retrievalmarket.Unsubscribe {
|
||||
p.subscribersLk.Lock()
|
||||
p.subscribers = append(p.subscribers, subscriber)
|
||||
p.subscribersLk.Unlock()
|
||||
|
||||
return p.unsubscribeAt(subscriber)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user