Merge pull request #6364 from filecoin-project/asr/resolve-messages
Improve address resolution for messages
This commit is contained in:
commit
5a9e6c8142
@ -126,10 +126,14 @@ type MessagePool struct {
|
|||||||
|
|
||||||
republished map[cid.Cid]struct{}
|
republished map[cid.Cid]struct{}
|
||||||
|
|
||||||
|
// do NOT access this map directly, use isLocal, setLocal, and forEachLocal respectively
|
||||||
localAddrs map[address.Address]struct{}
|
localAddrs map[address.Address]struct{}
|
||||||
|
|
||||||
|
// do NOT access this map directly, use getPendingMset, setPendingMset, deletePendingMset, forEachPending, and clearPending respectively
|
||||||
pending map[address.Address]*msgSet
|
pending map[address.Address]*msgSet
|
||||||
|
|
||||||
|
keyCache map[address.Address]address.Address
|
||||||
|
|
||||||
curTsLk sync.Mutex // DO NOT LOCK INSIDE lk
|
curTsLk sync.Mutex // DO NOT LOCK INSIDE lk
|
||||||
curTs *types.TipSet
|
curTs *types.TipSet
|
||||||
|
|
||||||
@ -329,6 +333,20 @@ func (ms *msgSet) getRequiredFunds(nonce uint64) types.BigInt {
|
|||||||
return types.BigInt{Int: requiredFunds}
|
return types.BigInt{Int: requiredFunds}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ms *msgSet) toSlice() []*types.SignedMessage {
|
||||||
|
set := make([]*types.SignedMessage, 0, len(ms.msgs))
|
||||||
|
|
||||||
|
for _, m := range ms.msgs {
|
||||||
|
set = append(set, m)
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Slice(set, func(i, j int) bool {
|
||||||
|
return set[i].Message.Nonce < set[j].Message.Nonce
|
||||||
|
})
|
||||||
|
|
||||||
|
return set
|
||||||
|
}
|
||||||
|
|
||||||
func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, j journal.Journal) (*MessagePool, error) {
|
func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, j journal.Journal) (*MessagePool, error) {
|
||||||
cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
|
cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
|
||||||
verifcache, _ := lru.New2Q(build.VerifSigCacheSize)
|
verifcache, _ := lru.New2Q(build.VerifSigCacheSize)
|
||||||
@ -350,6 +368,7 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, j journ
|
|||||||
repubTrigger: make(chan struct{}, 1),
|
repubTrigger: make(chan struct{}, 1),
|
||||||
localAddrs: make(map[address.Address]struct{}),
|
localAddrs: make(map[address.Address]struct{}),
|
||||||
pending: make(map[address.Address]*msgSet),
|
pending: make(map[address.Address]*msgSet),
|
||||||
|
keyCache: make(map[address.Address]address.Address),
|
||||||
minGasPrice: types.NewInt(0),
|
minGasPrice: types.NewInt(0),
|
||||||
pruneTrigger: make(chan struct{}, 1),
|
pruneTrigger: make(chan struct{}, 1),
|
||||||
pruneCooldown: make(chan struct{}, 1),
|
pruneCooldown: make(chan struct{}, 1),
|
||||||
@ -371,9 +390,11 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, j journ
|
|||||||
// enable initial prunes
|
// enable initial prunes
|
||||||
mp.pruneCooldown <- struct{}{}
|
mp.pruneCooldown <- struct{}{}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.TODO())
|
||||||
|
|
||||||
// load the current tipset and subscribe to head changes _before_ loading local messages
|
// load the current tipset and subscribe to head changes _before_ loading local messages
|
||||||
mp.curTs = api.SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
|
mp.curTs = api.SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
|
||||||
err := mp.HeadChange(rev, app)
|
err := mp.HeadChange(ctx, rev, app)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("mpool head notif handler error: %+v", err)
|
log.Errorf("mpool head notif handler error: %+v", err)
|
||||||
}
|
}
|
||||||
@ -384,7 +405,8 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, j journ
|
|||||||
mp.lk.Lock()
|
mp.lk.Lock()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
err := mp.loadLocal()
|
defer cancel()
|
||||||
|
err := mp.loadLocal(ctx)
|
||||||
|
|
||||||
mp.lk.Unlock()
|
mp.lk.Unlock()
|
||||||
mp.curTsLk.Unlock()
|
mp.curTsLk.Unlock()
|
||||||
@ -395,12 +417,106 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, j journ
|
|||||||
|
|
||||||
log.Info("mpool ready")
|
log.Info("mpool ready")
|
||||||
|
|
||||||
mp.runLoop()
|
mp.runLoop(ctx)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return mp, nil
|
return mp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) (address.Address, error) {
|
||||||
|
// check the cache
|
||||||
|
a, f := mp.keyCache[addr]
|
||||||
|
if f {
|
||||||
|
return a, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// resolve the address
|
||||||
|
ka, err := mp.api.StateAccountKeyAtFinality(ctx, addr, mp.curTs)
|
||||||
|
if err != nil {
|
||||||
|
return address.Undef, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// place both entries in the cache (may both be key addresses, which is fine)
|
||||||
|
mp.keyCache[addr] = ka
|
||||||
|
mp.keyCache[ka] = ka
|
||||||
|
|
||||||
|
return ka, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *MessagePool) getPendingMset(ctx context.Context, addr address.Address) (*msgSet, bool, error) {
|
||||||
|
ra, err := mp.resolveToKey(ctx, addr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ms, f := mp.pending[ra]
|
||||||
|
|
||||||
|
return ms, f, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *MessagePool) setPendingMset(ctx context.Context, addr address.Address, ms *msgSet) error {
|
||||||
|
ra, err := mp.resolveToKey(ctx, addr)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
mp.pending[ra] = ms
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// This method isn't strictly necessary, since it doesn't resolve any addresses, but it's safer to have
|
||||||
|
func (mp *MessagePool) forEachPending(f func(address.Address, *msgSet)) {
|
||||||
|
for la, ms := range mp.pending {
|
||||||
|
f(la, ms)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *MessagePool) deletePendingMset(ctx context.Context, addr address.Address) error {
|
||||||
|
ra, err := mp.resolveToKey(ctx, addr)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(mp.pending, ra)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// This method isn't strictly necessary, since it doesn't resolve any addresses, but it's safer to have
|
||||||
|
func (mp *MessagePool) clearPending() {
|
||||||
|
mp.pending = make(map[address.Address]*msgSet)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *MessagePool) isLocal(ctx context.Context, addr address.Address) (bool, error) {
|
||||||
|
ra, err := mp.resolveToKey(ctx, addr)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, f := mp.localAddrs[ra]
|
||||||
|
|
||||||
|
return f, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *MessagePool) setLocal(ctx context.Context, addr address.Address) error {
|
||||||
|
ra, err := mp.resolveToKey(ctx, addr)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
mp.localAddrs[ra] = struct{}{}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// This method isn't strictly necessary, since it doesn't resolve any addresses, but it's safer to have
|
||||||
|
func (mp *MessagePool) forEachLocal(ctx context.Context, f func(context.Context, address.Address)) {
|
||||||
|
for la := range mp.localAddrs {
|
||||||
|
f(ctx, la)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) Close() error {
|
func (mp *MessagePool) Close() error {
|
||||||
close(mp.closer)
|
close(mp.closer)
|
||||||
return nil
|
return nil
|
||||||
@ -418,15 +534,15 @@ func (mp *MessagePool) Prune() {
|
|||||||
mp.pruneTrigger <- struct{}{}
|
mp.pruneTrigger <- struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) runLoop() {
|
func (mp *MessagePool) runLoop(ctx context.Context) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-mp.repubTk.C:
|
case <-mp.repubTk.C:
|
||||||
if err := mp.republishPendingMessages(); err != nil {
|
if err := mp.republishPendingMessages(ctx); err != nil {
|
||||||
log.Errorf("error while republishing messages: %s", err)
|
log.Errorf("error while republishing messages: %s", err)
|
||||||
}
|
}
|
||||||
case <-mp.repubTrigger:
|
case <-mp.repubTrigger:
|
||||||
if err := mp.republishPendingMessages(); err != nil {
|
if err := mp.republishPendingMessages(ctx); err != nil {
|
||||||
log.Errorf("error while republishing messages: %s", err)
|
log.Errorf("error while republishing messages: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -442,8 +558,10 @@ func (mp *MessagePool) runLoop() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) addLocal(m *types.SignedMessage) error {
|
func (mp *MessagePool) addLocal(ctx context.Context, m *types.SignedMessage) error {
|
||||||
mp.localAddrs[m.Message.From] = struct{}{}
|
if err := mp.setLocal(ctx, m.Message.From); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
msgb, err := m.Serialize()
|
msgb, err := m.Serialize()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -475,7 +593,7 @@ func (mp *MessagePool) verifyMsgBeforeAdd(m *types.SignedMessage, curTs *types.T
|
|||||||
return false, xerrors.Errorf("message will not be included in a block: %w", err)
|
return false, xerrors.Errorf("message will not be included in a block: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// this checks if the GasFeeCap is suffisciently high for inclusion in the next 20 blocks
|
// this checks if the GasFeeCap is sufficiently high for inclusion in the next 20 blocks
|
||||||
// if the GasFeeCap is too low, we soft reject the message (Ignore in pubsub) and rely
|
// if the GasFeeCap is too low, we soft reject the message (Ignore in pubsub) and rely
|
||||||
// on republish to push it through later, if the baseFee has fallen.
|
// on republish to push it through later, if the baseFee has fallen.
|
||||||
// this is a defensive check that stops minimum baseFee spam attacks from overloading validation
|
// this is a defensive check that stops minimum baseFee spam attacks from overloading validation
|
||||||
@ -510,7 +628,7 @@ func (mp *MessagePool) verifyMsgBeforeAdd(m *types.SignedMessage, curTs *types.T
|
|||||||
return publish, nil
|
return publish, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
|
func (mp *MessagePool) Push(ctx context.Context, m *types.SignedMessage) (cid.Cid, error) {
|
||||||
err := mp.checkMessage(m)
|
err := mp.checkMessage(m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.Undef, err
|
return cid.Undef, err
|
||||||
@ -523,7 +641,7 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.Lock()
|
||||||
publish, err := mp.addTs(m, mp.curTs, true, false)
|
publish, err := mp.addTs(ctx, m, mp.curTs, true, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
mp.curTsLk.Unlock()
|
mp.curTsLk.Unlock()
|
||||||
return cid.Undef, err
|
return cid.Undef, err
|
||||||
@ -576,7 +694,7 @@ func (mp *MessagePool) checkMessage(m *types.SignedMessage) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) Add(m *types.SignedMessage) error {
|
func (mp *MessagePool) Add(ctx context.Context, m *types.SignedMessage) error {
|
||||||
err := mp.checkMessage(m)
|
err := mp.checkMessage(m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -591,7 +709,7 @@ func (mp *MessagePool) Add(m *types.SignedMessage) error {
|
|||||||
mp.curTsLk.Lock()
|
mp.curTsLk.Lock()
|
||||||
defer mp.curTsLk.Unlock()
|
defer mp.curTsLk.Unlock()
|
||||||
|
|
||||||
_, err = mp.addTs(m, mp.curTs, false, false)
|
_, err = mp.addTs(ctx, m, mp.curTs, false, false)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -631,7 +749,7 @@ func (mp *MessagePool) VerifyMsgSig(m *types.SignedMessage) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) checkBalance(m *types.SignedMessage, curTs *types.TipSet) error {
|
func (mp *MessagePool) checkBalance(ctx context.Context, m *types.SignedMessage, curTs *types.TipSet) error {
|
||||||
balance, err := mp.getStateBalance(m.Message.From, curTs)
|
balance, err := mp.getStateBalance(m.Message.From, curTs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("failed to check sender balance: %s: %w", err, ErrSoftValidationFailure)
|
return xerrors.Errorf("failed to check sender balance: %s: %w", err, ErrSoftValidationFailure)
|
||||||
@ -645,7 +763,12 @@ func (mp *MessagePool) checkBalance(m *types.SignedMessage, curTs *types.TipSet)
|
|||||||
// add Value for soft failure check
|
// add Value for soft failure check
|
||||||
//requiredFunds = types.BigAdd(requiredFunds, m.Message.Value)
|
//requiredFunds = types.BigAdd(requiredFunds, m.Message.Value)
|
||||||
|
|
||||||
mset, ok := mp.pending[m.Message.From]
|
mset, ok, err := mp.getPendingMset(ctx, m.Message.From)
|
||||||
|
if err != nil {
|
||||||
|
log.Debugf("mpoolcheckbalance failed to get pending mset: %s", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
if ok {
|
if ok {
|
||||||
requiredFunds = types.BigAdd(requiredFunds, mset.getRequiredFunds(m.Message.Nonce))
|
requiredFunds = types.BigAdd(requiredFunds, mset.getRequiredFunds(m.Message.Nonce))
|
||||||
}
|
}
|
||||||
@ -659,7 +782,7 @@ func (mp *MessagePool) checkBalance(m *types.SignedMessage, curTs *types.TipSet)
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local, untrusted bool) (bool, error) {
|
func (mp *MessagePool) addTs(ctx context.Context, m *types.SignedMessage, curTs *types.TipSet, local, untrusted bool) (bool, error) {
|
||||||
snonce, err := mp.getStateNonce(m.Message.From, curTs)
|
snonce, err := mp.getStateNonce(m.Message.From, curTs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure)
|
return false, xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure)
|
||||||
@ -677,17 +800,17 @@ func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local,
|
|||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := mp.checkBalance(m, curTs); err != nil {
|
if err := mp.checkBalance(ctx, m, curTs); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = mp.addLocked(m, !local, untrusted)
|
err = mp.addLocked(ctx, m, !local, untrusted)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if local {
|
if local {
|
||||||
err = mp.addLocal(m)
|
err = mp.addLocal(ctx, m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("error persisting local message: %w", err)
|
return false, xerrors.Errorf("error persisting local message: %w", err)
|
||||||
}
|
}
|
||||||
@ -696,7 +819,7 @@ func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local,
|
|||||||
return publish, nil
|
return publish, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) addLoaded(m *types.SignedMessage) error {
|
func (mp *MessagePool) addLoaded(ctx context.Context, m *types.SignedMessage) error {
|
||||||
err := mp.checkMessage(m)
|
err := mp.checkMessage(m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -722,21 +845,21 @@ func (mp *MessagePool) addLoaded(m *types.SignedMessage) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := mp.checkBalance(m, curTs); err != nil {
|
if err := mp.checkBalance(ctx, m, curTs); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return mp.addLocked(m, false, false)
|
return mp.addLocked(ctx, m, false, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) addSkipChecks(m *types.SignedMessage) error {
|
func (mp *MessagePool) addSkipChecks(ctx context.Context, m *types.SignedMessage) error {
|
||||||
mp.lk.Lock()
|
mp.lk.Lock()
|
||||||
defer mp.lk.Unlock()
|
defer mp.lk.Unlock()
|
||||||
|
|
||||||
return mp.addLocked(m, false, false)
|
return mp.addLocked(ctx, m, false, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) addLocked(m *types.SignedMessage, strict, untrusted bool) error {
|
func (mp *MessagePool) addLocked(ctx context.Context, m *types.SignedMessage, strict, untrusted bool) error {
|
||||||
log.Debugf("mpooladd: %s %d", m.Message.From, m.Message.Nonce)
|
log.Debugf("mpooladd: %s %d", m.Message.From, m.Message.Nonce)
|
||||||
if m.Signature.Type == crypto.SigTypeBLS {
|
if m.Signature.Type == crypto.SigTypeBLS {
|
||||||
mp.blsSigCache.Add(m.Cid(), m.Signature)
|
mp.blsSigCache.Add(m.Cid(), m.Signature)
|
||||||
@ -752,7 +875,13 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage, strict, untrusted bool)
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
mset, ok := mp.pending[m.Message.From]
|
// Note: If performance becomes an issue, making this getOrCreatePendingMset will save some work
|
||||||
|
mset, ok, err := mp.getPendingMset(ctx, m.Message.From)
|
||||||
|
if err != nil {
|
||||||
|
log.Debug(err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
nonce, err := mp.getStateNonce(m.Message.From, mp.curTs)
|
nonce, err := mp.getStateNonce(m.Message.From, mp.curTs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -760,7 +889,9 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage, strict, untrusted bool)
|
|||||||
}
|
}
|
||||||
|
|
||||||
mset = newMsgSet(nonce)
|
mset = newMsgSet(nonce)
|
||||||
mp.pending[m.Message.From] = mset
|
if err = mp.setPendingMset(ctx, m.Message.From, mset); err != nil {
|
||||||
|
return xerrors.Errorf("failed to set pending mset: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
incr, err := mset.add(m, mp, strict, untrusted)
|
incr, err := mset.add(m, mp, strict, untrusted)
|
||||||
@ -795,23 +926,28 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage, strict, untrusted bool)
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) GetNonce(addr address.Address) (uint64, error) {
|
func (mp *MessagePool) GetNonce(ctx context.Context, addr address.Address) (uint64, error) {
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.Lock()
|
||||||
defer mp.curTsLk.Unlock()
|
defer mp.curTsLk.Unlock()
|
||||||
|
|
||||||
mp.lk.Lock()
|
mp.lk.Lock()
|
||||||
defer mp.lk.Unlock()
|
defer mp.lk.Unlock()
|
||||||
|
|
||||||
return mp.getNonceLocked(addr, mp.curTs)
|
return mp.getNonceLocked(ctx, addr, mp.curTs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) getNonceLocked(addr address.Address, curTs *types.TipSet) (uint64, error) {
|
func (mp *MessagePool) getNonceLocked(ctx context.Context, addr address.Address, curTs *types.TipSet) (uint64, error) {
|
||||||
stateNonce, err := mp.getStateNonce(addr, curTs) // sanity check
|
stateNonce, err := mp.getStateNonce(addr, curTs) // sanity check
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
mset, ok := mp.pending[addr]
|
mset, ok, err := mp.getPendingMset(ctx, addr)
|
||||||
|
if err != nil {
|
||||||
|
log.Debugf("mpoolgetnonce failed to get mset: %s", err)
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
if ok {
|
if ok {
|
||||||
if stateNonce > mset.nextNonce {
|
if stateNonce > mset.nextNonce {
|
||||||
log.Errorf("state nonce was larger than mset.nextNonce (%d > %d)", stateNonce, mset.nextNonce)
|
log.Errorf("state nonce was larger than mset.nextNonce (%d > %d)", stateNonce, mset.nextNonce)
|
||||||
@ -848,7 +984,7 @@ func (mp *MessagePool) getStateBalance(addr address.Address, ts *types.TipSet) (
|
|||||||
// - strict checks are enabled
|
// - strict checks are enabled
|
||||||
// - extra strict add checks are used when adding the messages to the msgSet
|
// - extra strict add checks are used when adding the messages to the msgSet
|
||||||
// that means: no nonce gaps, at most 10 pending messages for the actor
|
// that means: no nonce gaps, at most 10 pending messages for the actor
|
||||||
func (mp *MessagePool) PushUntrusted(m *types.SignedMessage) (cid.Cid, error) {
|
func (mp *MessagePool) PushUntrusted(ctx context.Context, m *types.SignedMessage) (cid.Cid, error) {
|
||||||
err := mp.checkMessage(m)
|
err := mp.checkMessage(m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.Undef, err
|
return cid.Undef, err
|
||||||
@ -861,7 +997,7 @@ func (mp *MessagePool) PushUntrusted(m *types.SignedMessage) (cid.Cid, error) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.Lock()
|
||||||
publish, err := mp.addTs(m, mp.curTs, true, true)
|
publish, err := mp.addTs(ctx, m, mp.curTs, true, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
mp.curTsLk.Unlock()
|
mp.curTsLk.Unlock()
|
||||||
return cid.Undef, err
|
return cid.Undef, err
|
||||||
@ -883,15 +1019,20 @@ func (mp *MessagePool) PushUntrusted(m *types.SignedMessage) (cid.Cid, error) {
|
|||||||
return m.Cid(), nil
|
return m.Cid(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) Remove(from address.Address, nonce uint64, applied bool) {
|
func (mp *MessagePool) Remove(ctx context.Context, from address.Address, nonce uint64, applied bool) {
|
||||||
mp.lk.Lock()
|
mp.lk.Lock()
|
||||||
defer mp.lk.Unlock()
|
defer mp.lk.Unlock()
|
||||||
|
|
||||||
mp.remove(from, nonce, applied)
|
mp.remove(ctx, from, nonce, applied)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *MessagePool) remove(ctx context.Context, from address.Address, nonce uint64, applied bool) {
|
||||||
|
mset, ok, err := mp.getPendingMset(ctx, from)
|
||||||
|
if err != nil {
|
||||||
|
log.Debugf("mpoolremove failed to get mset: %s", err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) remove(from address.Address, nonce uint64, applied bool) {
|
|
||||||
mset, ok := mp.pending[from]
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -916,58 +1057,57 @@ func (mp *MessagePool) remove(from address.Address, nonce uint64, applied bool)
|
|||||||
mset.rm(nonce, applied)
|
mset.rm(nonce, applied)
|
||||||
|
|
||||||
if len(mset.msgs) == 0 {
|
if len(mset.msgs) == 0 {
|
||||||
delete(mp.pending, from)
|
if err = mp.deletePendingMset(ctx, from); err != nil {
|
||||||
|
log.Debugf("mpoolremove failed to delete mset: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) Pending() ([]*types.SignedMessage, *types.TipSet) {
|
func (mp *MessagePool) Pending(ctx context.Context) ([]*types.SignedMessage, *types.TipSet) {
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.Lock()
|
||||||
defer mp.curTsLk.Unlock()
|
defer mp.curTsLk.Unlock()
|
||||||
|
|
||||||
mp.lk.Lock()
|
mp.lk.Lock()
|
||||||
defer mp.lk.Unlock()
|
defer mp.lk.Unlock()
|
||||||
|
|
||||||
return mp.allPending()
|
return mp.allPending(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) allPending() ([]*types.SignedMessage, *types.TipSet) {
|
func (mp *MessagePool) allPending(ctx context.Context) ([]*types.SignedMessage, *types.TipSet) {
|
||||||
out := make([]*types.SignedMessage, 0)
|
out := make([]*types.SignedMessage, 0)
|
||||||
for a := range mp.pending {
|
|
||||||
out = append(out, mp.pendingFor(a)...)
|
mp.forEachPending(func(a address.Address, mset *msgSet) {
|
||||||
}
|
out = append(out, mset.toSlice()...)
|
||||||
|
})
|
||||||
|
|
||||||
return out, mp.curTs
|
return out, mp.curTs
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) PendingFor(a address.Address) ([]*types.SignedMessage, *types.TipSet) {
|
func (mp *MessagePool) PendingFor(ctx context.Context, a address.Address) ([]*types.SignedMessage, *types.TipSet) {
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.Lock()
|
||||||
defer mp.curTsLk.Unlock()
|
defer mp.curTsLk.Unlock()
|
||||||
|
|
||||||
mp.lk.Lock()
|
mp.lk.Lock()
|
||||||
defer mp.lk.Unlock()
|
defer mp.lk.Unlock()
|
||||||
return mp.pendingFor(a), mp.curTs
|
return mp.pendingFor(ctx, a), mp.curTs
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) pendingFor(a address.Address) []*types.SignedMessage {
|
func (mp *MessagePool) pendingFor(ctx context.Context, a address.Address) []*types.SignedMessage {
|
||||||
mset := mp.pending[a]
|
mset, ok, err := mp.getPendingMset(ctx, a)
|
||||||
if mset == nil || len(mset.msgs) == 0 {
|
if err != nil {
|
||||||
|
log.Debugf("mpoolpendingfor failed to get mset: %s", err)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
set := make([]*types.SignedMessage, 0, len(mset.msgs))
|
if mset == nil || !ok || len(mset.msgs) == 0 {
|
||||||
|
return nil
|
||||||
for _, m := range mset.msgs {
|
|
||||||
set = append(set, m)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sort.Slice(set, func(i, j int) bool {
|
return mset.toSlice()
|
||||||
return set[i].Message.Nonce < set[j].Message.Nonce
|
|
||||||
})
|
|
||||||
|
|
||||||
return set
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) error {
|
func (mp *MessagePool) HeadChange(ctx context.Context, revert []*types.TipSet, apply []*types.TipSet) error {
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.Lock()
|
||||||
defer mp.curTsLk.Unlock()
|
defer mp.curTsLk.Unlock()
|
||||||
|
|
||||||
@ -984,7 +1124,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
|
|||||||
rm := func(from address.Address, nonce uint64) {
|
rm := func(from address.Address, nonce uint64) {
|
||||||
s, ok := rmsgs[from]
|
s, ok := rmsgs[from]
|
||||||
if !ok {
|
if !ok {
|
||||||
mp.Remove(from, nonce, true)
|
mp.Remove(ctx, from, nonce, true)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -993,7 +1133,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
mp.Remove(from, nonce, true)
|
mp.Remove(ctx, from, nonce, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
maybeRepub := func(cid cid.Cid) {
|
maybeRepub := func(cid cid.Cid) {
|
||||||
@ -1064,7 +1204,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
|
|||||||
|
|
||||||
for _, s := range rmsgs {
|
for _, s := range rmsgs {
|
||||||
for _, msg := range s {
|
for _, msg := range s {
|
||||||
if err := mp.addSkipChecks(msg); err != nil {
|
if err := mp.addSkipChecks(ctx, msg); err != nil {
|
||||||
log.Errorf("Failed to readd message from reorg to mpool: %s", err)
|
log.Errorf("Failed to readd message from reorg to mpool: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1072,7 +1212,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
|
|||||||
|
|
||||||
if len(revert) > 0 && futureDebug {
|
if len(revert) > 0 && futureDebug {
|
||||||
mp.lk.Lock()
|
mp.lk.Lock()
|
||||||
msgs, ts := mp.allPending()
|
msgs, ts := mp.allPending(ctx)
|
||||||
mp.lk.Unlock()
|
mp.lk.Unlock()
|
||||||
|
|
||||||
buckets := map[address.Address]*statBucket{}
|
buckets := map[address.Address]*statBucket{}
|
||||||
@ -1279,7 +1419,7 @@ func (mp *MessagePool) Updates(ctx context.Context) (<-chan api.MpoolUpdate, err
|
|||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) loadLocal() error {
|
func (mp *MessagePool) loadLocal(ctx context.Context) error {
|
||||||
res, err := mp.localMsgs.Query(query.Query{})
|
res, err := mp.localMsgs.Query(query.Query{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("query local messages: %w", err)
|
return xerrors.Errorf("query local messages: %w", err)
|
||||||
@ -1295,7 +1435,7 @@ func (mp *MessagePool) loadLocal() error {
|
|||||||
return xerrors.Errorf("unmarshaling local message: %w", err)
|
return xerrors.Errorf("unmarshaling local message: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := mp.addLoaded(&sm); err != nil {
|
if err := mp.addLoaded(ctx, &sm); err != nil {
|
||||||
if xerrors.Is(err, ErrNonceTooLow) {
|
if xerrors.Is(err, ErrNonceTooLow) {
|
||||||
continue // todo: drop the message from local cache (if above certain confidence threshold)
|
continue // todo: drop the message from local cache (if above certain confidence threshold)
|
||||||
}
|
}
|
||||||
@ -1303,25 +1443,30 @@ func (mp *MessagePool) loadLocal() error {
|
|||||||
log.Errorf("adding local message: %+v", err)
|
log.Errorf("adding local message: %+v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
mp.localAddrs[sm.Message.From] = struct{}{}
|
if err = mp.setLocal(ctx, sm.Message.From); err != nil {
|
||||||
|
log.Debugf("mpoolloadLocal errored: %s", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) Clear(local bool) {
|
func (mp *MessagePool) Clear(ctx context.Context, local bool) {
|
||||||
mp.lk.Lock()
|
mp.lk.Lock()
|
||||||
defer mp.lk.Unlock()
|
defer mp.lk.Unlock()
|
||||||
|
|
||||||
// remove everything if local is true, including removing local messages from
|
// remove everything if local is true, including removing local messages from
|
||||||
// the datastore
|
// the datastore
|
||||||
if local {
|
if local {
|
||||||
for a := range mp.localAddrs {
|
mp.forEachLocal(ctx, func(ctx context.Context, la address.Address) {
|
||||||
mset, ok := mp.pending[a]
|
mset, ok, err := mp.getPendingMset(ctx, la)
|
||||||
if !ok {
|
if err != nil {
|
||||||
continue
|
log.Warnf("errored while getting pending mset: %w", err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ok {
|
||||||
for _, m := range mset.msgs {
|
for _, m := range mset.msgs {
|
||||||
err := mp.localMsgs.Delete(datastore.NewKey(string(m.Cid().Bytes())))
|
err := mp.localMsgs.Delete(datastore.NewKey(string(m.Cid().Bytes())))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1329,21 +1474,30 @@ func (mp *MessagePool) Clear(local bool) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
|
||||||
mp.pending = make(map[address.Address]*msgSet)
|
mp.clearPending()
|
||||||
mp.republished = nil
|
mp.republished = nil
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove everything except the local messages
|
mp.forEachPending(func(a address.Address, ms *msgSet) {
|
||||||
for a := range mp.pending {
|
isLocal, err := mp.isLocal(ctx, a)
|
||||||
_, isLocal := mp.localAddrs[a]
|
if err != nil {
|
||||||
|
log.Warnf("errored while determining isLocal: %w", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if isLocal {
|
if isLocal {
|
||||||
continue
|
return
|
||||||
}
|
}
|
||||||
delete(mp.pending, a)
|
|
||||||
|
if err = mp.deletePendingMset(ctx, a); err != nil {
|
||||||
|
log.Warnf("errored while deleting mset: %w", err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func getBaseFeeLowerBound(baseFee, factor types.BigInt) types.BigInt {
|
func getBaseFeeLowerBound(baseFee, factor types.BigInt) types.BigInt {
|
||||||
|
@ -150,7 +150,7 @@ func (tma *testMpoolAPI) GetActorAfter(addr address.Address, ts *types.TipSet) (
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tma *testMpoolAPI) StateAccountKey(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) {
|
func (tma *testMpoolAPI) StateAccountKeyAtFinality(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) {
|
||||||
if addr.Protocol() != address.BLS && addr.Protocol() != address.SECP256K1 {
|
if addr.Protocol() != address.BLS && addr.Protocol() != address.SECP256K1 {
|
||||||
return address.Undef, fmt.Errorf("given address was not a key addr")
|
return address.Undef, fmt.Errorf("given address was not a key addr")
|
||||||
}
|
}
|
||||||
@ -199,7 +199,7 @@ func (tma *testMpoolAPI) ChainComputeBaseFee(ctx context.Context, ts *types.TipS
|
|||||||
|
|
||||||
func assertNonce(t *testing.T, mp *MessagePool, addr address.Address, val uint64) {
|
func assertNonce(t *testing.T, mp *MessagePool, addr address.Address, val uint64) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
n, err := mp.GetNonce(addr)
|
n, err := mp.GetNonce(context.TODO(), addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -211,7 +211,7 @@ func assertNonce(t *testing.T, mp *MessagePool, addr address.Address, val uint64
|
|||||||
|
|
||||||
func mustAdd(t *testing.T, mp *MessagePool, msg *types.SignedMessage) {
|
func mustAdd(t *testing.T, mp *MessagePool, msg *types.SignedMessage) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
if err := mp.Add(msg); err != nil {
|
if err := mp.Add(context.TODO(), msg); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -293,9 +293,9 @@ func TestMessagePoolMessagesInEachBlock(t *testing.T) {
|
|||||||
tma.applyBlock(t, a)
|
tma.applyBlock(t, a)
|
||||||
tsa := mock.TipSet(a)
|
tsa := mock.TipSet(a)
|
||||||
|
|
||||||
_, _ = mp.Pending()
|
_, _ = mp.Pending(context.TODO())
|
||||||
|
|
||||||
selm, _ := mp.SelectMessages(tsa, 1)
|
selm, _ := mp.SelectMessages(context.Background(), tsa, 1)
|
||||||
if len(selm) == 0 {
|
if len(selm) == 0 {
|
||||||
t.Fatal("should have returned the rest of the messages")
|
t.Fatal("should have returned the rest of the messages")
|
||||||
}
|
}
|
||||||
@ -355,7 +355,7 @@ func TestRevertMessages(t *testing.T) {
|
|||||||
|
|
||||||
assertNonce(t, mp, sender, 4)
|
assertNonce(t, mp, sender, 4)
|
||||||
|
|
||||||
p, _ := mp.Pending()
|
p, _ := mp.Pending(context.TODO())
|
||||||
fmt.Printf("%+v\n", p)
|
fmt.Printf("%+v\n", p)
|
||||||
if len(p) != 3 {
|
if len(p) != 3 {
|
||||||
t.Fatal("expected three messages in mempool")
|
t.Fatal("expected three messages in mempool")
|
||||||
@ -396,14 +396,14 @@ func TestPruningSimple(t *testing.T) {
|
|||||||
|
|
||||||
for i := 0; i < 5; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
smsg := mock.MkMessage(sender, target, uint64(i), w)
|
smsg := mock.MkMessage(sender, target, uint64(i), w)
|
||||||
if err := mp.Add(smsg); err != nil {
|
if err := mp.Add(context.TODO(), smsg); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 10; i < 50; i++ {
|
for i := 10; i < 50; i++ {
|
||||||
smsg := mock.MkMessage(sender, target, uint64(i), w)
|
smsg := mock.MkMessage(sender, target, uint64(i), w)
|
||||||
if err := mp.Add(smsg); err != nil {
|
if err := mp.Add(context.TODO(), smsg); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -413,7 +413,7 @@ func TestPruningSimple(t *testing.T) {
|
|||||||
|
|
||||||
mp.Prune()
|
mp.Prune()
|
||||||
|
|
||||||
msgs, _ := mp.Pending()
|
msgs, _ := mp.Pending(context.TODO())
|
||||||
if len(msgs) != 5 {
|
if len(msgs) != 5 {
|
||||||
t.Fatal("expected only 5 messages in pool, got: ", len(msgs))
|
t.Fatal("expected only 5 messages in pool, got: ", len(msgs))
|
||||||
}
|
}
|
||||||
@ -455,7 +455,7 @@ func TestLoadLocal(t *testing.T) {
|
|||||||
msgs := make(map[cid.Cid]struct{})
|
msgs := make(map[cid.Cid]struct{})
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
||||||
cid, err := mp.Push(m)
|
cid, err := mp.Push(context.TODO(), m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -471,7 +471,7 @@ func TestLoadLocal(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
pmsgs, _ := mp.Pending()
|
pmsgs, _ := mp.Pending(context.TODO())
|
||||||
if len(msgs) != len(pmsgs) {
|
if len(msgs) != len(pmsgs) {
|
||||||
t.Fatalf("expected %d messages, but got %d", len(msgs), len(pmsgs))
|
t.Fatalf("expected %d messages, but got %d", len(msgs), len(pmsgs))
|
||||||
}
|
}
|
||||||
@ -526,7 +526,7 @@ func TestClearAll(t *testing.T) {
|
|||||||
gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin2.StorageMarketActorCodeID, M: 2}]
|
gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin2.StorageMarketActorCodeID, M: 2}]
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
||||||
_, err := mp.Push(m)
|
_, err := mp.Push(context.TODO(), m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -537,9 +537,9 @@ func TestClearAll(t *testing.T) {
|
|||||||
mustAdd(t, mp, m)
|
mustAdd(t, mp, m)
|
||||||
}
|
}
|
||||||
|
|
||||||
mp.Clear(true)
|
mp.Clear(context.Background(), true)
|
||||||
|
|
||||||
pending, _ := mp.Pending()
|
pending, _ := mp.Pending(context.TODO())
|
||||||
if len(pending) > 0 {
|
if len(pending) > 0 {
|
||||||
t.Fatalf("cleared the mpool, but got %d pending messages", len(pending))
|
t.Fatalf("cleared the mpool, but got %d pending messages", len(pending))
|
||||||
}
|
}
|
||||||
@ -581,7 +581,7 @@ func TestClearNonLocal(t *testing.T) {
|
|||||||
gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin2.StorageMarketActorCodeID, M: 2}]
|
gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin2.StorageMarketActorCodeID, M: 2}]
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
||||||
_, err := mp.Push(m)
|
_, err := mp.Push(context.TODO(), m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -592,9 +592,9 @@ func TestClearNonLocal(t *testing.T) {
|
|||||||
mustAdd(t, mp, m)
|
mustAdd(t, mp, m)
|
||||||
}
|
}
|
||||||
|
|
||||||
mp.Clear(false)
|
mp.Clear(context.Background(), false)
|
||||||
|
|
||||||
pending, _ := mp.Pending()
|
pending, _ := mp.Pending(context.TODO())
|
||||||
if len(pending) != 10 {
|
if len(pending) != 10 {
|
||||||
t.Fatalf("expected 10 pending messages, but got %d instead", len(pending))
|
t.Fatalf("expected 10 pending messages, but got %d instead", len(pending))
|
||||||
}
|
}
|
||||||
@ -651,7 +651,7 @@ func TestUpdates(t *testing.T) {
|
|||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
||||||
_, err := mp.Push(m)
|
_, err := mp.Push(context.TODO(), m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -25,7 +25,7 @@ type Provider interface {
|
|||||||
PutMessage(m types.ChainMsg) (cid.Cid, error)
|
PutMessage(m types.ChainMsg) (cid.Cid, error)
|
||||||
PubSubPublish(string, []byte) error
|
PubSubPublish(string, []byte) error
|
||||||
GetActorAfter(address.Address, *types.TipSet) (*types.Actor, error)
|
GetActorAfter(address.Address, *types.TipSet) (*types.Actor, error)
|
||||||
StateAccountKey(context.Context, address.Address, *types.TipSet) (address.Address, error)
|
StateAccountKeyAtFinality(context.Context, address.Address, *types.TipSet) (address.Address, error)
|
||||||
MessagesForBlock(*types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error)
|
MessagesForBlock(*types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error)
|
||||||
MessagesForTipset(*types.TipSet) ([]types.ChainMsg, error)
|
MessagesForTipset(*types.TipSet) ([]types.ChainMsg, error)
|
||||||
LoadTipSet(tsk types.TipSetKey) (*types.TipSet, error)
|
LoadTipSet(tsk types.TipSetKey) (*types.TipSet, error)
|
||||||
@ -37,6 +37,8 @@ type mpoolProvider struct {
|
|||||||
ps *pubsub.PubSub
|
ps *pubsub.PubSub
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var _ Provider = (*mpoolProvider)(nil)
|
||||||
|
|
||||||
func NewProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) Provider {
|
func NewProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) Provider {
|
||||||
return &mpoolProvider{sm: sm, ps: ps}
|
return &mpoolProvider{sm: sm, ps: ps}
|
||||||
}
|
}
|
||||||
@ -72,8 +74,8 @@ func (mpp *mpoolProvider) GetActorAfter(addr address.Address, ts *types.TipSet)
|
|||||||
return st.GetActor(addr)
|
return st.GetActor(addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mpp *mpoolProvider) StateAccountKey(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) {
|
func (mpp *mpoolProvider) StateAccountKeyAtFinality(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) {
|
||||||
return mpp.sm.ResolveToKeyAddress(ctx, addr, ts)
|
return mpp.sm.ResolveToKeyAddressAtFinality(ctx, addr, ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mpp *mpoolProvider) MessagesForBlock(h *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
|
func (mpp *mpoolProvider) MessagesForBlock(h *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
|
||||||
|
@ -57,13 +57,18 @@ func (mp *MessagePool) pruneMessages(ctx context.Context, ts *types.TipSet) erro
|
|||||||
mpCfg := mp.getConfig()
|
mpCfg := mp.getConfig()
|
||||||
// we never prune priority addresses
|
// we never prune priority addresses
|
||||||
for _, actor := range mpCfg.PriorityAddrs {
|
for _, actor := range mpCfg.PriorityAddrs {
|
||||||
protected[actor] = struct{}{}
|
pk, err := mp.resolveToKey(ctx, actor)
|
||||||
|
if err != nil {
|
||||||
|
log.Debugf("pruneMessages failed to resolve priority address: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
protected[pk] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// we also never prune locally published messages
|
// we also never prune locally published messages
|
||||||
for actor := range mp.localAddrs {
|
mp.forEachLocal(ctx, func(ctx context.Context, actor address.Address) {
|
||||||
protected[actor] = struct{}{}
|
protected[actor] = struct{}{}
|
||||||
}
|
})
|
||||||
|
|
||||||
// Collect all messages to track which ones to remove and create chains for block inclusion
|
// Collect all messages to track which ones to remove and create chains for block inclusion
|
||||||
pruneMsgs := make(map[cid.Cid]*types.SignedMessage, mp.currentSize)
|
pruneMsgs := make(map[cid.Cid]*types.SignedMessage, mp.currentSize)
|
||||||
@ -108,7 +113,7 @@ keepLoop:
|
|||||||
// and remove all messages that are still in pruneMsgs after processing the chains
|
// and remove all messages that are still in pruneMsgs after processing the chains
|
||||||
log.Infof("Pruning %d messages", len(pruneMsgs))
|
log.Infof("Pruning %d messages", len(pruneMsgs))
|
||||||
for _, m := range pruneMsgs {
|
for _, m := range pruneMsgs {
|
||||||
mp.remove(m.Message.From, m.Message.Nonce, false)
|
mp.remove(ctx, m.Message.From, m.Message.Nonce, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -18,7 +18,7 @@ const repubMsgLimit = 30
|
|||||||
|
|
||||||
var RepublishBatchDelay = 100 * time.Millisecond
|
var RepublishBatchDelay = 100 * time.Millisecond
|
||||||
|
|
||||||
func (mp *MessagePool) republishPendingMessages() error {
|
func (mp *MessagePool) republishPendingMessages(ctx context.Context) error {
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.Lock()
|
||||||
ts := mp.curTs
|
ts := mp.curTs
|
||||||
|
|
||||||
@ -32,13 +32,18 @@ func (mp *MessagePool) republishPendingMessages() error {
|
|||||||
pending := make(map[address.Address]map[uint64]*types.SignedMessage)
|
pending := make(map[address.Address]map[uint64]*types.SignedMessage)
|
||||||
mp.lk.Lock()
|
mp.lk.Lock()
|
||||||
mp.republished = nil // clear this to avoid races triggering an early republish
|
mp.republished = nil // clear this to avoid races triggering an early republish
|
||||||
for actor := range mp.localAddrs {
|
mp.forEachLocal(ctx, func(ctx context.Context, actor address.Address) {
|
||||||
mset, ok := mp.pending[actor]
|
mset, ok, err := mp.getPendingMset(ctx, actor)
|
||||||
|
if err != nil {
|
||||||
|
log.Debugf("failed to get mset: %w", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
return
|
||||||
}
|
}
|
||||||
if len(mset.msgs) == 0 {
|
if len(mset.msgs) == 0 {
|
||||||
continue
|
return
|
||||||
}
|
}
|
||||||
// we need to copy this while holding the lock to avoid races with concurrent modification
|
// we need to copy this while holding the lock to avoid races with concurrent modification
|
||||||
pend := make(map[uint64]*types.SignedMessage, len(mset.msgs))
|
pend := make(map[uint64]*types.SignedMessage, len(mset.msgs))
|
||||||
@ -46,7 +51,8 @@ func (mp *MessagePool) republishPendingMessages() error {
|
|||||||
pend[nonce] = m
|
pend[nonce] = m
|
||||||
}
|
}
|
||||||
pending[actor] = pend
|
pending[actor] = pend
|
||||||
}
|
})
|
||||||
|
|
||||||
mp.lk.Unlock()
|
mp.lk.Unlock()
|
||||||
mp.curTsLk.Unlock()
|
mp.curTsLk.Unlock()
|
||||||
|
|
||||||
|
@ -56,7 +56,7 @@ func TestRepubMessages(t *testing.T) {
|
|||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
||||||
_, err := mp.Push(m)
|
_, err := mp.Push(context.TODO(), m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -40,7 +40,7 @@ type msgChain struct {
|
|||||||
prev *msgChain
|
prev *msgChain
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) SelectMessages(ts *types.TipSet, tq float64) (msgs []*types.SignedMessage, err error) {
|
func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq float64) (msgs []*types.SignedMessage, err error) {
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.Lock()
|
||||||
defer mp.curTsLk.Unlock()
|
defer mp.curTsLk.Unlock()
|
||||||
|
|
||||||
@ -51,9 +51,9 @@ func (mp *MessagePool) SelectMessages(ts *types.TipSet, tq float64) (msgs []*typ
|
|||||||
// than any other block, then we don't bother with optimal selection because the
|
// than any other block, then we don't bother with optimal selection because the
|
||||||
// first block will always have higher effective performance
|
// first block will always have higher effective performance
|
||||||
if tq > 0.84 {
|
if tq > 0.84 {
|
||||||
msgs, err = mp.selectMessagesGreedy(mp.curTs, ts)
|
msgs, err = mp.selectMessagesGreedy(ctx, mp.curTs, ts)
|
||||||
} else {
|
} else {
|
||||||
msgs, err = mp.selectMessagesOptimal(mp.curTs, ts, tq)
|
msgs, err = mp.selectMessagesOptimal(ctx, mp.curTs, ts, tq)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -67,7 +67,7 @@ func (mp *MessagePool) SelectMessages(ts *types.TipSet, tq float64) (msgs []*typ
|
|||||||
return msgs, nil
|
return msgs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) selectMessagesOptimal(curTs, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) {
|
func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
|
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
|
||||||
@ -93,7 +93,7 @@ func (mp *MessagePool) selectMessagesOptimal(curTs, ts *types.TipSet, tq float64
|
|||||||
|
|
||||||
// 0b. Select all priority messages that fit in the block
|
// 0b. Select all priority messages that fit in the block
|
||||||
minGas := int64(gasguess.MinGas)
|
minGas := int64(gasguess.MinGas)
|
||||||
result, gasLimit := mp.selectPriorityMessages(pending, baseFee, ts)
|
result, gasLimit := mp.selectPriorityMessages(ctx, pending, baseFee, ts)
|
||||||
|
|
||||||
// have we filled the block?
|
// have we filled the block?
|
||||||
if gasLimit < minGas {
|
if gasLimit < minGas {
|
||||||
@ -391,7 +391,7 @@ tailLoop:
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) selectMessagesGreedy(curTs, ts *types.TipSet) ([]*types.SignedMessage, error) {
|
func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *types.TipSet) ([]*types.SignedMessage, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
|
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
|
||||||
@ -417,7 +417,7 @@ func (mp *MessagePool) selectMessagesGreedy(curTs, ts *types.TipSet) ([]*types.S
|
|||||||
|
|
||||||
// 0b. Select all priority messages that fit in the block
|
// 0b. Select all priority messages that fit in the block
|
||||||
minGas := int64(gasguess.MinGas)
|
minGas := int64(gasguess.MinGas)
|
||||||
result, gasLimit := mp.selectPriorityMessages(pending, baseFee, ts)
|
result, gasLimit := mp.selectPriorityMessages(ctx, pending, baseFee, ts)
|
||||||
|
|
||||||
// have we filled the block?
|
// have we filled the block?
|
||||||
if gasLimit < minGas {
|
if gasLimit < minGas {
|
||||||
@ -527,7 +527,7 @@ tailLoop:
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) selectPriorityMessages(pending map[address.Address]map[uint64]*types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) ([]*types.SignedMessage, int64) {
|
func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[address.Address]map[uint64]*types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) ([]*types.SignedMessage, int64) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
if dt := time.Since(start); dt > time.Millisecond {
|
if dt := time.Since(start); dt > time.Millisecond {
|
||||||
@ -543,10 +543,16 @@ func (mp *MessagePool) selectPriorityMessages(pending map[address.Address]map[ui
|
|||||||
var chains []*msgChain
|
var chains []*msgChain
|
||||||
priority := mpCfg.PriorityAddrs
|
priority := mpCfg.PriorityAddrs
|
||||||
for _, actor := range priority {
|
for _, actor := range priority {
|
||||||
mset, ok := pending[actor]
|
pk, err := mp.resolveToKey(ctx, actor)
|
||||||
|
if err != nil {
|
||||||
|
log.Debugf("mpooladdlocal failed to resolve sender: %s", err)
|
||||||
|
return nil, gasLimit
|
||||||
|
}
|
||||||
|
|
||||||
|
mset, ok := pending[pk]
|
||||||
if ok {
|
if ok {
|
||||||
// remove actor from pending set as we are already processed these messages
|
// remove actor from pending set as we are already processed these messages
|
||||||
delete(pending, actor)
|
delete(pending, pk)
|
||||||
// create chains for the priority actor
|
// create chains for the priority actor
|
||||||
next := mp.createMessageChains(actor, mset, baseFee, ts)
|
next := mp.createMessageChains(actor, mset, baseFee, ts)
|
||||||
chains = append(chains, next...)
|
chains = append(chains, next...)
|
||||||
@ -648,8 +654,7 @@ func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address.
|
|||||||
inSync = true
|
inSync = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// first add our current pending messages
|
mp.forEachPending(func(a address.Address, mset *msgSet) {
|
||||||
for a, mset := range mp.pending {
|
|
||||||
if inSync {
|
if inSync {
|
||||||
// no need to copy the map
|
// no need to copy the map
|
||||||
result[a] = mset.msgs
|
result[a] = mset.msgs
|
||||||
@ -662,7 +667,7 @@ func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address.
|
|||||||
result[a] = msetCopy
|
result[a] = msetCopy
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
})
|
||||||
|
|
||||||
// we are in sync, that's the happy path
|
// we are in sync, that's the happy path
|
||||||
if inSync {
|
if inSync {
|
||||||
|
@ -427,7 +427,7 @@ func TestBasicMessageSelection(t *testing.T) {
|
|||||||
mustAdd(t, mp, m)
|
mustAdd(t, mp, m)
|
||||||
}
|
}
|
||||||
|
|
||||||
msgs, err := mp.SelectMessages(ts, 1.0)
|
msgs, err := mp.SelectMessages(context.Background(), ts, 1.0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -464,7 +464,7 @@ func TestBasicMessageSelection(t *testing.T) {
|
|||||||
tma.applyBlock(t, block2)
|
tma.applyBlock(t, block2)
|
||||||
|
|
||||||
// we should have no pending messages in the mpool
|
// we should have no pending messages in the mpool
|
||||||
pend, _ := mp.Pending()
|
pend, _ := mp.Pending(context.TODO())
|
||||||
if len(pend) != 0 {
|
if len(pend) != 0 {
|
||||||
t.Fatalf("expected no pending messages, but got %d", len(pend))
|
t.Fatalf("expected no pending messages, but got %d", len(pend))
|
||||||
}
|
}
|
||||||
@ -495,7 +495,7 @@ func TestBasicMessageSelection(t *testing.T) {
|
|||||||
tma.setStateNonce(a1, 10)
|
tma.setStateNonce(a1, 10)
|
||||||
tma.setStateNonce(a2, 10)
|
tma.setStateNonce(a2, 10)
|
||||||
|
|
||||||
msgs, err = mp.SelectMessages(ts3, 1.0)
|
msgs, err = mp.SelectMessages(context.Background(), ts3, 1.0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -569,7 +569,7 @@ func TestMessageSelectionTrimming(t *testing.T) {
|
|||||||
mustAdd(t, mp, m)
|
mustAdd(t, mp, m)
|
||||||
}
|
}
|
||||||
|
|
||||||
msgs, err := mp.SelectMessages(ts, 1.0)
|
msgs, err := mp.SelectMessages(context.Background(), ts, 1.0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -633,7 +633,7 @@ func TestPriorityMessageSelection(t *testing.T) {
|
|||||||
mustAdd(t, mp, m)
|
mustAdd(t, mp, m)
|
||||||
}
|
}
|
||||||
|
|
||||||
msgs, err := mp.SelectMessages(ts, 1.0)
|
msgs, err := mp.SelectMessages(context.Background(), ts, 1.0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -712,7 +712,7 @@ func TestPriorityMessageSelection2(t *testing.T) {
|
|||||||
mustAdd(t, mp, m)
|
mustAdd(t, mp, m)
|
||||||
}
|
}
|
||||||
|
|
||||||
msgs, err := mp.SelectMessages(ts, 1.0)
|
msgs, err := mp.SelectMessages(context.Background(), ts, 1.0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -782,7 +782,7 @@ func TestPriorityMessageSelection3(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// test greedy selection
|
// test greedy selection
|
||||||
msgs, err := mp.SelectMessages(ts, 1.0)
|
msgs, err := mp.SelectMessages(context.Background(), ts, 1.0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -805,7 +805,7 @@ func TestPriorityMessageSelection3(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// test optimal selection
|
// test optimal selection
|
||||||
msgs, err = mp.SelectMessages(ts, 0.1)
|
msgs, err = mp.SelectMessages(context.Background(), ts, 0.1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -872,7 +872,7 @@ func TestOptimalMessageSelection1(t *testing.T) {
|
|||||||
mustAdd(t, mp, m)
|
mustAdd(t, mp, m)
|
||||||
}
|
}
|
||||||
|
|
||||||
msgs, err := mp.SelectMessages(ts, 0.25)
|
msgs, err := mp.SelectMessages(context.Background(), ts, 0.25)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -941,7 +941,7 @@ func TestOptimalMessageSelection2(t *testing.T) {
|
|||||||
mustAdd(t, mp, m)
|
mustAdd(t, mp, m)
|
||||||
}
|
}
|
||||||
|
|
||||||
msgs, err := mp.SelectMessages(ts, 0.1)
|
msgs, err := mp.SelectMessages(context.Background(), ts, 0.1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -1020,7 +1020,7 @@ func TestOptimalMessageSelection3(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
msgs, err := mp.SelectMessages(ts, 0.1)
|
msgs, err := mp.SelectMessages(context.Background(), ts, 0.1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -1108,7 +1108,7 @@ func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand, getPremium fu
|
|||||||
logging.SetLogLevel("messagepool", "error")
|
logging.SetLogLevel("messagepool", "error")
|
||||||
|
|
||||||
// 1. greedy selection
|
// 1. greedy selection
|
||||||
greedyMsgs, err := mp.selectMessagesGreedy(ts, ts)
|
greedyMsgs, err := mp.selectMessagesGreedy(context.Background(), ts, ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -1137,7 +1137,7 @@ func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand, getPremium fu
|
|||||||
var bestMsgs []*types.SignedMessage
|
var bestMsgs []*types.SignedMessage
|
||||||
for j := 0; j < nMiners; j++ {
|
for j := 0; j < nMiners; j++ {
|
||||||
tq := rng.Float64()
|
tq := rng.Float64()
|
||||||
msgs, err := mp.SelectMessages(ts, tq)
|
msgs, err := mp.SelectMessages(context.Background(), ts, tq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -1396,7 +1396,7 @@ readLoop:
|
|||||||
minGasLimit := int64(0.9 * float64(build.BlockGasLimit))
|
minGasLimit := int64(0.9 * float64(build.BlockGasLimit))
|
||||||
|
|
||||||
// greedy first
|
// greedy first
|
||||||
selected, err := mp.SelectMessages(ts, 1.0)
|
selected, err := mp.SelectMessages(context.Background(), ts, 1.0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -1410,7 +1410,7 @@ readLoop:
|
|||||||
}
|
}
|
||||||
|
|
||||||
// high quality ticket
|
// high quality ticket
|
||||||
selected, err = mp.SelectMessages(ts, .8)
|
selected, err = mp.SelectMessages(context.Background(), ts, .8)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -1424,7 +1424,7 @@ readLoop:
|
|||||||
}
|
}
|
||||||
|
|
||||||
// mid quality ticket
|
// mid quality ticket
|
||||||
selected, err = mp.SelectMessages(ts, .4)
|
selected, err = mp.SelectMessages(context.Background(), ts, .4)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -1438,7 +1438,7 @@ readLoop:
|
|||||||
}
|
}
|
||||||
|
|
||||||
// low quality ticket
|
// low quality ticket
|
||||||
selected, err = mp.SelectMessages(ts, .1)
|
selected, err = mp.SelectMessages(context.Background(), ts, .1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -1452,7 +1452,7 @@ readLoop:
|
|||||||
}
|
}
|
||||||
|
|
||||||
// very low quality ticket
|
// very low quality ticket
|
||||||
selected, err = mp.SelectMessages(ts, .01)
|
selected, err = mp.SelectMessages(context.Background(), ts, .01)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -23,7 +23,7 @@ const dsKeyActorNonce = "ActorNextNonce"
|
|||||||
var log = logging.Logger("messagesigner")
|
var log = logging.Logger("messagesigner")
|
||||||
|
|
||||||
type MpoolNonceAPI interface {
|
type MpoolNonceAPI interface {
|
||||||
GetNonce(address.Address) (uint64, error)
|
GetNonce(context.Context, address.Address) (uint64, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MessageSigner keeps track of nonces per address, and increments the nonce
|
// MessageSigner keeps track of nonces per address, and increments the nonce
|
||||||
@ -51,7 +51,7 @@ func (ms *MessageSigner) SignMessage(ctx context.Context, msg *types.Message, cb
|
|||||||
defer ms.lk.Unlock()
|
defer ms.lk.Unlock()
|
||||||
|
|
||||||
// Get the next message nonce
|
// Get the next message nonce
|
||||||
nonce, err := ms.nextNonce(msg.From)
|
nonce, err := ms.nextNonce(ctx, msg.From)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("failed to create nonce: %w", err)
|
return nil, xerrors.Errorf("failed to create nonce: %w", err)
|
||||||
}
|
}
|
||||||
@ -92,12 +92,12 @@ func (ms *MessageSigner) SignMessage(ctx context.Context, msg *types.Message, cb
|
|||||||
|
|
||||||
// nextNonce gets the next nonce for the given address.
|
// nextNonce gets the next nonce for the given address.
|
||||||
// If there is no nonce in the datastore, gets the nonce from the message pool.
|
// If there is no nonce in the datastore, gets the nonce from the message pool.
|
||||||
func (ms *MessageSigner) nextNonce(addr address.Address) (uint64, error) {
|
func (ms *MessageSigner) nextNonce(ctx context.Context, addr address.Address) (uint64, error) {
|
||||||
// Nonces used to be created by the mempool and we need to support nodes
|
// Nonces used to be created by the mempool and we need to support nodes
|
||||||
// that have mempool nonces, so first check the mempool for a nonce for
|
// that have mempool nonces, so first check the mempool for a nonce for
|
||||||
// this address. Note that the mempool returns the actor state's nonce
|
// this address. Note that the mempool returns the actor state's nonce
|
||||||
// by default.
|
// by default.
|
||||||
nonce, err := ms.mpool.GetNonce(addr)
|
nonce, err := ms.mpool.GetNonce(ctx, addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, xerrors.Errorf("failed to get nonce from mempool: %w", err)
|
return 0, xerrors.Errorf("failed to get nonce from mempool: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -24,6 +24,8 @@ type mockMpool struct {
|
|||||||
nonces map[address.Address]uint64
|
nonces map[address.Address]uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var _ MpoolNonceAPI = (*mockMpool)(nil)
|
||||||
|
|
||||||
func newMockMpool() *mockMpool {
|
func newMockMpool() *mockMpool {
|
||||||
return &mockMpool{nonces: make(map[address.Address]uint64)}
|
return &mockMpool{nonces: make(map[address.Address]uint64)}
|
||||||
}
|
}
|
||||||
@ -35,7 +37,7 @@ func (mp *mockMpool) setNonce(addr address.Address, nonce uint64) {
|
|||||||
mp.nonces[addr] = nonce
|
mp.nonces[addr] = nonce
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *mockMpool) GetNonce(addr address.Address) (uint64, error) {
|
func (mp *mockMpool) GetNonce(ctx context.Context, addr address.Address) (uint64, error) {
|
||||||
mp.lk.RLock()
|
mp.lk.RLock()
|
||||||
defer mp.lk.RUnlock()
|
defer mp.lk.RUnlock()
|
||||||
|
|
||||||
|
@ -6,6 +6,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/chain/actors/policy"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
cbor "github.com/ipfs/go-ipld-cbor"
|
cbor "github.com/ipfs/go-ipld-cbor"
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
@ -87,6 +89,7 @@ type StateManager struct {
|
|||||||
expensiveUpgrades map[abi.ChainEpoch]struct{}
|
expensiveUpgrades map[abi.ChainEpoch]struct{}
|
||||||
|
|
||||||
stCache map[string][]cid.Cid
|
stCache map[string][]cid.Cid
|
||||||
|
tCache treeCache
|
||||||
compWait map[string]chan struct{}
|
compWait map[string]chan struct{}
|
||||||
stlk sync.Mutex
|
stlk sync.Mutex
|
||||||
genesisMsigLk sync.Mutex
|
genesisMsigLk sync.Mutex
|
||||||
@ -99,6 +102,12 @@ type StateManager struct {
|
|||||||
genesisMarketFunds abi.TokenAmount
|
genesisMarketFunds abi.TokenAmount
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Caches a single state tree
|
||||||
|
type treeCache struct {
|
||||||
|
root cid.Cid
|
||||||
|
tree *state.StateTree
|
||||||
|
}
|
||||||
|
|
||||||
func NewStateManager(cs *store.ChainStore) *StateManager {
|
func NewStateManager(cs *store.ChainStore) *StateManager {
|
||||||
sm, err := NewStateManagerWithUpgradeSchedule(cs, DefaultUpgradeSchedule())
|
sm, err := NewStateManagerWithUpgradeSchedule(cs, DefaultUpgradeSchedule())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -151,6 +160,10 @@ func NewStateManagerWithUpgradeSchedule(cs *store.ChainStore, us UpgradeSchedule
|
|||||||
newVM: vm.NewVM,
|
newVM: vm.NewVM,
|
||||||
cs: cs,
|
cs: cs,
|
||||||
stCache: make(map[string][]cid.Cid),
|
stCache: make(map[string][]cid.Cid),
|
||||||
|
tCache: treeCache{
|
||||||
|
root: cid.Undef,
|
||||||
|
tree: nil,
|
||||||
|
},
|
||||||
compWait: make(map[string]chan struct{}),
|
compWait: make(map[string]chan struct{}),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
@ -542,6 +555,52 @@ func (sm *StateManager) ResolveToKeyAddress(ctx context.Context, addr address.Ad
|
|||||||
return vm.ResolveToKeyAddr(tree, cst, addr)
|
return vm.ResolveToKeyAddr(tree, cst, addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ResolveToKeyAddressAtFinality is similar to stmgr.ResolveToKeyAddress but fails if the ID address being resolved isn't reorg-stable yet.
|
||||||
|
// It should not be used for consensus-critical subsystems.
|
||||||
|
func (sm *StateManager) ResolveToKeyAddressAtFinality(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) {
|
||||||
|
switch addr.Protocol() {
|
||||||
|
case address.BLS, address.SECP256K1:
|
||||||
|
return addr, nil
|
||||||
|
case address.Actor:
|
||||||
|
return address.Undef, xerrors.New("cannot resolve actor address to key address")
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
if ts == nil {
|
||||||
|
ts = sm.cs.GetHeaviestTipSet()
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
if ts.Height() > policy.ChainFinality {
|
||||||
|
ts, err = sm.ChainStore().GetTipsetByHeight(ctx, ts.Height()-policy.ChainFinality, ts, true)
|
||||||
|
if err != nil {
|
||||||
|
return address.Undef, xerrors.Errorf("failed to load lookback tipset: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cst := cbor.NewCborStore(sm.cs.StateBlockstore())
|
||||||
|
tree := sm.tCache.tree
|
||||||
|
|
||||||
|
if tree == nil || sm.tCache.root != ts.ParentState() {
|
||||||
|
tree, err = state.LoadStateTree(cst, ts.ParentState())
|
||||||
|
if err != nil {
|
||||||
|
return address.Undef, xerrors.Errorf("failed to load parent state tree: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
sm.tCache = treeCache{
|
||||||
|
root: ts.ParentState(),
|
||||||
|
tree: tree,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
resolved, err := vm.ResolveToKeyAddr(tree, cst, addr)
|
||||||
|
if err == nil {
|
||||||
|
return resolved, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return address.Undef, xerrors.New("ID address not found in lookback state")
|
||||||
|
}
|
||||||
|
|
||||||
func (sm *StateManager) GetBlsPublicKey(ctx context.Context, addr address.Address, ts *types.TipSet) (pubk []byte, err error) {
|
func (sm *StateManager) GetBlsPublicKey(ctx context.Context, addr address.Address, ts *types.TipSet) (pubk []byte, err error) {
|
||||||
kaddr, err := sm.ResolveToKeyAddress(ctx, addr, ts)
|
kaddr, err := sm.ResolveToKeyAddress(ctx, addr, ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -12,6 +12,8 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/chain/state"
|
||||||
|
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-state-types/crypto"
|
"github.com/filecoin-project/go-state-types/crypto"
|
||||||
@ -1008,17 +1010,33 @@ type BlockMessages struct {
|
|||||||
func (cs *ChainStore) BlockMsgsForTipset(ts *types.TipSet) ([]BlockMessages, error) {
|
func (cs *ChainStore) BlockMsgsForTipset(ts *types.TipSet) ([]BlockMessages, error) {
|
||||||
applied := make(map[address.Address]uint64)
|
applied := make(map[address.Address]uint64)
|
||||||
|
|
||||||
selectMsg := func(m *types.Message) (bool, error) {
|
cst := cbor.NewCborStore(cs.stateBlockstore)
|
||||||
// The first match for a sender is guaranteed to have correct nonce -- the block isn't valid otherwise
|
st, err := state.LoadStateTree(cst, ts.Blocks()[0].ParentStateRoot)
|
||||||
if _, ok := applied[m.From]; !ok {
|
if err != nil {
|
||||||
applied[m.From] = m.Nonce
|
return nil, xerrors.Errorf("failed to load state tree")
|
||||||
}
|
}
|
||||||
|
|
||||||
if applied[m.From] != m.Nonce {
|
selectMsg := func(m *types.Message) (bool, error) {
|
||||||
|
var sender address.Address
|
||||||
|
if ts.Height() >= build.UpgradeHyperdriveHeight {
|
||||||
|
sender, err = st.LookupID(m.From)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
sender = m.From
|
||||||
|
}
|
||||||
|
|
||||||
|
// The first match for a sender is guaranteed to have correct nonce -- the block isn't valid otherwise
|
||||||
|
if _, ok := applied[sender]; !ok {
|
||||||
|
applied[sender] = m.Nonce
|
||||||
|
}
|
||||||
|
|
||||||
|
if applied[sender] != m.Nonce {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
applied[m.From]++
|
applied[sender]++
|
||||||
|
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
@ -516,7 +516,7 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs
|
|||||||
return pubsub.ValidationReject
|
return pubsub.ValidationReject
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := mv.mpool.Add(m); err != nil {
|
if err := mv.mpool.Add(ctx, m); err != nil {
|
||||||
log.Debugf("failed to add message from network to message pool (From: %s, To: %s, Nonce: %d, Value: %s): %s", m.Message.From, m.Message.To, m.Message.Nonce, types.FIL(m.Message.Value), err)
|
log.Debugf("failed to add message from network to message pool (From: %s, To: %s, Nonce: %d, Value: %s): %s", m.Message.From, m.Message.To, m.Message.Nonce, types.FIL(m.Message.Value), err)
|
||||||
ctx, _ = tag.New(
|
ctx, _ = tag.New(
|
||||||
ctx,
|
ctx,
|
||||||
|
@ -1084,9 +1084,19 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock
|
|||||||
|
|
||||||
// Phase 2: (Partial) semantic validation:
|
// Phase 2: (Partial) semantic validation:
|
||||||
// the sender exists and is an account actor, and the nonces make sense
|
// the sender exists and is an account actor, and the nonces make sense
|
||||||
if _, ok := nonces[m.From]; !ok {
|
var sender address.Address
|
||||||
|
if syncer.sm.GetNtwkVersion(ctx, b.Header.Height) >= network.Version13 {
|
||||||
|
sender, err = st.LookupID(m.From)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
sender = m.From
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := nonces[sender]; !ok {
|
||||||
// `GetActor` does not validate that this is an account actor.
|
// `GetActor` does not validate that this is an account actor.
|
||||||
act, err := st.GetActor(m.From)
|
act, err := st.GetActor(sender)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("failed to get actor: %w", err)
|
return xerrors.Errorf("failed to get actor: %w", err)
|
||||||
}
|
}
|
||||||
@ -1094,13 +1104,13 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock
|
|||||||
if !builtin.IsAccountActor(act.Code) {
|
if !builtin.IsAccountActor(act.Code) {
|
||||||
return xerrors.New("Sender must be an account actor")
|
return xerrors.New("Sender must be an account actor")
|
||||||
}
|
}
|
||||||
nonces[m.From] = act.Nonce
|
nonces[sender] = act.Nonce
|
||||||
}
|
}
|
||||||
|
|
||||||
if nonces[m.From] != m.Nonce {
|
if nonces[sender] != m.Nonce {
|
||||||
return xerrors.Errorf("wrong nonce (exp: %d, got: %d)", nonces[m.From], m.Nonce)
|
return xerrors.Errorf("wrong nonce (exp: %d, got: %d)", nonces[sender], m.Nonce)
|
||||||
}
|
}
|
||||||
nonces[m.From]++
|
nonces[sender]++
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -10,7 +10,6 @@ import (
|
|||||||
"github.com/filecoin-project/go-state-types/crypto"
|
"github.com/filecoin-project/go-state-types/crypto"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-state-types/network"
|
"github.com/filecoin-project/go-state-types/network"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
@ -108,6 +107,7 @@ func prepSyncTest(t testing.TB, h int) *syncTestUtil {
|
|||||||
}
|
}
|
||||||
|
|
||||||
tu.addSourceNode(stmgr.DefaultUpgradeSchedule(), h)
|
tu.addSourceNode(stmgr.DefaultUpgradeSchedule(), h)
|
||||||
|
|
||||||
//tu.checkHeight("source", source, h)
|
//tu.checkHeight("source", source, h)
|
||||||
|
|
||||||
// separate logs
|
// separate logs
|
||||||
@ -730,7 +730,6 @@ func TestBadNonce(t *testing.T) {
|
|||||||
|
|
||||||
// Produce a message from the banker with a bad nonce
|
// Produce a message from the banker with a bad nonce
|
||||||
makeBadMsg := func() *types.SignedMessage {
|
makeBadMsg := func() *types.SignedMessage {
|
||||||
|
|
||||||
msg := types.Message{
|
msg := types.Message{
|
||||||
To: tu.g.Banker(),
|
To: tu.g.Banker(),
|
||||||
From: tu.g.Banker(),
|
From: tu.g.Banker(),
|
||||||
@ -761,6 +760,114 @@ func TestBadNonce(t *testing.T) {
|
|||||||
tu.mineOnBlock(base, 0, []int{0}, true, true, msgs, 0)
|
tu.mineOnBlock(base, 0, []int{0}, true, true, msgs, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This test introduces a block that has 2 messages, with the same sender, and same nonce.
|
||||||
|
// One of the messages uses the sender's robust address, the other uses the ID address.
|
||||||
|
// Such a block is invalid and should not sync.
|
||||||
|
func TestMismatchedNoncesRobustID(t *testing.T) {
|
||||||
|
v5h := abi.ChainEpoch(4)
|
||||||
|
tu := prepSyncTestWithV5Height(t, int(v5h+5), v5h)
|
||||||
|
|
||||||
|
base := tu.g.CurTipset
|
||||||
|
|
||||||
|
// Get the banker from computed tipset state, not the parent.
|
||||||
|
st, _, err := tu.g.StateManager().TipSetState(context.TODO(), base.TipSet())
|
||||||
|
require.NoError(t, err)
|
||||||
|
ba, err := tu.g.StateManager().LoadActorRaw(context.TODO(), tu.g.Banker(), st)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Produce a message from the banker
|
||||||
|
makeMsg := func(id bool) *types.SignedMessage {
|
||||||
|
sender := tu.g.Banker()
|
||||||
|
if id {
|
||||||
|
s, err := tu.nds[0].StateLookupID(context.TODO(), sender, base.TipSet().Key())
|
||||||
|
require.NoError(t, err)
|
||||||
|
sender = s
|
||||||
|
}
|
||||||
|
|
||||||
|
msg := types.Message{
|
||||||
|
To: tu.g.Banker(),
|
||||||
|
From: sender,
|
||||||
|
|
||||||
|
Nonce: ba.Nonce,
|
||||||
|
|
||||||
|
Value: types.NewInt(1),
|
||||||
|
|
||||||
|
Method: 0,
|
||||||
|
|
||||||
|
GasLimit: 100_000_000,
|
||||||
|
GasFeeCap: types.NewInt(0),
|
||||||
|
GasPremium: types.NewInt(0),
|
||||||
|
}
|
||||||
|
|
||||||
|
sig, err := tu.g.Wallet().WalletSign(context.TODO(), tu.g.Banker(), msg.Cid().Bytes(), api.MsgMeta{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
return &types.SignedMessage{
|
||||||
|
Message: msg,
|
||||||
|
Signature: *sig,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs := make([][]*types.SignedMessage, 1)
|
||||||
|
msgs[0] = []*types.SignedMessage{makeMsg(false), makeMsg(true)}
|
||||||
|
|
||||||
|
tu.mineOnBlock(base, 0, []int{0}, true, true, msgs, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// This test introduces a block that has 2 messages, with the same sender, and nonces N and N+1 (so both can be included in a block)
|
||||||
|
// One of the messages uses the sender's robust address, the other uses the ID address.
|
||||||
|
// Such a block is valid and should sync.
|
||||||
|
func TestMatchedNoncesRobustID(t *testing.T) {
|
||||||
|
v5h := abi.ChainEpoch(4)
|
||||||
|
tu := prepSyncTestWithV5Height(t, int(v5h+5), v5h)
|
||||||
|
|
||||||
|
base := tu.g.CurTipset
|
||||||
|
|
||||||
|
// Get the banker from computed tipset state, not the parent.
|
||||||
|
st, _, err := tu.g.StateManager().TipSetState(context.TODO(), base.TipSet())
|
||||||
|
require.NoError(t, err)
|
||||||
|
ba, err := tu.g.StateManager().LoadActorRaw(context.TODO(), tu.g.Banker(), st)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Produce a message from the banker with specified nonce
|
||||||
|
makeMsg := func(n uint64, id bool) *types.SignedMessage {
|
||||||
|
sender := tu.g.Banker()
|
||||||
|
if id {
|
||||||
|
s, err := tu.nds[0].StateLookupID(context.TODO(), sender, base.TipSet().Key())
|
||||||
|
require.NoError(t, err)
|
||||||
|
sender = s
|
||||||
|
}
|
||||||
|
|
||||||
|
msg := types.Message{
|
||||||
|
To: tu.g.Banker(),
|
||||||
|
From: sender,
|
||||||
|
|
||||||
|
Nonce: n,
|
||||||
|
|
||||||
|
Value: types.NewInt(1),
|
||||||
|
|
||||||
|
Method: 0,
|
||||||
|
|
||||||
|
GasLimit: 100_000_000,
|
||||||
|
GasFeeCap: types.NewInt(0),
|
||||||
|
GasPremium: types.NewInt(0),
|
||||||
|
}
|
||||||
|
|
||||||
|
sig, err := tu.g.Wallet().WalletSign(context.TODO(), tu.g.Banker(), msg.Cid().Bytes(), api.MsgMeta{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
return &types.SignedMessage{
|
||||||
|
Message: msg,
|
||||||
|
Signature: *sig,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs := make([][]*types.SignedMessage, 1)
|
||||||
|
msgs[0] = []*types.SignedMessage{makeMsg(ba.Nonce, false), makeMsg(ba.Nonce+1, true)}
|
||||||
|
|
||||||
|
tu.mineOnBlock(base, 0, []int{0}, true, false, msgs, 0)
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkSyncBasic(b *testing.B) {
|
func BenchmarkSyncBasic(b *testing.B) {
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
runSyncBenchLength(b, 100)
|
runSyncBenchLength(b, 100)
|
||||||
|
@ -12,6 +12,6 @@
|
|||||||
- [ ] Update `chain/stmgr/forks.go`
|
- [ ] Update `chain/stmgr/forks.go`
|
||||||
- [ ] Schedule
|
- [ ] Schedule
|
||||||
- [ ] Migration
|
- [ ] Migration
|
||||||
- [ ] Update upgrade schedule in `api/test/test.go`
|
- [ ] Update upgrade schedule in `api/test/test.go` and `chain/sync_test.go`
|
||||||
- [ ] Update `NewestNetworkVersion` in `build/params_shared_vals.go`
|
- [ ] Update `NewestNetworkVersion` in `build/params_shared_vals.go`
|
||||||
- [ ] Register in init in `chain/stmgr/utils.go`
|
- [ ] Register in init in `chain/stmgr/utils.go`
|
||||||
|
@ -265,7 +265,7 @@ func gasEstimateGasLimit(
|
|||||||
return -1, xerrors.Errorf("getting key address: %w", err)
|
return -1, xerrors.Errorf("getting key address: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
pending, ts := mpool.PendingFor(fromA)
|
pending, ts := mpool.PendingFor(ctx, fromA)
|
||||||
priorMsgs := make([]types.ChainMsg, 0, len(pending))
|
priorMsgs := make([]types.ChainMsg, 0, len(pending))
|
||||||
for _, m := range pending {
|
for _, m := range pending {
|
||||||
if m.Message.Nonce == msg.Nonce {
|
if m.Message.Nonce == msg.Nonce {
|
||||||
|
@ -58,7 +58,7 @@ func (a *MpoolAPI) MpoolSelect(ctx context.Context, tsk types.TipSetKey, ticketQ
|
|||||||
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
|
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return a.Mpool.SelectMessages(ts, ticketQuality)
|
return a.Mpool.SelectMessages(ctx, ts, ticketQuality)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *MpoolAPI) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) {
|
func (a *MpoolAPI) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) {
|
||||||
@ -66,7 +66,7 @@ func (a *MpoolAPI) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*ty
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
|
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
|
||||||
}
|
}
|
||||||
pending, mpts := a.Mpool.Pending()
|
pending, mpts := a.Mpool.Pending(ctx)
|
||||||
|
|
||||||
haveCids := map[cid.Cid]struct{}{}
|
haveCids := map[cid.Cid]struct{}{}
|
||||||
for _, m := range pending {
|
for _, m := range pending {
|
||||||
@ -120,16 +120,16 @@ func (a *MpoolAPI) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*ty
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *MpoolAPI) MpoolClear(ctx context.Context, local bool) error {
|
func (a *MpoolAPI) MpoolClear(ctx context.Context, local bool) error {
|
||||||
a.Mpool.Clear(local)
|
a.Mpool.Clear(ctx, local)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MpoolModule) MpoolPush(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) {
|
func (m *MpoolModule) MpoolPush(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) {
|
||||||
return m.Mpool.Push(smsg)
|
return m.Mpool.Push(ctx, smsg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *MpoolAPI) MpoolPushUntrusted(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) {
|
func (a *MpoolAPI) MpoolPushUntrusted(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) {
|
||||||
return a.Mpool.PushUntrusted(smsg)
|
return a.Mpool.PushUntrusted(ctx, smsg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) {
|
func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) {
|
||||||
@ -190,7 +190,7 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spe
|
|||||||
func (a *MpoolAPI) MpoolBatchPush(ctx context.Context, smsgs []*types.SignedMessage) ([]cid.Cid, error) {
|
func (a *MpoolAPI) MpoolBatchPush(ctx context.Context, smsgs []*types.SignedMessage) ([]cid.Cid, error) {
|
||||||
var messageCids []cid.Cid
|
var messageCids []cid.Cid
|
||||||
for _, smsg := range smsgs {
|
for _, smsg := range smsgs {
|
||||||
smsgCid, err := a.Mpool.Push(smsg)
|
smsgCid, err := a.Mpool.Push(ctx, smsg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return messageCids, err
|
return messageCids, err
|
||||||
}
|
}
|
||||||
@ -202,7 +202,7 @@ func (a *MpoolAPI) MpoolBatchPush(ctx context.Context, smsgs []*types.SignedMess
|
|||||||
func (a *MpoolAPI) MpoolBatchPushUntrusted(ctx context.Context, smsgs []*types.SignedMessage) ([]cid.Cid, error) {
|
func (a *MpoolAPI) MpoolBatchPushUntrusted(ctx context.Context, smsgs []*types.SignedMessage) ([]cid.Cid, error) {
|
||||||
var messageCids []cid.Cid
|
var messageCids []cid.Cid
|
||||||
for _, smsg := range smsgs {
|
for _, smsg := range smsgs {
|
||||||
smsgCid, err := a.Mpool.PushUntrusted(smsg)
|
smsgCid, err := a.Mpool.PushUntrusted(ctx, smsg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return messageCids, err
|
return messageCids, err
|
||||||
}
|
}
|
||||||
@ -224,7 +224,7 @@ func (a *MpoolAPI) MpoolBatchPushMessage(ctx context.Context, msgs []*types.Mess
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *MpoolAPI) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) {
|
func (a *MpoolAPI) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) {
|
||||||
return a.Mpool.GetNonce(addr)
|
return a.Mpool.GetNonce(ctx, addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *MpoolAPI) MpoolSub(ctx context.Context) (<-chan api.MpoolUpdate, error) {
|
func (a *MpoolAPI) MpoolSub(ctx context.Context) (<-chan api.MpoolUpdate, error) {
|
||||||
|
@ -23,18 +23,18 @@ type MpoolNonceAPI struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetNonce gets the nonce from current chain head.
|
// GetNonce gets the nonce from current chain head.
|
||||||
func (a *MpoolNonceAPI) GetNonce(addr address.Address) (uint64, error) {
|
func (a *MpoolNonceAPI) GetNonce(ctx context.Context, addr address.Address) (uint64, error) {
|
||||||
ts := a.StateAPI.Chain.GetHeaviestTipSet()
|
ts := a.StateAPI.Chain.GetHeaviestTipSet()
|
||||||
|
|
||||||
// make sure we have a key address so we can compare with messages
|
// make sure we have a key address so we can compare with messages
|
||||||
keyAddr, err := a.StateAPI.StateManager.ResolveToKeyAddress(context.TODO(), addr, ts)
|
keyAddr, err := a.StateAPI.StateManager.ResolveToKeyAddress(ctx, addr, ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load the last nonce from the state, if it exists.
|
// Load the last nonce from the state, if it exists.
|
||||||
highestNonce := uint64(0)
|
highestNonce := uint64(0)
|
||||||
if baseActor, err := a.StateAPI.StateManager.LoadActorRaw(context.TODO(), addr, ts.ParentState()); err != nil {
|
if baseActor, err := a.StateAPI.StateManager.LoadActorRaw(ctx, addr, ts.ParentState()); err != nil {
|
||||||
if !xerrors.Is(err, types.ErrActorNotFound) {
|
if !xerrors.Is(err, types.ErrActorNotFound) {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user