diff --git a/api/api_storage.go b/api/api_storage.go index 8b2fe7c26..0ccfbd88f 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -24,6 +24,7 @@ import ( "github.com/filecoin-project/lotus/extern/sector-storage/fsutil" "github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" ) // MODIFYING THE API INTERFACE @@ -93,12 +94,12 @@ type StorageMiner interface { SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error //perm:admin // SectorPreCommitFlush immediately sends a PreCommit message with sectors batched for PreCommit. // Returns null if message wasn't sent - SectorPreCommitFlush(ctx context.Context) (*cid.Cid, error) //perm:admin + SectorPreCommitFlush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) //perm:admin // SectorPreCommitPending returns a list of pending PreCommit sectors to be sent in the next batch message SectorPreCommitPending(ctx context.Context) ([]abi.SectorID, error) //perm:admin // SectorCommitFlush immediately sends a Commit message with sectors aggregated for Commit. // Returns null if message wasn't sent - SectorCommitFlush(ctx context.Context) (*cid.Cid, error) //perm:admin + SectorCommitFlush(ctx context.Context) ([]sealiface.CommitBatchRes, error) //perm:admin // SectorCommitPending returns a list of pending Commit sectors to be sent in the next aggregate message SectorCommitPending(ctx context.Context) ([]abi.SectorID, error) //perm:admin diff --git a/api/proxy_gen.go b/api/proxy_gen.go index ff4af8043..4fa51a4af 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -27,6 +27,7 @@ import ( "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" "github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" marketevents "github.com/filecoin-project/lotus/markets/loggers" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/specs-storage/storage" @@ -659,7 +660,7 @@ type StorageMinerStruct struct { SealingSchedDiag func(p0 context.Context, p1 bool) (interface{}, error) `perm:"admin"` - SectorCommitFlush func(p0 context.Context) (*cid.Cid, error) `perm:"admin"` + SectorCommitFlush func(p0 context.Context) ([]sealiface.CommitBatchRes, error) `perm:"admin"` SectorCommitPending func(p0 context.Context) ([]abi.SectorID, error) `perm:"admin"` @@ -669,7 +670,7 @@ type StorageMinerStruct struct { SectorMarkForUpgrade func(p0 context.Context, p1 abi.SectorNumber) error `perm:"admin"` - SectorPreCommitFlush func(p0 context.Context) (*cid.Cid, error) `perm:"admin"` + SectorPreCommitFlush func(p0 context.Context) ([]sealiface.PreCommitBatchRes, error) `perm:"admin"` SectorPreCommitPending func(p0 context.Context) ([]abi.SectorID, error) `perm:"admin"` @@ -3125,12 +3126,12 @@ func (s *StorageMinerStub) SealingSchedDiag(p0 context.Context, p1 bool) (interf return nil, xerrors.New("method not supported") } -func (s *StorageMinerStruct) SectorCommitFlush(p0 context.Context) (*cid.Cid, error) { +func (s *StorageMinerStruct) SectorCommitFlush(p0 context.Context) ([]sealiface.CommitBatchRes, error) { return s.Internal.SectorCommitFlush(p0) } -func (s *StorageMinerStub) SectorCommitFlush(p0 context.Context) (*cid.Cid, error) { - return nil, xerrors.New("method not supported") +func (s *StorageMinerStub) SectorCommitFlush(p0 context.Context) ([]sealiface.CommitBatchRes, error) { + return *new([]sealiface.CommitBatchRes), xerrors.New("method not supported") } func (s *StorageMinerStruct) SectorCommitPending(p0 context.Context) ([]abi.SectorID, error) { @@ -3165,12 +3166,12 @@ func (s *StorageMinerStub) SectorMarkForUpgrade(p0 context.Context, p1 abi.Secto return xerrors.New("method not supported") } -func (s *StorageMinerStruct) SectorPreCommitFlush(p0 context.Context) (*cid.Cid, error) { +func (s *StorageMinerStruct) SectorPreCommitFlush(p0 context.Context) ([]sealiface.PreCommitBatchRes, error) { return s.Internal.SectorPreCommitFlush(p0) } -func (s *StorageMinerStub) SectorPreCommitFlush(p0 context.Context) (*cid.Cid, error) { - return nil, xerrors.New("method not supported") +func (s *StorageMinerStub) SectorPreCommitFlush(p0 context.Context) ([]sealiface.PreCommitBatchRes, error) { + return *new([]sealiface.PreCommitBatchRes), xerrors.New("method not supported") } func (s *StorageMinerStruct) SectorPreCommitPending(p0 context.Context) ([]abi.SectorID, error) { diff --git a/api/test/pledge.go b/api/test/pledge.go index 8752ad4ac..08548dc60 100644 --- a/api/test/pledge.go +++ b/api/test/pledge.go @@ -169,7 +169,7 @@ func TestPledgeBatching(t *testing.T, b APIBuilder, blocktime time.Duration, nSe pcb, err := miner.SectorPreCommitFlush(ctx) require.NoError(t, err) if pcb != nil { - fmt.Printf("PRECOMMIT BATCH: %s\n", *pcb) + fmt.Printf("PRECOMMIT BATCH: %+v\n", pcb) } } @@ -178,7 +178,7 @@ func TestPledgeBatching(t *testing.T, b APIBuilder, blocktime time.Duration, nSe cb, err := miner.SectorCommitFlush(ctx) require.NoError(t, err) if cb != nil { - fmt.Printf("COMMIT BATCH: %s\n", *cb) + fmt.Printf("COMMIT BATCH: %+v\n", cb) } } @@ -319,13 +319,13 @@ func flushSealingBatches(t *testing.T, ctx context.Context, miner TestStorageNod pcb, err := miner.SectorPreCommitFlush(ctx) require.NoError(t, err) if pcb != nil { - fmt.Printf("PRECOMMIT BATCH: %s\n", *pcb) + fmt.Printf("PRECOMMIT BATCH: %+v\n", pcb) } cb, err := miner.SectorCommitFlush(ctx) require.NoError(t, err) if cb != nil { - fmt.Printf("COMMIT BATCH: %s\n", *cb) + fmt.Printf("COMMIT BATCH: %+v\n", cb) } } diff --git a/api/test/window_post.go b/api/test/window_post.go index 6989129c4..6d317676e 100644 --- a/api/test/window_post.go +++ b/api/test/window_post.go @@ -543,7 +543,7 @@ func TestWindowPostDispute(t *testing.T, b APIBuilder, blocktime time.Duration) for { di, err = client.StateMinerProvingDeadline(ctx, evilMinerAddr, types.EmptyTSK) require.NoError(t, err) - if di.Index == evilSectorLoc.Deadline { + if di.Index == evilSectorLoc.Deadline && di.CurrentEpoch-di.PeriodStart > 1 { break } build.Clock.Sleep(blocktime) @@ -640,7 +640,7 @@ func TestWindowPostDispute(t *testing.T, b APIBuilder, blocktime time.Duration) for { di, err = client.StateMinerProvingDeadline(ctx, evilMinerAddr, types.EmptyTSK) require.NoError(t, err) - if di.Index == evilSectorLoc.Deadline { + if di.Index == evilSectorLoc.Deadline && di.CurrentEpoch-di.PeriodStart > 1 { break } build.Clock.Sleep(blocktime) diff --git a/build/openrpc/full.json.gz b/build/openrpc/full.json.gz index ee7c74e8f..055c13754 100644 Binary files a/build/openrpc/full.json.gz and b/build/openrpc/full.json.gz differ diff --git a/build/openrpc/miner.json.gz b/build/openrpc/miner.json.gz index 7dd32a88a..5d32e910d 100644 Binary files a/build/openrpc/miner.json.gz and b/build/openrpc/miner.json.gz differ diff --git a/build/openrpc/worker.json.gz b/build/openrpc/worker.json.gz index 9ca0c9217..fa6f28aff 100644 Binary files a/build/openrpc/worker.json.gz and b/build/openrpc/worker.json.gz differ diff --git a/chain/actors/policy/policy.go b/chain/actors/policy/policy.go index 4d115f783..bb35025ec 100644 --- a/chain/actors/policy/policy.go +++ b/chain/actors/policy/policy.go @@ -92,6 +92,13 @@ func AddSupportedProofTypes(types ...abi.RegisteredSealProof) { miner4.PreCommitSealProofTypesV8[t+abi.RegisteredSealProof_StackedDrg2KiBV1_1] = struct{}{} miner5.PreCommitSealProofTypesV8[t+abi.RegisteredSealProof_StackedDrg2KiBV1_1] = struct{}{} + wpp, err := t.RegisteredWindowPoStProof() + if err != nil { + // Fine to panic, this is a test-only method + panic(err) + } + + miner5.WindowPoStProofTypes[wpp] = struct{}{} } } diff --git a/chain/actors/policy/policy.go.template b/chain/actors/policy/policy.go.template index 9a8334374..6b8f80bd0 100644 --- a/chain/actors/policy/policy.go.template +++ b/chain/actors/policy/policy.go.template @@ -62,6 +62,13 @@ func AddSupportedProofTypes(types ...abi.RegisteredSealProof) { miner{{.}}.PreCommitSealProofTypesV8[t+abi.RegisteredSealProof_StackedDrg2KiBV1_1] = struct{}{} {{else}} miner{{.}}.PreCommitSealProofTypesV8[t+abi.RegisteredSealProof_StackedDrg2KiBV1_1] = struct{}{} + wpp, err := t.RegisteredWindowPoStProof() + if err != nil { + // Fine to panic, this is a test-only method + panic(err) + } + + miner{{.}}.WindowPoStProofTypes[wpp] = struct{}{} {{end}} {{end}} } diff --git a/chain/gen/gen.go b/chain/gen/gen.go index d65592eba..424ee6edc 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -592,6 +592,10 @@ func (mca mca) ChainGetRandomnessFromTickets(ctx context.Context, tsk types.TipS return nil, xerrors.Errorf("loading tipset key: %w", err) } + if randEpoch > build.UpgradeHyperdriveHeight { + return mca.sm.ChainStore().GetChainRandomnessLookingForward(ctx, pts.Cids(), personalization, randEpoch, entropy) + } + return mca.sm.ChainStore().GetChainRandomnessLookingBack(ctx, pts.Cids(), personalization, randEpoch, entropy) } @@ -601,6 +605,10 @@ func (mca mca) ChainGetRandomnessFromBeacon(ctx context.Context, tsk types.TipSe return nil, xerrors.Errorf("loading tipset key: %w", err) } + if randEpoch > build.UpgradeHyperdriveHeight { + return mca.sm.ChainStore().GetBeaconRandomnessLookingForward(ctx, pts.Cids(), personalization, randEpoch, entropy) + } + return mca.sm.ChainStore().GetBeaconRandomnessLookingBack(ctx, pts.Cids(), personalization, randEpoch, entropy) } diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 93dce1df0..0180d1abf 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -126,10 +126,14 @@ type MessagePool struct { republished map[cid.Cid]struct{} + // do NOT access this map directly, use isLocal, setLocal, and forEachLocal respectively localAddrs map[address.Address]struct{} + // do NOT access this map directly, use getPendingMset, setPendingMset, deletePendingMset, forEachPending, and clearPending respectively pending map[address.Address]*msgSet + keyCache map[address.Address]address.Address + curTsLk sync.Mutex // DO NOT LOCK INSIDE lk curTs *types.TipSet @@ -329,6 +333,20 @@ func (ms *msgSet) getRequiredFunds(nonce uint64) types.BigInt { return types.BigInt{Int: requiredFunds} } +func (ms *msgSet) toSlice() []*types.SignedMessage { + set := make([]*types.SignedMessage, 0, len(ms.msgs)) + + for _, m := range ms.msgs { + set = append(set, m) + } + + sort.Slice(set, func(i, j int) bool { + return set[i].Message.Nonce < set[j].Message.Nonce + }) + + return set +} + func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, j journal.Journal) (*MessagePool, error) { cache, _ := lru.New2Q(build.BlsSignatureCacheSize) verifcache, _ := lru.New2Q(build.VerifSigCacheSize) @@ -350,6 +368,7 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, j journ repubTrigger: make(chan struct{}, 1), localAddrs: make(map[address.Address]struct{}), pending: make(map[address.Address]*msgSet), + keyCache: make(map[address.Address]address.Address), minGasPrice: types.NewInt(0), pruneTrigger: make(chan struct{}, 1), pruneCooldown: make(chan struct{}, 1), @@ -371,9 +390,11 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, j journ // enable initial prunes mp.pruneCooldown <- struct{}{} + ctx, cancel := context.WithCancel(context.TODO()) + // load the current tipset and subscribe to head changes _before_ loading local messages mp.curTs = api.SubscribeHeadChanges(func(rev, app []*types.TipSet) error { - err := mp.HeadChange(rev, app) + err := mp.HeadChange(ctx, rev, app) if err != nil { log.Errorf("mpool head notif handler error: %+v", err) } @@ -384,7 +405,8 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, j journ mp.lk.Lock() go func() { - err := mp.loadLocal() + defer cancel() + err := mp.loadLocal(ctx) mp.lk.Unlock() mp.curTsLk.Unlock() @@ -395,12 +417,106 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, j journ log.Info("mpool ready") - mp.runLoop() + mp.runLoop(ctx) }() return mp, nil } +func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) (address.Address, error) { + // check the cache + a, f := mp.keyCache[addr] + if f { + return a, nil + } + + // resolve the address + ka, err := mp.api.StateAccountKeyAtFinality(ctx, addr, mp.curTs) + if err != nil { + return address.Undef, err + } + + // place both entries in the cache (may both be key addresses, which is fine) + mp.keyCache[addr] = ka + mp.keyCache[ka] = ka + + return ka, nil +} + +func (mp *MessagePool) getPendingMset(ctx context.Context, addr address.Address) (*msgSet, bool, error) { + ra, err := mp.resolveToKey(ctx, addr) + if err != nil { + return nil, false, err + } + + ms, f := mp.pending[ra] + + return ms, f, nil +} + +func (mp *MessagePool) setPendingMset(ctx context.Context, addr address.Address, ms *msgSet) error { + ra, err := mp.resolveToKey(ctx, addr) + if err != nil { + return err + } + + mp.pending[ra] = ms + + return nil +} + +// This method isn't strictly necessary, since it doesn't resolve any addresses, but it's safer to have +func (mp *MessagePool) forEachPending(f func(address.Address, *msgSet)) { + for la, ms := range mp.pending { + f(la, ms) + } +} + +func (mp *MessagePool) deletePendingMset(ctx context.Context, addr address.Address) error { + ra, err := mp.resolveToKey(ctx, addr) + if err != nil { + return err + } + + delete(mp.pending, ra) + + return nil +} + +// This method isn't strictly necessary, since it doesn't resolve any addresses, but it's safer to have +func (mp *MessagePool) clearPending() { + mp.pending = make(map[address.Address]*msgSet) +} + +func (mp *MessagePool) isLocal(ctx context.Context, addr address.Address) (bool, error) { + ra, err := mp.resolveToKey(ctx, addr) + if err != nil { + return false, err + } + + _, f := mp.localAddrs[ra] + + return f, nil +} + +func (mp *MessagePool) setLocal(ctx context.Context, addr address.Address) error { + ra, err := mp.resolveToKey(ctx, addr) + if err != nil { + return err + } + + mp.localAddrs[ra] = struct{}{} + + return nil +} + +// This method isn't strictly necessary, since it doesn't resolve any addresses, but it's safer to have +func (mp *MessagePool) forEachLocal(ctx context.Context, f func(context.Context, address.Address)) { + for la := range mp.localAddrs { + f(ctx, la) + } +} + func (mp *MessagePool) Close() error { close(mp.closer) return nil @@ -418,15 +534,15 @@ func (mp *MessagePool) Prune() { mp.pruneTrigger <- struct{}{} } -func (mp *MessagePool) runLoop() { +func (mp *MessagePool) runLoop(ctx context.Context) { for { select { case <-mp.repubTk.C: - if err := mp.republishPendingMessages(); err != nil { + if err := mp.republishPendingMessages(ctx); err != nil { log.Errorf("error while republishing messages: %s", err) } case <-mp.repubTrigger: - if err := mp.republishPendingMessages(); err != nil { + if err := mp.republishPendingMessages(ctx); err != nil { log.Errorf("error while republishing messages: %s", err) } @@ -442,8 +558,10 @@ func (mp *MessagePool) runLoop() { } } -func (mp *MessagePool) addLocal(m *types.SignedMessage) error { - mp.localAddrs[m.Message.From] = struct{}{} +func (mp *MessagePool) addLocal(ctx context.Context, m *types.SignedMessage) error { + if err := mp.setLocal(ctx, m.Message.From); err != nil { + return err + } msgb, err := m.Serialize() if err != nil { @@ -475,7 +593,7 @@ func (mp *MessagePool) verifyMsgBeforeAdd(m *types.SignedMessage, curTs *types.T return false, xerrors.Errorf("message will not be included in a block: %w", err) } - // this checks if the GasFeeCap is suffisciently high for inclusion in the next 20 blocks + // this checks if the GasFeeCap is sufficiently high for inclusion in the next 20 blocks // if the GasFeeCap is too low, we soft reject the message (Ignore in pubsub) and rely // on republish to push it through later, if the baseFee has fallen. // this is a defensive check that stops minimum baseFee spam attacks from overloading validation @@ -510,7 +628,7 @@ func (mp *MessagePool) verifyMsgBeforeAdd(m *types.SignedMessage, curTs *types.T return publish, nil } -func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) { +func (mp *MessagePool) Push(ctx context.Context, m *types.SignedMessage) (cid.Cid, error) { err := mp.checkMessage(m) if err != nil { return cid.Undef, err @@ -523,7 +641,7 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) { }() mp.curTsLk.Lock() - publish, err := mp.addTs(m, mp.curTs, true, false) + publish, err := mp.addTs(ctx, m, mp.curTs, true, false) if err != nil { mp.curTsLk.Unlock() return cid.Undef, err @@ -576,7 +694,7 @@ func (mp *MessagePool) checkMessage(m *types.SignedMessage) error { return nil } -func (mp *MessagePool) Add(m *types.SignedMessage) error { +func (mp *MessagePool) Add(ctx context.Context, m *types.SignedMessage) error { err := mp.checkMessage(m) if err != nil { return err @@ -591,7 +709,7 @@ func (mp *MessagePool) Add(m *types.SignedMessage) error { mp.curTsLk.Lock() defer mp.curTsLk.Unlock() - _, err = mp.addTs(m, mp.curTs, false, false) + _, err = mp.addTs(ctx, m, mp.curTs, false, false) return err } @@ -631,7 +749,7 @@ func (mp *MessagePool) VerifyMsgSig(m *types.SignedMessage) error { return nil } -func (mp *MessagePool) checkBalance(m *types.SignedMessage, curTs *types.TipSet) error { +func (mp *MessagePool) checkBalance(ctx context.Context, m *types.SignedMessage, curTs *types.TipSet) error { balance, err := mp.getStateBalance(m.Message.From, curTs) if err != nil { return xerrors.Errorf("failed to check sender balance: %s: %w", err, ErrSoftValidationFailure) @@ -645,7 +763,12 @@ func (mp *MessagePool) checkBalance(m *types.SignedMessage, curTs *types.TipSet) // add Value for soft failure check //requiredFunds = types.BigAdd(requiredFunds, m.Message.Value) - mset, ok := mp.pending[m.Message.From] + mset, ok, err := mp.getPendingMset(ctx, m.Message.From) + if err != nil { + log.Debugf("mpoolcheckbalance failed to get pending mset: %s", err) + return err + } + if ok { requiredFunds = types.BigAdd(requiredFunds, mset.getRequiredFunds(m.Message.Nonce)) } @@ -659,7 +782,7 @@ func (mp *MessagePool) checkBalance(m *types.SignedMessage, curTs *types.TipSet) return nil } -func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local, untrusted bool) (bool, error) { +func (mp *MessagePool) addTs(ctx context.Context, m *types.SignedMessage, curTs *types.TipSet, local, untrusted bool) (bool, error) { snonce, err := mp.getStateNonce(m.Message.From, curTs) if err != nil { return false, xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure) @@ -677,17 +800,17 @@ func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local, return false, err } - if err := mp.checkBalance(m, curTs); err != nil { + if err := mp.checkBalance(ctx, m, curTs); err != nil { return false, err } - err = mp.addLocked(m, !local, untrusted) + err = mp.addLocked(ctx, m, !local, untrusted) if err != nil { return false, err } if local { - err = mp.addLocal(m) + err = mp.addLocal(ctx, m) if err != nil { return false, xerrors.Errorf("error persisting local message: %w", err) } @@ -696,7 +819,7 @@ func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local, return publish, nil } -func (mp *MessagePool) addLoaded(m *types.SignedMessage) error { +func (mp *MessagePool) addLoaded(ctx context.Context, m *types.SignedMessage) error { err := mp.checkMessage(m) if err != nil { return err @@ -722,21 +845,21 @@ func (mp *MessagePool) addLoaded(m *types.SignedMessage) error { return err } - if err := mp.checkBalance(m, curTs); err != nil { + if err := mp.checkBalance(ctx, m, curTs); err != nil { return err } - return mp.addLocked(m, false, false) + return mp.addLocked(ctx, m, false, false) } -func (mp *MessagePool) addSkipChecks(m *types.SignedMessage) error { +func (mp *MessagePool) addSkipChecks(ctx context.Context, m *types.SignedMessage) error { mp.lk.Lock() defer mp.lk.Unlock() - return mp.addLocked(m, false, false) + return mp.addLocked(ctx, m, false, false) } -func (mp *MessagePool) addLocked(m *types.SignedMessage, strict, untrusted bool) error { +func (mp *MessagePool) addLocked(ctx context.Context, m *types.SignedMessage, strict, untrusted bool) error { log.Debugf("mpooladd: %s %d", m.Message.From, m.Message.Nonce) if m.Signature.Type == crypto.SigTypeBLS { mp.blsSigCache.Add(m.Cid(), m.Signature) @@ -752,7 +875,13 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage, strict, untrusted bool) return err } - mset, ok := mp.pending[m.Message.From] + // Note: If performance becomes an issue, making this getOrCreatePendingMset will save some work + mset, ok, err := mp.getPendingMset(ctx, m.Message.From) + if err != nil { + log.Debug(err) + return err + } + if !ok { nonce, err := mp.getStateNonce(m.Message.From, mp.curTs) if err != nil { @@ -760,7 +889,9 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage, strict, untrusted bool) } mset = newMsgSet(nonce) - mp.pending[m.Message.From] = mset + if err = mp.setPendingMset(ctx, m.Message.From, mset); err != nil { + return xerrors.Errorf("failed to set pending mset: %w", err) + } } incr, err := mset.add(m, mp, strict, untrusted) @@ -795,14 +926,14 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage, strict, untrusted bool) return nil } -func (mp *MessagePool) GetNonce(_ context.Context, addr address.Address, _ types.TipSetKey) (uint64, error) { +func (mp *MessagePool) GetNonce(ctx context.Context, addr address.Address, _ types.TipSetKey) (uint64, error) { mp.curTsLk.Lock() defer mp.curTsLk.Unlock() mp.lk.Lock() defer mp.lk.Unlock() - return mp.getNonceLocked(addr, mp.curTs) + return mp.getNonceLocked(ctx, addr, mp.curTs) } // GetActor should not be used. It is only here to satisfy interface mess caused by lite node handling @@ -812,13 +943,18 @@ func (mp *MessagePool) GetActor(_ context.Context, addr address.Address, _ types return mp.api.GetActorAfter(addr, mp.curTs) } -func (mp *MessagePool) getNonceLocked(addr address.Address, curTs *types.TipSet) (uint64, error) { +func (mp *MessagePool) getNonceLocked(ctx context.Context, addr address.Address, curTs *types.TipSet) (uint64, error) { stateNonce, err := mp.getStateNonce(addr, curTs) // sanity check if err != nil { return 0, err } - mset, ok := mp.pending[addr] + mset, ok, err := mp.getPendingMset(ctx, addr) + if err != nil { + log.Debugf("mpoolgetnonce failed to get mset: %s", err) + return 0, err + } + if ok { if stateNonce > mset.nextNonce { log.Errorf("state nonce was larger than mset.nextNonce (%d > %d)", stateNonce, mset.nextNonce) @@ -855,7 +991,7 @@ func (mp *MessagePool) getStateBalance(addr address.Address, ts *types.TipSet) ( // - strict checks are enabled // - extra strict add checks are used when adding the messages to the msgSet // that means: no nonce gaps, at most 10 pending messages for the actor -func (mp *MessagePool) PushUntrusted(m *types.SignedMessage) (cid.Cid, error) { +func (mp *MessagePool) PushUntrusted(ctx context.Context, m *types.SignedMessage) (cid.Cid, error) { err := mp.checkMessage(m) if err != nil { return cid.Undef, err @@ -868,7 +1004,7 @@ func (mp *MessagePool) PushUntrusted(m *types.SignedMessage) (cid.Cid, error) { }() mp.curTsLk.Lock() - publish, err := mp.addTs(m, mp.curTs, true, true) + publish, err := mp.addTs(ctx, m, mp.curTs, true, true) if err != nil { mp.curTsLk.Unlock() return cid.Undef, err @@ -890,15 +1026,20 @@ func (mp *MessagePool) PushUntrusted(m *types.SignedMessage) (cid.Cid, error) { return m.Cid(), nil } -func (mp *MessagePool) Remove(from address.Address, nonce uint64, applied bool) { +func (mp *MessagePool) Remove(ctx context.Context, from address.Address, nonce uint64, applied bool) { mp.lk.Lock() defer mp.lk.Unlock() - mp.remove(from, nonce, applied) + mp.remove(ctx, from, nonce, applied) } -func (mp *MessagePool) remove(from address.Address, nonce uint64, applied bool) { - mset, ok := mp.pending[from] +func (mp *MessagePool) remove(ctx context.Context, from address.Address, nonce uint64, applied bool) { + mset, ok, err := mp.getPendingMset(ctx, from) + if err != nil { + log.Debugf("mpoolremove failed to get mset: %s", err) + return + } + if !ok { return } @@ -923,58 +1064,57 @@ func (mp *MessagePool) remove(from address.Address, nonce uint64, applied bool) mset.rm(nonce, applied) if len(mset.msgs) == 0 { - delete(mp.pending, from) + if err = mp.deletePendingMset(ctx, from); err != nil { + log.Debugf("mpoolremove failed to delete mset: %s", err) + return + } } } -func (mp *MessagePool) Pending() ([]*types.SignedMessage, *types.TipSet) { +func (mp *MessagePool) Pending(ctx context.Context) ([]*types.SignedMessage, *types.TipSet) { mp.curTsLk.Lock() defer mp.curTsLk.Unlock() mp.lk.Lock() defer mp.lk.Unlock() - return mp.allPending() + return mp.allPending(ctx) } -func (mp *MessagePool) allPending() ([]*types.SignedMessage, *types.TipSet) { +func (mp *MessagePool) allPending(ctx context.Context) ([]*types.SignedMessage, *types.TipSet) { out := make([]*types.SignedMessage, 0) - for a := range mp.pending { - out = append(out, mp.pendingFor(a)...) - } + + mp.forEachPending(func(a address.Address, mset *msgSet) { + out = append(out, mset.toSlice()...) + }) return out, mp.curTs } -func (mp *MessagePool) PendingFor(a address.Address) ([]*types.SignedMessage, *types.TipSet) { +func (mp *MessagePool) PendingFor(ctx context.Context, a address.Address) ([]*types.SignedMessage, *types.TipSet) { mp.curTsLk.Lock() defer mp.curTsLk.Unlock() mp.lk.Lock() defer mp.lk.Unlock() - return mp.pendingFor(a), mp.curTs + return mp.pendingFor(ctx, a), mp.curTs } -func (mp *MessagePool) pendingFor(a address.Address) []*types.SignedMessage { - mset := mp.pending[a] - if mset == nil || len(mset.msgs) == 0 { +func (mp *MessagePool) pendingFor(ctx context.Context, a address.Address) []*types.SignedMessage { + mset, ok, err := mp.getPendingMset(ctx, a) + if err != nil { + log.Debugf("mpoolpendingfor failed to get mset: %s", err) return nil } - set := make([]*types.SignedMessage, 0, len(mset.msgs)) - - for _, m := range mset.msgs { - set = append(set, m) + if mset == nil || !ok || len(mset.msgs) == 0 { + return nil } - sort.Slice(set, func(i, j int) bool { - return set[i].Message.Nonce < set[j].Message.Nonce - }) - - return set + return mset.toSlice() } -func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) error { +func (mp *MessagePool) HeadChange(ctx context.Context, revert []*types.TipSet, apply []*types.TipSet) error { mp.curTsLk.Lock() defer mp.curTsLk.Unlock() @@ -991,7 +1131,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) rm := func(from address.Address, nonce uint64) { s, ok := rmsgs[from] if !ok { - mp.Remove(from, nonce, true) + mp.Remove(ctx, from, nonce, true) return } @@ -1000,7 +1140,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) return } - mp.Remove(from, nonce, true) + mp.Remove(ctx, from, nonce, true) } maybeRepub := func(cid cid.Cid) { @@ -1071,7 +1211,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) for _, s := range rmsgs { for _, msg := range s { - if err := mp.addSkipChecks(msg); err != nil { + if err := mp.addSkipChecks(ctx, msg); err != nil { log.Errorf("Failed to readd message from reorg to mpool: %s", err) } } @@ -1079,7 +1219,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) if len(revert) > 0 && futureDebug { mp.lk.Lock() - msgs, ts := mp.allPending() + msgs, ts := mp.allPending(ctx) mp.lk.Unlock() buckets := map[address.Address]*statBucket{} @@ -1286,7 +1426,7 @@ func (mp *MessagePool) Updates(ctx context.Context) (<-chan api.MpoolUpdate, err return out, nil } -func (mp *MessagePool) loadLocal() error { +func (mp *MessagePool) loadLocal(ctx context.Context) error { res, err := mp.localMsgs.Query(query.Query{}) if err != nil { return xerrors.Errorf("query local messages: %w", err) @@ -1302,7 +1442,7 @@ func (mp *MessagePool) loadLocal() error { return xerrors.Errorf("unmarshaling local message: %w", err) } - if err := mp.addLoaded(&sm); err != nil { + if err := mp.addLoaded(ctx, &sm); err != nil { if xerrors.Is(err, ErrNonceTooLow) { continue // todo: drop the message from local cache (if above certain confidence threshold) } @@ -1310,47 +1450,61 @@ func (mp *MessagePool) loadLocal() error { log.Errorf("adding local message: %+v", err) } - mp.localAddrs[sm.Message.From] = struct{}{} + if err = mp.setLocal(ctx, sm.Message.From); err != nil { + log.Debugf("mpoolloadLocal errored: %s", err) + return err + } } return nil } -func (mp *MessagePool) Clear(local bool) { +func (mp *MessagePool) Clear(ctx context.Context, local bool) { mp.lk.Lock() defer mp.lk.Unlock() // remove everything if local is true, including removing local messages from // the datastore if local { - for a := range mp.localAddrs { - mset, ok := mp.pending[a] - if !ok { - continue + mp.forEachLocal(ctx, func(ctx context.Context, la address.Address) { + mset, ok, err := mp.getPendingMset(ctx, la) + if err != nil { + log.Warnf("errored while getting pending mset: %w", err) + return } - for _, m := range mset.msgs { - err := mp.localMsgs.Delete(datastore.NewKey(string(m.Cid().Bytes()))) - if err != nil { - log.Warnf("error deleting local message: %s", err) + if ok { + for _, m := range mset.msgs { + err := mp.localMsgs.Delete(datastore.NewKey(string(m.Cid().Bytes()))) + if err != nil { + log.Warnf("error deleting local message: %s", err) + } } } - } + }) - mp.pending = make(map[address.Address]*msgSet) + mp.clearPending() mp.republished = nil return } - // remove everything except the local messages - for a := range mp.pending { - _, isLocal := mp.localAddrs[a] - if isLocal { - continue + mp.forEachPending(func(a address.Address, ms *msgSet) { + isLocal, err := mp.isLocal(ctx, a) + if err != nil { + log.Warnf("errored while determining isLocal: %w", err) + return } - delete(mp.pending, a) - } + + if isLocal { + return + } + + if err = mp.deletePendingMset(ctx, a); err != nil { + log.Warnf("errored while deleting mset: %w", err) + return + } + }) } func getBaseFeeLowerBound(baseFee, factor types.BigInt) types.BigInt { diff --git a/chain/messagepool/messagepool_test.go b/chain/messagepool/messagepool_test.go index 925ee438c..b48685069 100644 --- a/chain/messagepool/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -153,7 +153,7 @@ func (tma *testMpoolAPI) GetActorAfter(addr address.Address, ts *types.TipSet) ( }, nil } -func (tma *testMpoolAPI) StateAccountKey(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) { +func (tma *testMpoolAPI) StateAccountKeyAtFinality(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) { if addr.Protocol() != address.BLS && addr.Protocol() != address.SECP256K1 { return address.Undef, fmt.Errorf("given address was not a key addr") } @@ -202,7 +202,7 @@ func (tma *testMpoolAPI) ChainComputeBaseFee(ctx context.Context, ts *types.TipS func assertNonce(t *testing.T, mp *MessagePool, addr address.Address, val uint64) { t.Helper() - n, err := mp.GetNonce(context.Background(), addr, types.EmptyTSK) + n, err := mp.GetNonce(context.TODO(), addr, types.EmptyTSK) if err != nil { t.Fatal(err) } @@ -214,7 +214,7 @@ func assertNonce(t *testing.T, mp *MessagePool, addr address.Address, val uint64 func mustAdd(t *testing.T, mp *MessagePool, msg *types.SignedMessage) { t.Helper() - if err := mp.Add(msg); err != nil { + if err := mp.Add(context.TODO(), msg); err != nil { t.Fatal(err) } } @@ -296,9 +296,9 @@ func TestMessagePoolMessagesInEachBlock(t *testing.T) { tma.applyBlock(t, a) tsa := mock.TipSet(a) - _, _ = mp.Pending() + _, _ = mp.Pending(context.TODO()) - selm, _ := mp.SelectMessages(tsa, 1) + selm, _ := mp.SelectMessages(context.Background(), tsa, 1) if len(selm) == 0 { t.Fatal("should have returned the rest of the messages") } @@ -358,7 +358,7 @@ func TestRevertMessages(t *testing.T) { assertNonce(t, mp, sender, 4) - p, _ := mp.Pending() + p, _ := mp.Pending(context.TODO()) fmt.Printf("%+v\n", p) if len(p) != 3 { t.Fatal("expected three messages in mempool") @@ -399,14 +399,14 @@ func TestPruningSimple(t *testing.T) { for i := 0; i < 5; i++ { smsg := mock.MkMessage(sender, target, uint64(i), w) - if err := mp.Add(smsg); err != nil { + if err := mp.Add(context.TODO(), smsg); err != nil { t.Fatal(err) } } for i := 10; i < 50; i++ { smsg := mock.MkMessage(sender, target, uint64(i), w) - if err := mp.Add(smsg); err != nil { + if err := mp.Add(context.TODO(), smsg); err != nil { t.Fatal(err) } } @@ -416,7 +416,7 @@ func TestPruningSimple(t *testing.T) { mp.Prune() - msgs, _ := mp.Pending() + msgs, _ := mp.Pending(context.TODO()) if len(msgs) != 5 { t.Fatal("expected only 5 messages in pool, got: ", len(msgs)) } @@ -458,7 +458,7 @@ func TestLoadLocal(t *testing.T) { msgs := make(map[cid.Cid]struct{}) for i := 0; i < 10; i++ { m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1)) - cid, err := mp.Push(m) + cid, err := mp.Push(context.TODO(), m) if err != nil { t.Fatal(err) } @@ -474,7 +474,7 @@ func TestLoadLocal(t *testing.T) { t.Fatal(err) } - pmsgs, _ := mp.Pending() + pmsgs, _ := mp.Pending(context.TODO()) if len(msgs) != len(pmsgs) { t.Fatalf("expected %d messages, but got %d", len(msgs), len(pmsgs)) } @@ -529,7 +529,7 @@ func TestClearAll(t *testing.T) { gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin2.StorageMarketActorCodeID, M: 2}] for i := 0; i < 10; i++ { m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1)) - _, err := mp.Push(m) + _, err := mp.Push(context.TODO(), m) if err != nil { t.Fatal(err) } @@ -540,9 +540,9 @@ func TestClearAll(t *testing.T) { mustAdd(t, mp, m) } - mp.Clear(true) + mp.Clear(context.Background(), true) - pending, _ := mp.Pending() + pending, _ := mp.Pending(context.TODO()) if len(pending) > 0 { t.Fatalf("cleared the mpool, but got %d pending messages", len(pending)) } @@ -584,7 +584,7 @@ func TestClearNonLocal(t *testing.T) { gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin2.StorageMarketActorCodeID, M: 2}] for i := 0; i < 10; i++ { m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1)) - _, err := mp.Push(m) + _, err := mp.Push(context.TODO(), m) if err != nil { t.Fatal(err) } @@ -595,9 +595,9 @@ func TestClearNonLocal(t *testing.T) { mustAdd(t, mp, m) } - mp.Clear(false) + mp.Clear(context.Background(), false) - pending, _ := mp.Pending() + pending, _ := mp.Pending(context.TODO()) if len(pending) != 10 { t.Fatalf("expected 10 pending messages, but got %d instead", len(pending)) } @@ -654,7 +654,7 @@ func TestUpdates(t *testing.T) { for i := 0; i < 10; i++ { m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1)) - _, err := mp.Push(m) + _, err := mp.Push(context.TODO(), m) if err != nil { t.Fatal(err) } diff --git a/chain/messagepool/provider.go b/chain/messagepool/provider.go index 565691004..0f904c52c 100644 --- a/chain/messagepool/provider.go +++ b/chain/messagepool/provider.go @@ -26,7 +26,7 @@ type Provider interface { PutMessage(m types.ChainMsg) (cid.Cid, error) PubSubPublish(string, []byte) error GetActorAfter(address.Address, *types.TipSet) (*types.Actor, error) - StateAccountKey(context.Context, address.Address, *types.TipSet) (address.Address, error) + StateAccountKeyAtFinality(context.Context, address.Address, *types.TipSet) (address.Address, error) MessagesForBlock(*types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) MessagesForTipset(*types.TipSet) ([]types.ChainMsg, error) LoadTipSet(tsk types.TipSetKey) (*types.TipSet, error) @@ -41,6 +41,8 @@ type mpoolProvider struct { lite messagesigner.MpoolNonceAPI } +var _ Provider = (*mpoolProvider)(nil) + func NewProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) Provider { return &mpoolProvider{sm: sm, ps: ps} } @@ -97,8 +99,8 @@ func (mpp *mpoolProvider) GetActorAfter(addr address.Address, ts *types.TipSet) return st.GetActor(addr) } -func (mpp *mpoolProvider) StateAccountKey(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) { - return mpp.sm.ResolveToKeyAddress(ctx, addr, ts) +func (mpp *mpoolProvider) StateAccountKeyAtFinality(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) { + return mpp.sm.ResolveToKeyAddressAtFinality(ctx, addr, ts) } func (mpp *mpoolProvider) MessagesForBlock(h *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) { diff --git a/chain/messagepool/pruning.go b/chain/messagepool/pruning.go index dc1c69417..c10239b8e 100644 --- a/chain/messagepool/pruning.go +++ b/chain/messagepool/pruning.go @@ -57,13 +57,18 @@ func (mp *MessagePool) pruneMessages(ctx context.Context, ts *types.TipSet) erro mpCfg := mp.getConfig() // we never prune priority addresses for _, actor := range mpCfg.PriorityAddrs { - protected[actor] = struct{}{} + pk, err := mp.resolveToKey(ctx, actor) + if err != nil { + log.Debugf("pruneMessages failed to resolve priority address: %s", err) + } + + protected[pk] = struct{}{} } // we also never prune locally published messages - for actor := range mp.localAddrs { + mp.forEachLocal(ctx, func(ctx context.Context, actor address.Address) { protected[actor] = struct{}{} - } + }) // Collect all messages to track which ones to remove and create chains for block inclusion pruneMsgs := make(map[cid.Cid]*types.SignedMessage, mp.currentSize) @@ -108,7 +113,7 @@ keepLoop: // and remove all messages that are still in pruneMsgs after processing the chains log.Infof("Pruning %d messages", len(pruneMsgs)) for _, m := range pruneMsgs { - mp.remove(m.Message.From, m.Message.Nonce, false) + mp.remove(ctx, m.Message.From, m.Message.Nonce, false) } return nil diff --git a/chain/messagepool/repub.go b/chain/messagepool/repub.go index 5fa68aa53..4323bdee1 100644 --- a/chain/messagepool/repub.go +++ b/chain/messagepool/repub.go @@ -18,7 +18,7 @@ const repubMsgLimit = 30 var RepublishBatchDelay = 100 * time.Millisecond -func (mp *MessagePool) republishPendingMessages() error { +func (mp *MessagePool) republishPendingMessages(ctx context.Context) error { mp.curTsLk.Lock() ts := mp.curTs @@ -32,13 +32,18 @@ func (mp *MessagePool) republishPendingMessages() error { pending := make(map[address.Address]map[uint64]*types.SignedMessage) mp.lk.Lock() mp.republished = nil // clear this to avoid races triggering an early republish - for actor := range mp.localAddrs { - mset, ok := mp.pending[actor] + mp.forEachLocal(ctx, func(ctx context.Context, actor address.Address) { + mset, ok, err := mp.getPendingMset(ctx, actor) + if err != nil { + log.Debugf("failed to get mset: %w", err) + return + } + if !ok { - continue + return } if len(mset.msgs) == 0 { - continue + return } // we need to copy this while holding the lock to avoid races with concurrent modification pend := make(map[uint64]*types.SignedMessage, len(mset.msgs)) @@ -46,7 +51,8 @@ func (mp *MessagePool) republishPendingMessages() error { pend[nonce] = m } pending[actor] = pend - } + }) + mp.lk.Unlock() mp.curTsLk.Unlock() diff --git a/chain/messagepool/repub_test.go b/chain/messagepool/repub_test.go index 8da64f974..580231f7a 100644 --- a/chain/messagepool/repub_test.go +++ b/chain/messagepool/repub_test.go @@ -56,7 +56,7 @@ func TestRepubMessages(t *testing.T) { for i := 0; i < 10; i++ { m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1)) - _, err := mp.Push(m) + _, err := mp.Push(context.TODO(), m) if err != nil { t.Fatal(err) } diff --git a/chain/messagepool/selection.go b/chain/messagepool/selection.go index 05acc5667..611ab8e5f 100644 --- a/chain/messagepool/selection.go +++ b/chain/messagepool/selection.go @@ -38,7 +38,7 @@ type msgChain struct { prev *msgChain } -func (mp *MessagePool) SelectMessages(ts *types.TipSet, tq float64) (msgs []*types.SignedMessage, err error) { +func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq float64) (msgs []*types.SignedMessage, err error) { mp.curTsLk.Lock() defer mp.curTsLk.Unlock() @@ -49,9 +49,9 @@ func (mp *MessagePool) SelectMessages(ts *types.TipSet, tq float64) (msgs []*typ // than any other block, then we don't bother with optimal selection because the // first block will always have higher effective performance if tq > 0.84 { - msgs, err = mp.selectMessagesGreedy(mp.curTs, ts) + msgs, err = mp.selectMessagesGreedy(ctx, mp.curTs, ts) } else { - msgs, err = mp.selectMessagesOptimal(mp.curTs, ts, tq) + msgs, err = mp.selectMessagesOptimal(ctx, mp.curTs, ts, tq) } if err != nil { @@ -65,7 +65,7 @@ func (mp *MessagePool) SelectMessages(ts *types.TipSet, tq float64) (msgs []*typ return msgs, nil } -func (mp *MessagePool) selectMessagesOptimal(curTs, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) { +func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) { start := time.Now() baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts) @@ -91,7 +91,7 @@ func (mp *MessagePool) selectMessagesOptimal(curTs, ts *types.TipSet, tq float64 // 0b. Select all priority messages that fit in the block minGas := int64(gasguess.MinGas) - result, gasLimit := mp.selectPriorityMessages(pending, baseFee, ts) + result, gasLimit := mp.selectPriorityMessages(ctx, pending, baseFee, ts) // have we filled the block? if gasLimit < minGas { @@ -389,7 +389,7 @@ tailLoop: return result, nil } -func (mp *MessagePool) selectMessagesGreedy(curTs, ts *types.TipSet) ([]*types.SignedMessage, error) { +func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *types.TipSet) ([]*types.SignedMessage, error) { start := time.Now() baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts) @@ -415,7 +415,7 @@ func (mp *MessagePool) selectMessagesGreedy(curTs, ts *types.TipSet) ([]*types.S // 0b. Select all priority messages that fit in the block minGas := int64(gasguess.MinGas) - result, gasLimit := mp.selectPriorityMessages(pending, baseFee, ts) + result, gasLimit := mp.selectPriorityMessages(ctx, pending, baseFee, ts) // have we filled the block? if gasLimit < minGas { @@ -525,7 +525,7 @@ tailLoop: return result, nil } -func (mp *MessagePool) selectPriorityMessages(pending map[address.Address]map[uint64]*types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) ([]*types.SignedMessage, int64) { +func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[address.Address]map[uint64]*types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) ([]*types.SignedMessage, int64) { start := time.Now() defer func() { if dt := time.Since(start); dt > time.Millisecond { @@ -541,10 +541,16 @@ func (mp *MessagePool) selectPriorityMessages(pending map[address.Address]map[ui var chains []*msgChain priority := mpCfg.PriorityAddrs for _, actor := range priority { - mset, ok := pending[actor] + pk, err := mp.resolveToKey(ctx, actor) + if err != nil { + log.Debugf("mpooladdlocal failed to resolve sender: %s", err) + return nil, gasLimit + } + + mset, ok := pending[pk] if ok { // remove actor from pending set as we are already processed these messages - delete(pending, actor) + delete(pending, pk) // create chains for the priority actor next := mp.createMessageChains(actor, mset, baseFee, ts) chains = append(chains, next...) @@ -646,8 +652,7 @@ func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address. inSync = true } - // first add our current pending messages - for a, mset := range mp.pending { + mp.forEachPending(func(a address.Address, mset *msgSet) { if inSync { // no need to copy the map result[a] = mset.msgs @@ -660,7 +665,7 @@ func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address. result[a] = msetCopy } - } + }) // we are in sync, that's the happy path if inSync { diff --git a/chain/messagepool/selection_test.go b/chain/messagepool/selection_test.go index e32d897c4..463473229 100644 --- a/chain/messagepool/selection_test.go +++ b/chain/messagepool/selection_test.go @@ -427,7 +427,7 @@ func TestBasicMessageSelection(t *testing.T) { mustAdd(t, mp, m) } - msgs, err := mp.SelectMessages(ts, 1.0) + msgs, err := mp.SelectMessages(context.Background(), ts, 1.0) if err != nil { t.Fatal(err) } @@ -464,7 +464,7 @@ func TestBasicMessageSelection(t *testing.T) { tma.applyBlock(t, block2) // we should have no pending messages in the mpool - pend, _ := mp.Pending() + pend, _ := mp.Pending(context.TODO()) if len(pend) != 0 { t.Fatalf("expected no pending messages, but got %d", len(pend)) } @@ -495,7 +495,7 @@ func TestBasicMessageSelection(t *testing.T) { tma.setStateNonce(a1, 10) tma.setStateNonce(a2, 10) - msgs, err = mp.SelectMessages(ts3, 1.0) + msgs, err = mp.SelectMessages(context.Background(), ts3, 1.0) if err != nil { t.Fatal(err) } @@ -569,7 +569,7 @@ func TestMessageSelectionTrimming(t *testing.T) { mustAdd(t, mp, m) } - msgs, err := mp.SelectMessages(ts, 1.0) + msgs, err := mp.SelectMessages(context.Background(), ts, 1.0) if err != nil { t.Fatal(err) } @@ -633,7 +633,7 @@ func TestPriorityMessageSelection(t *testing.T) { mustAdd(t, mp, m) } - msgs, err := mp.SelectMessages(ts, 1.0) + msgs, err := mp.SelectMessages(context.Background(), ts, 1.0) if err != nil { t.Fatal(err) } @@ -712,7 +712,7 @@ func TestPriorityMessageSelection2(t *testing.T) { mustAdd(t, mp, m) } - msgs, err := mp.SelectMessages(ts, 1.0) + msgs, err := mp.SelectMessages(context.Background(), ts, 1.0) if err != nil { t.Fatal(err) } @@ -782,7 +782,7 @@ func TestPriorityMessageSelection3(t *testing.T) { } // test greedy selection - msgs, err := mp.SelectMessages(ts, 1.0) + msgs, err := mp.SelectMessages(context.Background(), ts, 1.0) if err != nil { t.Fatal(err) } @@ -805,7 +805,7 @@ func TestPriorityMessageSelection3(t *testing.T) { } // test optimal selection - msgs, err = mp.SelectMessages(ts, 0.1) + msgs, err = mp.SelectMessages(context.Background(), ts, 0.1) if err != nil { t.Fatal(err) } @@ -872,7 +872,7 @@ func TestOptimalMessageSelection1(t *testing.T) { mustAdd(t, mp, m) } - msgs, err := mp.SelectMessages(ts, 0.25) + msgs, err := mp.SelectMessages(context.Background(), ts, 0.25) if err != nil { t.Fatal(err) } @@ -941,7 +941,7 @@ func TestOptimalMessageSelection2(t *testing.T) { mustAdd(t, mp, m) } - msgs, err := mp.SelectMessages(ts, 0.1) + msgs, err := mp.SelectMessages(context.Background(), ts, 0.1) if err != nil { t.Fatal(err) } @@ -1020,7 +1020,7 @@ func TestOptimalMessageSelection3(t *testing.T) { } } - msgs, err := mp.SelectMessages(ts, 0.1) + msgs, err := mp.SelectMessages(context.Background(), ts, 0.1) if err != nil { t.Fatal(err) } @@ -1108,7 +1108,7 @@ func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand, getPremium fu logging.SetLogLevel("messagepool", "error") // 1. greedy selection - greedyMsgs, err := mp.selectMessagesGreedy(ts, ts) + greedyMsgs, err := mp.selectMessagesGreedy(context.Background(), ts, ts) if err != nil { t.Fatal(err) } @@ -1137,7 +1137,7 @@ func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand, getPremium fu var bestMsgs []*types.SignedMessage for j := 0; j < nMiners; j++ { tq := rng.Float64() - msgs, err := mp.SelectMessages(ts, tq) + msgs, err := mp.SelectMessages(context.Background(), ts, tq) if err != nil { t.Fatal(err) } @@ -1396,7 +1396,7 @@ readLoop: minGasLimit := int64(0.9 * float64(build.BlockGasLimit)) // greedy first - selected, err := mp.SelectMessages(ts, 1.0) + selected, err := mp.SelectMessages(context.Background(), ts, 1.0) if err != nil { t.Fatal(err) } @@ -1410,7 +1410,7 @@ readLoop: } // high quality ticket - selected, err = mp.SelectMessages(ts, .8) + selected, err = mp.SelectMessages(context.Background(), ts, .8) if err != nil { t.Fatal(err) } @@ -1424,7 +1424,7 @@ readLoop: } // mid quality ticket - selected, err = mp.SelectMessages(ts, .4) + selected, err = mp.SelectMessages(context.Background(), ts, .4) if err != nil { t.Fatal(err) } @@ -1438,7 +1438,7 @@ readLoop: } // low quality ticket - selected, err = mp.SelectMessages(ts, .1) + selected, err = mp.SelectMessages(context.Background(), ts, .1) if err != nil { t.Fatal(err) } @@ -1452,7 +1452,7 @@ readLoop: } // very low quality ticket - selected, err = mp.SelectMessages(ts, .01) + selected, err = mp.SelectMessages(context.Background(), ts, .01) if err != nil { t.Fatal(err) } diff --git a/chain/messagesigner/messagesigner_test.go b/chain/messagesigner/messagesigner_test.go index 90d16b7ff..20d9af38b 100644 --- a/chain/messagesigner/messagesigner_test.go +++ b/chain/messagesigner/messagesigner_test.go @@ -24,6 +24,8 @@ type mockMpool struct { nonces map[address.Address]uint64 } +var _ MpoolNonceAPI = (*mockMpool)(nil) + func newMockMpool() *mockMpool { return &mockMpool{nonces: make(map[address.Address]uint64)} } diff --git a/chain/stmgr/call.go b/chain/stmgr/call.go index 961bebd9c..cfbf60a95 100644 --- a/chain/stmgr/call.go +++ b/chain/stmgr/call.go @@ -155,11 +155,6 @@ func (sm *StateManager) CallWithGas(ctx context.Context, msg *types.Message, pri return nil, xerrors.Errorf("computing tipset state: %w", err) } - state, err = sm.handleStateForks(ctx, state, ts.Height(), nil, ts) - if err != nil { - return nil, fmt.Errorf("failed to handle fork: %w", err) - } - r := store.NewChainRand(sm.cs, ts.Cids()) if span.IsRecordingEvents() { @@ -172,7 +167,7 @@ func (sm *StateManager) CallWithGas(ctx context.Context, msg *types.Message, pri vmopt := &vm.VMOpts{ StateBase: state, - Epoch: ts.Height() + 1, + Epoch: ts.Height(), Rand: r, Bstore: sm.cs.StateBlockstore(), Syscalls: sm.cs.VMSys(), diff --git a/chain/stmgr/stmgr.go b/chain/stmgr/stmgr.go index cbe658f96..6690a4ad3 100644 --- a/chain/stmgr/stmgr.go +++ b/chain/stmgr/stmgr.go @@ -7,6 +7,8 @@ import ( "sync" "sync/atomic" + "github.com/filecoin-project/lotus/chain/actors/policy" + "github.com/ipfs/go-cid" cbor "github.com/ipfs/go-ipld-cbor" logging "github.com/ipfs/go-log/v2" @@ -90,6 +92,7 @@ type StateManager struct { expensiveUpgrades map[abi.ChainEpoch]struct{} stCache map[string][]cid.Cid + tCache treeCache compWait map[string]chan struct{} stlk sync.Mutex genesisMsigLk sync.Mutex @@ -102,6 +105,12 @@ type StateManager struct { genesisMarketFunds abi.TokenAmount } +// Caches a single state tree +type treeCache struct { + root cid.Cid + tree *state.StateTree +} + func NewStateManager(cs *store.ChainStore) *StateManager { sm, err := NewStateManagerWithUpgradeSchedule(cs, DefaultUpgradeSchedule()) if err != nil { @@ -154,7 +163,11 @@ func NewStateManagerWithUpgradeSchedule(cs *store.ChainStore, us UpgradeSchedule newVM: vm.NewVM, cs: cs, stCache: make(map[string][]cid.Cid), - compWait: make(map[string]chan struct{}), + tCache: treeCache{ + root: cid.Undef, + tree: nil, + }, + compWait: make(map[string]chan struct{}), }, nil } @@ -563,6 +576,52 @@ func (sm *StateManager) ResolveToKeyAddress(ctx context.Context, addr address.Ad return vm.ResolveToKeyAddr(tree, cst, addr) } +// ResolveToKeyAddressAtFinality is similar to stmgr.ResolveToKeyAddress but fails if the ID address being resolved isn't reorg-stable yet. +// It should not be used for consensus-critical subsystems. +func (sm *StateManager) ResolveToKeyAddressAtFinality(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) { + switch addr.Protocol() { + case address.BLS, address.SECP256K1: + return addr, nil + case address.Actor: + return address.Undef, xerrors.New("cannot resolve actor address to key address") + default: + } + + if ts == nil { + ts = sm.cs.GetHeaviestTipSet() + } + + var err error + if ts.Height() > policy.ChainFinality { + ts, err = sm.ChainStore().GetTipsetByHeight(ctx, ts.Height()-policy.ChainFinality, ts, true) + if err != nil { + return address.Undef, xerrors.Errorf("failed to load lookback tipset: %w", err) + } + } + + cst := cbor.NewCborStore(sm.cs.StateBlockstore()) + tree := sm.tCache.tree + + if tree == nil || sm.tCache.root != ts.ParentState() { + tree, err = state.LoadStateTree(cst, ts.ParentState()) + if err != nil { + return address.Undef, xerrors.Errorf("failed to load parent state tree: %w", err) + } + + sm.tCache = treeCache{ + root: ts.ParentState(), + tree: tree, + } + } + + resolved, err := vm.ResolveToKeyAddr(tree, cst, addr) + if err == nil { + return resolved, nil + } + + return address.Undef, xerrors.New("ID address not found in lookback state") +} + func (sm *StateManager) GetBlsPublicKey(ctx context.Context, addr address.Address, ts *types.TipSet) (pubk []byte, err error) { kaddr, err := sm.ResolveToKeyAddress(ctx, addr, ts) if err != nil { diff --git a/chain/store/store.go b/chain/store/store.go index 26c18c6ff..71fa0397a 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -12,6 +12,8 @@ import ( "strings" "sync" + "github.com/filecoin-project/lotus/chain/state" + "golang.org/x/sync/errgroup" "github.com/filecoin-project/go-state-types/crypto" @@ -1129,17 +1131,33 @@ type BlockMessages struct { func (cs *ChainStore) BlockMsgsForTipset(ts *types.TipSet) ([]BlockMessages, error) { applied := make(map[address.Address]uint64) + cst := cbor.NewCborStore(cs.stateBlockstore) + st, err := state.LoadStateTree(cst, ts.Blocks()[0].ParentStateRoot) + if err != nil { + return nil, xerrors.Errorf("failed to load state tree") + } + selectMsg := func(m *types.Message) (bool, error) { - // The first match for a sender is guaranteed to have correct nonce -- the block isn't valid otherwise - if _, ok := applied[m.From]; !ok { - applied[m.From] = m.Nonce + var sender address.Address + if ts.Height() >= build.UpgradeHyperdriveHeight { + sender, err = st.LookupID(m.From) + if err != nil { + return false, err + } + } else { + sender = m.From } - if applied[m.From] != m.Nonce { + // The first match for a sender is guaranteed to have correct nonce -- the block isn't valid otherwise + if _, ok := applied[sender]; !ok { + applied[sender] = m.Nonce + } + + if applied[sender] != m.Nonce { return false, nil } - applied[m.From]++ + applied[sender]++ return true, nil } diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index d1c6414a1..e262fe271 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -516,7 +516,7 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs return pubsub.ValidationReject } - if err := mv.mpool.Add(m); err != nil { + if err := mv.mpool.Add(ctx, m); err != nil { log.Debugf("failed to add message from network to message pool (From: %s, To: %s, Nonce: %d, Value: %s): %s", m.Message.From, m.Message.To, m.Message.Nonce, types.FIL(m.Message.Value), err) ctx, _ = tag.New( ctx, diff --git a/chain/sync.go b/chain/sync.go index 6f594024d..167856927 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -1074,9 +1074,19 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock // Phase 2: (Partial) semantic validation: // the sender exists and is an account actor, and the nonces make sense - if _, ok := nonces[m.From]; !ok { + var sender address.Address + if syncer.sm.GetNtwkVersion(ctx, b.Header.Height) >= network.Version13 { + sender, err = st.LookupID(m.From) + if err != nil { + return err + } + } else { + sender = m.From + } + + if _, ok := nonces[sender]; !ok { // `GetActor` does not validate that this is an account actor. - act, err := st.GetActor(m.From) + act, err := st.GetActor(sender) if err != nil { return xerrors.Errorf("failed to get actor: %w", err) } @@ -1084,13 +1094,13 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock if !builtin.IsAccountActor(act.Code) { return xerrors.New("Sender must be an account actor") } - nonces[m.From] = act.Nonce + nonces[sender] = act.Nonce } - if nonces[m.From] != m.Nonce { - return xerrors.Errorf("wrong nonce (exp: %d, got: %d)", nonces[m.From], m.Nonce) + if nonces[sender] != m.Nonce { + return xerrors.Errorf("wrong nonce (exp: %d, got: %d)", nonces[sender], m.Nonce) } - nonces[m.From]++ + nonces[sender]++ return nil } diff --git a/chain/sync_test.go b/chain/sync_test.go index 6f6d7babd..2289d6350 100644 --- a/chain/sync_test.go +++ b/chain/sync_test.go @@ -10,7 +10,6 @@ import ( "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/network" - "github.com/filecoin-project/lotus/chain/stmgr" "github.com/ipfs/go-cid" @@ -108,6 +107,7 @@ func prepSyncTest(t testing.TB, h int) *syncTestUtil { } tu.addSourceNode(stmgr.DefaultUpgradeSchedule(), h) + //tu.checkHeight("source", source, h) // separate logs @@ -733,7 +733,6 @@ func TestBadNonce(t *testing.T) { // Produce a message from the banker with a bad nonce makeBadMsg := func() *types.SignedMessage { - msg := types.Message{ To: tu.g.Banker(), From: tu.g.Banker(), @@ -764,6 +763,114 @@ func TestBadNonce(t *testing.T) { tu.mineOnBlock(base, 0, []int{0}, true, true, msgs, 0) } +// This test introduces a block that has 2 messages, with the same sender, and same nonce. +// One of the messages uses the sender's robust address, the other uses the ID address. +// Such a block is invalid and should not sync. +func TestMismatchedNoncesRobustID(t *testing.T) { + v5h := abi.ChainEpoch(4) + tu := prepSyncTestWithV5Height(t, int(v5h+5), v5h) + + base := tu.g.CurTipset + + // Get the banker from computed tipset state, not the parent. + st, _, err := tu.g.StateManager().TipSetState(context.TODO(), base.TipSet()) + require.NoError(t, err) + ba, err := tu.g.StateManager().LoadActorRaw(context.TODO(), tu.g.Banker(), st) + require.NoError(t, err) + + // Produce a message from the banker + makeMsg := func(id bool) *types.SignedMessage { + sender := tu.g.Banker() + if id { + s, err := tu.nds[0].StateLookupID(context.TODO(), sender, base.TipSet().Key()) + require.NoError(t, err) + sender = s + } + + msg := types.Message{ + To: tu.g.Banker(), + From: sender, + + Nonce: ba.Nonce, + + Value: types.NewInt(1), + + Method: 0, + + GasLimit: 100_000_000, + GasFeeCap: types.NewInt(0), + GasPremium: types.NewInt(0), + } + + sig, err := tu.g.Wallet().WalletSign(context.TODO(), tu.g.Banker(), msg.Cid().Bytes(), api.MsgMeta{}) + require.NoError(t, err) + + return &types.SignedMessage{ + Message: msg, + Signature: *sig, + } + } + + msgs := make([][]*types.SignedMessage, 1) + msgs[0] = []*types.SignedMessage{makeMsg(false), makeMsg(true)} + + tu.mineOnBlock(base, 0, []int{0}, true, true, msgs, 0) +} + +// This test introduces a block that has 2 messages, with the same sender, and nonces N and N+1 (so both can be included in a block) +// One of the messages uses the sender's robust address, the other uses the ID address. +// Such a block is valid and should sync. +func TestMatchedNoncesRobustID(t *testing.T) { + v5h := abi.ChainEpoch(4) + tu := prepSyncTestWithV5Height(t, int(v5h+5), v5h) + + base := tu.g.CurTipset + + // Get the banker from computed tipset state, not the parent. + st, _, err := tu.g.StateManager().TipSetState(context.TODO(), base.TipSet()) + require.NoError(t, err) + ba, err := tu.g.StateManager().LoadActorRaw(context.TODO(), tu.g.Banker(), st) + require.NoError(t, err) + + // Produce a message from the banker with specified nonce + makeMsg := func(n uint64, id bool) *types.SignedMessage { + sender := tu.g.Banker() + if id { + s, err := tu.nds[0].StateLookupID(context.TODO(), sender, base.TipSet().Key()) + require.NoError(t, err) + sender = s + } + + msg := types.Message{ + To: tu.g.Banker(), + From: sender, + + Nonce: n, + + Value: types.NewInt(1), + + Method: 0, + + GasLimit: 100_000_000, + GasFeeCap: types.NewInt(0), + GasPremium: types.NewInt(0), + } + + sig, err := tu.g.Wallet().WalletSign(context.TODO(), tu.g.Banker(), msg.Cid().Bytes(), api.MsgMeta{}) + require.NoError(t, err) + + return &types.SignedMessage{ + Message: msg, + Signature: *sig, + } + } + + msgs := make([][]*types.SignedMessage, 1) + msgs[0] = []*types.SignedMessage{makeMsg(ba.Nonce, false), makeMsg(ba.Nonce+1, true)} + + tu.mineOnBlock(base, 0, []int{0}, true, false, msgs, 0) +} + func BenchmarkSyncBasic(b *testing.B) { for i := 0; i < b.N; i++ { runSyncBenchLength(b, 100) diff --git a/chain/vm/runtime.go b/chain/vm/runtime.go index 00c04ceb8..7c40fed62 100644 --- a/chain/vm/runtime.go +++ b/chain/vm/runtime.go @@ -214,7 +214,7 @@ func (rt *Runtime) GetActorCodeCID(addr address.Address) (ret cid.Cid, ok bool) func (rt *Runtime) GetRandomnessFromTickets(personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte) abi.Randomness { var err error var res []byte - if rt.vm.GetNtwkVersion(rt.ctx, randEpoch) >= network.Version13 { + if randEpoch > build.UpgradeHyperdriveHeight { res, err = rt.vm.rand.GetChainRandomnessLookingForward(rt.ctx, personalization, randEpoch, entropy) } else { res, err = rt.vm.rand.GetChainRandomnessLookingBack(rt.ctx, personalization, randEpoch, entropy) diff --git a/chain/vm/syscalls.go b/chain/vm/syscalls.go index 568197bc8..bb93fce8d 100644 --- a/chain/vm/syscalls.go +++ b/chain/vm/syscalls.go @@ -287,7 +287,7 @@ func (ss *syscallShim) VerifyAggregateSeals(aggregate proof5.AggregateSealVerify return xerrors.Errorf("failed to verify aggregated PoRep: %w", err) } if !ok { - return fmt.Errorf("invalid aggredate proof") + return fmt.Errorf("invalid aggregate proof") } return nil diff --git a/cmd/lotus-storage-miner/sectors.go b/cmd/lotus-storage-miner/sectors.go index 2bb44b4f4..2476c16e8 100644 --- a/cmd/lotus-storage-miner/sectors.go +++ b/cmd/lotus-storage-miner/sectors.go @@ -980,7 +980,7 @@ var sectorsBatching = &cli.Command{ } var sectorsBatchingPendingCommit = &cli.Command{ - Name: "pending-commit", + Name: "commit", Usage: "list sectors waiting in commit batch queue", Flags: []cli.Flag{ &cli.BoolFlag{ @@ -997,15 +997,30 @@ var sectorsBatchingPendingCommit = &cli.Command{ ctx := lcli.ReqContext(cctx) if cctx.Bool("publish-now") { - cid, err := api.SectorCommitFlush(ctx) + res, err := api.SectorCommitFlush(ctx) if err != nil { return xerrors.Errorf("flush: %w", err) } - if cid == nil { + if res == nil { return xerrors.Errorf("no sectors to publish") } - fmt.Println("sector batch published: ", cid) + for i, re := range res { + fmt.Printf("Batch %d:\n", i) + if re.Error != "" { + fmt.Printf("\tError: %s\n", re.Error) + } else { + fmt.Printf("\tMessage: %s\n", re.Msg) + } + fmt.Printf("\tSectors:\n") + for _, sector := range re.Sectors { + if e, found := re.FailedSectors[sector]; found { + fmt.Printf("\t\t%d\tERROR %s\n", sector, e) + } else { + fmt.Printf("\t\t%d\tOK\n", sector) + } + } + } return nil } @@ -1027,7 +1042,7 @@ var sectorsBatchingPendingCommit = &cli.Command{ } var sectorsBatchingPendingPreCommit = &cli.Command{ - Name: "pending-precommit", + Name: "precommit", Usage: "list sectors waiting in precommit batch queue", Flags: []cli.Flag{ &cli.BoolFlag{ @@ -1044,15 +1059,26 @@ var sectorsBatchingPendingPreCommit = &cli.Command{ ctx := lcli.ReqContext(cctx) if cctx.Bool("publish-now") { - cid, err := api.SectorPreCommitFlush(ctx) + res, err := api.SectorPreCommitFlush(ctx) if err != nil { return xerrors.Errorf("flush: %w", err) } - if cid == nil { + if res == nil { return xerrors.Errorf("no sectors to publish") } - fmt.Println("sector batch published: ", cid) + for i, re := range res { + fmt.Printf("Batch %d:\n", i) + if re.Error != "" { + fmt.Printf("\tError: %s\n", re.Error) + } else { + fmt.Printf("\tMessage: %s\n", re.Msg) + } + fmt.Printf("\tSectors:\n") + for _, sector := range re.Sectors { + fmt.Printf("\t\t%d\tOK\n", sector) + } + } return nil } diff --git a/documentation/misc/actors_version_checklist.md b/documentation/misc/actors_version_checklist.md index a88712878..1fae4bd8a 100644 --- a/documentation/misc/actors_version_checklist.md +++ b/documentation/misc/actors_version_checklist.md @@ -14,6 +14,6 @@ - [ ] Update `chain/stmgr/forks.go` - [ ] Schedule - [ ] Migration -- [ ] Update upgrade schedule in `api/test/test.go` +- [ ] Update upgrade schedule in `api/test/test.go` and `chain/sync_test.go` - [ ] Update `NewestNetworkVersion` in `build/params_shared_vals.go` - [ ] Register in init in `chain/stmgr/utils.go` diff --git a/extern/storage-sealing/commit_batch.go b/extern/storage-sealing/commit_batch.go index 845400ccf..7d128fe76 100644 --- a/extern/storage-sealing/commit_batch.go +++ b/extern/storage-sealing/commit_batch.go @@ -22,6 +22,7 @@ import ( "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" ) const arp = abi.RegisteredAggregationProof_SnarkPackV1 @@ -30,6 +31,9 @@ type CommitBatcherApi interface { SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error) StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error) ChainHead(ctx context.Context) (TipSetToken, abi.ChainEpoch, error) + + StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorPreCommitOnChainInfo, error) + StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, TipSetToken) (big.Int, error) } type AggregateInput struct { @@ -49,10 +53,10 @@ type CommitBatcher struct { deadlines map[abi.SectorNumber]time.Time todo map[abi.SectorNumber]AggregateInput - waiting map[abi.SectorNumber][]chan cid.Cid + waiting map[abi.SectorNumber][]chan sealiface.CommitBatchRes notify, stop, stopped chan struct{} - force chan chan *cid.Cid + force chan chan []sealiface.CommitBatchRes lk sync.Mutex } @@ -68,10 +72,10 @@ func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBat deadlines: map[abi.SectorNumber]time.Time{}, todo: map[abi.SectorNumber]AggregateInput{}, - waiting: map[abi.SectorNumber][]chan cid.Cid{}, + waiting: map[abi.SectorNumber][]chan sealiface.CommitBatchRes{}, notify: make(chan struct{}, 1), - force: make(chan chan *cid.Cid), + force: make(chan chan []sealiface.CommitBatchRes), stop: make(chan struct{}), stopped: make(chan struct{}), } @@ -82,8 +86,8 @@ func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBat } func (b *CommitBatcher) run() { - var forceRes chan *cid.Cid - var lastMsg *cid.Cid + var forceRes chan []sealiface.CommitBatchRes + var lastMsg []sealiface.CommitBatchRes cfg, err := b.getConfig() if err != nil { @@ -111,7 +115,7 @@ func (b *CommitBatcher) run() { } var err error - lastMsg, err = b.processBatch(sendAboveMax, sendAboveMin) + lastMsg, err = b.maybeStartBatch(sendAboveMax, sendAboveMin) if err != nil { log.Warnw("CommitBatcher processBatch error", "error", err) } @@ -159,12 +163,9 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time return time.After(wait) } -func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) { +func (b *CommitBatcher) maybeStartBatch(notif, after bool) ([]sealiface.CommitBatchRes, error) { b.lk.Lock() defer b.lk.Unlock() - params := miner5.ProveCommitAggregateParams{ - SectorNumbers: bitfield.New(), - } total := len(b.todo) if total == 0 { @@ -184,8 +185,53 @@ func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) { return nil, nil } + var res []sealiface.CommitBatchRes + + if total < cfg.MinCommitBatch || total < miner5.MinAggregatedSectors { + res, err = b.processIndividually() + } else { + res, err = b.processBatch(cfg) + } + if err != nil && len(res) == 0 { + return nil, err + } + + for _, r := range res { + if err != nil { + r.Error = err.Error() + } + + for _, sn := range r.Sectors { + for _, ch := range b.waiting[sn] { + ch <- r // buffered + } + + delete(b.waiting, sn) + delete(b.todo, sn) + delete(b.deadlines, sn) + } + } + + return res, nil +} + +func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBatchRes, error) { + tok, _, err := b.api.ChainHead(b.mctx) + if err != nil { + return nil, err + } + + total := len(b.todo) + + var res sealiface.CommitBatchRes + + params := miner5.ProveCommitAggregateParams{ + SectorNumbers: bitfield.New(), + } + proofs := make([][]byte, 0, total) infos := make([]proof5.AggregateSealVerifyInfo, 0, total) + collateral := big.Zero() for id, p := range b.todo { if len(infos) >= cfg.MaxCommitBatch { @@ -193,6 +239,15 @@ func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) { break } + sc, err := b.getSectorCollateral(id, tok) + if err != nil { + res.FailedSectors[id] = err.Error() + continue + } + + collateral = big.Add(collateral, sc) + + res.Sectors = append(res.Sectors, id) params.SectorNumbers.Set(uint64(id)) infos = append(infos, p.info) } @@ -207,7 +262,7 @@ func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) { mid, err := address.IDFromAddress(b.maddr) if err != nil { - return nil, xerrors.Errorf("getting miner id: %w", err) + return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting miner id: %w", err) } params.AggregateProof, err = b.prover.AggregateSealProofs(proof5.AggregateSealVerifyProofAndInfos{ @@ -217,55 +272,107 @@ func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) { Infos: infos, }, proofs) if err != nil { - return nil, xerrors.Errorf("aggregating proofs: %w", err) + return []sealiface.CommitBatchRes{res}, xerrors.Errorf("aggregating proofs: %w", err) } enc := new(bytes.Buffer) if err := params.MarshalCBOR(enc); err != nil { - return nil, xerrors.Errorf("couldn't serialize ProveCommitAggregateParams: %w", err) + return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't serialize ProveCommitAggregateParams: %w", err) } + mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil) + if err != nil { + return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't get miner info: %w", err) + } + + goodFunds := big.Add(b.feeCfg.MaxCommitGasFee, collateral) + + from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, goodFunds, collateral) + if err != nil { + return []sealiface.CommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err) + } + + mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.ProveCommitAggregate, collateral, b.feeCfg.MaxCommitGasFee, enc.Bytes()) + if err != nil { + return []sealiface.CommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err) + } + + res.Msg = &mcid + + log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "todo", total, "sectors", len(infos)) + + return []sealiface.CommitBatchRes{res}, nil +} + +func (b *CommitBatcher) processIndividually() ([]sealiface.CommitBatchRes, error) { mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil) if err != nil { return nil, xerrors.Errorf("couldn't get miner info: %w", err) } - from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, b.feeCfg.MaxCommitGasFee, b.feeCfg.MaxCommitGasFee) + tok, _, err := b.api.ChainHead(b.mctx) if err != nil { - return nil, xerrors.Errorf("no good address found: %w", err) + return nil, err } - mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.ProveCommitAggregate, big.Zero(), b.feeCfg.MaxCommitGasFee, enc.Bytes()) - if err != nil { - return nil, xerrors.Errorf("sending message failed: %w", err) - } + var res []sealiface.CommitBatchRes - log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "todo", total, "sectors", len(infos)) - - err = params.SectorNumbers.ForEach(func(us uint64) error { - sn := abi.SectorNumber(us) - - for _, ch := range b.waiting[sn] { - ch <- mcid // buffered + for sn, info := range b.todo { + r := sealiface.CommitBatchRes{ + Sectors: []abi.SectorNumber{sn}, } - delete(b.waiting, sn) - delete(b.todo, sn) - delete(b.deadlines, sn) - return nil - }) - if err != nil { - return nil, xerrors.Errorf("done sectors foreach: %w", err) + + mcid, err := b.processSingle(mi, sn, info, tok) + if err != nil { + log.Errorf("process single error: %+v", err) // todo: return to user + r.FailedSectors[sn] = err.Error() + } else { + r.Msg = &mcid + } + + res = append(res, r) } - return &mcid, nil + return res, nil +} + +func (b *CommitBatcher) processSingle(mi miner.MinerInfo, sn abi.SectorNumber, info AggregateInput, tok TipSetToken) (cid.Cid, error) { + enc := new(bytes.Buffer) + params := &miner.ProveCommitSectorParams{ + SectorNumber: sn, + Proof: info.proof, + } + + if err := params.MarshalCBOR(enc); err != nil { + return cid.Undef, xerrors.Errorf("marshaling commit params: %w", err) + } + + collateral, err := b.getSectorCollateral(sn, tok) + if err != nil { + return cid.Undef, err + } + + goodFunds := big.Add(collateral, b.feeCfg.MaxCommitGasFee) + + from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, goodFunds, collateral) + if err != nil { + return cid.Undef, xerrors.Errorf("no good address to send commit message from: %w", err) + } + + mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.ProveCommitSector, collateral, b.feeCfg.MaxCommitGasFee, enc.Bytes()) + if err != nil { + return cid.Undef, xerrors.Errorf("pushing message to mpool: %w", err) + } + + return mcid, nil } // register commit, wait for batch message, return message CID -func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in AggregateInput) (mcid cid.Cid, err error) { +func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in AggregateInput) (res sealiface.CommitBatchRes, err error) { _, curEpoch, err := b.api.ChainHead(b.mctx) if err != nil { log.Errorf("getting chain head: %s", err) - return cid.Undef, nil + return sealiface.CommitBatchRes{}, nil } sn := s.SectorNumber @@ -274,7 +381,7 @@ func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in Aggregat b.deadlines[sn] = getSectorDeadline(curEpoch, s) b.todo[sn] = in - sent := make(chan cid.Cid, 1) + sent := make(chan sealiface.CommitBatchRes, 1) b.waiting[sn] = append(b.waiting[sn], sent) select { @@ -284,15 +391,15 @@ func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in Aggregat b.lk.Unlock() select { - case c := <-sent: - return c, nil + case r := <-sent: + return r, nil case <-ctx.Done(): - return cid.Undef, ctx.Err() + return sealiface.CommitBatchRes{}, ctx.Err() } } -func (b *CommitBatcher) Flush(ctx context.Context) (*cid.Cid, error) { - resCh := make(chan *cid.Cid, 1) +func (b *CommitBatcher) Flush(ctx context.Context) ([]sealiface.CommitBatchRes, error) { + resCh := make(chan []sealiface.CommitBatchRes, 1) select { case b.force <- resCh: select { @@ -364,3 +471,25 @@ func getSectorDeadline(curEpoch abi.ChainEpoch, si SectorInfo) time.Time { return time.Now().Add(time.Duration(deadlineEpoch-curEpoch) * time.Duration(build.BlockDelaySecs) * time.Second) } + +func (b *CommitBatcher) getSectorCollateral(sn abi.SectorNumber, tok TipSetToken) (abi.TokenAmount, error) { + pci, err := b.api.StateSectorPreCommitInfo(b.mctx, b.maddr, sn, tok) + if err != nil { + return big.Zero(), xerrors.Errorf("getting precommit info: %w", err) + } + if pci == nil { + return big.Zero(), xerrors.Errorf("precommit info not found on chain") + } + + collateral, err := b.api.StateMinerInitialPledgeCollateral(b.mctx, b.maddr, pci.Info, tok) + if err != nil { + return big.Zero(), xerrors.Errorf("getting initial pledge collateral: %w", err) + } + + collateral = big.Sub(collateral, pci.PreCommitDeposit) + if collateral.LessThan(big.Zero()) { + collateral = big.Zero() + } + + return collateral, nil +} diff --git a/extern/storage-sealing/precommit_batch.go b/extern/storage-sealing/precommit_batch.go index bce8e21d5..dd674d331 100644 --- a/extern/storage-sealing/precommit_batch.go +++ b/extern/storage-sealing/precommit_batch.go @@ -18,6 +18,7 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" ) type PreCommitBatcherApi interface { @@ -41,10 +42,10 @@ type PreCommitBatcher struct { deadlines map[abi.SectorNumber]time.Time todo map[abi.SectorNumber]*preCommitEntry - waiting map[abi.SectorNumber][]chan cid.Cid + waiting map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes notify, stop, stopped chan struct{} - force chan chan *cid.Cid + force chan chan []sealiface.PreCommitBatchRes lk sync.Mutex } @@ -59,10 +60,10 @@ func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCom deadlines: map[abi.SectorNumber]time.Time{}, todo: map[abi.SectorNumber]*preCommitEntry{}, - waiting: map[abi.SectorNumber][]chan cid.Cid{}, + waiting: map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes{}, notify: make(chan struct{}, 1), - force: make(chan chan *cid.Cid), + force: make(chan chan []sealiface.PreCommitBatchRes), stop: make(chan struct{}), stopped: make(chan struct{}), } @@ -73,8 +74,8 @@ func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCom } func (b *PreCommitBatcher) run() { - var forceRes chan *cid.Cid - var lastMsg *cid.Cid + var forceRes chan []sealiface.PreCommitBatchRes + var lastRes []sealiface.PreCommitBatchRes cfg, err := b.getConfig() if err != nil { @@ -83,10 +84,10 @@ func (b *PreCommitBatcher) run() { for { if forceRes != nil { - forceRes <- lastMsg + forceRes <- lastRes forceRes = nil } - lastMsg = nil + lastRes = nil var sendAboveMax, sendAboveMin bool select { @@ -102,7 +103,7 @@ func (b *PreCommitBatcher) run() { } var err error - lastMsg, err = b.processBatch(sendAboveMax, sendAboveMin) + lastRes, err = b.maybeStartBatch(sendAboveMax, sendAboveMin) if err != nil { log.Warnw("PreCommitBatcher processBatch error", "error", err) } @@ -150,10 +151,9 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T return time.After(wait) } -func (b *PreCommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) { +func (b *PreCommitBatcher) maybeStartBatch(notif, after bool) ([]sealiface.PreCommitBatchRes, error) { b.lk.Lock() defer b.lk.Unlock() - params := miner5.PreCommitSectorBatchParams{} total := len(b.todo) if total == 0 { @@ -173,7 +173,35 @@ func (b *PreCommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) { return nil, nil } + // todo support multiple batches + res, err := b.processBatch(cfg) + if err != nil && len(res) == 0 { + return nil, err + } + + for _, r := range res { + if err != nil { + r.Error = err.Error() + } + + for _, sn := range r.Sectors { + for _, ch := range b.waiting[sn] { + ch <- r // buffered + } + + delete(b.waiting, sn) + delete(b.todo, sn) + delete(b.deadlines, sn) + } + } + + return res, nil +} + +func (b *PreCommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.PreCommitBatchRes, error) { + params := miner5.PreCommitSectorBatchParams{} deposit := big.Zero() + var res sealiface.PreCommitBatchRes for _, p := range b.todo { if len(params.Sectors) >= cfg.MaxPreCommitBatch { @@ -181,54 +209,46 @@ func (b *PreCommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) { break } + res.Sectors = append(res.Sectors, p.pci.SectorNumber) params.Sectors = append(params.Sectors, *p.pci) deposit = big.Add(deposit, p.deposit) } enc := new(bytes.Buffer) if err := params.MarshalCBOR(enc); err != nil { - return nil, xerrors.Errorf("couldn't serialize PreCommitSectorBatchParams: %w", err) + return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("couldn't serialize PreCommitSectorBatchParams: %w", err) } mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil) if err != nil { - return nil, xerrors.Errorf("couldn't get miner info: %w", err) + return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("couldn't get miner info: %w", err) } goodFunds := big.Add(deposit, b.feeCfg.MaxPreCommitGasFee) from, _, err := b.addrSel(b.mctx, mi, api.PreCommitAddr, goodFunds, deposit) if err != nil { - return nil, xerrors.Errorf("no good address found: %w", err) + return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err) } mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.PreCommitSectorBatch, deposit, b.feeCfg.MaxPreCommitGasFee, enc.Bytes()) if err != nil { - return nil, xerrors.Errorf("sending message failed: %w", err) + return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err) } - log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "sectors", total) + res.Msg = &mcid - for _, sector := range params.Sectors { - sn := sector.SectorNumber + log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "sectors", len(b.todo)) - for _, ch := range b.waiting[sn] { - ch <- mcid // buffered - } - delete(b.waiting, sn) - delete(b.todo, sn) - delete(b.deadlines, sn) - } - - return &mcid, nil + return []sealiface.PreCommitBatchRes{res}, nil } // register PreCommit, wait for batch message, return message CID -func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, deposit abi.TokenAmount, in *miner0.SectorPreCommitInfo) (mcid cid.Cid, err error) { +func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, deposit abi.TokenAmount, in *miner0.SectorPreCommitInfo) (res sealiface.PreCommitBatchRes, err error) { _, curEpoch, err := b.api.ChainHead(b.mctx) if err != nil { log.Errorf("getting chain head: %s", err) - return cid.Undef, nil + return sealiface.PreCommitBatchRes{}, err } sn := s.SectorNumber @@ -240,7 +260,7 @@ func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, depos pci: in, } - sent := make(chan cid.Cid, 1) + sent := make(chan sealiface.PreCommitBatchRes, 1) b.waiting[sn] = append(b.waiting[sn], sent) select { @@ -253,12 +273,12 @@ func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, depos case c := <-sent: return c, nil case <-ctx.Done(): - return cid.Undef, ctx.Err() + return sealiface.PreCommitBatchRes{}, ctx.Err() } } -func (b *PreCommitBatcher) Flush(ctx context.Context) (*cid.Cid, error) { - resCh := make(chan *cid.Cid, 1) +func (b *PreCommitBatcher) Flush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) { + resCh := make(chan []sealiface.PreCommitBatchRes, 1) select { case b.force <- resCh: select { diff --git a/extern/storage-sealing/sealiface/batching.go b/extern/storage-sealing/sealiface/batching.go new file mode 100644 index 000000000..d0e6d4178 --- /dev/null +++ b/extern/storage-sealing/sealiface/batching.go @@ -0,0 +1,23 @@ +package sealiface + +import ( + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/go-state-types/abi" +) + +type CommitBatchRes struct { + Sectors []abi.SectorNumber + + FailedSectors map[abi.SectorNumber]string + + Msg *cid.Cid + Error string // if set, means that all sectors are failed, implies Msg==nil +} + +type PreCommitBatchRes struct { + Sectors []abi.SectorNumber + + Msg *cid.Cid + Error string // if set, means that all sectors are failed, implies Msg==nil +} diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index eb2d30f82..69746268f 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -27,6 +27,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" ) const SectorStorePrefix = "/sectors" @@ -207,7 +208,7 @@ func (m *Sealing) TerminatePending(ctx context.Context) ([]abi.SectorID, error) return m.terminator.Pending(ctx) } -func (m *Sealing) SectorPreCommitFlush(ctx context.Context) (*cid.Cid, error) { +func (m *Sealing) SectorPreCommitFlush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) { return m.precommiter.Flush(ctx) } @@ -215,7 +216,7 @@ func (m *Sealing) SectorPreCommitPending(ctx context.Context) ([]abi.SectorID, e return m.precommiter.Pending(ctx) } -func (m *Sealing) CommitFlush(ctx context.Context) (*cid.Cid, error) { +func (m *Sealing) CommitFlush(ctx context.Context) ([]sealiface.CommitBatchRes, error) { return m.commiter.Flush(ctx) } diff --git a/extern/storage-sealing/states_sealing.go b/extern/storage-sealing/states_sealing.go index 01c0bc644..5e8f5269b 100644 --- a/extern/storage-sealing/states_sealing.go +++ b/extern/storage-sealing/states_sealing.go @@ -355,20 +355,28 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf func (m *Sealing) handleSubmitPreCommitBatch(ctx statemachine.Context, sector SectorInfo) error { if sector.CommD == nil || sector.CommR == nil { - return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector had nil commR or commD")}) + return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("sector had nil commR or commD")}) } params, deposit, _, err := m.preCommitParams(ctx, sector) if params == nil || err != nil { - return err + return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("preCommitParams: %w", err)}) } - mcid, err := m.precommiter.AddPreCommit(ctx.Context(), sector, deposit, params) + res, err := m.precommiter.AddPreCommit(ctx.Context(), sector, deposit, params) if err != nil { - return ctx.Send(SectorCommitFailed{xerrors.Errorf("queuing precommit batch failed: %w", err)}) + return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("queuing precommit batch failed: %w", err)}) } - return ctx.Send(SectorPreCommitBatchSent{mcid}) + if res.Error != "" { + return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("precommit batch error: %s", res.Error)}) + } + + if res.Msg == nil { + return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("batch message was nil")}) + } + + return ctx.Send(SectorPreCommitBatchSent{*res.Msg}) } func (m *Sealing) handlePreCommitWait(ctx statemachine.Context, sector SectorInfo) error { @@ -581,7 +589,7 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector had nil commR or commD")}) } - mcid, err := m.commiter.AddCommit(ctx.Context(), sector, AggregateInput{ + res, err := m.commiter.AddCommit(ctx.Context(), sector, AggregateInput{ info: proof.AggregateSealVerifyInfo{ Number: sector.SectorNumber, Randomness: sector.TicketValue, @@ -596,7 +604,19 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S return ctx.Send(SectorCommitFailed{xerrors.Errorf("queuing commit for aggregation failed: %w", err)}) } - return ctx.Send(SectorCommitAggregateSent{mcid}) + if res.Error != "" { + return ctx.Send(SectorCommitFailed{xerrors.Errorf("aggregate error: %s", res.Error)}) + } + + if e, found := res.FailedSectors[sector.SectorNumber]; found { + return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector failed in aggregate processing: %s", e)}) + } + + if res.Msg == nil { + return ctx.Send(SectorCommitFailed{xerrors.Errorf("aggregate message was nil")}) + } + + return ctx.Send(SectorCommitAggregateSent{*res.Msg}) } func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) error { diff --git a/node/config/def.go b/node/config/def.go index 6f1783ac2..c18f60a7a 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -271,7 +271,7 @@ func DefaultStorageMiner() *StorageMiner { PreCommitBatchSlack: Duration(3 * time.Hour), AggregateCommits: true, - MinCommitBatch: 1, // we must have at least one proof to aggregate + MinCommitBatch: miner5.MinAggregatedSectors, // we must have at least four proofs to aggregate MaxCommitBatch: miner5.MaxAggregatedSectors, // this is the maximum aggregation per FIP13 CommitBatchWait: Duration(24 * time.Hour), // this can be up to 6 days CommitBatchSlack: Duration(1 * time.Hour), diff --git a/node/impl/full/gas.go b/node/impl/full/gas.go index 7b624d39b..edf53ff63 100644 --- a/node/impl/full/gas.go +++ b/node/impl/full/gas.go @@ -267,7 +267,7 @@ func gasEstimateGasLimit( return -1, xerrors.Errorf("getting key address: %w", err) } - pending, ts := mpool.PendingFor(fromA) + pending, ts := mpool.PendingFor(ctx, fromA) priorMsgs := make([]types.ChainMsg, 0, len(pending)) for _, m := range pending { if m.Message.Nonce == msg.Nonce { diff --git a/node/impl/full/mpool.go b/node/impl/full/mpool.go index 63d3c7d58..5fe5fbec1 100644 --- a/node/impl/full/mpool.go +++ b/node/impl/full/mpool.go @@ -60,7 +60,7 @@ func (a *MpoolAPI) MpoolSelect(ctx context.Context, tsk types.TipSetKey, ticketQ return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err) } - return a.Mpool.SelectMessages(ts, ticketQuality) + return a.Mpool.SelectMessages(ctx, ts, ticketQuality) } func (a *MpoolAPI) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) { @@ -68,7 +68,7 @@ func (a *MpoolAPI) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*ty if err != nil { return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err) } - pending, mpts := a.Mpool.Pending() + pending, mpts := a.Mpool.Pending(ctx) haveCids := map[cid.Cid]struct{}{} for _, m := range pending { @@ -122,16 +122,16 @@ func (a *MpoolAPI) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*ty } func (a *MpoolAPI) MpoolClear(ctx context.Context, local bool) error { - a.Mpool.Clear(local) + a.Mpool.Clear(ctx, local) return nil } func (m *MpoolModule) MpoolPush(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) { - return m.Mpool.Push(smsg) + return m.Mpool.Push(ctx, smsg) } func (a *MpoolAPI) MpoolPushUntrusted(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) { - return a.Mpool.PushUntrusted(smsg) + return a.Mpool.PushUntrusted(ctx, smsg) } func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) { @@ -192,7 +192,7 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spe func (a *MpoolAPI) MpoolBatchPush(ctx context.Context, smsgs []*types.SignedMessage) ([]cid.Cid, error) { var messageCids []cid.Cid for _, smsg := range smsgs { - smsgCid, err := a.Mpool.Push(smsg) + smsgCid, err := a.Mpool.Push(ctx, smsg) if err != nil { return messageCids, err } @@ -204,7 +204,7 @@ func (a *MpoolAPI) MpoolBatchPush(ctx context.Context, smsgs []*types.SignedMess func (a *MpoolAPI) MpoolBatchPushUntrusted(ctx context.Context, smsgs []*types.SignedMessage) ([]cid.Cid, error) { var messageCids []cid.Cid for _, smsg := range smsgs { - smsgCid, err := a.Mpool.PushUntrusted(smsg) + smsgCid, err := a.Mpool.PushUntrusted(ctx, smsg) if err != nil { return messageCids, err } diff --git a/node/impl/storminer.go b/node/impl/storminer.go index a8bfe0884..61c69b2ba 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -32,6 +32,7 @@ import ( "github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" sealing "github.com/filecoin-project/lotus/extern/storage-sealing" + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" "github.com/filecoin-project/lotus/api" apitypes "github.com/filecoin-project/lotus/api/types" @@ -373,7 +374,7 @@ func (sm *StorageMinerAPI) SectorTerminatePending(ctx context.Context) ([]abi.Se return sm.Miner.TerminatePending(ctx) } -func (sm *StorageMinerAPI) SectorPreCommitFlush(ctx context.Context) (*cid.Cid, error) { +func (sm *StorageMinerAPI) SectorPreCommitFlush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) { return sm.Miner.SectorPreCommitFlush(ctx) } @@ -385,7 +386,7 @@ func (sm *StorageMinerAPI) SectorMarkForUpgrade(ctx context.Context, id abi.Sect return sm.Miner.MarkForUpgrade(id) } -func (sm *StorageMinerAPI) SectorCommitFlush(ctx context.Context) (*cid.Cid, error) { +func (sm *StorageMinerAPI) SectorCommitFlush(ctx context.Context) ([]sealiface.CommitBatchRes, error) { return sm.Miner.CommitFlush(ctx) } diff --git a/storage/adapter_storage_miner.go b/storage/adapter_storage_miner.go index 854d0d368..1376ae5fb 100644 --- a/storage/adapter_storage_miner.go +++ b/storage/adapter_storage_miner.go @@ -147,15 +147,7 @@ func (s SealingAPIAdapter) StateComputeDataCommitment(ctx context.Context, maddr return cid.Undef, xerrors.Errorf("failed to unmarshal TipSetToken to TipSetKey: %w", err) } - ts, err := s.delegate.ChainGetTipSet(ctx, tsk) - if err != nil { - return cid.Cid{}, err - } - - // using parent ts because the migration won't be run on the first nv13 - // tipset we apply StateCall to (because we don't run migrations in StateCall - // and just apply to parent state) - nv, err := s.delegate.StateNetworkVersion(ctx, ts.Parents()) + nv, err := s.delegate.StateNetworkVersion(ctx, tsk) if err != nil { return cid.Cid{}, err } diff --git a/storage/miner_sealing.go b/storage/miner_sealing.go index cd215f238..6a1195826 100644 --- a/storage/miner_sealing.go +++ b/storage/miner_sealing.go @@ -11,6 +11,7 @@ import ( "github.com/filecoin-project/specs-storage/storage" sealing "github.com/filecoin-project/lotus/extern/storage-sealing" + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" ) // TODO: refactor this to be direct somehow @@ -59,7 +60,7 @@ func (m *Miner) TerminatePending(ctx context.Context) ([]abi.SectorID, error) { return m.sealing.TerminatePending(ctx) } -func (m *Miner) SectorPreCommitFlush(ctx context.Context) (*cid.Cid, error) { +func (m *Miner) SectorPreCommitFlush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) { return m.sealing.SectorPreCommitFlush(ctx) } @@ -67,7 +68,7 @@ func (m *Miner) SectorPreCommitPending(ctx context.Context) ([]abi.SectorID, err return m.sealing.SectorPreCommitPending(ctx) } -func (m *Miner) CommitFlush(ctx context.Context) (*cid.Cid, error) { +func (m *Miner) CommitFlush(ctx context.Context) ([]sealiface.CommitBatchRes, error) { return m.sealing.CommitFlush(ctx) } diff --git a/storage/wdpost_run.go b/storage/wdpost_run.go index b4c702197..d62b5e851 100644 --- a/storage/wdpost_run.go +++ b/storage/wdpost_run.go @@ -534,9 +534,14 @@ func (s *WindowPoStScheduler) runPoStCycle(ctx context.Context, di dline.Info, t return nil, xerrors.Errorf("getting partitions: %w", err) } + nv, err := s.api.StateNetworkVersion(ctx, ts.Key()) + if err != nil { + return nil, xerrors.Errorf("getting network version: %w", err) + } + // Split partitions into batches, so as not to exceed the number of sectors // allowed in a single message - partitionBatches, err := s.batchPartitions(partitions) + partitionBatches, err := s.batchPartitions(partitions, nv) if err != nil { return nil, err } @@ -716,7 +721,7 @@ func (s *WindowPoStScheduler) runPoStCycle(ctx context.Context, di dline.Info, t return posts, nil } -func (s *WindowPoStScheduler) batchPartitions(partitions []api.Partition) ([][]api.Partition, error) { +func (s *WindowPoStScheduler) batchPartitions(partitions []api.Partition, nv network.Version) ([][]api.Partition, error) { // We don't want to exceed the number of sectors allowed in a message. // So given the number of sectors in a partition, work out the number of // partitions that can be in a message without exceeding sectors per @@ -732,6 +737,11 @@ func (s *WindowPoStScheduler) batchPartitions(partitions []api.Partition) ([][]a return nil, xerrors.Errorf("getting sectors per partition: %w", err) } + // Also respect the AddressedPartitionsMax (which is the same as DeclarationsMax (which is all really just MaxPartitionsPerDeadline)) + if partitionsPerMsg > policy.GetDeclarationsMax(nv) { + partitionsPerMsg = policy.GetDeclarationsMax(nv) + } + // The number of messages will be: // ceiling(number of partitions / partitions per message) batchCount := len(partitions) / partitionsPerMsg diff --git a/storage/wdpost_run_test.go b/storage/wdpost_run_test.go index 98e4f87d3..b878ff97e 100644 --- a/storage/wdpost_run_test.go +++ b/storage/wdpost_run_test.go @@ -5,6 +5,9 @@ import ( "context" "testing" + builtin5 "github.com/filecoin-project/specs-actors/v5/actors/builtin" + miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner" + "github.com/stretchr/testify/require" "golang.org/x/xerrors" @@ -177,13 +180,16 @@ func TestWDPostDoPost(t *testing.T) { mockStgMinerAPI := newMockStorageMinerAPI() // Get the number of sectors allowed in a partition for this proof type - sectorsPerPartition, err := builtin2.PoStProofWindowPoStPartitionSectors(proofType) + sectorsPerPartition, err := builtin5.PoStProofWindowPoStPartitionSectors(proofType) require.NoError(t, err) // Work out the number of partitions that can be included in a message // without exceeding the message sector limit require.NoError(t, err) - partitionsPerMsg := int(miner2.AddressedSectorsMax / sectorsPerPartition) + partitionsPerMsg := int(miner5.AddressedSectorsMax / sectorsPerPartition) + if partitionsPerMsg > miner5.AddressedPartitionsMax { + partitionsPerMsg = miner5.AddressedPartitionsMax + } // Enough partitions to fill expectedMsgCount-1 messages partitionCount := (expectedMsgCount - 1) * partitionsPerMsg @@ -219,11 +225,11 @@ func TestWDPostDoPost(t *testing.T) { } di := &dline.Info{ - WPoStPeriodDeadlines: miner2.WPoStPeriodDeadlines, - WPoStProvingPeriod: miner2.WPoStProvingPeriod, - WPoStChallengeWindow: miner2.WPoStChallengeWindow, - WPoStChallengeLookback: miner2.WPoStChallengeLookback, - FaultDeclarationCutoff: miner2.FaultDeclarationCutoff, + WPoStPeriodDeadlines: miner5.WPoStPeriodDeadlines, + WPoStProvingPeriod: miner5.WPoStProvingPeriod, + WPoStChallengeWindow: miner5.WPoStChallengeWindow, + WPoStChallengeLookback: miner5.WPoStChallengeLookback, + FaultDeclarationCutoff: miner5.FaultDeclarationCutoff, } ts := mockTipSet(t)