diff --git a/node/builder.go b/node/builder.go index 671170e53..aa0a18626 100644 --- a/node/builder.go +++ b/node/builder.go @@ -255,6 +255,7 @@ func Online() Option { Override(new(*peermgr.PeerMgr), peermgr.NewPeerMgr), Override(new(dtypes.Graphsync), modules.Graphsync), + Override(new(*dtypes.MpoolLocker), new(dtypes.MpoolLocker)), Override(RunHelloKey, modules.RunHello), Override(RunBlockSyncKey, modules.RunBlockSync), diff --git a/node/impl/full/mpool.go b/node/impl/full/mpool.go index 20ca271c0..b1d9a58cb 100644 --- a/node/impl/full/mpool.go +++ b/node/impl/full/mpool.go @@ -2,7 +2,6 @@ package full import ( "context" - "sync" "github.com/ipfs/go-cid" "go.uber.org/fx" @@ -13,6 +12,7 @@ import ( "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/node/modules/dtypes" ) type MpoolAPI struct { @@ -25,10 +25,7 @@ type MpoolAPI struct { Mpool *messagepool.MessagePool - PushLocks struct { - m map[address.Address]chan struct{} - sync.Mutex - } `name:"verymuchunique" optional:"true"` + PushLocks *dtypes.MpoolLocker } func (a *MpoolAPI) MpoolGetConfig(context.Context) (*types.MpoolConfig, error) { @@ -118,27 +115,11 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message) (*t if err != nil { return nil, xerrors.Errorf("getting key address: %w", err) } - - a.PushLocks.Lock() - if a.PushLocks.m == nil { - a.PushLocks.m = make(map[address.Address]chan struct{}) + done, err := a.PushLocks.TakeLock(ctx, fromA) + if err != nil { + return nil, xerrors.Errorf("taking lock: %w", err) } - lk, ok := a.PushLocks.m[fromA] - if !ok { - lk = make(chan struct{}, 1) - a.PushLocks.m[msg.From] = lk - } - a.PushLocks.Unlock() - - select { - case lk <- struct{}{}: - case <-ctx.Done(): - return nil, ctx.Err() - } - - defer func() { - <-lk - }() + defer done() } if msg.Nonce != 0 { diff --git a/node/modules/dtypes/mpool.go b/node/modules/dtypes/mpool.go new file mode 100644 index 000000000..1c64449f8 --- /dev/null +++ b/node/modules/dtypes/mpool.go @@ -0,0 +1,35 @@ +package dtypes + +import ( + "context" + "sync" + + "github.com/filecoin-project/go-address" +) + +type MpoolLocker struct { + m map[address.Address]chan struct{} + lk sync.Mutex +} + +func (ml *MpoolLocker) TakeLock(ctx context.Context, a address.Address) (func(), error) { + ml.lk.Lock() + if ml.m == nil { + ml.m = make(map[address.Address]chan struct{}) + } + lk, ok := ml.m[a] + if !ok { + lk = make(chan struct{}, 1) + ml.m[a] = lk + } + ml.lk.Unlock() + + select { + case lk <- struct{}{}: + case <-ctx.Done(): + return nil, ctx.Err() + } + return func() { + <-lk + }, nil +}