diff --git a/chain/messagepool/check.go b/chain/messagepool/check.go index a1097e7d1..07a278f6d 100644 --- a/chain/messagepool/check.go +++ b/chain/messagepool/check.go @@ -33,8 +33,13 @@ func (mp *MessagePool) CheckMessages(ctx context.Context, protos []*api.MessageP func (mp *MessagePool) CheckPendingMessages(ctx context.Context, from address.Address) ([][]api.MessageCheckStatus, error) { var msgs []*types.Message mp.lk.RLock() - mset, ok := mp.pending[from] + mset, ok, err := mp.getPendingMset(ctx, from) + if err != nil { + log.Warnf("errored while getting pending mset: %w", err) + return nil, err + } if ok { + msgs = make([]*types.Message, 0, len(mset.msgs)) for _, sm := range mset.msgs { msgs = append(msgs, &sm.Message) } @@ -64,7 +69,11 @@ func (mp *MessagePool) CheckReplaceMessages(ctx context.Context, replace []*type if !ok { mmap = make(map[uint64]*types.Message) msgMap[m.From] = mmap - mset, ok := mp.pending[m.From] + mset, ok, err := mp.getPendingMset(ctx, m.From) + if err != nil { + log.Warnf("errored while getting pending mset: %w", err) + return nil, err + } if ok { count += len(mset.msgs) for _, sm := range mset.msgs { @@ -144,7 +153,11 @@ func (mp *MessagePool) checkMessages(ctx context.Context, msgs []*types.Message, st, ok := state[m.From] if !ok { mp.lk.RLock() - mset, ok := mp.pending[m.From] + mset, ok, err := mp.getPendingMset(ctx, m.From) + if err != nil { + log.Warnf("errored while getting pending mset: %w", err) + return nil, err + } if ok && !interned { st = &actorState{nextNonce: mset.nextNonce, requiredFunds: mset.requiredFunds} for _, m := range mset.msgs { diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 6c3e776c0..4dcb6eb9b 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -474,6 +474,15 @@ func (mp *MessagePool) TryForEachPendingMessage(f func(cid.Cid) error) error { } func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) (address.Address, error) { + //if addr is not an ID addr, then it is already resolved to a key + if addr.Protocol() != address.ID { + return addr, nil + } + return mp.resolveToKeyFromID(ctx, addr) +} + +func (mp *MessagePool) resolveToKeyFromID(ctx context.Context, addr address.Address) (address.Address, error) { + // check the cache a, ok := mp.keyCache.Get(addr) if ok { @@ -488,8 +497,6 @@ func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) ( // place both entries in the cache (may both be key addresses, which is fine) mp.keyCache.Add(addr, ka) - mp.keyCache.Add(ka, ka) - return ka, nil } diff --git a/chain/messagepool/repub.go b/chain/messagepool/repub.go index 704676439..a87d5e08a 100644 --- a/chain/messagepool/repub.go +++ b/chain/messagepool/repub.go @@ -22,18 +22,21 @@ var RepublishBatchDelay = 100 * time.Millisecond func (mp *MessagePool) republishPendingMessages(ctx context.Context) error { mp.curTsLk.RLock() ts := mp.curTs + mp.curTsLk.RUnlock() baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts) - mp.curTsLk.RUnlock() if err != nil { return xerrors.Errorf("computing basefee: %w", err) } baseFeeLowerBound := getBaseFeeLowerBound(baseFee, baseFeeLowerBoundFactor) pending := make(map[address.Address]map[uint64]*types.SignedMessage) - mp.curTsLk.Lock() + mp.lk.Lock() mp.republished = nil // clear this to avoid races triggering an early republish + mp.lk.Unlock() + + mp.lk.RLock() mp.forEachLocal(ctx, func(ctx context.Context, actor address.Address) { mset, ok, err := mp.getPendingMset(ctx, actor) if err != nil { @@ -54,9 +57,7 @@ func (mp *MessagePool) republishPendingMessages(ctx context.Context) error { } pending[actor] = pend }) - - mp.lk.Unlock() - mp.curTsLk.Unlock() + mp.lk.RUnlock() if len(pending) == 0 { return nil @@ -177,8 +178,8 @@ loop: republished[m.Cid()] = struct{}{} } - mp.lk.Lock() // update the republished set so that we can trigger early republish from head changes + mp.lk.Lock() mp.republished = republished mp.lk.Unlock() diff --git a/chain/messagepool/selection.go b/chain/messagepool/selection.go index e42cc7812..163bd76f9 100644 --- a/chain/messagepool/selection.go +++ b/chain/messagepool/selection.go @@ -40,12 +40,11 @@ type msgChain struct { } func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) { - mp.curTsLk.Lock() - defer mp.curTsLk.Unlock() + mp.curTsLk.RLock() + defer mp.curTsLk.RUnlock() - //TODO confirm if we can switch to RLock here for performance - mp.lk.Lock() - defer mp.lk.Unlock() + mp.lk.RLock() + defer mp.lk.RUnlock() // See if we need to prune before selection; excessive buildup can lead to slow selection, // so prune if we have too many messages (ignoring the cooldown).