more mpooland post sched fixes

This commit is contained in:
Łukasz Magiera 2019-11-24 17:35:50 +01:00
parent e7efb6099a
commit c7b34153fb
9 changed files with 85 additions and 48 deletions

View File

@ -73,7 +73,7 @@ CLEAN+=lotus
lotus-storage-miner: $(BUILD_DEPS) lotus-storage-miner: $(BUILD_DEPS)
rm -f lotus-storage-miner rm -f lotus-storage-miner
go build -o lotus-storage-miner ./cmd/lotus-storage-miner go build $(GOFLAGS) -o lotus-storage-miner ./cmd/lotus-storage-miner
go run github.com/GeertJohan/go.rice/rice append --exec lotus-storage-miner -i ./build go run github.com/GeertJohan/go.rice/rice append --exec lotus-storage-miner -i ./build
.PHONY: lotus-storage-miner .PHONY: lotus-storage-miner

View File

@ -0,0 +1,22 @@
package actors
import (
"fmt"
"github.com/filecoin-project/lotus/build"
"gotest.tools/assert"
"testing"
)
func TestProvingPeriodEnd(t *testing.T) {
assertPPE := func(setPeriodEnd uint64, height uint64, expectEnd uint64) {
end, _ := ProvingPeriodEnd(setPeriodEnd, height)
assert.Equal(t, expectEnd, end)
assert.Equal(t, expectEnd, end)
fmt.Println(end)
fmt.Println(end - build.PoStChallangeTime)
}
// assumes proving dur of 40 epochs
assertPPE(185, 147, 185)
}

View File

@ -6,7 +6,9 @@ import (
"sync" "sync"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
) )
@ -302,9 +304,10 @@ func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHand
e.lk.Lock() e.lk.Lock()
defer e.lk.Unlock() defer e.lk.Unlock()
done, more, err := check(e.tsc.best()) ts := e.tsc.best()
done, more, err := check(ts)
if err != nil { if err != nil {
return err return xerrors.Errorf("called check error (h: %d): %w", ts.Height(), err)
} }
if done { if done {
timeout = NoTimeout timeout = NoTimeout
@ -335,6 +338,6 @@ func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHand
return nil return nil
} }
func (e *calledEvents) CalledMsg(ctx context.Context, hnd CalledHandler, rev RevertHandler, confidence int, timeout uint64, msg *types.Message) error { func (e *calledEvents) CalledMsg(ctx context.Context, hnd CalledHandler, rev RevertHandler, confidence int, timeout uint64, msg store.ChainMsg) error {
return e.Called(e.CheckMsg(ctx, msg, hnd), hnd, rev, confidence, timeout, e.MatchMsg(msg)) return e.Called(e.CheckMsg(ctx, msg, hnd), hnd, rev, confidence, timeout, e.MatchMsg(msg.VMMessage()))
} }

View File

@ -96,7 +96,7 @@ func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
span.End() span.End()
if err != nil { if err != nil {
log.Errorf("chain trigger (@H %d, called @ %d) failed: %s", triggerH, ts.Height(), err) log.Errorf("chain trigger (@H %d, called @ %d) failed: %+v", triggerH, ts.Height(), err)
} }
} }
return nil return nil

View File

@ -5,28 +5,31 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
) )
func (e *calledEvents) CheckMsg(ctx context.Context, msg *types.Message, hnd CalledHandler) CheckFunc { func (e *calledEvents) CheckMsg(ctx context.Context, smsg store.ChainMsg, hnd CalledHandler) CheckFunc {
msg := smsg.VMMessage()
return func(ts *types.TipSet) (done bool, more bool, err error) { return func(ts *types.TipSet) (done bool, more bool, err error) {
fa, err := e.cs.StateGetActor(ctx, msg.From, ts) fa, err := e.cs.StateGetActor(ctx, msg.From, ts)
if err != nil { if err != nil {
return false, true, err return false, true, err
} }
// TODO: probably want to look at the chain to make sure it's if msg.Nonce > fa.Nonce {
// the right message, but this is probably good enough for now return false, true, nil
done = fa.Nonce >= msg.Nonce }
rec, err := e.cs.StateGetReceipt(ctx, msg.Cid(), ts) rec, err := e.cs.StateGetReceipt(ctx, smsg.VMMessage().Cid(), ts)
if err != nil { if err != nil {
return false, true, err return false, true, xerrors.Errorf("getting receipt in CheckMsg: %w", err)
} }
more, err = hnd(msg, rec, ts, ts.Height()) more, err = hnd(msg, rec, ts, ts.Height())
return done, more, err return true, more, err
} }
} }

View File

@ -257,6 +257,11 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
return err return err
} }
if _, err := mp.sm.ChainStore().PutMessage(&m.Message); err != nil {
log.Warnf("mpooladd cs.PutMessage failed: %s", err)
return err
}
mset, ok := mp.pending[m.Message.From] mset, ok := mp.pending[m.Message.From]
if !ok { if !ok {
mset = newMsgSet() mset = newMsgSet()
@ -290,7 +295,7 @@ func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) {
mset, ok := mp.pending[addr] mset, ok := mp.pending[addr]
if ok { if ok {
if stateNonce > mset.nextNonce { if stateNonce > mset.nextNonce {
log.Errorf("state nonce was larger than mset.nextNonce") log.Errorf("state nonce was larger than mset.nextNonce (%d > %d)", stateNonce, mset.nextNonce)
return stateNonce, nil return stateNonce, nil
} }
@ -341,7 +346,9 @@ func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*typ
if err := mp.addLocked(msg); err != nil { if err := mp.addLocked(msg); err != nil {
return nil, err return nil, err
} }
mp.addLocal(msg, msgb) if err := mp.addLocal(msg, msgb); err != nil {
log.Errorf("addLocal failed: %+v", err)
}
return msg, mp.ps.Publish(msgTopic, msgb) return msg, mp.ps.Publish(msgTopic, msgb)
} }
@ -375,6 +382,10 @@ func (mp *MessagePool) Remove(from address.Address, nonce uint64) {
max = nonce max = nonce
} }
} }
if nonce > max {
max = nonce // we could have not seen the removed message before
}
mset.nextNonce = max + 1 mset.nextNonce = max + 1
} }
} }
@ -418,7 +429,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
} }
for _, msg := range smsgs { for _, msg := range smsgs {
if err := mp.Add(msg); err != nil { if err := mp.Add(msg); err != nil {
return err log.Error(err) // TODO: probably lots of spam in multi-block tsets
} }
} }
@ -426,7 +437,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
smsg := mp.RecoverSig(msg) smsg := mp.RecoverSig(msg)
if smsg != nil { if smsg != nil {
if err := mp.Add(smsg); err != nil { if err := mp.Add(smsg); err != nil {
return err log.Error(err) // TODO: probably lots of spam in multi-block tsets
} }
} else { } else {
log.Warnf("could not recover signature for bls message %s during a reorg revert", msg.Cid()) log.Warnf("could not recover signature for bls message %s during a reorg revert", msg.Cid())
@ -461,7 +472,7 @@ func (mp *MessagePool) RecoverSig(msg *types.Message) *types.SignedMessage {
} }
sig, ok := val.(types.Signature) sig, ok := val.(types.Signature)
if !ok { if !ok {
log.Warnf("value in signature cache was not a signature (got %T)", val) log.Errorf("value in signature cache was not a signature (got %T)", val)
return nil return nil
} }
@ -516,7 +527,7 @@ func (mp *MessagePool) loadLocal() error {
continue // todo: drop the message from local cache (if above certain confidence threshold) continue // todo: drop the message from local cache (if above certain confidence threshold)
} }
return xerrors.Errorf("adding local messgae: %w", err) return xerrors.Errorf("adding local message: %w", err)
} }
} }

View File

@ -536,6 +536,9 @@ func (cs *ChainStore) GetCMessage(c cid.Cid) (ChainMsg, error) {
if err == nil { if err == nil {
return m, nil return m, nil
} }
if err != bstore.ErrNotFound {
log.Warn("GetCMessage: unexpected error getting unsigned message: %s", err)
}
return cs.GetSignedMessage(c) return cs.GetSignedMessage(c)
} }

View File

@ -377,7 +377,7 @@ func selectMessages(ctx context.Context, al actorLookup, base *MiningBase, msgs
} }
if msg.Message.Nonce < inclNonces[from] { if msg.Message.Nonce < inclNonces[from] {
log.Warnf("message in mempool has already used nonce (%d < %d) %s", msg.Message.Nonce, inclNonces[from], msg.Cid()) log.Warnf("message in mempool has already used nonce (%d < %d), from %s, to %s, %s", msg.Message.Nonce, inclNonces[from], msg.Message.From, msg.Message.To, msg.Cid())
continue continue
} }

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"time" "time"
"github.com/ipfs/go-cid"
"go.opencensus.io/trace" "go.opencensus.io/trace"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -24,13 +23,13 @@ func (m *Miner) beginPosting(ctx context.Context) {
return return
} }
ppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, ts) sppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, ts)
if err != nil { if err != nil {
log.Errorf("failed to get proving period end for miner: %s", err) log.Errorf("failed to get proving period end for miner (ts h: %d): %s", ts.Height(), err)
return return
} }
if ppe == 0 { if sppe == 0 {
log.Warn("Not proving yet") log.Warn("Not proving yet")
return return
} }
@ -44,12 +43,16 @@ func (m *Miner) beginPosting(ctx context.Context) {
// height needs to be +1, because otherwise we'd be trying to schedule PoSt // height needs to be +1, because otherwise we'd be trying to schedule PoSt
// at current block height // at current block height
ppe, _ = actors.ProvingPeriodEnd(ppe, ts.Height()+1) ppe, _ := actors.ProvingPeriodEnd(sppe, ts.Height()+1)
m.schedPost = ppe m.schedPost = ppe
m.postLk.Unlock() m.postLk.Unlock()
log.Infof("Scheduling post at height %d", ppe-build.PoStChallangeTime) if build.PoStChallangeTime > ppe {
ppe = build.PoStChallangeTime
}
log.Infof("Scheduling post at height %d (begin ts: %d, statePPE: %d)", ppe-build.PoStChallangeTime, ts.Height(), sppe)
err = m.events.ChainAt(m.computePost(m.schedPost), func(ctx context.Context, ts *types.TipSet) error { // Revert err = m.events.ChainAt(m.computePost(m.schedPost), func(ctx context.Context, ts *types.TipSet) error { // Revert
// TODO: Cancel post // TODO: Cancel post
log.Errorf("TODO: Cancel PoSt, re-run") log.Errorf("TODO: Cancel PoSt, re-run")
@ -115,8 +118,7 @@ type post struct {
proof []byte proof []byte
// commit // commit
msg *types.Message smsg *types.SignedMessage
smsg cid.Cid
} }
func (p *post) doPost(ctx context.Context) error { func (p *post) doPost(ctx context.Context) error {
@ -124,19 +126,19 @@ func (p *post) doPost(ctx context.Context) error {
defer span.End() defer span.End()
if err := p.preparePost(ctx); err != nil { if err := p.preparePost(ctx); err != nil {
return err return xerrors.Errorf("prepare: %w", err)
} }
if err := p.runPost(ctx); err != nil { if err := p.runPost(ctx); err != nil {
return err return xerrors.Errorf("run: %w", err)
} }
if err := p.commitPost(ctx); err != nil { if err := p.commitPost(ctx); err != nil {
return err return xerrors.Errorf("commit: %w", err)
} }
if err := p.waitCommit(ctx); err != nil { if err := p.waitCommit(ctx); err != nil {
return err return xerrors.Errorf("wait: %w", err)
} }
return nil return nil
@ -149,7 +151,7 @@ func (p *post) preparePost(ctx context.Context) error {
sset, err := p.m.api.StateMinerProvingSet(ctx, p.m.maddr, p.ts) sset, err := p.m.api.StateMinerProvingSet(ctx, p.m.maddr, p.ts)
if err != nil { if err != nil {
return xerrors.Errorf("failed to get proving set for miner: %w", err) return xerrors.Errorf("failed to get proving set for miner (tsH: %d): %w", p.ts.Height(), err)
} }
if len(sset) == 0 { if len(sset) == 0 {
log.Warn("empty proving set! (ts.H: %d)", p.ts.Height()) log.Warn("empty proving set! (ts.H: %d)", p.ts.Height())
@ -209,7 +211,7 @@ func (p *post) runPost(ctx context.Context) error {
return nil return nil
} }
func (p *post) commitPost(ctx context.Context) error { func (p *post) commitPost(ctx context.Context) (err error) {
ctx, span := trace.StartSpan(ctx, "storage.commitPost") ctx, span := trace.StartSpan(ctx, "storage.commitPost")
defer span.End() defer span.End()
@ -223,7 +225,7 @@ func (p *post) commitPost(ctx context.Context) error {
return xerrors.Errorf("could not serialize submit post parameters: %w", aerr) return xerrors.Errorf("could not serialize submit post parameters: %w", aerr)
} }
p.msg = &types.Message{ msg := &types.Message{
To: p.m.maddr, To: p.m.maddr,
From: p.m.worker, From: p.m.worker,
Method: actors.MAMethods.SubmitPoSt, Method: actors.MAMethods.SubmitPoSt,
@ -235,11 +237,10 @@ func (p *post) commitPost(ctx context.Context) error {
log.Info("mpush") log.Info("mpush")
smsg, err := p.m.api.MpoolPushMessage(ctx, p.msg) p.smsg, err = p.m.api.MpoolPushMessage(ctx, msg)
if err != nil { if err != nil {
return xerrors.Errorf("pushing message to mpool: %w", err) return xerrors.Errorf("pushing message to mpool: %w", err)
} }
p.smsg = smsg.Cid()
return nil return nil
} }
@ -261,9 +262,9 @@ func (p *post) waitCommit(ctx context.Context) error {
}, func(ctx context.Context, ts *types.TipSet) error { }, func(ctx context.Context, ts *types.TipSet) error {
log.Warn("post message reverted") log.Warn("post message reverted")
return nil return nil
}, 3, postMsgTimeout, p.msg) }, 3, postMsgTimeout, p.smsg)
if err != nil { if err != nil {
return err return xerrors.Errorf("waiting for post to appear on chain: %w", err)
} }
return nil return nil
@ -284,19 +285,13 @@ func (m *Miner) computePost(ppe uint64) func(ctx context.Context, ts *types.TipS
ppe: ppe, ppe: ppe,
ts: ts, ts: ts,
}).doPost(ctx) }).doPost(ctx)
m.scheduleNextPost(ppe + build.ProvingPeriodDuration)
if err != nil { if err != nil {
return xerrors.Errorf("doPost: %w", err) return xerrors.Errorf("doPost: %w", err)
} }
m.scheduleNextPost(ppe + build.ProvingPeriodDuration)
return nil return nil
} }
} }
func sectorIdList(si []*api.ChainSectorInfo) []uint64 {
out := make([]uint64, len(si))
for i, s := range si {
out[i] = s.SectorID
}
return out
}