Merge remote-tracking branch 'origin/feat/nv13' into feat/nv13-1.11
This commit is contained in:
commit
dc642d0b7b
@ -24,6 +24,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
||||
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||
)
|
||||
|
||||
// MODIFYING THE API INTERFACE
|
||||
@ -93,12 +94,12 @@ type StorageMiner interface {
|
||||
SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error //perm:admin
|
||||
// SectorPreCommitFlush immediately sends a PreCommit message with sectors batched for PreCommit.
|
||||
// Returns null if message wasn't sent
|
||||
SectorPreCommitFlush(ctx context.Context) (*cid.Cid, error) //perm:admin
|
||||
SectorPreCommitFlush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) //perm:admin
|
||||
// SectorPreCommitPending returns a list of pending PreCommit sectors to be sent in the next batch message
|
||||
SectorPreCommitPending(ctx context.Context) ([]abi.SectorID, error) //perm:admin
|
||||
// SectorCommitFlush immediately sends a Commit message with sectors aggregated for Commit.
|
||||
// Returns null if message wasn't sent
|
||||
SectorCommitFlush(ctx context.Context) (*cid.Cid, error) //perm:admin
|
||||
SectorCommitFlush(ctx context.Context) ([]sealiface.CommitBatchRes, error) //perm:admin
|
||||
// SectorCommitPending returns a list of pending Commit sectors to be sent in the next aggregate message
|
||||
SectorCommitPending(ctx context.Context) ([]abi.SectorID, error) //perm:admin
|
||||
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
||||
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||
marketevents "github.com/filecoin-project/lotus/markets/loggers"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/specs-storage/storage"
|
||||
@ -659,7 +660,7 @@ type StorageMinerStruct struct {
|
||||
|
||||
SealingSchedDiag func(p0 context.Context, p1 bool) (interface{}, error) `perm:"admin"`
|
||||
|
||||
SectorCommitFlush func(p0 context.Context) (*cid.Cid, error) `perm:"admin"`
|
||||
SectorCommitFlush func(p0 context.Context) ([]sealiface.CommitBatchRes, error) `perm:"admin"`
|
||||
|
||||
SectorCommitPending func(p0 context.Context) ([]abi.SectorID, error) `perm:"admin"`
|
||||
|
||||
@ -669,7 +670,7 @@ type StorageMinerStruct struct {
|
||||
|
||||
SectorMarkForUpgrade func(p0 context.Context, p1 abi.SectorNumber) error `perm:"admin"`
|
||||
|
||||
SectorPreCommitFlush func(p0 context.Context) (*cid.Cid, error) `perm:"admin"`
|
||||
SectorPreCommitFlush func(p0 context.Context) ([]sealiface.PreCommitBatchRes, error) `perm:"admin"`
|
||||
|
||||
SectorPreCommitPending func(p0 context.Context) ([]abi.SectorID, error) `perm:"admin"`
|
||||
|
||||
@ -3125,12 +3126,12 @@ func (s *StorageMinerStub) SealingSchedDiag(p0 context.Context, p1 bool) (interf
|
||||
return nil, xerrors.New("method not supported")
|
||||
}
|
||||
|
||||
func (s *StorageMinerStruct) SectorCommitFlush(p0 context.Context) (*cid.Cid, error) {
|
||||
func (s *StorageMinerStruct) SectorCommitFlush(p0 context.Context) ([]sealiface.CommitBatchRes, error) {
|
||||
return s.Internal.SectorCommitFlush(p0)
|
||||
}
|
||||
|
||||
func (s *StorageMinerStub) SectorCommitFlush(p0 context.Context) (*cid.Cid, error) {
|
||||
return nil, xerrors.New("method not supported")
|
||||
func (s *StorageMinerStub) SectorCommitFlush(p0 context.Context) ([]sealiface.CommitBatchRes, error) {
|
||||
return *new([]sealiface.CommitBatchRes), xerrors.New("method not supported")
|
||||
}
|
||||
|
||||
func (s *StorageMinerStruct) SectorCommitPending(p0 context.Context) ([]abi.SectorID, error) {
|
||||
@ -3165,12 +3166,12 @@ func (s *StorageMinerStub) SectorMarkForUpgrade(p0 context.Context, p1 abi.Secto
|
||||
return xerrors.New("method not supported")
|
||||
}
|
||||
|
||||
func (s *StorageMinerStruct) SectorPreCommitFlush(p0 context.Context) (*cid.Cid, error) {
|
||||
func (s *StorageMinerStruct) SectorPreCommitFlush(p0 context.Context) ([]sealiface.PreCommitBatchRes, error) {
|
||||
return s.Internal.SectorPreCommitFlush(p0)
|
||||
}
|
||||
|
||||
func (s *StorageMinerStub) SectorPreCommitFlush(p0 context.Context) (*cid.Cid, error) {
|
||||
return nil, xerrors.New("method not supported")
|
||||
func (s *StorageMinerStub) SectorPreCommitFlush(p0 context.Context) ([]sealiface.PreCommitBatchRes, error) {
|
||||
return *new([]sealiface.PreCommitBatchRes), xerrors.New("method not supported")
|
||||
}
|
||||
|
||||
func (s *StorageMinerStruct) SectorPreCommitPending(p0 context.Context) ([]abi.SectorID, error) {
|
||||
|
@ -169,7 +169,7 @@ func TestPledgeBatching(t *testing.T, b APIBuilder, blocktime time.Duration, nSe
|
||||
pcb, err := miner.SectorPreCommitFlush(ctx)
|
||||
require.NoError(t, err)
|
||||
if pcb != nil {
|
||||
fmt.Printf("PRECOMMIT BATCH: %s\n", *pcb)
|
||||
fmt.Printf("PRECOMMIT BATCH: %+v\n", pcb)
|
||||
}
|
||||
}
|
||||
|
||||
@ -178,7 +178,7 @@ func TestPledgeBatching(t *testing.T, b APIBuilder, blocktime time.Duration, nSe
|
||||
cb, err := miner.SectorCommitFlush(ctx)
|
||||
require.NoError(t, err)
|
||||
if cb != nil {
|
||||
fmt.Printf("COMMIT BATCH: %s\n", *cb)
|
||||
fmt.Printf("COMMIT BATCH: %+v\n", cb)
|
||||
}
|
||||
}
|
||||
|
||||
@ -319,13 +319,13 @@ func flushSealingBatches(t *testing.T, ctx context.Context, miner TestStorageNod
|
||||
pcb, err := miner.SectorPreCommitFlush(ctx)
|
||||
require.NoError(t, err)
|
||||
if pcb != nil {
|
||||
fmt.Printf("PRECOMMIT BATCH: %s\n", *pcb)
|
||||
fmt.Printf("PRECOMMIT BATCH: %+v\n", pcb)
|
||||
}
|
||||
|
||||
cb, err := miner.SectorCommitFlush(ctx)
|
||||
require.NoError(t, err)
|
||||
if cb != nil {
|
||||
fmt.Printf("COMMIT BATCH: %s\n", *cb)
|
||||
fmt.Printf("COMMIT BATCH: %+v\n", cb)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -543,7 +543,7 @@ func TestWindowPostDispute(t *testing.T, b APIBuilder, blocktime time.Duration)
|
||||
for {
|
||||
di, err = client.StateMinerProvingDeadline(ctx, evilMinerAddr, types.EmptyTSK)
|
||||
require.NoError(t, err)
|
||||
if di.Index == evilSectorLoc.Deadline {
|
||||
if di.Index == evilSectorLoc.Deadline && di.CurrentEpoch-di.PeriodStart > 1 {
|
||||
break
|
||||
}
|
||||
build.Clock.Sleep(blocktime)
|
||||
@ -640,7 +640,7 @@ func TestWindowPostDispute(t *testing.T, b APIBuilder, blocktime time.Duration)
|
||||
for {
|
||||
di, err = client.StateMinerProvingDeadline(ctx, evilMinerAddr, types.EmptyTSK)
|
||||
require.NoError(t, err)
|
||||
if di.Index == evilSectorLoc.Deadline {
|
||||
if di.Index == evilSectorLoc.Deadline && di.CurrentEpoch-di.PeriodStart > 1 {
|
||||
break
|
||||
}
|
||||
build.Clock.Sleep(blocktime)
|
||||
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -92,6 +92,13 @@ func AddSupportedProofTypes(types ...abi.RegisteredSealProof) {
|
||||
miner4.PreCommitSealProofTypesV8[t+abi.RegisteredSealProof_StackedDrg2KiBV1_1] = struct{}{}
|
||||
|
||||
miner5.PreCommitSealProofTypesV8[t+abi.RegisteredSealProof_StackedDrg2KiBV1_1] = struct{}{}
|
||||
wpp, err := t.RegisteredWindowPoStProof()
|
||||
if err != nil {
|
||||
// Fine to panic, this is a test-only method
|
||||
panic(err)
|
||||
}
|
||||
|
||||
miner5.WindowPoStProofTypes[wpp] = struct{}{}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -62,6 +62,13 @@ func AddSupportedProofTypes(types ...abi.RegisteredSealProof) {
|
||||
miner{{.}}.PreCommitSealProofTypesV8[t+abi.RegisteredSealProof_StackedDrg2KiBV1_1] = struct{}{}
|
||||
{{else}}
|
||||
miner{{.}}.PreCommitSealProofTypesV8[t+abi.RegisteredSealProof_StackedDrg2KiBV1_1] = struct{}{}
|
||||
wpp, err := t.RegisteredWindowPoStProof()
|
||||
if err != nil {
|
||||
// Fine to panic, this is a test-only method
|
||||
panic(err)
|
||||
}
|
||||
|
||||
miner{{.}}.WindowPoStProofTypes[wpp] = struct{}{}
|
||||
{{end}}
|
||||
{{end}}
|
||||
}
|
||||
|
@ -592,6 +592,10 @@ func (mca mca) ChainGetRandomnessFromTickets(ctx context.Context, tsk types.TipS
|
||||
return nil, xerrors.Errorf("loading tipset key: %w", err)
|
||||
}
|
||||
|
||||
if randEpoch > build.UpgradeHyperdriveHeight {
|
||||
return mca.sm.ChainStore().GetChainRandomnessLookingForward(ctx, pts.Cids(), personalization, randEpoch, entropy)
|
||||
}
|
||||
|
||||
return mca.sm.ChainStore().GetChainRandomnessLookingBack(ctx, pts.Cids(), personalization, randEpoch, entropy)
|
||||
}
|
||||
|
||||
@ -601,6 +605,10 @@ func (mca mca) ChainGetRandomnessFromBeacon(ctx context.Context, tsk types.TipSe
|
||||
return nil, xerrors.Errorf("loading tipset key: %w", err)
|
||||
}
|
||||
|
||||
if randEpoch > build.UpgradeHyperdriveHeight {
|
||||
return mca.sm.ChainStore().GetBeaconRandomnessLookingForward(ctx, pts.Cids(), personalization, randEpoch, entropy)
|
||||
}
|
||||
|
||||
return mca.sm.ChainStore().GetBeaconRandomnessLookingBack(ctx, pts.Cids(), personalization, randEpoch, entropy)
|
||||
}
|
||||
|
||||
|
@ -126,10 +126,14 @@ type MessagePool struct {
|
||||
|
||||
republished map[cid.Cid]struct{}
|
||||
|
||||
// do NOT access this map directly, use isLocal, setLocal, and forEachLocal respectively
|
||||
localAddrs map[address.Address]struct{}
|
||||
|
||||
// do NOT access this map directly, use getPendingMset, setPendingMset, deletePendingMset, forEachPending, and clearPending respectively
|
||||
pending map[address.Address]*msgSet
|
||||
|
||||
keyCache map[address.Address]address.Address
|
||||
|
||||
curTsLk sync.Mutex // DO NOT LOCK INSIDE lk
|
||||
curTs *types.TipSet
|
||||
|
||||
@ -329,6 +333,20 @@ func (ms *msgSet) getRequiredFunds(nonce uint64) types.BigInt {
|
||||
return types.BigInt{Int: requiredFunds}
|
||||
}
|
||||
|
||||
func (ms *msgSet) toSlice() []*types.SignedMessage {
|
||||
set := make([]*types.SignedMessage, 0, len(ms.msgs))
|
||||
|
||||
for _, m := range ms.msgs {
|
||||
set = append(set, m)
|
||||
}
|
||||
|
||||
sort.Slice(set, func(i, j int) bool {
|
||||
return set[i].Message.Nonce < set[j].Message.Nonce
|
||||
})
|
||||
|
||||
return set
|
||||
}
|
||||
|
||||
func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, j journal.Journal) (*MessagePool, error) {
|
||||
cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
|
||||
verifcache, _ := lru.New2Q(build.VerifSigCacheSize)
|
||||
@ -350,6 +368,7 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, j journ
|
||||
repubTrigger: make(chan struct{}, 1),
|
||||
localAddrs: make(map[address.Address]struct{}),
|
||||
pending: make(map[address.Address]*msgSet),
|
||||
keyCache: make(map[address.Address]address.Address),
|
||||
minGasPrice: types.NewInt(0),
|
||||
pruneTrigger: make(chan struct{}, 1),
|
||||
pruneCooldown: make(chan struct{}, 1),
|
||||
@ -371,9 +390,11 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, j journ
|
||||
// enable initial prunes
|
||||
mp.pruneCooldown <- struct{}{}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.TODO())
|
||||
|
||||
// load the current tipset and subscribe to head changes _before_ loading local messages
|
||||
mp.curTs = api.SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
|
||||
err := mp.HeadChange(rev, app)
|
||||
err := mp.HeadChange(ctx, rev, app)
|
||||
if err != nil {
|
||||
log.Errorf("mpool head notif handler error: %+v", err)
|
||||
}
|
||||
@ -384,7 +405,8 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, j journ
|
||||
mp.lk.Lock()
|
||||
|
||||
go func() {
|
||||
err := mp.loadLocal()
|
||||
defer cancel()
|
||||
err := mp.loadLocal(ctx)
|
||||
|
||||
mp.lk.Unlock()
|
||||
mp.curTsLk.Unlock()
|
||||
@ -395,12 +417,106 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, j journ
|
||||
|
||||
log.Info("mpool ready")
|
||||
|
||||
mp.runLoop()
|
||||
mp.runLoop(ctx)
|
||||
}()
|
||||
|
||||
return mp, nil
|
||||
}
|
||||
|
||||
func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) (address.Address, error) {
|
||||
// check the cache
|
||||
a, f := mp.keyCache[addr]
|
||||
if f {
|
||||
return a, nil
|
||||
}
|
||||
|
||||
// resolve the address
|
||||
ka, err := mp.api.StateAccountKeyAtFinality(ctx, addr, mp.curTs)
|
||||
if err != nil {
|
||||
return address.Undef, err
|
||||
}
|
||||
|
||||
// place both entries in the cache (may both be key addresses, which is fine)
|
||||
mp.keyCache[addr] = ka
|
||||
mp.keyCache[ka] = ka
|
||||
|
||||
return ka, nil
|
||||
}
|
||||
|
||||
func (mp *MessagePool) getPendingMset(ctx context.Context, addr address.Address) (*msgSet, bool, error) {
|
||||
ra, err := mp.resolveToKey(ctx, addr)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
ms, f := mp.pending[ra]
|
||||
|
||||
return ms, f, nil
|
||||
}
|
||||
|
||||
func (mp *MessagePool) setPendingMset(ctx context.Context, addr address.Address, ms *msgSet) error {
|
||||
ra, err := mp.resolveToKey(ctx, addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
mp.pending[ra] = ms
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// This method isn't strictly necessary, since it doesn't resolve any addresses, but it's safer to have
|
||||
func (mp *MessagePool) forEachPending(f func(address.Address, *msgSet)) {
|
||||
for la, ms := range mp.pending {
|
||||
f(la, ms)
|
||||
}
|
||||
}
|
||||
|
||||
func (mp *MessagePool) deletePendingMset(ctx context.Context, addr address.Address) error {
|
||||
ra, err := mp.resolveToKey(ctx, addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
delete(mp.pending, ra)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// This method isn't strictly necessary, since it doesn't resolve any addresses, but it's safer to have
|
||||
func (mp *MessagePool) clearPending() {
|
||||
mp.pending = make(map[address.Address]*msgSet)
|
||||
}
|
||||
|
||||
func (mp *MessagePool) isLocal(ctx context.Context, addr address.Address) (bool, error) {
|
||||
ra, err := mp.resolveToKey(ctx, addr)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
_, f := mp.localAddrs[ra]
|
||||
|
||||
return f, nil
|
||||
}
|
||||
|
||||
func (mp *MessagePool) setLocal(ctx context.Context, addr address.Address) error {
|
||||
ra, err := mp.resolveToKey(ctx, addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
mp.localAddrs[ra] = struct{}{}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// This method isn't strictly necessary, since it doesn't resolve any addresses, but it's safer to have
|
||||
func (mp *MessagePool) forEachLocal(ctx context.Context, f func(context.Context, address.Address)) {
|
||||
for la := range mp.localAddrs {
|
||||
f(ctx, la)
|
||||
}
|
||||
}
|
||||
|
||||
func (mp *MessagePool) Close() error {
|
||||
close(mp.closer)
|
||||
return nil
|
||||
@ -418,15 +534,15 @@ func (mp *MessagePool) Prune() {
|
||||
mp.pruneTrigger <- struct{}{}
|
||||
}
|
||||
|
||||
func (mp *MessagePool) runLoop() {
|
||||
func (mp *MessagePool) runLoop(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-mp.repubTk.C:
|
||||
if err := mp.republishPendingMessages(); err != nil {
|
||||
if err := mp.republishPendingMessages(ctx); err != nil {
|
||||
log.Errorf("error while republishing messages: %s", err)
|
||||
}
|
||||
case <-mp.repubTrigger:
|
||||
if err := mp.republishPendingMessages(); err != nil {
|
||||
if err := mp.republishPendingMessages(ctx); err != nil {
|
||||
log.Errorf("error while republishing messages: %s", err)
|
||||
}
|
||||
|
||||
@ -442,8 +558,10 @@ func (mp *MessagePool) runLoop() {
|
||||
}
|
||||
}
|
||||
|
||||
func (mp *MessagePool) addLocal(m *types.SignedMessage) error {
|
||||
mp.localAddrs[m.Message.From] = struct{}{}
|
||||
func (mp *MessagePool) addLocal(ctx context.Context, m *types.SignedMessage) error {
|
||||
if err := mp.setLocal(ctx, m.Message.From); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
msgb, err := m.Serialize()
|
||||
if err != nil {
|
||||
@ -475,7 +593,7 @@ func (mp *MessagePool) verifyMsgBeforeAdd(m *types.SignedMessage, curTs *types.T
|
||||
return false, xerrors.Errorf("message will not be included in a block: %w", err)
|
||||
}
|
||||
|
||||
// this checks if the GasFeeCap is suffisciently high for inclusion in the next 20 blocks
|
||||
// this checks if the GasFeeCap is sufficiently high for inclusion in the next 20 blocks
|
||||
// if the GasFeeCap is too low, we soft reject the message (Ignore in pubsub) and rely
|
||||
// on republish to push it through later, if the baseFee has fallen.
|
||||
// this is a defensive check that stops minimum baseFee spam attacks from overloading validation
|
||||
@ -510,7 +628,7 @@ func (mp *MessagePool) verifyMsgBeforeAdd(m *types.SignedMessage, curTs *types.T
|
||||
return publish, nil
|
||||
}
|
||||
|
||||
func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
|
||||
func (mp *MessagePool) Push(ctx context.Context, m *types.SignedMessage) (cid.Cid, error) {
|
||||
err := mp.checkMessage(m)
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
@ -523,7 +641,7 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
|
||||
}()
|
||||
|
||||
mp.curTsLk.Lock()
|
||||
publish, err := mp.addTs(m, mp.curTs, true, false)
|
||||
publish, err := mp.addTs(ctx, m, mp.curTs, true, false)
|
||||
if err != nil {
|
||||
mp.curTsLk.Unlock()
|
||||
return cid.Undef, err
|
||||
@ -576,7 +694,7 @@ func (mp *MessagePool) checkMessage(m *types.SignedMessage) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mp *MessagePool) Add(m *types.SignedMessage) error {
|
||||
func (mp *MessagePool) Add(ctx context.Context, m *types.SignedMessage) error {
|
||||
err := mp.checkMessage(m)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -591,7 +709,7 @@ func (mp *MessagePool) Add(m *types.SignedMessage) error {
|
||||
mp.curTsLk.Lock()
|
||||
defer mp.curTsLk.Unlock()
|
||||
|
||||
_, err = mp.addTs(m, mp.curTs, false, false)
|
||||
_, err = mp.addTs(ctx, m, mp.curTs, false, false)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -631,7 +749,7 @@ func (mp *MessagePool) VerifyMsgSig(m *types.SignedMessage) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mp *MessagePool) checkBalance(m *types.SignedMessage, curTs *types.TipSet) error {
|
||||
func (mp *MessagePool) checkBalance(ctx context.Context, m *types.SignedMessage, curTs *types.TipSet) error {
|
||||
balance, err := mp.getStateBalance(m.Message.From, curTs)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to check sender balance: %s: %w", err, ErrSoftValidationFailure)
|
||||
@ -645,7 +763,12 @@ func (mp *MessagePool) checkBalance(m *types.SignedMessage, curTs *types.TipSet)
|
||||
// add Value for soft failure check
|
||||
//requiredFunds = types.BigAdd(requiredFunds, m.Message.Value)
|
||||
|
||||
mset, ok := mp.pending[m.Message.From]
|
||||
mset, ok, err := mp.getPendingMset(ctx, m.Message.From)
|
||||
if err != nil {
|
||||
log.Debugf("mpoolcheckbalance failed to get pending mset: %s", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if ok {
|
||||
requiredFunds = types.BigAdd(requiredFunds, mset.getRequiredFunds(m.Message.Nonce))
|
||||
}
|
||||
@ -659,7 +782,7 @@ func (mp *MessagePool) checkBalance(m *types.SignedMessage, curTs *types.TipSet)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local, untrusted bool) (bool, error) {
|
||||
func (mp *MessagePool) addTs(ctx context.Context, m *types.SignedMessage, curTs *types.TipSet, local, untrusted bool) (bool, error) {
|
||||
snonce, err := mp.getStateNonce(m.Message.From, curTs)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure)
|
||||
@ -677,17 +800,17 @@ func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local,
|
||||
return false, err
|
||||
}
|
||||
|
||||
if err := mp.checkBalance(m, curTs); err != nil {
|
||||
if err := mp.checkBalance(ctx, m, curTs); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
err = mp.addLocked(m, !local, untrusted)
|
||||
err = mp.addLocked(ctx, m, !local, untrusted)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if local {
|
||||
err = mp.addLocal(m)
|
||||
err = mp.addLocal(ctx, m)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("error persisting local message: %w", err)
|
||||
}
|
||||
@ -696,7 +819,7 @@ func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local,
|
||||
return publish, nil
|
||||
}
|
||||
|
||||
func (mp *MessagePool) addLoaded(m *types.SignedMessage) error {
|
||||
func (mp *MessagePool) addLoaded(ctx context.Context, m *types.SignedMessage) error {
|
||||
err := mp.checkMessage(m)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -722,21 +845,21 @@ func (mp *MessagePool) addLoaded(m *types.SignedMessage) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := mp.checkBalance(m, curTs); err != nil {
|
||||
if err := mp.checkBalance(ctx, m, curTs); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return mp.addLocked(m, false, false)
|
||||
return mp.addLocked(ctx, m, false, false)
|
||||
}
|
||||
|
||||
func (mp *MessagePool) addSkipChecks(m *types.SignedMessage) error {
|
||||
func (mp *MessagePool) addSkipChecks(ctx context.Context, m *types.SignedMessage) error {
|
||||
mp.lk.Lock()
|
||||
defer mp.lk.Unlock()
|
||||
|
||||
return mp.addLocked(m, false, false)
|
||||
return mp.addLocked(ctx, m, false, false)
|
||||
}
|
||||
|
||||
func (mp *MessagePool) addLocked(m *types.SignedMessage, strict, untrusted bool) error {
|
||||
func (mp *MessagePool) addLocked(ctx context.Context, m *types.SignedMessage, strict, untrusted bool) error {
|
||||
log.Debugf("mpooladd: %s %d", m.Message.From, m.Message.Nonce)
|
||||
if m.Signature.Type == crypto.SigTypeBLS {
|
||||
mp.blsSigCache.Add(m.Cid(), m.Signature)
|
||||
@ -752,7 +875,13 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage, strict, untrusted bool)
|
||||
return err
|
||||
}
|
||||
|
||||
mset, ok := mp.pending[m.Message.From]
|
||||
// Note: If performance becomes an issue, making this getOrCreatePendingMset will save some work
|
||||
mset, ok, err := mp.getPendingMset(ctx, m.Message.From)
|
||||
if err != nil {
|
||||
log.Debug(err)
|
||||
return err
|
||||
}
|
||||
|
||||
if !ok {
|
||||
nonce, err := mp.getStateNonce(m.Message.From, mp.curTs)
|
||||
if err != nil {
|
||||
@ -760,7 +889,9 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage, strict, untrusted bool)
|
||||
}
|
||||
|
||||
mset = newMsgSet(nonce)
|
||||
mp.pending[m.Message.From] = mset
|
||||
if err = mp.setPendingMset(ctx, m.Message.From, mset); err != nil {
|
||||
return xerrors.Errorf("failed to set pending mset: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
incr, err := mset.add(m, mp, strict, untrusted)
|
||||
@ -795,14 +926,14 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage, strict, untrusted bool)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mp *MessagePool) GetNonce(_ context.Context, addr address.Address, _ types.TipSetKey) (uint64, error) {
|
||||
func (mp *MessagePool) GetNonce(ctx context.Context, addr address.Address, _ types.TipSetKey) (uint64, error) {
|
||||
mp.curTsLk.Lock()
|
||||
defer mp.curTsLk.Unlock()
|
||||
|
||||
mp.lk.Lock()
|
||||
defer mp.lk.Unlock()
|
||||
|
||||
return mp.getNonceLocked(addr, mp.curTs)
|
||||
return mp.getNonceLocked(ctx, addr, mp.curTs)
|
||||
}
|
||||
|
||||
// GetActor should not be used. It is only here to satisfy interface mess caused by lite node handling
|
||||
@ -812,13 +943,18 @@ func (mp *MessagePool) GetActor(_ context.Context, addr address.Address, _ types
|
||||
return mp.api.GetActorAfter(addr, mp.curTs)
|
||||
}
|
||||
|
||||
func (mp *MessagePool) getNonceLocked(addr address.Address, curTs *types.TipSet) (uint64, error) {
|
||||
func (mp *MessagePool) getNonceLocked(ctx context.Context, addr address.Address, curTs *types.TipSet) (uint64, error) {
|
||||
stateNonce, err := mp.getStateNonce(addr, curTs) // sanity check
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
mset, ok := mp.pending[addr]
|
||||
mset, ok, err := mp.getPendingMset(ctx, addr)
|
||||
if err != nil {
|
||||
log.Debugf("mpoolgetnonce failed to get mset: %s", err)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if ok {
|
||||
if stateNonce > mset.nextNonce {
|
||||
log.Errorf("state nonce was larger than mset.nextNonce (%d > %d)", stateNonce, mset.nextNonce)
|
||||
@ -855,7 +991,7 @@ func (mp *MessagePool) getStateBalance(addr address.Address, ts *types.TipSet) (
|
||||
// - strict checks are enabled
|
||||
// - extra strict add checks are used when adding the messages to the msgSet
|
||||
// that means: no nonce gaps, at most 10 pending messages for the actor
|
||||
func (mp *MessagePool) PushUntrusted(m *types.SignedMessage) (cid.Cid, error) {
|
||||
func (mp *MessagePool) PushUntrusted(ctx context.Context, m *types.SignedMessage) (cid.Cid, error) {
|
||||
err := mp.checkMessage(m)
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
@ -868,7 +1004,7 @@ func (mp *MessagePool) PushUntrusted(m *types.SignedMessage) (cid.Cid, error) {
|
||||
}()
|
||||
|
||||
mp.curTsLk.Lock()
|
||||
publish, err := mp.addTs(m, mp.curTs, true, true)
|
||||
publish, err := mp.addTs(ctx, m, mp.curTs, true, true)
|
||||
if err != nil {
|
||||
mp.curTsLk.Unlock()
|
||||
return cid.Undef, err
|
||||
@ -890,15 +1026,20 @@ func (mp *MessagePool) PushUntrusted(m *types.SignedMessage) (cid.Cid, error) {
|
||||
return m.Cid(), nil
|
||||
}
|
||||
|
||||
func (mp *MessagePool) Remove(from address.Address, nonce uint64, applied bool) {
|
||||
func (mp *MessagePool) Remove(ctx context.Context, from address.Address, nonce uint64, applied bool) {
|
||||
mp.lk.Lock()
|
||||
defer mp.lk.Unlock()
|
||||
|
||||
mp.remove(from, nonce, applied)
|
||||
mp.remove(ctx, from, nonce, applied)
|
||||
}
|
||||
|
||||
func (mp *MessagePool) remove(from address.Address, nonce uint64, applied bool) {
|
||||
mset, ok := mp.pending[from]
|
||||
func (mp *MessagePool) remove(ctx context.Context, from address.Address, nonce uint64, applied bool) {
|
||||
mset, ok, err := mp.getPendingMset(ctx, from)
|
||||
if err != nil {
|
||||
log.Debugf("mpoolremove failed to get mset: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
@ -923,58 +1064,57 @@ func (mp *MessagePool) remove(from address.Address, nonce uint64, applied bool)
|
||||
mset.rm(nonce, applied)
|
||||
|
||||
if len(mset.msgs) == 0 {
|
||||
delete(mp.pending, from)
|
||||
if err = mp.deletePendingMset(ctx, from); err != nil {
|
||||
log.Debugf("mpoolremove failed to delete mset: %s", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (mp *MessagePool) Pending() ([]*types.SignedMessage, *types.TipSet) {
|
||||
func (mp *MessagePool) Pending(ctx context.Context) ([]*types.SignedMessage, *types.TipSet) {
|
||||
mp.curTsLk.Lock()
|
||||
defer mp.curTsLk.Unlock()
|
||||
|
||||
mp.lk.Lock()
|
||||
defer mp.lk.Unlock()
|
||||
|
||||
return mp.allPending()
|
||||
return mp.allPending(ctx)
|
||||
}
|
||||
|
||||
func (mp *MessagePool) allPending() ([]*types.SignedMessage, *types.TipSet) {
|
||||
func (mp *MessagePool) allPending(ctx context.Context) ([]*types.SignedMessage, *types.TipSet) {
|
||||
out := make([]*types.SignedMessage, 0)
|
||||
for a := range mp.pending {
|
||||
out = append(out, mp.pendingFor(a)...)
|
||||
}
|
||||
|
||||
mp.forEachPending(func(a address.Address, mset *msgSet) {
|
||||
out = append(out, mset.toSlice()...)
|
||||
})
|
||||
|
||||
return out, mp.curTs
|
||||
}
|
||||
|
||||
func (mp *MessagePool) PendingFor(a address.Address) ([]*types.SignedMessage, *types.TipSet) {
|
||||
func (mp *MessagePool) PendingFor(ctx context.Context, a address.Address) ([]*types.SignedMessage, *types.TipSet) {
|
||||
mp.curTsLk.Lock()
|
||||
defer mp.curTsLk.Unlock()
|
||||
|
||||
mp.lk.Lock()
|
||||
defer mp.lk.Unlock()
|
||||
return mp.pendingFor(a), mp.curTs
|
||||
return mp.pendingFor(ctx, a), mp.curTs
|
||||
}
|
||||
|
||||
func (mp *MessagePool) pendingFor(a address.Address) []*types.SignedMessage {
|
||||
mset := mp.pending[a]
|
||||
if mset == nil || len(mset.msgs) == 0 {
|
||||
func (mp *MessagePool) pendingFor(ctx context.Context, a address.Address) []*types.SignedMessage {
|
||||
mset, ok, err := mp.getPendingMset(ctx, a)
|
||||
if err != nil {
|
||||
log.Debugf("mpoolpendingfor failed to get mset: %s", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
set := make([]*types.SignedMessage, 0, len(mset.msgs))
|
||||
|
||||
for _, m := range mset.msgs {
|
||||
set = append(set, m)
|
||||
if mset == nil || !ok || len(mset.msgs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
sort.Slice(set, func(i, j int) bool {
|
||||
return set[i].Message.Nonce < set[j].Message.Nonce
|
||||
})
|
||||
|
||||
return set
|
||||
return mset.toSlice()
|
||||
}
|
||||
|
||||
func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) error {
|
||||
func (mp *MessagePool) HeadChange(ctx context.Context, revert []*types.TipSet, apply []*types.TipSet) error {
|
||||
mp.curTsLk.Lock()
|
||||
defer mp.curTsLk.Unlock()
|
||||
|
||||
@ -991,7 +1131,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
|
||||
rm := func(from address.Address, nonce uint64) {
|
||||
s, ok := rmsgs[from]
|
||||
if !ok {
|
||||
mp.Remove(from, nonce, true)
|
||||
mp.Remove(ctx, from, nonce, true)
|
||||
return
|
||||
}
|
||||
|
||||
@ -1000,7 +1140,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
|
||||
return
|
||||
}
|
||||
|
||||
mp.Remove(from, nonce, true)
|
||||
mp.Remove(ctx, from, nonce, true)
|
||||
}
|
||||
|
||||
maybeRepub := func(cid cid.Cid) {
|
||||
@ -1071,7 +1211,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
|
||||
|
||||
for _, s := range rmsgs {
|
||||
for _, msg := range s {
|
||||
if err := mp.addSkipChecks(msg); err != nil {
|
||||
if err := mp.addSkipChecks(ctx, msg); err != nil {
|
||||
log.Errorf("Failed to readd message from reorg to mpool: %s", err)
|
||||
}
|
||||
}
|
||||
@ -1079,7 +1219,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
|
||||
|
||||
if len(revert) > 0 && futureDebug {
|
||||
mp.lk.Lock()
|
||||
msgs, ts := mp.allPending()
|
||||
msgs, ts := mp.allPending(ctx)
|
||||
mp.lk.Unlock()
|
||||
|
||||
buckets := map[address.Address]*statBucket{}
|
||||
@ -1286,7 +1426,7 @@ func (mp *MessagePool) Updates(ctx context.Context) (<-chan api.MpoolUpdate, err
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (mp *MessagePool) loadLocal() error {
|
||||
func (mp *MessagePool) loadLocal(ctx context.Context) error {
|
||||
res, err := mp.localMsgs.Query(query.Query{})
|
||||
if err != nil {
|
||||
return xerrors.Errorf("query local messages: %w", err)
|
||||
@ -1302,7 +1442,7 @@ func (mp *MessagePool) loadLocal() error {
|
||||
return xerrors.Errorf("unmarshaling local message: %w", err)
|
||||
}
|
||||
|
||||
if err := mp.addLoaded(&sm); err != nil {
|
||||
if err := mp.addLoaded(ctx, &sm); err != nil {
|
||||
if xerrors.Is(err, ErrNonceTooLow) {
|
||||
continue // todo: drop the message from local cache (if above certain confidence threshold)
|
||||
}
|
||||
@ -1310,25 +1450,30 @@ func (mp *MessagePool) loadLocal() error {
|
||||
log.Errorf("adding local message: %+v", err)
|
||||
}
|
||||
|
||||
mp.localAddrs[sm.Message.From] = struct{}{}
|
||||
if err = mp.setLocal(ctx, sm.Message.From); err != nil {
|
||||
log.Debugf("mpoolloadLocal errored: %s", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mp *MessagePool) Clear(local bool) {
|
||||
func (mp *MessagePool) Clear(ctx context.Context, local bool) {
|
||||
mp.lk.Lock()
|
||||
defer mp.lk.Unlock()
|
||||
|
||||
// remove everything if local is true, including removing local messages from
|
||||
// the datastore
|
||||
if local {
|
||||
for a := range mp.localAddrs {
|
||||
mset, ok := mp.pending[a]
|
||||
if !ok {
|
||||
continue
|
||||
mp.forEachLocal(ctx, func(ctx context.Context, la address.Address) {
|
||||
mset, ok, err := mp.getPendingMset(ctx, la)
|
||||
if err != nil {
|
||||
log.Warnf("errored while getting pending mset: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
if ok {
|
||||
for _, m := range mset.msgs {
|
||||
err := mp.localMsgs.Delete(datastore.NewKey(string(m.Cid().Bytes())))
|
||||
if err != nil {
|
||||
@ -1336,21 +1481,30 @@ func (mp *MessagePool) Clear(local bool) {
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
mp.pending = make(map[address.Address]*msgSet)
|
||||
mp.clearPending()
|
||||
mp.republished = nil
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// remove everything except the local messages
|
||||
for a := range mp.pending {
|
||||
_, isLocal := mp.localAddrs[a]
|
||||
mp.forEachPending(func(a address.Address, ms *msgSet) {
|
||||
isLocal, err := mp.isLocal(ctx, a)
|
||||
if err != nil {
|
||||
log.Warnf("errored while determining isLocal: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
if isLocal {
|
||||
continue
|
||||
return
|
||||
}
|
||||
delete(mp.pending, a)
|
||||
|
||||
if err = mp.deletePendingMset(ctx, a); err != nil {
|
||||
log.Warnf("errored while deleting mset: %w", err)
|
||||
return
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func getBaseFeeLowerBound(baseFee, factor types.BigInt) types.BigInt {
|
||||
|
@ -153,7 +153,7 @@ func (tma *testMpoolAPI) GetActorAfter(addr address.Address, ts *types.TipSet) (
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (tma *testMpoolAPI) StateAccountKey(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) {
|
||||
func (tma *testMpoolAPI) StateAccountKeyAtFinality(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) {
|
||||
if addr.Protocol() != address.BLS && addr.Protocol() != address.SECP256K1 {
|
||||
return address.Undef, fmt.Errorf("given address was not a key addr")
|
||||
}
|
||||
@ -202,7 +202,7 @@ func (tma *testMpoolAPI) ChainComputeBaseFee(ctx context.Context, ts *types.TipS
|
||||
|
||||
func assertNonce(t *testing.T, mp *MessagePool, addr address.Address, val uint64) {
|
||||
t.Helper()
|
||||
n, err := mp.GetNonce(context.Background(), addr, types.EmptyTSK)
|
||||
n, err := mp.GetNonce(context.TODO(), addr, types.EmptyTSK)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -214,7 +214,7 @@ func assertNonce(t *testing.T, mp *MessagePool, addr address.Address, val uint64
|
||||
|
||||
func mustAdd(t *testing.T, mp *MessagePool, msg *types.SignedMessage) {
|
||||
t.Helper()
|
||||
if err := mp.Add(msg); err != nil {
|
||||
if err := mp.Add(context.TODO(), msg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
@ -296,9 +296,9 @@ func TestMessagePoolMessagesInEachBlock(t *testing.T) {
|
||||
tma.applyBlock(t, a)
|
||||
tsa := mock.TipSet(a)
|
||||
|
||||
_, _ = mp.Pending()
|
||||
_, _ = mp.Pending(context.TODO())
|
||||
|
||||
selm, _ := mp.SelectMessages(tsa, 1)
|
||||
selm, _ := mp.SelectMessages(context.Background(), tsa, 1)
|
||||
if len(selm) == 0 {
|
||||
t.Fatal("should have returned the rest of the messages")
|
||||
}
|
||||
@ -358,7 +358,7 @@ func TestRevertMessages(t *testing.T) {
|
||||
|
||||
assertNonce(t, mp, sender, 4)
|
||||
|
||||
p, _ := mp.Pending()
|
||||
p, _ := mp.Pending(context.TODO())
|
||||
fmt.Printf("%+v\n", p)
|
||||
if len(p) != 3 {
|
||||
t.Fatal("expected three messages in mempool")
|
||||
@ -399,14 +399,14 @@ func TestPruningSimple(t *testing.T) {
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
smsg := mock.MkMessage(sender, target, uint64(i), w)
|
||||
if err := mp.Add(smsg); err != nil {
|
||||
if err := mp.Add(context.TODO(), smsg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
for i := 10; i < 50; i++ {
|
||||
smsg := mock.MkMessage(sender, target, uint64(i), w)
|
||||
if err := mp.Add(smsg); err != nil {
|
||||
if err := mp.Add(context.TODO(), smsg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
@ -416,7 +416,7 @@ func TestPruningSimple(t *testing.T) {
|
||||
|
||||
mp.Prune()
|
||||
|
||||
msgs, _ := mp.Pending()
|
||||
msgs, _ := mp.Pending(context.TODO())
|
||||
if len(msgs) != 5 {
|
||||
t.Fatal("expected only 5 messages in pool, got: ", len(msgs))
|
||||
}
|
||||
@ -458,7 +458,7 @@ func TestLoadLocal(t *testing.T) {
|
||||
msgs := make(map[cid.Cid]struct{})
|
||||
for i := 0; i < 10; i++ {
|
||||
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
||||
cid, err := mp.Push(m)
|
||||
cid, err := mp.Push(context.TODO(), m)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -474,7 +474,7 @@ func TestLoadLocal(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
pmsgs, _ := mp.Pending()
|
||||
pmsgs, _ := mp.Pending(context.TODO())
|
||||
if len(msgs) != len(pmsgs) {
|
||||
t.Fatalf("expected %d messages, but got %d", len(msgs), len(pmsgs))
|
||||
}
|
||||
@ -529,7 +529,7 @@ func TestClearAll(t *testing.T) {
|
||||
gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin2.StorageMarketActorCodeID, M: 2}]
|
||||
for i := 0; i < 10; i++ {
|
||||
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
||||
_, err := mp.Push(m)
|
||||
_, err := mp.Push(context.TODO(), m)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -540,9 +540,9 @@ func TestClearAll(t *testing.T) {
|
||||
mustAdd(t, mp, m)
|
||||
}
|
||||
|
||||
mp.Clear(true)
|
||||
mp.Clear(context.Background(), true)
|
||||
|
||||
pending, _ := mp.Pending()
|
||||
pending, _ := mp.Pending(context.TODO())
|
||||
if len(pending) > 0 {
|
||||
t.Fatalf("cleared the mpool, but got %d pending messages", len(pending))
|
||||
}
|
||||
@ -584,7 +584,7 @@ func TestClearNonLocal(t *testing.T) {
|
||||
gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin2.StorageMarketActorCodeID, M: 2}]
|
||||
for i := 0; i < 10; i++ {
|
||||
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
||||
_, err := mp.Push(m)
|
||||
_, err := mp.Push(context.TODO(), m)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -595,9 +595,9 @@ func TestClearNonLocal(t *testing.T) {
|
||||
mustAdd(t, mp, m)
|
||||
}
|
||||
|
||||
mp.Clear(false)
|
||||
mp.Clear(context.Background(), false)
|
||||
|
||||
pending, _ := mp.Pending()
|
||||
pending, _ := mp.Pending(context.TODO())
|
||||
if len(pending) != 10 {
|
||||
t.Fatalf("expected 10 pending messages, but got %d instead", len(pending))
|
||||
}
|
||||
@ -654,7 +654,7 @@ func TestUpdates(t *testing.T) {
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
||||
_, err := mp.Push(m)
|
||||
_, err := mp.Push(context.TODO(), m)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -26,7 +26,7 @@ type Provider interface {
|
||||
PutMessage(m types.ChainMsg) (cid.Cid, error)
|
||||
PubSubPublish(string, []byte) error
|
||||
GetActorAfter(address.Address, *types.TipSet) (*types.Actor, error)
|
||||
StateAccountKey(context.Context, address.Address, *types.TipSet) (address.Address, error)
|
||||
StateAccountKeyAtFinality(context.Context, address.Address, *types.TipSet) (address.Address, error)
|
||||
MessagesForBlock(*types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error)
|
||||
MessagesForTipset(*types.TipSet) ([]types.ChainMsg, error)
|
||||
LoadTipSet(tsk types.TipSetKey) (*types.TipSet, error)
|
||||
@ -41,6 +41,8 @@ type mpoolProvider struct {
|
||||
lite messagesigner.MpoolNonceAPI
|
||||
}
|
||||
|
||||
var _ Provider = (*mpoolProvider)(nil)
|
||||
|
||||
func NewProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) Provider {
|
||||
return &mpoolProvider{sm: sm, ps: ps}
|
||||
}
|
||||
@ -97,8 +99,8 @@ func (mpp *mpoolProvider) GetActorAfter(addr address.Address, ts *types.TipSet)
|
||||
return st.GetActor(addr)
|
||||
}
|
||||
|
||||
func (mpp *mpoolProvider) StateAccountKey(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) {
|
||||
return mpp.sm.ResolveToKeyAddress(ctx, addr, ts)
|
||||
func (mpp *mpoolProvider) StateAccountKeyAtFinality(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) {
|
||||
return mpp.sm.ResolveToKeyAddressAtFinality(ctx, addr, ts)
|
||||
}
|
||||
|
||||
func (mpp *mpoolProvider) MessagesForBlock(h *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
|
||||
|
@ -57,13 +57,18 @@ func (mp *MessagePool) pruneMessages(ctx context.Context, ts *types.TipSet) erro
|
||||
mpCfg := mp.getConfig()
|
||||
// we never prune priority addresses
|
||||
for _, actor := range mpCfg.PriorityAddrs {
|
||||
protected[actor] = struct{}{}
|
||||
pk, err := mp.resolveToKey(ctx, actor)
|
||||
if err != nil {
|
||||
log.Debugf("pruneMessages failed to resolve priority address: %s", err)
|
||||
}
|
||||
|
||||
protected[pk] = struct{}{}
|
||||
}
|
||||
|
||||
// we also never prune locally published messages
|
||||
for actor := range mp.localAddrs {
|
||||
mp.forEachLocal(ctx, func(ctx context.Context, actor address.Address) {
|
||||
protected[actor] = struct{}{}
|
||||
}
|
||||
})
|
||||
|
||||
// Collect all messages to track which ones to remove and create chains for block inclusion
|
||||
pruneMsgs := make(map[cid.Cid]*types.SignedMessage, mp.currentSize)
|
||||
@ -108,7 +113,7 @@ keepLoop:
|
||||
// and remove all messages that are still in pruneMsgs after processing the chains
|
||||
log.Infof("Pruning %d messages", len(pruneMsgs))
|
||||
for _, m := range pruneMsgs {
|
||||
mp.remove(m.Message.From, m.Message.Nonce, false)
|
||||
mp.remove(ctx, m.Message.From, m.Message.Nonce, false)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -18,7 +18,7 @@ const repubMsgLimit = 30
|
||||
|
||||
var RepublishBatchDelay = 100 * time.Millisecond
|
||||
|
||||
func (mp *MessagePool) republishPendingMessages() error {
|
||||
func (mp *MessagePool) republishPendingMessages(ctx context.Context) error {
|
||||
mp.curTsLk.Lock()
|
||||
ts := mp.curTs
|
||||
|
||||
@ -32,13 +32,18 @@ func (mp *MessagePool) republishPendingMessages() error {
|
||||
pending := make(map[address.Address]map[uint64]*types.SignedMessage)
|
||||
mp.lk.Lock()
|
||||
mp.republished = nil // clear this to avoid races triggering an early republish
|
||||
for actor := range mp.localAddrs {
|
||||
mset, ok := mp.pending[actor]
|
||||
mp.forEachLocal(ctx, func(ctx context.Context, actor address.Address) {
|
||||
mset, ok, err := mp.getPendingMset(ctx, actor)
|
||||
if err != nil {
|
||||
log.Debugf("failed to get mset: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
if !ok {
|
||||
continue
|
||||
return
|
||||
}
|
||||
if len(mset.msgs) == 0 {
|
||||
continue
|
||||
return
|
||||
}
|
||||
// we need to copy this while holding the lock to avoid races with concurrent modification
|
||||
pend := make(map[uint64]*types.SignedMessage, len(mset.msgs))
|
||||
@ -46,7 +51,8 @@ func (mp *MessagePool) republishPendingMessages() error {
|
||||
pend[nonce] = m
|
||||
}
|
||||
pending[actor] = pend
|
||||
}
|
||||
})
|
||||
|
||||
mp.lk.Unlock()
|
||||
mp.curTsLk.Unlock()
|
||||
|
||||
|
@ -56,7 +56,7 @@ func TestRepubMessages(t *testing.T) {
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
||||
_, err := mp.Push(m)
|
||||
_, err := mp.Push(context.TODO(), m)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -38,7 +38,7 @@ type msgChain struct {
|
||||
prev *msgChain
|
||||
}
|
||||
|
||||
func (mp *MessagePool) SelectMessages(ts *types.TipSet, tq float64) (msgs []*types.SignedMessage, err error) {
|
||||
func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq float64) (msgs []*types.SignedMessage, err error) {
|
||||
mp.curTsLk.Lock()
|
||||
defer mp.curTsLk.Unlock()
|
||||
|
||||
@ -49,9 +49,9 @@ func (mp *MessagePool) SelectMessages(ts *types.TipSet, tq float64) (msgs []*typ
|
||||
// than any other block, then we don't bother with optimal selection because the
|
||||
// first block will always have higher effective performance
|
||||
if tq > 0.84 {
|
||||
msgs, err = mp.selectMessagesGreedy(mp.curTs, ts)
|
||||
msgs, err = mp.selectMessagesGreedy(ctx, mp.curTs, ts)
|
||||
} else {
|
||||
msgs, err = mp.selectMessagesOptimal(mp.curTs, ts, tq)
|
||||
msgs, err = mp.selectMessagesOptimal(ctx, mp.curTs, ts, tq)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
@ -65,7 +65,7 @@ func (mp *MessagePool) SelectMessages(ts *types.TipSet, tq float64) (msgs []*typ
|
||||
return msgs, nil
|
||||
}
|
||||
|
||||
func (mp *MessagePool) selectMessagesOptimal(curTs, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) {
|
||||
func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) {
|
||||
start := time.Now()
|
||||
|
||||
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
|
||||
@ -91,7 +91,7 @@ func (mp *MessagePool) selectMessagesOptimal(curTs, ts *types.TipSet, tq float64
|
||||
|
||||
// 0b. Select all priority messages that fit in the block
|
||||
minGas := int64(gasguess.MinGas)
|
||||
result, gasLimit := mp.selectPriorityMessages(pending, baseFee, ts)
|
||||
result, gasLimit := mp.selectPriorityMessages(ctx, pending, baseFee, ts)
|
||||
|
||||
// have we filled the block?
|
||||
if gasLimit < minGas {
|
||||
@ -389,7 +389,7 @@ tailLoop:
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (mp *MessagePool) selectMessagesGreedy(curTs, ts *types.TipSet) ([]*types.SignedMessage, error) {
|
||||
func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *types.TipSet) ([]*types.SignedMessage, error) {
|
||||
start := time.Now()
|
||||
|
||||
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
|
||||
@ -415,7 +415,7 @@ func (mp *MessagePool) selectMessagesGreedy(curTs, ts *types.TipSet) ([]*types.S
|
||||
|
||||
// 0b. Select all priority messages that fit in the block
|
||||
minGas := int64(gasguess.MinGas)
|
||||
result, gasLimit := mp.selectPriorityMessages(pending, baseFee, ts)
|
||||
result, gasLimit := mp.selectPriorityMessages(ctx, pending, baseFee, ts)
|
||||
|
||||
// have we filled the block?
|
||||
if gasLimit < minGas {
|
||||
@ -525,7 +525,7 @@ tailLoop:
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (mp *MessagePool) selectPriorityMessages(pending map[address.Address]map[uint64]*types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) ([]*types.SignedMessage, int64) {
|
||||
func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[address.Address]map[uint64]*types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) ([]*types.SignedMessage, int64) {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
if dt := time.Since(start); dt > time.Millisecond {
|
||||
@ -541,10 +541,16 @@ func (mp *MessagePool) selectPriorityMessages(pending map[address.Address]map[ui
|
||||
var chains []*msgChain
|
||||
priority := mpCfg.PriorityAddrs
|
||||
for _, actor := range priority {
|
||||
mset, ok := pending[actor]
|
||||
pk, err := mp.resolveToKey(ctx, actor)
|
||||
if err != nil {
|
||||
log.Debugf("mpooladdlocal failed to resolve sender: %s", err)
|
||||
return nil, gasLimit
|
||||
}
|
||||
|
||||
mset, ok := pending[pk]
|
||||
if ok {
|
||||
// remove actor from pending set as we are already processed these messages
|
||||
delete(pending, actor)
|
||||
delete(pending, pk)
|
||||
// create chains for the priority actor
|
||||
next := mp.createMessageChains(actor, mset, baseFee, ts)
|
||||
chains = append(chains, next...)
|
||||
@ -646,8 +652,7 @@ func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address.
|
||||
inSync = true
|
||||
}
|
||||
|
||||
// first add our current pending messages
|
||||
for a, mset := range mp.pending {
|
||||
mp.forEachPending(func(a address.Address, mset *msgSet) {
|
||||
if inSync {
|
||||
// no need to copy the map
|
||||
result[a] = mset.msgs
|
||||
@ -660,7 +665,7 @@ func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address.
|
||||
result[a] = msetCopy
|
||||
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
// we are in sync, that's the happy path
|
||||
if inSync {
|
||||
|
@ -427,7 +427,7 @@ func TestBasicMessageSelection(t *testing.T) {
|
||||
mustAdd(t, mp, m)
|
||||
}
|
||||
|
||||
msgs, err := mp.SelectMessages(ts, 1.0)
|
||||
msgs, err := mp.SelectMessages(context.Background(), ts, 1.0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -464,7 +464,7 @@ func TestBasicMessageSelection(t *testing.T) {
|
||||
tma.applyBlock(t, block2)
|
||||
|
||||
// we should have no pending messages in the mpool
|
||||
pend, _ := mp.Pending()
|
||||
pend, _ := mp.Pending(context.TODO())
|
||||
if len(pend) != 0 {
|
||||
t.Fatalf("expected no pending messages, but got %d", len(pend))
|
||||
}
|
||||
@ -495,7 +495,7 @@ func TestBasicMessageSelection(t *testing.T) {
|
||||
tma.setStateNonce(a1, 10)
|
||||
tma.setStateNonce(a2, 10)
|
||||
|
||||
msgs, err = mp.SelectMessages(ts3, 1.0)
|
||||
msgs, err = mp.SelectMessages(context.Background(), ts3, 1.0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -569,7 +569,7 @@ func TestMessageSelectionTrimming(t *testing.T) {
|
||||
mustAdd(t, mp, m)
|
||||
}
|
||||
|
||||
msgs, err := mp.SelectMessages(ts, 1.0)
|
||||
msgs, err := mp.SelectMessages(context.Background(), ts, 1.0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -633,7 +633,7 @@ func TestPriorityMessageSelection(t *testing.T) {
|
||||
mustAdd(t, mp, m)
|
||||
}
|
||||
|
||||
msgs, err := mp.SelectMessages(ts, 1.0)
|
||||
msgs, err := mp.SelectMessages(context.Background(), ts, 1.0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -712,7 +712,7 @@ func TestPriorityMessageSelection2(t *testing.T) {
|
||||
mustAdd(t, mp, m)
|
||||
}
|
||||
|
||||
msgs, err := mp.SelectMessages(ts, 1.0)
|
||||
msgs, err := mp.SelectMessages(context.Background(), ts, 1.0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -782,7 +782,7 @@ func TestPriorityMessageSelection3(t *testing.T) {
|
||||
}
|
||||
|
||||
// test greedy selection
|
||||
msgs, err := mp.SelectMessages(ts, 1.0)
|
||||
msgs, err := mp.SelectMessages(context.Background(), ts, 1.0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -805,7 +805,7 @@ func TestPriorityMessageSelection3(t *testing.T) {
|
||||
}
|
||||
|
||||
// test optimal selection
|
||||
msgs, err = mp.SelectMessages(ts, 0.1)
|
||||
msgs, err = mp.SelectMessages(context.Background(), ts, 0.1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -872,7 +872,7 @@ func TestOptimalMessageSelection1(t *testing.T) {
|
||||
mustAdd(t, mp, m)
|
||||
}
|
||||
|
||||
msgs, err := mp.SelectMessages(ts, 0.25)
|
||||
msgs, err := mp.SelectMessages(context.Background(), ts, 0.25)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -941,7 +941,7 @@ func TestOptimalMessageSelection2(t *testing.T) {
|
||||
mustAdd(t, mp, m)
|
||||
}
|
||||
|
||||
msgs, err := mp.SelectMessages(ts, 0.1)
|
||||
msgs, err := mp.SelectMessages(context.Background(), ts, 0.1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -1020,7 +1020,7 @@ func TestOptimalMessageSelection3(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
msgs, err := mp.SelectMessages(ts, 0.1)
|
||||
msgs, err := mp.SelectMessages(context.Background(), ts, 0.1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -1108,7 +1108,7 @@ func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand, getPremium fu
|
||||
logging.SetLogLevel("messagepool", "error")
|
||||
|
||||
// 1. greedy selection
|
||||
greedyMsgs, err := mp.selectMessagesGreedy(ts, ts)
|
||||
greedyMsgs, err := mp.selectMessagesGreedy(context.Background(), ts, ts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -1137,7 +1137,7 @@ func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand, getPremium fu
|
||||
var bestMsgs []*types.SignedMessage
|
||||
for j := 0; j < nMiners; j++ {
|
||||
tq := rng.Float64()
|
||||
msgs, err := mp.SelectMessages(ts, tq)
|
||||
msgs, err := mp.SelectMessages(context.Background(), ts, tq)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -1396,7 +1396,7 @@ readLoop:
|
||||
minGasLimit := int64(0.9 * float64(build.BlockGasLimit))
|
||||
|
||||
// greedy first
|
||||
selected, err := mp.SelectMessages(ts, 1.0)
|
||||
selected, err := mp.SelectMessages(context.Background(), ts, 1.0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -1410,7 +1410,7 @@ readLoop:
|
||||
}
|
||||
|
||||
// high quality ticket
|
||||
selected, err = mp.SelectMessages(ts, .8)
|
||||
selected, err = mp.SelectMessages(context.Background(), ts, .8)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -1424,7 +1424,7 @@ readLoop:
|
||||
}
|
||||
|
||||
// mid quality ticket
|
||||
selected, err = mp.SelectMessages(ts, .4)
|
||||
selected, err = mp.SelectMessages(context.Background(), ts, .4)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -1438,7 +1438,7 @@ readLoop:
|
||||
}
|
||||
|
||||
// low quality ticket
|
||||
selected, err = mp.SelectMessages(ts, .1)
|
||||
selected, err = mp.SelectMessages(context.Background(), ts, .1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -1452,7 +1452,7 @@ readLoop:
|
||||
}
|
||||
|
||||
// very low quality ticket
|
||||
selected, err = mp.SelectMessages(ts, .01)
|
||||
selected, err = mp.SelectMessages(context.Background(), ts, .01)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -24,6 +24,8 @@ type mockMpool struct {
|
||||
nonces map[address.Address]uint64
|
||||
}
|
||||
|
||||
var _ MpoolNonceAPI = (*mockMpool)(nil)
|
||||
|
||||
func newMockMpool() *mockMpool {
|
||||
return &mockMpool{nonces: make(map[address.Address]uint64)}
|
||||
}
|
||||
|
@ -155,11 +155,6 @@ func (sm *StateManager) CallWithGas(ctx context.Context, msg *types.Message, pri
|
||||
return nil, xerrors.Errorf("computing tipset state: %w", err)
|
||||
}
|
||||
|
||||
state, err = sm.handleStateForks(ctx, state, ts.Height(), nil, ts)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to handle fork: %w", err)
|
||||
}
|
||||
|
||||
r := store.NewChainRand(sm.cs, ts.Cids())
|
||||
|
||||
if span.IsRecordingEvents() {
|
||||
@ -172,7 +167,7 @@ func (sm *StateManager) CallWithGas(ctx context.Context, msg *types.Message, pri
|
||||
|
||||
vmopt := &vm.VMOpts{
|
||||
StateBase: state,
|
||||
Epoch: ts.Height() + 1,
|
||||
Epoch: ts.Height(),
|
||||
Rand: r,
|
||||
Bstore: sm.cs.StateBlockstore(),
|
||||
Syscalls: sm.cs.VMSys(),
|
||||
|
@ -7,6 +7,8 @@ import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/actors/policy"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
cbor "github.com/ipfs/go-ipld-cbor"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
@ -90,6 +92,7 @@ type StateManager struct {
|
||||
expensiveUpgrades map[abi.ChainEpoch]struct{}
|
||||
|
||||
stCache map[string][]cid.Cid
|
||||
tCache treeCache
|
||||
compWait map[string]chan struct{}
|
||||
stlk sync.Mutex
|
||||
genesisMsigLk sync.Mutex
|
||||
@ -102,6 +105,12 @@ type StateManager struct {
|
||||
genesisMarketFunds abi.TokenAmount
|
||||
}
|
||||
|
||||
// Caches a single state tree
|
||||
type treeCache struct {
|
||||
root cid.Cid
|
||||
tree *state.StateTree
|
||||
}
|
||||
|
||||
func NewStateManager(cs *store.ChainStore) *StateManager {
|
||||
sm, err := NewStateManagerWithUpgradeSchedule(cs, DefaultUpgradeSchedule())
|
||||
if err != nil {
|
||||
@ -154,6 +163,10 @@ func NewStateManagerWithUpgradeSchedule(cs *store.ChainStore, us UpgradeSchedule
|
||||
newVM: vm.NewVM,
|
||||
cs: cs,
|
||||
stCache: make(map[string][]cid.Cid),
|
||||
tCache: treeCache{
|
||||
root: cid.Undef,
|
||||
tree: nil,
|
||||
},
|
||||
compWait: make(map[string]chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
@ -563,6 +576,52 @@ func (sm *StateManager) ResolveToKeyAddress(ctx context.Context, addr address.Ad
|
||||
return vm.ResolveToKeyAddr(tree, cst, addr)
|
||||
}
|
||||
|
||||
// ResolveToKeyAddressAtFinality is similar to stmgr.ResolveToKeyAddress but fails if the ID address being resolved isn't reorg-stable yet.
|
||||
// It should not be used for consensus-critical subsystems.
|
||||
func (sm *StateManager) ResolveToKeyAddressAtFinality(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) {
|
||||
switch addr.Protocol() {
|
||||
case address.BLS, address.SECP256K1:
|
||||
return addr, nil
|
||||
case address.Actor:
|
||||
return address.Undef, xerrors.New("cannot resolve actor address to key address")
|
||||
default:
|
||||
}
|
||||
|
||||
if ts == nil {
|
||||
ts = sm.cs.GetHeaviestTipSet()
|
||||
}
|
||||
|
||||
var err error
|
||||
if ts.Height() > policy.ChainFinality {
|
||||
ts, err = sm.ChainStore().GetTipsetByHeight(ctx, ts.Height()-policy.ChainFinality, ts, true)
|
||||
if err != nil {
|
||||
return address.Undef, xerrors.Errorf("failed to load lookback tipset: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
cst := cbor.NewCborStore(sm.cs.StateBlockstore())
|
||||
tree := sm.tCache.tree
|
||||
|
||||
if tree == nil || sm.tCache.root != ts.ParentState() {
|
||||
tree, err = state.LoadStateTree(cst, ts.ParentState())
|
||||
if err != nil {
|
||||
return address.Undef, xerrors.Errorf("failed to load parent state tree: %w", err)
|
||||
}
|
||||
|
||||
sm.tCache = treeCache{
|
||||
root: ts.ParentState(),
|
||||
tree: tree,
|
||||
}
|
||||
}
|
||||
|
||||
resolved, err := vm.ResolveToKeyAddr(tree, cst, addr)
|
||||
if err == nil {
|
||||
return resolved, nil
|
||||
}
|
||||
|
||||
return address.Undef, xerrors.New("ID address not found in lookback state")
|
||||
}
|
||||
|
||||
func (sm *StateManager) GetBlsPublicKey(ctx context.Context, addr address.Address, ts *types.TipSet) (pubk []byte, err error) {
|
||||
kaddr, err := sm.ResolveToKeyAddress(ctx, addr, ts)
|
||||
if err != nil {
|
||||
|
@ -12,6 +12,8 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/state"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/crypto"
|
||||
@ -1129,17 +1131,33 @@ type BlockMessages struct {
|
||||
func (cs *ChainStore) BlockMsgsForTipset(ts *types.TipSet) ([]BlockMessages, error) {
|
||||
applied := make(map[address.Address]uint64)
|
||||
|
||||
selectMsg := func(m *types.Message) (bool, error) {
|
||||
// The first match for a sender is guaranteed to have correct nonce -- the block isn't valid otherwise
|
||||
if _, ok := applied[m.From]; !ok {
|
||||
applied[m.From] = m.Nonce
|
||||
cst := cbor.NewCborStore(cs.stateBlockstore)
|
||||
st, err := state.LoadStateTree(cst, ts.Blocks()[0].ParentStateRoot)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to load state tree")
|
||||
}
|
||||
|
||||
if applied[m.From] != m.Nonce {
|
||||
selectMsg := func(m *types.Message) (bool, error) {
|
||||
var sender address.Address
|
||||
if ts.Height() >= build.UpgradeHyperdriveHeight {
|
||||
sender, err = st.LookupID(m.From)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
} else {
|
||||
sender = m.From
|
||||
}
|
||||
|
||||
// The first match for a sender is guaranteed to have correct nonce -- the block isn't valid otherwise
|
||||
if _, ok := applied[sender]; !ok {
|
||||
applied[sender] = m.Nonce
|
||||
}
|
||||
|
||||
if applied[sender] != m.Nonce {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
applied[m.From]++
|
||||
applied[sender]++
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
@ -516,7 +516,7 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs
|
||||
return pubsub.ValidationReject
|
||||
}
|
||||
|
||||
if err := mv.mpool.Add(m); err != nil {
|
||||
if err := mv.mpool.Add(ctx, m); err != nil {
|
||||
log.Debugf("failed to add message from network to message pool (From: %s, To: %s, Nonce: %d, Value: %s): %s", m.Message.From, m.Message.To, m.Message.Nonce, types.FIL(m.Message.Value), err)
|
||||
ctx, _ = tag.New(
|
||||
ctx,
|
||||
|
@ -1074,9 +1074,19 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock
|
||||
|
||||
// Phase 2: (Partial) semantic validation:
|
||||
// the sender exists and is an account actor, and the nonces make sense
|
||||
if _, ok := nonces[m.From]; !ok {
|
||||
var sender address.Address
|
||||
if syncer.sm.GetNtwkVersion(ctx, b.Header.Height) >= network.Version13 {
|
||||
sender, err = st.LookupID(m.From)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
sender = m.From
|
||||
}
|
||||
|
||||
if _, ok := nonces[sender]; !ok {
|
||||
// `GetActor` does not validate that this is an account actor.
|
||||
act, err := st.GetActor(m.From)
|
||||
act, err := st.GetActor(sender)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to get actor: %w", err)
|
||||
}
|
||||
@ -1084,13 +1094,13 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock
|
||||
if !builtin.IsAccountActor(act.Code) {
|
||||
return xerrors.New("Sender must be an account actor")
|
||||
}
|
||||
nonces[m.From] = act.Nonce
|
||||
nonces[sender] = act.Nonce
|
||||
}
|
||||
|
||||
if nonces[m.From] != m.Nonce {
|
||||
return xerrors.Errorf("wrong nonce (exp: %d, got: %d)", nonces[m.From], m.Nonce)
|
||||
if nonces[sender] != m.Nonce {
|
||||
return xerrors.Errorf("wrong nonce (exp: %d, got: %d)", nonces[sender], m.Nonce)
|
||||
}
|
||||
nonces[m.From]++
|
||||
nonces[sender]++
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -10,7 +10,6 @@ import (
|
||||
"github.com/filecoin-project/go-state-types/crypto"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/network"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
@ -108,6 +107,7 @@ func prepSyncTest(t testing.TB, h int) *syncTestUtil {
|
||||
}
|
||||
|
||||
tu.addSourceNode(stmgr.DefaultUpgradeSchedule(), h)
|
||||
|
||||
//tu.checkHeight("source", source, h)
|
||||
|
||||
// separate logs
|
||||
@ -733,7 +733,6 @@ func TestBadNonce(t *testing.T) {
|
||||
|
||||
// Produce a message from the banker with a bad nonce
|
||||
makeBadMsg := func() *types.SignedMessage {
|
||||
|
||||
msg := types.Message{
|
||||
To: tu.g.Banker(),
|
||||
From: tu.g.Banker(),
|
||||
@ -764,6 +763,114 @@ func TestBadNonce(t *testing.T) {
|
||||
tu.mineOnBlock(base, 0, []int{0}, true, true, msgs, 0)
|
||||
}
|
||||
|
||||
// This test introduces a block that has 2 messages, with the same sender, and same nonce.
|
||||
// One of the messages uses the sender's robust address, the other uses the ID address.
|
||||
// Such a block is invalid and should not sync.
|
||||
func TestMismatchedNoncesRobustID(t *testing.T) {
|
||||
v5h := abi.ChainEpoch(4)
|
||||
tu := prepSyncTestWithV5Height(t, int(v5h+5), v5h)
|
||||
|
||||
base := tu.g.CurTipset
|
||||
|
||||
// Get the banker from computed tipset state, not the parent.
|
||||
st, _, err := tu.g.StateManager().TipSetState(context.TODO(), base.TipSet())
|
||||
require.NoError(t, err)
|
||||
ba, err := tu.g.StateManager().LoadActorRaw(context.TODO(), tu.g.Banker(), st)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Produce a message from the banker
|
||||
makeMsg := func(id bool) *types.SignedMessage {
|
||||
sender := tu.g.Banker()
|
||||
if id {
|
||||
s, err := tu.nds[0].StateLookupID(context.TODO(), sender, base.TipSet().Key())
|
||||
require.NoError(t, err)
|
||||
sender = s
|
||||
}
|
||||
|
||||
msg := types.Message{
|
||||
To: tu.g.Banker(),
|
||||
From: sender,
|
||||
|
||||
Nonce: ba.Nonce,
|
||||
|
||||
Value: types.NewInt(1),
|
||||
|
||||
Method: 0,
|
||||
|
||||
GasLimit: 100_000_000,
|
||||
GasFeeCap: types.NewInt(0),
|
||||
GasPremium: types.NewInt(0),
|
||||
}
|
||||
|
||||
sig, err := tu.g.Wallet().WalletSign(context.TODO(), tu.g.Banker(), msg.Cid().Bytes(), api.MsgMeta{})
|
||||
require.NoError(t, err)
|
||||
|
||||
return &types.SignedMessage{
|
||||
Message: msg,
|
||||
Signature: *sig,
|
||||
}
|
||||
}
|
||||
|
||||
msgs := make([][]*types.SignedMessage, 1)
|
||||
msgs[0] = []*types.SignedMessage{makeMsg(false), makeMsg(true)}
|
||||
|
||||
tu.mineOnBlock(base, 0, []int{0}, true, true, msgs, 0)
|
||||
}
|
||||
|
||||
// This test introduces a block that has 2 messages, with the same sender, and nonces N and N+1 (so both can be included in a block)
|
||||
// One of the messages uses the sender's robust address, the other uses the ID address.
|
||||
// Such a block is valid and should sync.
|
||||
func TestMatchedNoncesRobustID(t *testing.T) {
|
||||
v5h := abi.ChainEpoch(4)
|
||||
tu := prepSyncTestWithV5Height(t, int(v5h+5), v5h)
|
||||
|
||||
base := tu.g.CurTipset
|
||||
|
||||
// Get the banker from computed tipset state, not the parent.
|
||||
st, _, err := tu.g.StateManager().TipSetState(context.TODO(), base.TipSet())
|
||||
require.NoError(t, err)
|
||||
ba, err := tu.g.StateManager().LoadActorRaw(context.TODO(), tu.g.Banker(), st)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Produce a message from the banker with specified nonce
|
||||
makeMsg := func(n uint64, id bool) *types.SignedMessage {
|
||||
sender := tu.g.Banker()
|
||||
if id {
|
||||
s, err := tu.nds[0].StateLookupID(context.TODO(), sender, base.TipSet().Key())
|
||||
require.NoError(t, err)
|
||||
sender = s
|
||||
}
|
||||
|
||||
msg := types.Message{
|
||||
To: tu.g.Banker(),
|
||||
From: sender,
|
||||
|
||||
Nonce: n,
|
||||
|
||||
Value: types.NewInt(1),
|
||||
|
||||
Method: 0,
|
||||
|
||||
GasLimit: 100_000_000,
|
||||
GasFeeCap: types.NewInt(0),
|
||||
GasPremium: types.NewInt(0),
|
||||
}
|
||||
|
||||
sig, err := tu.g.Wallet().WalletSign(context.TODO(), tu.g.Banker(), msg.Cid().Bytes(), api.MsgMeta{})
|
||||
require.NoError(t, err)
|
||||
|
||||
return &types.SignedMessage{
|
||||
Message: msg,
|
||||
Signature: *sig,
|
||||
}
|
||||
}
|
||||
|
||||
msgs := make([][]*types.SignedMessage, 1)
|
||||
msgs[0] = []*types.SignedMessage{makeMsg(ba.Nonce, false), makeMsg(ba.Nonce+1, true)}
|
||||
|
||||
tu.mineOnBlock(base, 0, []int{0}, true, false, msgs, 0)
|
||||
}
|
||||
|
||||
func BenchmarkSyncBasic(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
runSyncBenchLength(b, 100)
|
||||
|
@ -214,7 +214,7 @@ func (rt *Runtime) GetActorCodeCID(addr address.Address) (ret cid.Cid, ok bool)
|
||||
func (rt *Runtime) GetRandomnessFromTickets(personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte) abi.Randomness {
|
||||
var err error
|
||||
var res []byte
|
||||
if rt.vm.GetNtwkVersion(rt.ctx, randEpoch) >= network.Version13 {
|
||||
if randEpoch > build.UpgradeHyperdriveHeight {
|
||||
res, err = rt.vm.rand.GetChainRandomnessLookingForward(rt.ctx, personalization, randEpoch, entropy)
|
||||
} else {
|
||||
res, err = rt.vm.rand.GetChainRandomnessLookingBack(rt.ctx, personalization, randEpoch, entropy)
|
||||
|
@ -287,7 +287,7 @@ func (ss *syscallShim) VerifyAggregateSeals(aggregate proof5.AggregateSealVerify
|
||||
return xerrors.Errorf("failed to verify aggregated PoRep: %w", err)
|
||||
}
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid aggredate proof")
|
||||
return fmt.Errorf("invalid aggregate proof")
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -980,7 +980,7 @@ var sectorsBatching = &cli.Command{
|
||||
}
|
||||
|
||||
var sectorsBatchingPendingCommit = &cli.Command{
|
||||
Name: "pending-commit",
|
||||
Name: "commit",
|
||||
Usage: "list sectors waiting in commit batch queue",
|
||||
Flags: []cli.Flag{
|
||||
&cli.BoolFlag{
|
||||
@ -997,15 +997,30 @@ var sectorsBatchingPendingCommit = &cli.Command{
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
|
||||
if cctx.Bool("publish-now") {
|
||||
cid, err := api.SectorCommitFlush(ctx)
|
||||
res, err := api.SectorCommitFlush(ctx)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("flush: %w", err)
|
||||
}
|
||||
if cid == nil {
|
||||
if res == nil {
|
||||
return xerrors.Errorf("no sectors to publish")
|
||||
}
|
||||
|
||||
fmt.Println("sector batch published: ", cid)
|
||||
for i, re := range res {
|
||||
fmt.Printf("Batch %d:\n", i)
|
||||
if re.Error != "" {
|
||||
fmt.Printf("\tError: %s\n", re.Error)
|
||||
} else {
|
||||
fmt.Printf("\tMessage: %s\n", re.Msg)
|
||||
}
|
||||
fmt.Printf("\tSectors:\n")
|
||||
for _, sector := range re.Sectors {
|
||||
if e, found := re.FailedSectors[sector]; found {
|
||||
fmt.Printf("\t\t%d\tERROR %s\n", sector, e)
|
||||
} else {
|
||||
fmt.Printf("\t\t%d\tOK\n", sector)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -1027,7 +1042,7 @@ var sectorsBatchingPendingCommit = &cli.Command{
|
||||
}
|
||||
|
||||
var sectorsBatchingPendingPreCommit = &cli.Command{
|
||||
Name: "pending-precommit",
|
||||
Name: "precommit",
|
||||
Usage: "list sectors waiting in precommit batch queue",
|
||||
Flags: []cli.Flag{
|
||||
&cli.BoolFlag{
|
||||
@ -1044,15 +1059,26 @@ var sectorsBatchingPendingPreCommit = &cli.Command{
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
|
||||
if cctx.Bool("publish-now") {
|
||||
cid, err := api.SectorPreCommitFlush(ctx)
|
||||
res, err := api.SectorPreCommitFlush(ctx)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("flush: %w", err)
|
||||
}
|
||||
if cid == nil {
|
||||
if res == nil {
|
||||
return xerrors.Errorf("no sectors to publish")
|
||||
}
|
||||
|
||||
fmt.Println("sector batch published: ", cid)
|
||||
for i, re := range res {
|
||||
fmt.Printf("Batch %d:\n", i)
|
||||
if re.Error != "" {
|
||||
fmt.Printf("\tError: %s\n", re.Error)
|
||||
} else {
|
||||
fmt.Printf("\tMessage: %s\n", re.Msg)
|
||||
}
|
||||
fmt.Printf("\tSectors:\n")
|
||||
for _, sector := range re.Sectors {
|
||||
fmt.Printf("\t\t%d\tOK\n", sector)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -14,6 +14,6 @@
|
||||
- [ ] Update `chain/stmgr/forks.go`
|
||||
- [ ] Schedule
|
||||
- [ ] Migration
|
||||
- [ ] Update upgrade schedule in `api/test/test.go`
|
||||
- [ ] Update upgrade schedule in `api/test/test.go` and `chain/sync_test.go`
|
||||
- [ ] Update `NewestNetworkVersion` in `build/params_shared_vals.go`
|
||||
- [ ] Register in init in `chain/stmgr/utils.go`
|
||||
|
209
extern/storage-sealing/commit_batch.go
vendored
209
extern/storage-sealing/commit_batch.go
vendored
@ -22,6 +22,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||
"github.com/filecoin-project/lotus/chain/actors/policy"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
||||
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||
)
|
||||
|
||||
const arp = abi.RegisteredAggregationProof_SnarkPackV1
|
||||
@ -30,6 +31,9 @@ type CommitBatcherApi interface {
|
||||
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
|
||||
StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error)
|
||||
ChainHead(ctx context.Context) (TipSetToken, abi.ChainEpoch, error)
|
||||
|
||||
StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorPreCommitOnChainInfo, error)
|
||||
StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, TipSetToken) (big.Int, error)
|
||||
}
|
||||
|
||||
type AggregateInput struct {
|
||||
@ -49,10 +53,10 @@ type CommitBatcher struct {
|
||||
|
||||
deadlines map[abi.SectorNumber]time.Time
|
||||
todo map[abi.SectorNumber]AggregateInput
|
||||
waiting map[abi.SectorNumber][]chan cid.Cid
|
||||
waiting map[abi.SectorNumber][]chan sealiface.CommitBatchRes
|
||||
|
||||
notify, stop, stopped chan struct{}
|
||||
force chan chan *cid.Cid
|
||||
force chan chan []sealiface.CommitBatchRes
|
||||
lk sync.Mutex
|
||||
}
|
||||
|
||||
@ -68,10 +72,10 @@ func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBat
|
||||
|
||||
deadlines: map[abi.SectorNumber]time.Time{},
|
||||
todo: map[abi.SectorNumber]AggregateInput{},
|
||||
waiting: map[abi.SectorNumber][]chan cid.Cid{},
|
||||
waiting: map[abi.SectorNumber][]chan sealiface.CommitBatchRes{},
|
||||
|
||||
notify: make(chan struct{}, 1),
|
||||
force: make(chan chan *cid.Cid),
|
||||
force: make(chan chan []sealiface.CommitBatchRes),
|
||||
stop: make(chan struct{}),
|
||||
stopped: make(chan struct{}),
|
||||
}
|
||||
@ -82,8 +86,8 @@ func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBat
|
||||
}
|
||||
|
||||
func (b *CommitBatcher) run() {
|
||||
var forceRes chan *cid.Cid
|
||||
var lastMsg *cid.Cid
|
||||
var forceRes chan []sealiface.CommitBatchRes
|
||||
var lastMsg []sealiface.CommitBatchRes
|
||||
|
||||
cfg, err := b.getConfig()
|
||||
if err != nil {
|
||||
@ -111,7 +115,7 @@ func (b *CommitBatcher) run() {
|
||||
}
|
||||
|
||||
var err error
|
||||
lastMsg, err = b.processBatch(sendAboveMax, sendAboveMin)
|
||||
lastMsg, err = b.maybeStartBatch(sendAboveMax, sendAboveMin)
|
||||
if err != nil {
|
||||
log.Warnw("CommitBatcher processBatch error", "error", err)
|
||||
}
|
||||
@ -159,12 +163,9 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time
|
||||
return time.After(wait)
|
||||
}
|
||||
|
||||
func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
|
||||
func (b *CommitBatcher) maybeStartBatch(notif, after bool) ([]sealiface.CommitBatchRes, error) {
|
||||
b.lk.Lock()
|
||||
defer b.lk.Unlock()
|
||||
params := miner5.ProveCommitAggregateParams{
|
||||
SectorNumbers: bitfield.New(),
|
||||
}
|
||||
|
||||
total := len(b.todo)
|
||||
if total == 0 {
|
||||
@ -184,8 +185,53 @@ func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var res []sealiface.CommitBatchRes
|
||||
|
||||
if total < cfg.MinCommitBatch || total < miner5.MinAggregatedSectors {
|
||||
res, err = b.processIndividually()
|
||||
} else {
|
||||
res, err = b.processBatch(cfg)
|
||||
}
|
||||
if err != nil && len(res) == 0 {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, r := range res {
|
||||
if err != nil {
|
||||
r.Error = err.Error()
|
||||
}
|
||||
|
||||
for _, sn := range r.Sectors {
|
||||
for _, ch := range b.waiting[sn] {
|
||||
ch <- r // buffered
|
||||
}
|
||||
|
||||
delete(b.waiting, sn)
|
||||
delete(b.todo, sn)
|
||||
delete(b.deadlines, sn)
|
||||
}
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBatchRes, error) {
|
||||
tok, _, err := b.api.ChainHead(b.mctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
total := len(b.todo)
|
||||
|
||||
var res sealiface.CommitBatchRes
|
||||
|
||||
params := miner5.ProveCommitAggregateParams{
|
||||
SectorNumbers: bitfield.New(),
|
||||
}
|
||||
|
||||
proofs := make([][]byte, 0, total)
|
||||
infos := make([]proof5.AggregateSealVerifyInfo, 0, total)
|
||||
collateral := big.Zero()
|
||||
|
||||
for id, p := range b.todo {
|
||||
if len(infos) >= cfg.MaxCommitBatch {
|
||||
@ -193,6 +239,15 @@ func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
|
||||
break
|
||||
}
|
||||
|
||||
sc, err := b.getSectorCollateral(id, tok)
|
||||
if err != nil {
|
||||
res.FailedSectors[id] = err.Error()
|
||||
continue
|
||||
}
|
||||
|
||||
collateral = big.Add(collateral, sc)
|
||||
|
||||
res.Sectors = append(res.Sectors, id)
|
||||
params.SectorNumbers.Set(uint64(id))
|
||||
infos = append(infos, p.info)
|
||||
}
|
||||
@ -207,7 +262,7 @@ func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
|
||||
|
||||
mid, err := address.IDFromAddress(b.maddr)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("getting miner id: %w", err)
|
||||
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting miner id: %w", err)
|
||||
}
|
||||
|
||||
params.AggregateProof, err = b.prover.AggregateSealProofs(proof5.AggregateSealVerifyProofAndInfos{
|
||||
@ -217,55 +272,107 @@ func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
|
||||
Infos: infos,
|
||||
}, proofs)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("aggregating proofs: %w", err)
|
||||
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("aggregating proofs: %w", err)
|
||||
}
|
||||
|
||||
enc := new(bytes.Buffer)
|
||||
if err := params.MarshalCBOR(enc); err != nil {
|
||||
return nil, xerrors.Errorf("couldn't serialize ProveCommitAggregateParams: %w", err)
|
||||
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't serialize ProveCommitAggregateParams: %w", err)
|
||||
}
|
||||
|
||||
mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil)
|
||||
if err != nil {
|
||||
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't get miner info: %w", err)
|
||||
}
|
||||
|
||||
goodFunds := big.Add(b.feeCfg.MaxCommitGasFee, collateral)
|
||||
|
||||
from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, goodFunds, collateral)
|
||||
if err != nil {
|
||||
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err)
|
||||
}
|
||||
|
||||
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.ProveCommitAggregate, collateral, b.feeCfg.MaxCommitGasFee, enc.Bytes())
|
||||
if err != nil {
|
||||
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err)
|
||||
}
|
||||
|
||||
res.Msg = &mcid
|
||||
|
||||
log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "todo", total, "sectors", len(infos))
|
||||
|
||||
return []sealiface.CommitBatchRes{res}, nil
|
||||
}
|
||||
|
||||
func (b *CommitBatcher) processIndividually() ([]sealiface.CommitBatchRes, error) {
|
||||
mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("couldn't get miner info: %w", err)
|
||||
}
|
||||
|
||||
from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, b.feeCfg.MaxCommitGasFee, b.feeCfg.MaxCommitGasFee)
|
||||
tok, _, err := b.api.ChainHead(b.mctx)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("no good address found: %w", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.ProveCommitAggregate, big.Zero(), b.feeCfg.MaxCommitGasFee, enc.Bytes())
|
||||
var res []sealiface.CommitBatchRes
|
||||
|
||||
for sn, info := range b.todo {
|
||||
r := sealiface.CommitBatchRes{
|
||||
Sectors: []abi.SectorNumber{sn},
|
||||
}
|
||||
|
||||
mcid, err := b.processSingle(mi, sn, info, tok)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("sending message failed: %w", err)
|
||||
log.Errorf("process single error: %+v", err) // todo: return to user
|
||||
r.FailedSectors[sn] = err.Error()
|
||||
} else {
|
||||
r.Msg = &mcid
|
||||
}
|
||||
|
||||
log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "todo", total, "sectors", len(infos))
|
||||
|
||||
err = params.SectorNumbers.ForEach(func(us uint64) error {
|
||||
sn := abi.SectorNumber(us)
|
||||
|
||||
for _, ch := range b.waiting[sn] {
|
||||
ch <- mcid // buffered
|
||||
res = append(res, r)
|
||||
}
|
||||
delete(b.waiting, sn)
|
||||
delete(b.todo, sn)
|
||||
delete(b.deadlines, sn)
|
||||
return nil
|
||||
})
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (b *CommitBatcher) processSingle(mi miner.MinerInfo, sn abi.SectorNumber, info AggregateInput, tok TipSetToken) (cid.Cid, error) {
|
||||
enc := new(bytes.Buffer)
|
||||
params := &miner.ProveCommitSectorParams{
|
||||
SectorNumber: sn,
|
||||
Proof: info.proof,
|
||||
}
|
||||
|
||||
if err := params.MarshalCBOR(enc); err != nil {
|
||||
return cid.Undef, xerrors.Errorf("marshaling commit params: %w", err)
|
||||
}
|
||||
|
||||
collateral, err := b.getSectorCollateral(sn, tok)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("done sectors foreach: %w", err)
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
return &mcid, nil
|
||||
goodFunds := big.Add(collateral, b.feeCfg.MaxCommitGasFee)
|
||||
|
||||
from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, goodFunds, collateral)
|
||||
if err != nil {
|
||||
return cid.Undef, xerrors.Errorf("no good address to send commit message from: %w", err)
|
||||
}
|
||||
|
||||
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.ProveCommitSector, collateral, b.feeCfg.MaxCommitGasFee, enc.Bytes())
|
||||
if err != nil {
|
||||
return cid.Undef, xerrors.Errorf("pushing message to mpool: %w", err)
|
||||
}
|
||||
|
||||
return mcid, nil
|
||||
}
|
||||
|
||||
// register commit, wait for batch message, return message CID
|
||||
func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in AggregateInput) (mcid cid.Cid, err error) {
|
||||
func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in AggregateInput) (res sealiface.CommitBatchRes, err error) {
|
||||
_, curEpoch, err := b.api.ChainHead(b.mctx)
|
||||
if err != nil {
|
||||
log.Errorf("getting chain head: %s", err)
|
||||
return cid.Undef, nil
|
||||
return sealiface.CommitBatchRes{}, nil
|
||||
}
|
||||
|
||||
sn := s.SectorNumber
|
||||
@ -274,7 +381,7 @@ func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in Aggregat
|
||||
b.deadlines[sn] = getSectorDeadline(curEpoch, s)
|
||||
b.todo[sn] = in
|
||||
|
||||
sent := make(chan cid.Cid, 1)
|
||||
sent := make(chan sealiface.CommitBatchRes, 1)
|
||||
b.waiting[sn] = append(b.waiting[sn], sent)
|
||||
|
||||
select {
|
||||
@ -284,15 +391,15 @@ func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in Aggregat
|
||||
b.lk.Unlock()
|
||||
|
||||
select {
|
||||
case c := <-sent:
|
||||
return c, nil
|
||||
case r := <-sent:
|
||||
return r, nil
|
||||
case <-ctx.Done():
|
||||
return cid.Undef, ctx.Err()
|
||||
return sealiface.CommitBatchRes{}, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (b *CommitBatcher) Flush(ctx context.Context) (*cid.Cid, error) {
|
||||
resCh := make(chan *cid.Cid, 1)
|
||||
func (b *CommitBatcher) Flush(ctx context.Context) ([]sealiface.CommitBatchRes, error) {
|
||||
resCh := make(chan []sealiface.CommitBatchRes, 1)
|
||||
select {
|
||||
case b.force <- resCh:
|
||||
select {
|
||||
@ -364,3 +471,25 @@ func getSectorDeadline(curEpoch abi.ChainEpoch, si SectorInfo) time.Time {
|
||||
|
||||
return time.Now().Add(time.Duration(deadlineEpoch-curEpoch) * time.Duration(build.BlockDelaySecs) * time.Second)
|
||||
}
|
||||
|
||||
func (b *CommitBatcher) getSectorCollateral(sn abi.SectorNumber, tok TipSetToken) (abi.TokenAmount, error) {
|
||||
pci, err := b.api.StateSectorPreCommitInfo(b.mctx, b.maddr, sn, tok)
|
||||
if err != nil {
|
||||
return big.Zero(), xerrors.Errorf("getting precommit info: %w", err)
|
||||
}
|
||||
if pci == nil {
|
||||
return big.Zero(), xerrors.Errorf("precommit info not found on chain")
|
||||
}
|
||||
|
||||
collateral, err := b.api.StateMinerInitialPledgeCollateral(b.mctx, b.maddr, pci.Info, tok)
|
||||
if err != nil {
|
||||
return big.Zero(), xerrors.Errorf("getting initial pledge collateral: %w", err)
|
||||
}
|
||||
|
||||
collateral = big.Sub(collateral, pci.PreCommitDeposit)
|
||||
if collateral.LessThan(big.Zero()) {
|
||||
collateral = big.Zero()
|
||||
}
|
||||
|
||||
return collateral, nil
|
||||
}
|
||||
|
86
extern/storage-sealing/precommit_batch.go
vendored
86
extern/storage-sealing/precommit_batch.go
vendored
@ -18,6 +18,7 @@ import (
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||
)
|
||||
|
||||
type PreCommitBatcherApi interface {
|
||||
@ -41,10 +42,10 @@ type PreCommitBatcher struct {
|
||||
|
||||
deadlines map[abi.SectorNumber]time.Time
|
||||
todo map[abi.SectorNumber]*preCommitEntry
|
||||
waiting map[abi.SectorNumber][]chan cid.Cid
|
||||
waiting map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes
|
||||
|
||||
notify, stop, stopped chan struct{}
|
||||
force chan chan *cid.Cid
|
||||
force chan chan []sealiface.PreCommitBatchRes
|
||||
lk sync.Mutex
|
||||
}
|
||||
|
||||
@ -59,10 +60,10 @@ func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCom
|
||||
|
||||
deadlines: map[abi.SectorNumber]time.Time{},
|
||||
todo: map[abi.SectorNumber]*preCommitEntry{},
|
||||
waiting: map[abi.SectorNumber][]chan cid.Cid{},
|
||||
waiting: map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes{},
|
||||
|
||||
notify: make(chan struct{}, 1),
|
||||
force: make(chan chan *cid.Cid),
|
||||
force: make(chan chan []sealiface.PreCommitBatchRes),
|
||||
stop: make(chan struct{}),
|
||||
stopped: make(chan struct{}),
|
||||
}
|
||||
@ -73,8 +74,8 @@ func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCom
|
||||
}
|
||||
|
||||
func (b *PreCommitBatcher) run() {
|
||||
var forceRes chan *cid.Cid
|
||||
var lastMsg *cid.Cid
|
||||
var forceRes chan []sealiface.PreCommitBatchRes
|
||||
var lastRes []sealiface.PreCommitBatchRes
|
||||
|
||||
cfg, err := b.getConfig()
|
||||
if err != nil {
|
||||
@ -83,10 +84,10 @@ func (b *PreCommitBatcher) run() {
|
||||
|
||||
for {
|
||||
if forceRes != nil {
|
||||
forceRes <- lastMsg
|
||||
forceRes <- lastRes
|
||||
forceRes = nil
|
||||
}
|
||||
lastMsg = nil
|
||||
lastRes = nil
|
||||
|
||||
var sendAboveMax, sendAboveMin bool
|
||||
select {
|
||||
@ -102,7 +103,7 @@ func (b *PreCommitBatcher) run() {
|
||||
}
|
||||
|
||||
var err error
|
||||
lastMsg, err = b.processBatch(sendAboveMax, sendAboveMin)
|
||||
lastRes, err = b.maybeStartBatch(sendAboveMax, sendAboveMin)
|
||||
if err != nil {
|
||||
log.Warnw("PreCommitBatcher processBatch error", "error", err)
|
||||
}
|
||||
@ -150,10 +151,9 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T
|
||||
return time.After(wait)
|
||||
}
|
||||
|
||||
func (b *PreCommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
|
||||
func (b *PreCommitBatcher) maybeStartBatch(notif, after bool) ([]sealiface.PreCommitBatchRes, error) {
|
||||
b.lk.Lock()
|
||||
defer b.lk.Unlock()
|
||||
params := miner5.PreCommitSectorBatchParams{}
|
||||
|
||||
total := len(b.todo)
|
||||
if total == 0 {
|
||||
@ -173,7 +173,35 @@ func (b *PreCommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// todo support multiple batches
|
||||
res, err := b.processBatch(cfg)
|
||||
if err != nil && len(res) == 0 {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, r := range res {
|
||||
if err != nil {
|
||||
r.Error = err.Error()
|
||||
}
|
||||
|
||||
for _, sn := range r.Sectors {
|
||||
for _, ch := range b.waiting[sn] {
|
||||
ch <- r // buffered
|
||||
}
|
||||
|
||||
delete(b.waiting, sn)
|
||||
delete(b.todo, sn)
|
||||
delete(b.deadlines, sn)
|
||||
}
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (b *PreCommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.PreCommitBatchRes, error) {
|
||||
params := miner5.PreCommitSectorBatchParams{}
|
||||
deposit := big.Zero()
|
||||
var res sealiface.PreCommitBatchRes
|
||||
|
||||
for _, p := range b.todo {
|
||||
if len(params.Sectors) >= cfg.MaxPreCommitBatch {
|
||||
@ -181,54 +209,46 @@ func (b *PreCommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
|
||||
break
|
||||
}
|
||||
|
||||
res.Sectors = append(res.Sectors, p.pci.SectorNumber)
|
||||
params.Sectors = append(params.Sectors, *p.pci)
|
||||
deposit = big.Add(deposit, p.deposit)
|
||||
}
|
||||
|
||||
enc := new(bytes.Buffer)
|
||||
if err := params.MarshalCBOR(enc); err != nil {
|
||||
return nil, xerrors.Errorf("couldn't serialize PreCommitSectorBatchParams: %w", err)
|
||||
return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("couldn't serialize PreCommitSectorBatchParams: %w", err)
|
||||
}
|
||||
|
||||
mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("couldn't get miner info: %w", err)
|
||||
return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("couldn't get miner info: %w", err)
|
||||
}
|
||||
|
||||
goodFunds := big.Add(deposit, b.feeCfg.MaxPreCommitGasFee)
|
||||
|
||||
from, _, err := b.addrSel(b.mctx, mi, api.PreCommitAddr, goodFunds, deposit)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("no good address found: %w", err)
|
||||
return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err)
|
||||
}
|
||||
|
||||
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.PreCommitSectorBatch, deposit, b.feeCfg.MaxPreCommitGasFee, enc.Bytes())
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("sending message failed: %w", err)
|
||||
return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err)
|
||||
}
|
||||
|
||||
log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "sectors", total)
|
||||
res.Msg = &mcid
|
||||
|
||||
for _, sector := range params.Sectors {
|
||||
sn := sector.SectorNumber
|
||||
log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "sectors", len(b.todo))
|
||||
|
||||
for _, ch := range b.waiting[sn] {
|
||||
ch <- mcid // buffered
|
||||
}
|
||||
delete(b.waiting, sn)
|
||||
delete(b.todo, sn)
|
||||
delete(b.deadlines, sn)
|
||||
}
|
||||
|
||||
return &mcid, nil
|
||||
return []sealiface.PreCommitBatchRes{res}, nil
|
||||
}
|
||||
|
||||
// register PreCommit, wait for batch message, return message CID
|
||||
func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, deposit abi.TokenAmount, in *miner0.SectorPreCommitInfo) (mcid cid.Cid, err error) {
|
||||
func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, deposit abi.TokenAmount, in *miner0.SectorPreCommitInfo) (res sealiface.PreCommitBatchRes, err error) {
|
||||
_, curEpoch, err := b.api.ChainHead(b.mctx)
|
||||
if err != nil {
|
||||
log.Errorf("getting chain head: %s", err)
|
||||
return cid.Undef, nil
|
||||
return sealiface.PreCommitBatchRes{}, err
|
||||
}
|
||||
|
||||
sn := s.SectorNumber
|
||||
@ -240,7 +260,7 @@ func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, depos
|
||||
pci: in,
|
||||
}
|
||||
|
||||
sent := make(chan cid.Cid, 1)
|
||||
sent := make(chan sealiface.PreCommitBatchRes, 1)
|
||||
b.waiting[sn] = append(b.waiting[sn], sent)
|
||||
|
||||
select {
|
||||
@ -253,12 +273,12 @@ func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, depos
|
||||
case c := <-sent:
|
||||
return c, nil
|
||||
case <-ctx.Done():
|
||||
return cid.Undef, ctx.Err()
|
||||
return sealiface.PreCommitBatchRes{}, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (b *PreCommitBatcher) Flush(ctx context.Context) (*cid.Cid, error) {
|
||||
resCh := make(chan *cid.Cid, 1)
|
||||
func (b *PreCommitBatcher) Flush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) {
|
||||
resCh := make(chan []sealiface.PreCommitBatchRes, 1)
|
||||
select {
|
||||
case b.force <- resCh:
|
||||
select {
|
||||
|
23
extern/storage-sealing/sealiface/batching.go
vendored
Normal file
23
extern/storage-sealing/sealiface/batching.go
vendored
Normal file
@ -0,0 +1,23 @@
|
||||
package sealiface
|
||||
|
||||
import (
|
||||
"github.com/ipfs/go-cid"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
)
|
||||
|
||||
type CommitBatchRes struct {
|
||||
Sectors []abi.SectorNumber
|
||||
|
||||
FailedSectors map[abi.SectorNumber]string
|
||||
|
||||
Msg *cid.Cid
|
||||
Error string // if set, means that all sectors are failed, implies Msg==nil
|
||||
}
|
||||
|
||||
type PreCommitBatchRes struct {
|
||||
Sectors []abi.SectorNumber
|
||||
|
||||
Msg *cid.Cid
|
||||
Error string // if set, means that all sectors are failed, implies Msg==nil
|
||||
}
|
5
extern/storage-sealing/sealing.go
vendored
5
extern/storage-sealing/sealing.go
vendored
@ -27,6 +27,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
||||
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||
)
|
||||
|
||||
const SectorStorePrefix = "/sectors"
|
||||
@ -207,7 +208,7 @@ func (m *Sealing) TerminatePending(ctx context.Context) ([]abi.SectorID, error)
|
||||
return m.terminator.Pending(ctx)
|
||||
}
|
||||
|
||||
func (m *Sealing) SectorPreCommitFlush(ctx context.Context) (*cid.Cid, error) {
|
||||
func (m *Sealing) SectorPreCommitFlush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) {
|
||||
return m.precommiter.Flush(ctx)
|
||||
}
|
||||
|
||||
@ -215,7 +216,7 @@ func (m *Sealing) SectorPreCommitPending(ctx context.Context) ([]abi.SectorID, e
|
||||
return m.precommiter.Pending(ctx)
|
||||
}
|
||||
|
||||
func (m *Sealing) CommitFlush(ctx context.Context) (*cid.Cid, error) {
|
||||
func (m *Sealing) CommitFlush(ctx context.Context) ([]sealiface.CommitBatchRes, error) {
|
||||
return m.commiter.Flush(ctx)
|
||||
}
|
||||
|
||||
|
34
extern/storage-sealing/states_sealing.go
vendored
34
extern/storage-sealing/states_sealing.go
vendored
@ -355,20 +355,28 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
|
||||
|
||||
func (m *Sealing) handleSubmitPreCommitBatch(ctx statemachine.Context, sector SectorInfo) error {
|
||||
if sector.CommD == nil || sector.CommR == nil {
|
||||
return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector had nil commR or commD")})
|
||||
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("sector had nil commR or commD")})
|
||||
}
|
||||
|
||||
params, deposit, _, err := m.preCommitParams(ctx, sector)
|
||||
if params == nil || err != nil {
|
||||
return err
|
||||
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("preCommitParams: %w", err)})
|
||||
}
|
||||
|
||||
mcid, err := m.precommiter.AddPreCommit(ctx.Context(), sector, deposit, params)
|
||||
res, err := m.precommiter.AddPreCommit(ctx.Context(), sector, deposit, params)
|
||||
if err != nil {
|
||||
return ctx.Send(SectorCommitFailed{xerrors.Errorf("queuing precommit batch failed: %w", err)})
|
||||
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("queuing precommit batch failed: %w", err)})
|
||||
}
|
||||
|
||||
return ctx.Send(SectorPreCommitBatchSent{mcid})
|
||||
if res.Error != "" {
|
||||
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("precommit batch error: %s", res.Error)})
|
||||
}
|
||||
|
||||
if res.Msg == nil {
|
||||
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("batch message was nil")})
|
||||
}
|
||||
|
||||
return ctx.Send(SectorPreCommitBatchSent{*res.Msg})
|
||||
}
|
||||
|
||||
func (m *Sealing) handlePreCommitWait(ctx statemachine.Context, sector SectorInfo) error {
|
||||
@ -581,7 +589,7 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S
|
||||
return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector had nil commR or commD")})
|
||||
}
|
||||
|
||||
mcid, err := m.commiter.AddCommit(ctx.Context(), sector, AggregateInput{
|
||||
res, err := m.commiter.AddCommit(ctx.Context(), sector, AggregateInput{
|
||||
info: proof.AggregateSealVerifyInfo{
|
||||
Number: sector.SectorNumber,
|
||||
Randomness: sector.TicketValue,
|
||||
@ -596,7 +604,19 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S
|
||||
return ctx.Send(SectorCommitFailed{xerrors.Errorf("queuing commit for aggregation failed: %w", err)})
|
||||
}
|
||||
|
||||
return ctx.Send(SectorCommitAggregateSent{mcid})
|
||||
if res.Error != "" {
|
||||
return ctx.Send(SectorCommitFailed{xerrors.Errorf("aggregate error: %s", res.Error)})
|
||||
}
|
||||
|
||||
if e, found := res.FailedSectors[sector.SectorNumber]; found {
|
||||
return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector failed in aggregate processing: %s", e)})
|
||||
}
|
||||
|
||||
if res.Msg == nil {
|
||||
return ctx.Send(SectorCommitFailed{xerrors.Errorf("aggregate message was nil")})
|
||||
}
|
||||
|
||||
return ctx.Send(SectorCommitAggregateSent{*res.Msg})
|
||||
}
|
||||
|
||||
func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) error {
|
||||
|
@ -271,7 +271,7 @@ func DefaultStorageMiner() *StorageMiner {
|
||||
PreCommitBatchSlack: Duration(3 * time.Hour),
|
||||
|
||||
AggregateCommits: true,
|
||||
MinCommitBatch: 1, // we must have at least one proof to aggregate
|
||||
MinCommitBatch: miner5.MinAggregatedSectors, // we must have at least four proofs to aggregate
|
||||
MaxCommitBatch: miner5.MaxAggregatedSectors, // this is the maximum aggregation per FIP13
|
||||
CommitBatchWait: Duration(24 * time.Hour), // this can be up to 6 days
|
||||
CommitBatchSlack: Duration(1 * time.Hour),
|
||||
|
@ -267,7 +267,7 @@ func gasEstimateGasLimit(
|
||||
return -1, xerrors.Errorf("getting key address: %w", err)
|
||||
}
|
||||
|
||||
pending, ts := mpool.PendingFor(fromA)
|
||||
pending, ts := mpool.PendingFor(ctx, fromA)
|
||||
priorMsgs := make([]types.ChainMsg, 0, len(pending))
|
||||
for _, m := range pending {
|
||||
if m.Message.Nonce == msg.Nonce {
|
||||
|
@ -60,7 +60,7 @@ func (a *MpoolAPI) MpoolSelect(ctx context.Context, tsk types.TipSetKey, ticketQ
|
||||
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
|
||||
}
|
||||
|
||||
return a.Mpool.SelectMessages(ts, ticketQuality)
|
||||
return a.Mpool.SelectMessages(ctx, ts, ticketQuality)
|
||||
}
|
||||
|
||||
func (a *MpoolAPI) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) {
|
||||
@ -68,7 +68,7 @@ func (a *MpoolAPI) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*ty
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
|
||||
}
|
||||
pending, mpts := a.Mpool.Pending()
|
||||
pending, mpts := a.Mpool.Pending(ctx)
|
||||
|
||||
haveCids := map[cid.Cid]struct{}{}
|
||||
for _, m := range pending {
|
||||
@ -122,16 +122,16 @@ func (a *MpoolAPI) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*ty
|
||||
}
|
||||
|
||||
func (a *MpoolAPI) MpoolClear(ctx context.Context, local bool) error {
|
||||
a.Mpool.Clear(local)
|
||||
a.Mpool.Clear(ctx, local)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MpoolModule) MpoolPush(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) {
|
||||
return m.Mpool.Push(smsg)
|
||||
return m.Mpool.Push(ctx, smsg)
|
||||
}
|
||||
|
||||
func (a *MpoolAPI) MpoolPushUntrusted(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) {
|
||||
return a.Mpool.PushUntrusted(smsg)
|
||||
return a.Mpool.PushUntrusted(ctx, smsg)
|
||||
}
|
||||
|
||||
func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) {
|
||||
@ -192,7 +192,7 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spe
|
||||
func (a *MpoolAPI) MpoolBatchPush(ctx context.Context, smsgs []*types.SignedMessage) ([]cid.Cid, error) {
|
||||
var messageCids []cid.Cid
|
||||
for _, smsg := range smsgs {
|
||||
smsgCid, err := a.Mpool.Push(smsg)
|
||||
smsgCid, err := a.Mpool.Push(ctx, smsg)
|
||||
if err != nil {
|
||||
return messageCids, err
|
||||
}
|
||||
@ -204,7 +204,7 @@ func (a *MpoolAPI) MpoolBatchPush(ctx context.Context, smsgs []*types.SignedMess
|
||||
func (a *MpoolAPI) MpoolBatchPushUntrusted(ctx context.Context, smsgs []*types.SignedMessage) ([]cid.Cid, error) {
|
||||
var messageCids []cid.Cid
|
||||
for _, smsg := range smsgs {
|
||||
smsgCid, err := a.Mpool.PushUntrusted(smsg)
|
||||
smsgCid, err := a.Mpool.PushUntrusted(ctx, smsg)
|
||||
if err != nil {
|
||||
return messageCids, err
|
||||
}
|
||||
|
@ -32,6 +32,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
||||
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
||||
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
apitypes "github.com/filecoin-project/lotus/api/types"
|
||||
@ -373,7 +374,7 @@ func (sm *StorageMinerAPI) SectorTerminatePending(ctx context.Context) ([]abi.Se
|
||||
return sm.Miner.TerminatePending(ctx)
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) SectorPreCommitFlush(ctx context.Context) (*cid.Cid, error) {
|
||||
func (sm *StorageMinerAPI) SectorPreCommitFlush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) {
|
||||
return sm.Miner.SectorPreCommitFlush(ctx)
|
||||
}
|
||||
|
||||
@ -385,7 +386,7 @@ func (sm *StorageMinerAPI) SectorMarkForUpgrade(ctx context.Context, id abi.Sect
|
||||
return sm.Miner.MarkForUpgrade(id)
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) SectorCommitFlush(ctx context.Context) (*cid.Cid, error) {
|
||||
func (sm *StorageMinerAPI) SectorCommitFlush(ctx context.Context) ([]sealiface.CommitBatchRes, error) {
|
||||
return sm.Miner.CommitFlush(ctx)
|
||||
}
|
||||
|
||||
|
@ -147,15 +147,7 @@ func (s SealingAPIAdapter) StateComputeDataCommitment(ctx context.Context, maddr
|
||||
return cid.Undef, xerrors.Errorf("failed to unmarshal TipSetToken to TipSetKey: %w", err)
|
||||
}
|
||||
|
||||
ts, err := s.delegate.ChainGetTipSet(ctx, tsk)
|
||||
if err != nil {
|
||||
return cid.Cid{}, err
|
||||
}
|
||||
|
||||
// using parent ts because the migration won't be run on the first nv13
|
||||
// tipset we apply StateCall to (because we don't run migrations in StateCall
|
||||
// and just apply to parent state)
|
||||
nv, err := s.delegate.StateNetworkVersion(ctx, ts.Parents())
|
||||
nv, err := s.delegate.StateNetworkVersion(ctx, tsk)
|
||||
if err != nil {
|
||||
return cid.Cid{}, err
|
||||
}
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"github.com/filecoin-project/specs-storage/storage"
|
||||
|
||||
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
||||
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||
)
|
||||
|
||||
// TODO: refactor this to be direct somehow
|
||||
@ -59,7 +60,7 @@ func (m *Miner) TerminatePending(ctx context.Context) ([]abi.SectorID, error) {
|
||||
return m.sealing.TerminatePending(ctx)
|
||||
}
|
||||
|
||||
func (m *Miner) SectorPreCommitFlush(ctx context.Context) (*cid.Cid, error) {
|
||||
func (m *Miner) SectorPreCommitFlush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) {
|
||||
return m.sealing.SectorPreCommitFlush(ctx)
|
||||
}
|
||||
|
||||
@ -67,7 +68,7 @@ func (m *Miner) SectorPreCommitPending(ctx context.Context) ([]abi.SectorID, err
|
||||
return m.sealing.SectorPreCommitPending(ctx)
|
||||
}
|
||||
|
||||
func (m *Miner) CommitFlush(ctx context.Context) (*cid.Cid, error) {
|
||||
func (m *Miner) CommitFlush(ctx context.Context) ([]sealiface.CommitBatchRes, error) {
|
||||
return m.sealing.CommitFlush(ctx)
|
||||
}
|
||||
|
||||
|
@ -534,9 +534,14 @@ func (s *WindowPoStScheduler) runPoStCycle(ctx context.Context, di dline.Info, t
|
||||
return nil, xerrors.Errorf("getting partitions: %w", err)
|
||||
}
|
||||
|
||||
nv, err := s.api.StateNetworkVersion(ctx, ts.Key())
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("getting network version: %w", err)
|
||||
}
|
||||
|
||||
// Split partitions into batches, so as not to exceed the number of sectors
|
||||
// allowed in a single message
|
||||
partitionBatches, err := s.batchPartitions(partitions)
|
||||
partitionBatches, err := s.batchPartitions(partitions, nv)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -716,7 +721,7 @@ func (s *WindowPoStScheduler) runPoStCycle(ctx context.Context, di dline.Info, t
|
||||
return posts, nil
|
||||
}
|
||||
|
||||
func (s *WindowPoStScheduler) batchPartitions(partitions []api.Partition) ([][]api.Partition, error) {
|
||||
func (s *WindowPoStScheduler) batchPartitions(partitions []api.Partition, nv network.Version) ([][]api.Partition, error) {
|
||||
// We don't want to exceed the number of sectors allowed in a message.
|
||||
// So given the number of sectors in a partition, work out the number of
|
||||
// partitions that can be in a message without exceeding sectors per
|
||||
@ -732,6 +737,11 @@ func (s *WindowPoStScheduler) batchPartitions(partitions []api.Partition) ([][]a
|
||||
return nil, xerrors.Errorf("getting sectors per partition: %w", err)
|
||||
}
|
||||
|
||||
// Also respect the AddressedPartitionsMax (which is the same as DeclarationsMax (which is all really just MaxPartitionsPerDeadline))
|
||||
if partitionsPerMsg > policy.GetDeclarationsMax(nv) {
|
||||
partitionsPerMsg = policy.GetDeclarationsMax(nv)
|
||||
}
|
||||
|
||||
// The number of messages will be:
|
||||
// ceiling(number of partitions / partitions per message)
|
||||
batchCount := len(partitions) / partitionsPerMsg
|
||||
|
@ -5,6 +5,9 @@ import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
builtin5 "github.com/filecoin-project/specs-actors/v5/actors/builtin"
|
||||
miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
@ -177,13 +180,16 @@ func TestWDPostDoPost(t *testing.T) {
|
||||
mockStgMinerAPI := newMockStorageMinerAPI()
|
||||
|
||||
// Get the number of sectors allowed in a partition for this proof type
|
||||
sectorsPerPartition, err := builtin2.PoStProofWindowPoStPartitionSectors(proofType)
|
||||
sectorsPerPartition, err := builtin5.PoStProofWindowPoStPartitionSectors(proofType)
|
||||
require.NoError(t, err)
|
||||
// Work out the number of partitions that can be included in a message
|
||||
// without exceeding the message sector limit
|
||||
|
||||
require.NoError(t, err)
|
||||
partitionsPerMsg := int(miner2.AddressedSectorsMax / sectorsPerPartition)
|
||||
partitionsPerMsg := int(miner5.AddressedSectorsMax / sectorsPerPartition)
|
||||
if partitionsPerMsg > miner5.AddressedPartitionsMax {
|
||||
partitionsPerMsg = miner5.AddressedPartitionsMax
|
||||
}
|
||||
|
||||
// Enough partitions to fill expectedMsgCount-1 messages
|
||||
partitionCount := (expectedMsgCount - 1) * partitionsPerMsg
|
||||
@ -219,11 +225,11 @@ func TestWDPostDoPost(t *testing.T) {
|
||||
}
|
||||
|
||||
di := &dline.Info{
|
||||
WPoStPeriodDeadlines: miner2.WPoStPeriodDeadlines,
|
||||
WPoStProvingPeriod: miner2.WPoStProvingPeriod,
|
||||
WPoStChallengeWindow: miner2.WPoStChallengeWindow,
|
||||
WPoStChallengeLookback: miner2.WPoStChallengeLookback,
|
||||
FaultDeclarationCutoff: miner2.FaultDeclarationCutoff,
|
||||
WPoStPeriodDeadlines: miner5.WPoStPeriodDeadlines,
|
||||
WPoStProvingPeriod: miner5.WPoStProvingPeriod,
|
||||
WPoStChallengeWindow: miner5.WPoStChallengeWindow,
|
||||
WPoStChallengeLookback: miner5.WPoStChallengeLookback,
|
||||
FaultDeclarationCutoff: miner5.FaultDeclarationCutoff,
|
||||
}
|
||||
ts := mockTipSet(t)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user