Fix MpoolLocker
Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai>
This commit is contained in:
parent
a467deede6
commit
a45febc065
@ -255,6 +255,7 @@ func Online() Option {
|
||||
Override(new(*peermgr.PeerMgr), peermgr.NewPeerMgr),
|
||||
|
||||
Override(new(dtypes.Graphsync), modules.Graphsync),
|
||||
Override(new(*dtypes.MpoolLocker), new(dtypes.MpoolLocker)),
|
||||
|
||||
Override(RunHelloKey, modules.RunHello),
|
||||
Override(RunBlockSyncKey, modules.RunBlockSync),
|
||||
|
@ -2,7 +2,6 @@ package full
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"go.uber.org/fx"
|
||||
@ -13,6 +12,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/messagepool"
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
)
|
||||
|
||||
type MpoolAPI struct {
|
||||
@ -25,10 +25,7 @@ type MpoolAPI struct {
|
||||
|
||||
Mpool *messagepool.MessagePool
|
||||
|
||||
PushLocks struct {
|
||||
m map[address.Address]chan struct{}
|
||||
sync.Mutex
|
||||
} `name:"verymuchunique" optional:"true"`
|
||||
PushLocks *dtypes.MpoolLocker
|
||||
}
|
||||
|
||||
func (a *MpoolAPI) MpoolGetConfig(context.Context) (*types.MpoolConfig, error) {
|
||||
@ -118,27 +115,11 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message) (*t
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("getting key address: %w", err)
|
||||
}
|
||||
|
||||
a.PushLocks.Lock()
|
||||
if a.PushLocks.m == nil {
|
||||
a.PushLocks.m = make(map[address.Address]chan struct{})
|
||||
done, err := a.PushLocks.TakeLock(ctx, fromA)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("taking lock: %w", err)
|
||||
}
|
||||
lk, ok := a.PushLocks.m[fromA]
|
||||
if !ok {
|
||||
lk = make(chan struct{}, 1)
|
||||
a.PushLocks.m[msg.From] = lk
|
||||
}
|
||||
a.PushLocks.Unlock()
|
||||
|
||||
select {
|
||||
case lk <- struct{}{}:
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
|
||||
defer func() {
|
||||
<-lk
|
||||
}()
|
||||
defer done()
|
||||
}
|
||||
|
||||
if msg.Nonce != 0 {
|
||||
|
35
node/modules/dtypes/mpool.go
Normal file
35
node/modules/dtypes/mpool.go
Normal file
@ -0,0 +1,35 @@
|
||||
package dtypes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
)
|
||||
|
||||
type MpoolLocker struct {
|
||||
m map[address.Address]chan struct{}
|
||||
lk sync.Mutex
|
||||
}
|
||||
|
||||
func (ml *MpoolLocker) TakeLock(ctx context.Context, a address.Address) (func(), error) {
|
||||
ml.lk.Lock()
|
||||
if ml.m == nil {
|
||||
ml.m = make(map[address.Address]chan struct{})
|
||||
}
|
||||
lk, ok := ml.m[a]
|
||||
if !ok {
|
||||
lk = make(chan struct{}, 1)
|
||||
ml.m[a] = lk
|
||||
}
|
||||
ml.lk.Unlock()
|
||||
|
||||
select {
|
||||
case lk <- struct{}{}:
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return func() {
|
||||
<-lk
|
||||
}, nil
|
||||
}
|
Loading…
Reference in New Issue
Block a user