mpool: Make MpoolPending more atomic
This commit is contained in:
parent
9c6e9212a2
commit
96c04fc0a6
@ -472,9 +472,9 @@ func (epp *eppProvider) ComputeProof(ctx context.Context, _ sectorbuilder.Sorted
|
|||||||
|
|
||||||
type ProofInput struct {
|
type ProofInput struct {
|
||||||
sectors sectorbuilder.SortedPublicSectorInfo
|
sectors sectorbuilder.SortedPublicSectorInfo
|
||||||
hvrf []byte
|
hvrf []byte
|
||||||
winners []sectorbuilder.EPostCandidate
|
winners []sectorbuilder.EPostCandidate
|
||||||
vrfout []byte
|
vrfout []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func IsRoundWinner(ctx context.Context, ts *types.TipSet, round int64, miner address.Address, epp ElectionPoStProver, a MiningCheckAPI) (bool, *ProofInput, error) {
|
func IsRoundWinner(ctx context.Context, ts *types.TipSet, round int64, miner address.Address, epp ElectionPoStProver, a MiningCheckAPI) (bool, *ProofInput, error) {
|
||||||
|
@ -62,7 +62,7 @@ type MessagePool struct {
|
|||||||
pending map[address.Address]*msgSet
|
pending map[address.Address]*msgSet
|
||||||
pendingCount int
|
pendingCount int
|
||||||
|
|
||||||
curTsLk sync.Mutex
|
curTsLk sync.Mutex // DO NOT LOCK INSIDE lk
|
||||||
curTs *types.TipSet
|
curTs *types.TipSet
|
||||||
|
|
||||||
api Provider
|
api Provider
|
||||||
@ -106,7 +106,7 @@ func (ms *msgSet) add(m *types.SignedMessage) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Provider interface {
|
type Provider interface {
|
||||||
SubscribeHeadChanges(func(rev, app []*types.TipSet) error)
|
SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet
|
||||||
PutMessage(m store.ChainMsg) (cid.Cid, error)
|
PutMessage(m store.ChainMsg) (cid.Cid, error)
|
||||||
PubSubPublish(string, []byte) error
|
PubSubPublish(string, []byte) error
|
||||||
StateGetActor(address.Address, *types.TipSet) (*types.Actor, error)
|
StateGetActor(address.Address, *types.TipSet) (*types.Actor, error)
|
||||||
@ -124,8 +124,9 @@ func NewProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) Provider {
|
|||||||
return &mpoolProvider{sm, ps}
|
return &mpoolProvider{sm, ps}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) {
|
func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet {
|
||||||
mpp.sm.ChainStore().SubscribeHeadChanges(cb)
|
mpp.sm.ChainStore().SubscribeHeadChanges(cb)
|
||||||
|
return mpp.sm.ChainStore().GetHeaviestTipSet()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mpp *mpoolProvider) PutMessage(m store.ChainMsg) (cid.Cid, error) {
|
func (mpp *mpoolProvider) PutMessage(m store.ChainMsg) (cid.Cid, error) {
|
||||||
@ -173,7 +174,7 @@ func New(api Provider, ds dtypes.MetadataDS) (*MessagePool, error) {
|
|||||||
|
|
||||||
go mp.repubLocal()
|
go mp.repubLocal()
|
||||||
|
|
||||||
api.SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
|
mp.curTs = api.SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
|
||||||
err := mp.HeadChange(rev, app)
|
err := mp.HeadChange(rev, app)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("mpool head notif handler error: %+v", err)
|
log.Errorf("mpool head notif handler error: %+v", err)
|
||||||
@ -257,6 +258,12 @@ func (mp *MessagePool) Push(m *types.SignedMessage) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) Add(m *types.SignedMessage) error {
|
func (mp *MessagePool) Add(m *types.SignedMessage) error {
|
||||||
|
mp.curTsLk.Lock()
|
||||||
|
defer mp.curTsLk.Unlock()
|
||||||
|
return mp.addTs(m, mp.curTs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet) error {
|
||||||
// big messages are bad, anti DOS
|
// big messages are bad, anti DOS
|
||||||
if m.Size() > 32*1024 {
|
if m.Size() > 32*1024 {
|
||||||
return xerrors.Errorf("mpool message too large (%dB): %w", m.Size(), ErrMessageTooBig)
|
return xerrors.Errorf("mpool message too large (%dB): %w", m.Size(), ErrMessageTooBig)
|
||||||
@ -275,7 +282,7 @@ func (mp *MessagePool) Add(m *types.SignedMessage) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
snonce, err := mp.getStateNonce(m.Message.From)
|
snonce, err := mp.getStateNonce(m.Message.From, curTs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("failed to look up actor state nonce: %w", err)
|
return xerrors.Errorf("failed to look up actor state nonce: %w", err)
|
||||||
}
|
}
|
||||||
@ -333,14 +340,17 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) GetNonce(addr address.Address) (uint64, error) {
|
func (mp *MessagePool) GetNonce(addr address.Address) (uint64, error) {
|
||||||
|
mp.curTsLk.Lock()
|
||||||
|
defer mp.curTsLk.Lock()
|
||||||
|
|
||||||
mp.lk.Lock()
|
mp.lk.Lock()
|
||||||
defer mp.lk.Unlock()
|
defer mp.lk.Unlock()
|
||||||
|
|
||||||
return mp.getNonceLocked(addr)
|
return mp.getNonceLocked(addr, mp.curTs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) {
|
func (mp *MessagePool) getNonceLocked(addr address.Address, curTs *types.TipSet) (uint64, error) {
|
||||||
stateNonce, err := mp.getStateNonce(addr) // sanity check
|
stateNonce, err := mp.getStateNonce(addr, curTs) // sanity check
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
@ -359,22 +369,9 @@ func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) {
|
|||||||
return stateNonce, nil
|
return stateNonce, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) setCurTipset(ts *types.TipSet) {
|
func (mp *MessagePool) getStateNonce(addr address.Address, curTs *types.TipSet) (uint64, error) {
|
||||||
mp.curTsLk.Lock()
|
|
||||||
defer mp.curTsLk.Unlock()
|
|
||||||
mp.curTs = ts
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mp *MessagePool) getCurTipset() *types.TipSet {
|
|
||||||
mp.curTsLk.Lock()
|
|
||||||
defer mp.curTsLk.Unlock()
|
|
||||||
return mp.curTs
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mp *MessagePool) getStateNonce(addr address.Address) (uint64, error) {
|
|
||||||
// TODO: this method probably should be cached
|
// TODO: this method probably should be cached
|
||||||
|
|
||||||
curTs := mp.getCurTipset()
|
|
||||||
act, err := mp.api.StateGetActor(addr, curTs)
|
act, err := mp.api.StateGetActor(addr, curTs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
@ -417,13 +414,16 @@ func (mp *MessagePool) getStateBalance(addr address.Address) (types.BigInt, erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*types.SignedMessage, error)) (*types.SignedMessage, error) {
|
func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*types.SignedMessage, error)) (*types.SignedMessage, error) {
|
||||||
|
mp.curTsLk.Lock()
|
||||||
|
defer mp.curTsLk.Lock()
|
||||||
|
|
||||||
mp.lk.Lock()
|
mp.lk.Lock()
|
||||||
defer mp.lk.Unlock()
|
defer mp.lk.Unlock()
|
||||||
if addr.Protocol() == address.ID {
|
if addr.Protocol() == address.ID {
|
||||||
log.Warnf("Called pushWithNonce with ID address (%s) this might not be handled properly yet", addr)
|
log.Warnf("Called pushWithNonce with ID address (%s) this might not be handled properly yet", addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
nonce, err := mp.getNonceLocked(addr)
|
nonce, err := mp.getNonceLocked(addr, mp.curTs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -485,15 +485,19 @@ func (mp *MessagePool) Remove(from address.Address, nonce uint64) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) Pending() []*types.SignedMessage {
|
func (mp *MessagePool) Pending() ([]*types.SignedMessage, *types.TipSet) {
|
||||||
|
mp.curTsLk.Lock()
|
||||||
|
defer mp.curTsLk.Unlock()
|
||||||
|
|
||||||
mp.lk.Lock()
|
mp.lk.Lock()
|
||||||
defer mp.lk.Unlock()
|
defer mp.lk.Unlock()
|
||||||
|
|
||||||
out := make([]*types.SignedMessage, 0)
|
out := make([]*types.SignedMessage, 0)
|
||||||
for a := range mp.pending {
|
for a := range mp.pending {
|
||||||
out = append(out, mp.pendingFor(a)...)
|
out = append(out, mp.pendingFor(a)...)
|
||||||
}
|
}
|
||||||
|
|
||||||
return out
|
return out, mp.curTs
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) pendingFor(a address.Address) []*types.SignedMessage {
|
func (mp *MessagePool) pendingFor(a address.Address) []*types.SignedMessage {
|
||||||
@ -516,6 +520,8 @@ func (mp *MessagePool) pendingFor(a address.Address) []*types.SignedMessage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) error {
|
func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) error {
|
||||||
|
mp.curTsLk.Lock()
|
||||||
|
defer mp.curTsLk.Unlock()
|
||||||
|
|
||||||
for _, ts := range revert {
|
for _, ts := range revert {
|
||||||
pts, err := mp.api.LoadTipSet(ts.Parents())
|
pts, err := mp.api.LoadTipSet(ts.Parents())
|
||||||
@ -523,27 +529,14 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
mp.setCurTipset(pts)
|
msgs, err := mp.MessagesForBlocks(ts.Blocks())
|
||||||
for _, b := range ts.Blocks() {
|
if err != nil {
|
||||||
bmsgs, smsgs, err := mp.api.MessagesForBlock(b)
|
return err
|
||||||
if err != nil {
|
}
|
||||||
return xerrors.Errorf("failed to get messages for revert block %s(height %d): %w", b.Cid(), b.Height, err)
|
|
||||||
}
|
|
||||||
for _, msg := range smsgs {
|
|
||||||
if err := mp.Add(msg); err != nil {
|
|
||||||
log.Error(err) // TODO: probably lots of spam in multi-block tsets
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, msg := range bmsgs {
|
for _, msg := range msgs {
|
||||||
smsg := mp.RecoverSig(msg)
|
if err := mp.addTs(msg, pts); err != nil {
|
||||||
if smsg != nil {
|
log.Error(err) // TODO: probably lots of spam in multi-block tsets
|
||||||
if err := mp.Add(smsg); err != nil {
|
|
||||||
log.Error(err) // TODO: probably lots of spam in multi-block tsets
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.Warnf("could not recover signature for bls message %s during a reorg revert", msg.Cid())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -562,12 +555,38 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
|
|||||||
mp.Remove(msg.From, msg.Nonce)
|
mp.Remove(msg.From, msg.Nonce)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
mp.setCurTipset(ts)
|
|
||||||
|
mp.curTs = ts
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mp *MessagePool) MessagesForBlocks(blks []*types.BlockHeader) ([]*types.SignedMessage, error) {
|
||||||
|
out := make([]*types.SignedMessage, 0)
|
||||||
|
|
||||||
|
for _, b := range blks {
|
||||||
|
bmsgs, smsgs, err := mp.api.MessagesForBlock(b)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err)
|
||||||
|
}
|
||||||
|
for _, msg := range smsgs {
|
||||||
|
out = append(out, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, msg := range bmsgs {
|
||||||
|
smsg := mp.RecoverSig(msg)
|
||||||
|
if smsg != nil {
|
||||||
|
out = append(out, smsg)
|
||||||
|
} else {
|
||||||
|
log.Warnf("could not recover signature for bls message %s", msg.Cid())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) RecoverSig(msg *types.Message) *types.SignedMessage {
|
func (mp *MessagePool) RecoverSig(msg *types.Message) *types.SignedMessage {
|
||||||
val, ok := mp.blsSigCache.Get(msg.Cid())
|
val, ok := mp.blsSigCache.Get(msg.Cid())
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -3,12 +3,14 @@ package full
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/chain/address"
|
"github.com/filecoin-project/lotus/chain/address"
|
||||||
"github.com/filecoin-project/lotus/chain/messagepool"
|
"github.com/filecoin-project/lotus/chain/messagepool"
|
||||||
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -17,13 +19,63 @@ type MpoolAPI struct {
|
|||||||
|
|
||||||
WalletAPI
|
WalletAPI
|
||||||
|
|
||||||
|
Chain *store.ChainStore
|
||||||
|
|
||||||
Mpool *messagepool.MessagePool
|
Mpool *messagepool.MessagePool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *MpoolAPI) MpoolPending(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error) {
|
func (a *MpoolAPI) MpoolPending(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error) {
|
||||||
// TODO: need to make sure we don't return messages that were already included in the referenced chain
|
pending, mpts := a.Mpool.Pending()
|
||||||
// also need to accept ts == nil just fine, assume nil == chain.Head()
|
|
||||||
return a.Mpool.Pending(), nil
|
haveCids := map[cid.Cid]struct{}{}
|
||||||
|
for _, m := range pending {
|
||||||
|
haveCids[m.Cid()] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
if mpts.Height() > ts.Height() {
|
||||||
|
return pending, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
if mpts.Height() == ts.Height() {
|
||||||
|
if mpts.Equals(ts) {
|
||||||
|
return pending, nil
|
||||||
|
}
|
||||||
|
// different blocks in tipsets
|
||||||
|
|
||||||
|
have, err := a.Mpool.MessagesForBlocks(ts.Blocks())
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("getting messages for base ts: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, m := range have {
|
||||||
|
haveCids[m.Cid()] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs, err := a.Mpool.MessagesForBlocks(ts.Blocks())
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf(": %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, m := range msgs {
|
||||||
|
if _, ok := haveCids[m.Cid()]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
haveCids[m.Cid()] = struct{}{}
|
||||||
|
pending = append(pending, m)
|
||||||
|
}
|
||||||
|
|
||||||
|
if mpts.Height() >= ts.Height() {
|
||||||
|
return pending, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
ts, err = a.Chain.LoadTipSet(ts.Parents())
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("loading parent tipset: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *MpoolAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage) error {
|
func (a *MpoolAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage) error {
|
||||||
|
Loading…
Reference in New Issue
Block a user