From 96c04fc0a60be9f0072dfe27902cc02bf8d5a256 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 3 Dec 2019 20:33:29 +0100 Subject: [PATCH] mpool: Make MpoolPending more atomic --- chain/gen/gen.go | 4 +- chain/messagepool/messagepool.go | 111 ++++++++++++++++++------------- node/impl/full/mpool.go | 58 +++++++++++++++- 3 files changed, 122 insertions(+), 51 deletions(-) diff --git a/chain/gen/gen.go b/chain/gen/gen.go index a823c1250..db97dae32 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -472,9 +472,9 @@ func (epp *eppProvider) ComputeProof(ctx context.Context, _ sectorbuilder.Sorted type ProofInput struct { sectors sectorbuilder.SortedPublicSectorInfo - hvrf []byte + hvrf []byte winners []sectorbuilder.EPostCandidate - vrfout []byte + vrfout []byte } func IsRoundWinner(ctx context.Context, ts *types.TipSet, round int64, miner address.Address, epp ElectionPoStProver, a MiningCheckAPI) (bool, *ProofInput, error) { diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 38b7e1fc4..38456a143 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -62,7 +62,7 @@ type MessagePool struct { pending map[address.Address]*msgSet pendingCount int - curTsLk sync.Mutex + curTsLk sync.Mutex // DO NOT LOCK INSIDE lk curTs *types.TipSet api Provider @@ -106,7 +106,7 @@ func (ms *msgSet) add(m *types.SignedMessage) error { } type Provider interface { - SubscribeHeadChanges(func(rev, app []*types.TipSet) error) + SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet PutMessage(m store.ChainMsg) (cid.Cid, error) PubSubPublish(string, []byte) error StateGetActor(address.Address, *types.TipSet) (*types.Actor, error) @@ -124,8 +124,9 @@ func NewProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) Provider { return &mpoolProvider{sm, ps} } -func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) { +func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet { mpp.sm.ChainStore().SubscribeHeadChanges(cb) + return mpp.sm.ChainStore().GetHeaviestTipSet() } func (mpp *mpoolProvider) PutMessage(m store.ChainMsg) (cid.Cid, error) { @@ -173,7 +174,7 @@ func New(api Provider, ds dtypes.MetadataDS) (*MessagePool, error) { go mp.repubLocal() - api.SubscribeHeadChanges(func(rev, app []*types.TipSet) error { + mp.curTs = api.SubscribeHeadChanges(func(rev, app []*types.TipSet) error { err := mp.HeadChange(rev, app) if err != nil { log.Errorf("mpool head notif handler error: %+v", err) @@ -257,6 +258,12 @@ func (mp *MessagePool) Push(m *types.SignedMessage) error { } func (mp *MessagePool) Add(m *types.SignedMessage) error { + mp.curTsLk.Lock() + defer mp.curTsLk.Unlock() + return mp.addTs(m, mp.curTs) +} + +func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet) error { // big messages are bad, anti DOS if m.Size() > 32*1024 { return xerrors.Errorf("mpool message too large (%dB): %w", m.Size(), ErrMessageTooBig) @@ -275,7 +282,7 @@ func (mp *MessagePool) Add(m *types.SignedMessage) error { return err } - snonce, err := mp.getStateNonce(m.Message.From) + snonce, err := mp.getStateNonce(m.Message.From, curTs) if err != nil { return xerrors.Errorf("failed to look up actor state nonce: %w", err) } @@ -333,14 +340,17 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error { } func (mp *MessagePool) GetNonce(addr address.Address) (uint64, error) { + mp.curTsLk.Lock() + defer mp.curTsLk.Lock() + mp.lk.Lock() defer mp.lk.Unlock() - return mp.getNonceLocked(addr) + return mp.getNonceLocked(addr, mp.curTs) } -func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) { - stateNonce, err := mp.getStateNonce(addr) // sanity check +func (mp *MessagePool) getNonceLocked(addr address.Address, curTs *types.TipSet) (uint64, error) { + stateNonce, err := mp.getStateNonce(addr, curTs) // sanity check if err != nil { return 0, err } @@ -359,22 +369,9 @@ func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) { return stateNonce, nil } -func (mp *MessagePool) setCurTipset(ts *types.TipSet) { - mp.curTsLk.Lock() - defer mp.curTsLk.Unlock() - mp.curTs = ts -} - -func (mp *MessagePool) getCurTipset() *types.TipSet { - mp.curTsLk.Lock() - defer mp.curTsLk.Unlock() - return mp.curTs -} - -func (mp *MessagePool) getStateNonce(addr address.Address) (uint64, error) { +func (mp *MessagePool) getStateNonce(addr address.Address, curTs *types.TipSet) (uint64, error) { // TODO: this method probably should be cached - curTs := mp.getCurTipset() act, err := mp.api.StateGetActor(addr, curTs) if err != nil { return 0, err @@ -417,13 +414,16 @@ func (mp *MessagePool) getStateBalance(addr address.Address) (types.BigInt, erro } func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*types.SignedMessage, error)) (*types.SignedMessage, error) { + mp.curTsLk.Lock() + defer mp.curTsLk.Lock() + mp.lk.Lock() defer mp.lk.Unlock() if addr.Protocol() == address.ID { log.Warnf("Called pushWithNonce with ID address (%s) this might not be handled properly yet", addr) } - nonce, err := mp.getNonceLocked(addr) + nonce, err := mp.getNonceLocked(addr, mp.curTs) if err != nil { return nil, err } @@ -485,15 +485,19 @@ func (mp *MessagePool) Remove(from address.Address, nonce uint64) { } } -func (mp *MessagePool) Pending() []*types.SignedMessage { +func (mp *MessagePool) Pending() ([]*types.SignedMessage, *types.TipSet) { + mp.curTsLk.Lock() + defer mp.curTsLk.Unlock() + mp.lk.Lock() defer mp.lk.Unlock() + out := make([]*types.SignedMessage, 0) for a := range mp.pending { out = append(out, mp.pendingFor(a)...) } - return out + return out, mp.curTs } func (mp *MessagePool) pendingFor(a address.Address) []*types.SignedMessage { @@ -516,6 +520,8 @@ func (mp *MessagePool) pendingFor(a address.Address) []*types.SignedMessage { } func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) error { + mp.curTsLk.Lock() + defer mp.curTsLk.Unlock() for _, ts := range revert { pts, err := mp.api.LoadTipSet(ts.Parents()) @@ -523,27 +529,14 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) return err } - mp.setCurTipset(pts) - for _, b := range ts.Blocks() { - bmsgs, smsgs, err := mp.api.MessagesForBlock(b) - if err != nil { - return xerrors.Errorf("failed to get messages for revert block %s(height %d): %w", b.Cid(), b.Height, err) - } - for _, msg := range smsgs { - if err := mp.Add(msg); err != nil { - log.Error(err) // TODO: probably lots of spam in multi-block tsets - } - } + msgs, err := mp.MessagesForBlocks(ts.Blocks()) + if err != nil { + return err + } - for _, msg := range bmsgs { - smsg := mp.RecoverSig(msg) - if smsg != nil { - if err := mp.Add(smsg); err != nil { - log.Error(err) // TODO: probably lots of spam in multi-block tsets - } - } else { - log.Warnf("could not recover signature for bls message %s during a reorg revert", msg.Cid()) - } + for _, msg := range msgs { + if err := mp.addTs(msg, pts); err != nil { + log.Error(err) // TODO: probably lots of spam in multi-block tsets } } } @@ -562,12 +555,38 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) mp.Remove(msg.From, msg.Nonce) } } - mp.setCurTipset(ts) + + mp.curTs = ts } return nil } +func (mp *MessagePool) MessagesForBlocks(blks []*types.BlockHeader) ([]*types.SignedMessage, error) { + out := make([]*types.SignedMessage, 0) + + for _, b := range blks { + bmsgs, smsgs, err := mp.api.MessagesForBlock(b) + if err != nil { + return nil, xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err) + } + for _, msg := range smsgs { + out = append(out, msg) + } + + for _, msg := range bmsgs { + smsg := mp.RecoverSig(msg) + if smsg != nil { + out = append(out, smsg) + } else { + log.Warnf("could not recover signature for bls message %s", msg.Cid()) + } + } + } + + return out, nil +} + func (mp *MessagePool) RecoverSig(msg *types.Message) *types.SignedMessage { val, ok := mp.blsSigCache.Get(msg.Cid()) if !ok { diff --git a/node/impl/full/mpool.go b/node/impl/full/mpool.go index 9ea34a7d1..249c62948 100644 --- a/node/impl/full/mpool.go +++ b/node/impl/full/mpool.go @@ -3,12 +3,14 @@ package full import ( "context" + "github.com/ipfs/go-cid" "go.uber.org/fx" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/chain/messagepool" + "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" ) @@ -17,13 +19,63 @@ type MpoolAPI struct { WalletAPI + Chain *store.ChainStore + Mpool *messagepool.MessagePool } func (a *MpoolAPI) MpoolPending(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error) { - // TODO: need to make sure we don't return messages that were already included in the referenced chain - // also need to accept ts == nil just fine, assume nil == chain.Head() - return a.Mpool.Pending(), nil + pending, mpts := a.Mpool.Pending() + + haveCids := map[cid.Cid]struct{}{} + for _, m := range pending { + haveCids[m.Cid()] = struct{}{} + } + + if mpts.Height() > ts.Height() { + return pending, nil + } + + for { + if mpts.Height() == ts.Height() { + if mpts.Equals(ts) { + return pending, nil + } + // different blocks in tipsets + + have, err := a.Mpool.MessagesForBlocks(ts.Blocks()) + if err != nil { + return nil, xerrors.Errorf("getting messages for base ts: %w", err) + } + + for _, m := range have { + haveCids[m.Cid()] = struct{}{} + } + } + + msgs, err := a.Mpool.MessagesForBlocks(ts.Blocks()) + if err != nil { + return nil, xerrors.Errorf(": %w", err) + } + + for _, m := range msgs { + if _, ok := haveCids[m.Cid()]; ok { + continue + } + + haveCids[m.Cid()] = struct{}{} + pending = append(pending, m) + } + + if mpts.Height() >= ts.Height() { + return pending, nil + } + + ts, err = a.Chain.LoadTipSet(ts.Parents()) + if err != nil { + return nil, xerrors.Errorf("loading parent tipset: %w", err) + } + } } func (a *MpoolAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage) error {