From c7b34153fb0c17865c7b060e6d02be6a4da14ed3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sun, 24 Nov 2019 17:35:50 +0100 Subject: [PATCH] more mpooland post sched fixes --- Makefile | 2 +- chain/actors/actor_miner_test.go | 22 ++++++++++++++ chain/events/events_called.go | 11 ++++--- chain/events/events_height.go | 2 +- chain/events/utils.go | 17 ++++++----- chain/messagepool.go | 23 ++++++++++---- chain/store/store.go | 3 ++ miner/miner.go | 2 +- storage/post.go | 51 ++++++++++++++------------------ 9 files changed, 85 insertions(+), 48 deletions(-) create mode 100644 chain/actors/actor_miner_test.go diff --git a/Makefile b/Makefile index 6411786e5..45d918b70 100644 --- a/Makefile +++ b/Makefile @@ -73,7 +73,7 @@ CLEAN+=lotus lotus-storage-miner: $(BUILD_DEPS) rm -f lotus-storage-miner - go build -o lotus-storage-miner ./cmd/lotus-storage-miner + go build $(GOFLAGS) -o lotus-storage-miner ./cmd/lotus-storage-miner go run github.com/GeertJohan/go.rice/rice append --exec lotus-storage-miner -i ./build .PHONY: lotus-storage-miner diff --git a/chain/actors/actor_miner_test.go b/chain/actors/actor_miner_test.go new file mode 100644 index 000000000..309db1acb --- /dev/null +++ b/chain/actors/actor_miner_test.go @@ -0,0 +1,22 @@ +package actors + +import ( + "fmt" + "github.com/filecoin-project/lotus/build" + "gotest.tools/assert" + "testing" +) + +func TestProvingPeriodEnd(t *testing.T) { + assertPPE := func(setPeriodEnd uint64, height uint64, expectEnd uint64) { + end, _ := ProvingPeriodEnd(setPeriodEnd, height) + assert.Equal(t, expectEnd, end) + assert.Equal(t, expectEnd, end) + + fmt.Println(end) + fmt.Println(end - build.PoStChallangeTime) + } + + // assumes proving dur of 40 epochs + assertPPE(185, 147, 185) +} diff --git a/chain/events/events_called.go b/chain/events/events_called.go index e38b01035..00b86bfee 100644 --- a/chain/events/events_called.go +++ b/chain/events/events_called.go @@ -6,7 +6,9 @@ import ( "sync" "github.com/ipfs/go-cid" + "golang.org/x/xerrors" + "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" ) @@ -302,9 +304,10 @@ func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHand e.lk.Lock() defer e.lk.Unlock() - done, more, err := check(e.tsc.best()) + ts := e.tsc.best() + done, more, err := check(ts) if err != nil { - return err + return xerrors.Errorf("called check error (h: %d): %w", ts.Height(), err) } if done { timeout = NoTimeout @@ -335,6 +338,6 @@ func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHand return nil } -func (e *calledEvents) CalledMsg(ctx context.Context, hnd CalledHandler, rev RevertHandler, confidence int, timeout uint64, msg *types.Message) error { - return e.Called(e.CheckMsg(ctx, msg, hnd), hnd, rev, confidence, timeout, e.MatchMsg(msg)) +func (e *calledEvents) CalledMsg(ctx context.Context, hnd CalledHandler, rev RevertHandler, confidence int, timeout uint64, msg store.ChainMsg) error { + return e.Called(e.CheckMsg(ctx, msg, hnd), hnd, rev, confidence, timeout, e.MatchMsg(msg.VMMessage())) } diff --git a/chain/events/events_height.go b/chain/events/events_height.go index a05a3e1be..e7a81d735 100644 --- a/chain/events/events_height.go +++ b/chain/events/events_height.go @@ -96,7 +96,7 @@ func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error { span.End() if err != nil { - log.Errorf("chain trigger (@H %d, called @ %d) failed: %s", triggerH, ts.Height(), err) + log.Errorf("chain trigger (@H %d, called @ %d) failed: %+v", triggerH, ts.Height(), err) } } return nil diff --git a/chain/events/utils.go b/chain/events/utils.go index 630158b13..eebd8fd1d 100644 --- a/chain/events/utils.go +++ b/chain/events/utils.go @@ -5,28 +5,31 @@ import ( "golang.org/x/xerrors" + "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" ) -func (e *calledEvents) CheckMsg(ctx context.Context, msg *types.Message, hnd CalledHandler) CheckFunc { +func (e *calledEvents) CheckMsg(ctx context.Context, smsg store.ChainMsg, hnd CalledHandler) CheckFunc { + msg := smsg.VMMessage() + return func(ts *types.TipSet) (done bool, more bool, err error) { fa, err := e.cs.StateGetActor(ctx, msg.From, ts) if err != nil { return false, true, err } - // TODO: probably want to look at the chain to make sure it's - // the right message, but this is probably good enough for now - done = fa.Nonce >= msg.Nonce + if msg.Nonce > fa.Nonce { + return false, true, nil + } - rec, err := e.cs.StateGetReceipt(ctx, msg.Cid(), ts) + rec, err := e.cs.StateGetReceipt(ctx, smsg.VMMessage().Cid(), ts) if err != nil { - return false, true, err + return false, true, xerrors.Errorf("getting receipt in CheckMsg: %w", err) } more, err = hnd(msg, rec, ts, ts.Height()) - return done, more, err + return true, more, err } } diff --git a/chain/messagepool.go b/chain/messagepool.go index a30db5b32..90c237422 100644 --- a/chain/messagepool.go +++ b/chain/messagepool.go @@ -257,6 +257,11 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error { return err } + if _, err := mp.sm.ChainStore().PutMessage(&m.Message); err != nil { + log.Warnf("mpooladd cs.PutMessage failed: %s", err) + return err + } + mset, ok := mp.pending[m.Message.From] if !ok { mset = newMsgSet() @@ -290,7 +295,7 @@ func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) { mset, ok := mp.pending[addr] if ok { if stateNonce > mset.nextNonce { - log.Errorf("state nonce was larger than mset.nextNonce") + log.Errorf("state nonce was larger than mset.nextNonce (%d > %d)", stateNonce, mset.nextNonce) return stateNonce, nil } @@ -341,7 +346,9 @@ func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*typ if err := mp.addLocked(msg); err != nil { return nil, err } - mp.addLocal(msg, msgb) + if err := mp.addLocal(msg, msgb); err != nil { + log.Errorf("addLocal failed: %+v", err) + } return msg, mp.ps.Publish(msgTopic, msgb) } @@ -375,6 +382,10 @@ func (mp *MessagePool) Remove(from address.Address, nonce uint64) { max = nonce } } + if nonce > max { + max = nonce // we could have not seen the removed message before + } + mset.nextNonce = max + 1 } } @@ -418,7 +429,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) } for _, msg := range smsgs { if err := mp.Add(msg); err != nil { - return err + log.Error(err) // TODO: probably lots of spam in multi-block tsets } } @@ -426,7 +437,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) smsg := mp.RecoverSig(msg) if smsg != nil { if err := mp.Add(smsg); err != nil { - return err + log.Error(err) // TODO: probably lots of spam in multi-block tsets } } else { log.Warnf("could not recover signature for bls message %s during a reorg revert", msg.Cid()) @@ -461,7 +472,7 @@ func (mp *MessagePool) RecoverSig(msg *types.Message) *types.SignedMessage { } sig, ok := val.(types.Signature) if !ok { - log.Warnf("value in signature cache was not a signature (got %T)", val) + log.Errorf("value in signature cache was not a signature (got %T)", val) return nil } @@ -516,7 +527,7 @@ func (mp *MessagePool) loadLocal() error { continue // todo: drop the message from local cache (if above certain confidence threshold) } - return xerrors.Errorf("adding local messgae: %w", err) + return xerrors.Errorf("adding local message: %w", err) } } diff --git a/chain/store/store.go b/chain/store/store.go index 6a7349033..9f78f0baa 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -536,6 +536,9 @@ func (cs *ChainStore) GetCMessage(c cid.Cid) (ChainMsg, error) { if err == nil { return m, nil } + if err != bstore.ErrNotFound { + log.Warn("GetCMessage: unexpected error getting unsigned message: %s", err) + } return cs.GetSignedMessage(c) } diff --git a/miner/miner.go b/miner/miner.go index c26bb9909..65a61f377 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -377,7 +377,7 @@ func selectMessages(ctx context.Context, al actorLookup, base *MiningBase, msgs } if msg.Message.Nonce < inclNonces[from] { - log.Warnf("message in mempool has already used nonce (%d < %d) %s", msg.Message.Nonce, inclNonces[from], msg.Cid()) + log.Warnf("message in mempool has already used nonce (%d < %d), from %s, to %s, %s", msg.Message.Nonce, inclNonces[from], msg.Message.From, msg.Message.To, msg.Cid()) continue } diff --git a/storage/post.go b/storage/post.go index d44fc1b63..44c330927 100644 --- a/storage/post.go +++ b/storage/post.go @@ -4,7 +4,6 @@ import ( "context" "time" - "github.com/ipfs/go-cid" "go.opencensus.io/trace" "golang.org/x/xerrors" @@ -24,13 +23,13 @@ func (m *Miner) beginPosting(ctx context.Context) { return } - ppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, ts) + sppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, ts) if err != nil { - log.Errorf("failed to get proving period end for miner: %s", err) + log.Errorf("failed to get proving period end for miner (ts h: %d): %s", ts.Height(), err) return } - if ppe == 0 { + if sppe == 0 { log.Warn("Not proving yet") return } @@ -44,12 +43,16 @@ func (m *Miner) beginPosting(ctx context.Context) { // height needs to be +1, because otherwise we'd be trying to schedule PoSt // at current block height - ppe, _ = actors.ProvingPeriodEnd(ppe, ts.Height()+1) + ppe, _ := actors.ProvingPeriodEnd(sppe, ts.Height()+1) m.schedPost = ppe m.postLk.Unlock() - log.Infof("Scheduling post at height %d", ppe-build.PoStChallangeTime) + if build.PoStChallangeTime > ppe { + ppe = build.PoStChallangeTime + } + + log.Infof("Scheduling post at height %d (begin ts: %d, statePPE: %d)", ppe-build.PoStChallangeTime, ts.Height(), sppe) err = m.events.ChainAt(m.computePost(m.schedPost), func(ctx context.Context, ts *types.TipSet) error { // Revert // TODO: Cancel post log.Errorf("TODO: Cancel PoSt, re-run") @@ -115,8 +118,7 @@ type post struct { proof []byte // commit - msg *types.Message - smsg cid.Cid + smsg *types.SignedMessage } func (p *post) doPost(ctx context.Context) error { @@ -124,19 +126,19 @@ func (p *post) doPost(ctx context.Context) error { defer span.End() if err := p.preparePost(ctx); err != nil { - return err + return xerrors.Errorf("prepare: %w", err) } if err := p.runPost(ctx); err != nil { - return err + return xerrors.Errorf("run: %w", err) } if err := p.commitPost(ctx); err != nil { - return err + return xerrors.Errorf("commit: %w", err) } if err := p.waitCommit(ctx); err != nil { - return err + return xerrors.Errorf("wait: %w", err) } return nil @@ -149,7 +151,7 @@ func (p *post) preparePost(ctx context.Context) error { sset, err := p.m.api.StateMinerProvingSet(ctx, p.m.maddr, p.ts) if err != nil { - return xerrors.Errorf("failed to get proving set for miner: %w", err) + return xerrors.Errorf("failed to get proving set for miner (tsH: %d): %w", p.ts.Height(), err) } if len(sset) == 0 { log.Warn("empty proving set! (ts.H: %d)", p.ts.Height()) @@ -209,7 +211,7 @@ func (p *post) runPost(ctx context.Context) error { return nil } -func (p *post) commitPost(ctx context.Context) error { +func (p *post) commitPost(ctx context.Context) (err error) { ctx, span := trace.StartSpan(ctx, "storage.commitPost") defer span.End() @@ -223,7 +225,7 @@ func (p *post) commitPost(ctx context.Context) error { return xerrors.Errorf("could not serialize submit post parameters: %w", aerr) } - p.msg = &types.Message{ + msg := &types.Message{ To: p.m.maddr, From: p.m.worker, Method: actors.MAMethods.SubmitPoSt, @@ -235,11 +237,10 @@ func (p *post) commitPost(ctx context.Context) error { log.Info("mpush") - smsg, err := p.m.api.MpoolPushMessage(ctx, p.msg) + p.smsg, err = p.m.api.MpoolPushMessage(ctx, msg) if err != nil { return xerrors.Errorf("pushing message to mpool: %w", err) } - p.smsg = smsg.Cid() return nil } @@ -261,9 +262,9 @@ func (p *post) waitCommit(ctx context.Context) error { }, func(ctx context.Context, ts *types.TipSet) error { log.Warn("post message reverted") return nil - }, 3, postMsgTimeout, p.msg) + }, 3, postMsgTimeout, p.smsg) if err != nil { - return err + return xerrors.Errorf("waiting for post to appear on chain: %w", err) } return nil @@ -284,19 +285,13 @@ func (m *Miner) computePost(ppe uint64) func(ctx context.Context, ts *types.TipS ppe: ppe, ts: ts, }).doPost(ctx) + + m.scheduleNextPost(ppe + build.ProvingPeriodDuration) + if err != nil { return xerrors.Errorf("doPost: %w", err) } - m.scheduleNextPost(ppe + build.ProvingPeriodDuration) return nil } } - -func sectorIdList(si []*api.ChainSectorInfo) []uint64 { - out := make([]uint64, len(si)) - for i, s := range si { - out[i] = s.SectorID - } - return out -}