More tweaking of context handling in the messagepool

This commit is contained in:
Aayush Rajasekaran 2021-05-30 15:20:47 -04:00
parent 183c12db25
commit 9ceee6028b
9 changed files with 53 additions and 46 deletions

View File

@ -347,7 +347,7 @@ func (ms *msgSet) toSlice() []*types.SignedMessage {
return set
}
func New(ctx context.Context, api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, j journal.Journal) (*MessagePool, error) {
func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, j journal.Journal) (*MessagePool, error) {
cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
verifcache, _ := lru.New2Q(build.VerifSigCacheSize)
@ -390,6 +390,8 @@ func New(ctx context.Context, api Provider, ds dtypes.MetadataDS, netName dtypes
// enable initial prunes
mp.pruneCooldown <- struct{}{}
ctx, cancel := context.WithCancel(context.Background())
// load the current tipset and subscribe to head changes _before_ loading local messages
mp.curTs = api.SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
err := mp.HeadChange(ctx, rev, app)
@ -403,6 +405,7 @@ func New(ctx context.Context, api Provider, ds dtypes.MetadataDS, netName dtypes
mp.lk.Lock()
go func() {
defer cancel()
err := mp.loadLocal(ctx)
mp.lk.Unlock()
@ -414,7 +417,7 @@ func New(ctx context.Context, api Provider, ds dtypes.MetadataDS, netName dtypes
log.Info("mpool ready")
mp.runLoop(context.Background())
mp.runLoop(ctx)
}()
return mp, nil

View File

@ -226,7 +226,7 @@ func TestMessagePool(t *testing.T) {
ds := datastore.NewMapDatastore()
mp, err := New(context.TODO(), tma, ds, "mptest", nil)
mp, err := New(tma, ds, "mptest", nil)
if err != nil {
t.Fatal(err)
}
@ -267,7 +267,7 @@ func TestMessagePoolMessagesInEachBlock(t *testing.T) {
ds := datastore.NewMapDatastore()
mp, err := New(context.TODO(), tma, ds, "mptest", nil)
mp, err := New(tma, ds, "mptest", nil)
if err != nil {
t.Fatal(err)
}
@ -295,7 +295,7 @@ func TestMessagePoolMessagesInEachBlock(t *testing.T) {
_, _ = mp.Pending(context.TODO())
selm, _ := mp.SelectMessages(tsa, 1)
selm, _ := mp.SelectMessages(context.Background(), tsa, 1)
if len(selm) == 0 {
t.Fatal("should have returned the rest of the messages")
}
@ -316,7 +316,7 @@ func TestRevertMessages(t *testing.T) {
ds := datastore.NewMapDatastore()
mp, err := New(context.TODO(), tma, ds, "mptest", nil)
mp, err := New(tma, ds, "mptest", nil)
if err != nil {
t.Fatal(err)
}
@ -379,7 +379,7 @@ func TestPruningSimple(t *testing.T) {
ds := datastore.NewMapDatastore()
mp, err := New(context.TODO(), tma, ds, "mptest", nil)
mp, err := New(tma, ds, "mptest", nil)
if err != nil {
t.Fatal(err)
}
@ -423,7 +423,7 @@ func TestLoadLocal(t *testing.T) {
tma := newTestMpoolAPI()
ds := datastore.NewMapDatastore()
mp, err := New(context.TODO(), tma, ds, "mptest", nil)
mp, err := New(tma, ds, "mptest", nil)
if err != nil {
t.Fatal(err)
}
@ -466,7 +466,7 @@ func TestLoadLocal(t *testing.T) {
t.Fatal(err)
}
mp, err = New(context.TODO(), tma, ds, "mptest", nil)
mp, err = New(tma, ds, "mptest", nil)
if err != nil {
t.Fatal(err)
}
@ -495,7 +495,7 @@ func TestClearAll(t *testing.T) {
tma := newTestMpoolAPI()
ds := datastore.NewMapDatastore()
mp, err := New(context.TODO(), tma, ds, "mptest", nil)
mp, err := New(tma, ds, "mptest", nil)
if err != nil {
t.Fatal(err)
}
@ -549,7 +549,7 @@ func TestClearNonLocal(t *testing.T) {
tma := newTestMpoolAPI()
ds := datastore.NewMapDatastore()
mp, err := New(context.TODO(), tma, ds, "mptest", nil)
mp, err := New(tma, ds, "mptest", nil)
if err != nil {
t.Fatal(err)
}
@ -610,7 +610,7 @@ func TestUpdates(t *testing.T) {
tma := newTestMpoolAPI()
ds := datastore.NewMapDatastore()
mp, err := New(context.TODO(), tma, ds, "mptest", nil)
mp, err := New(tma, ds, "mptest", nil)
if err != nil {
t.Fatal(err)
}

View File

@ -57,7 +57,12 @@ func (mp *MessagePool) pruneMessages(ctx context.Context, ts *types.TipSet) erro
mpCfg := mp.getConfig()
// we never prune priority addresses
for _, actor := range mpCfg.PriorityAddrs {
protected[actor] = struct{}{}
pk, err := mp.api.StateAccountKey(ctx, actor, mp.curTs)
if err != nil {
log.Debugf("pruneMessages failed to resolve priority address: %s", err)
}
protected[pk] = struct{}{}
}
// we also never prune locally published messages

View File

@ -24,7 +24,7 @@ func TestRepubMessages(t *testing.T) {
tma := newTestMpoolAPI()
ds := datastore.NewMapDatastore()
mp, err := New(context.TODO(), tma, ds, "mptest", nil)
mp, err := New(tma, ds, "mptest", nil)
if err != nil {
t.Fatal(err)
}

View File

@ -40,7 +40,7 @@ type msgChain struct {
prev *msgChain
}
func (mp *MessagePool) SelectMessages(ts *types.TipSet, tq float64) (msgs []*types.SignedMessage, err error) {
func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq float64) (msgs []*types.SignedMessage, err error) {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
@ -51,9 +51,9 @@ func (mp *MessagePool) SelectMessages(ts *types.TipSet, tq float64) (msgs []*typ
// than any other block, then we don't bother with optimal selection because the
// first block will always have higher effective performance
if tq > 0.84 {
msgs, err = mp.selectMessagesGreedy(mp.curTs, ts)
msgs, err = mp.selectMessagesGreedy(ctx, mp.curTs, ts)
} else {
msgs, err = mp.selectMessagesOptimal(mp.curTs, ts, tq)
msgs, err = mp.selectMessagesOptimal(ctx, mp.curTs, ts, tq)
}
if err != nil {
@ -67,7 +67,7 @@ func (mp *MessagePool) SelectMessages(ts *types.TipSet, tq float64) (msgs []*typ
return msgs, nil
}
func (mp *MessagePool) selectMessagesOptimal(curTs, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) {
func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) {
start := time.Now()
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
@ -93,7 +93,7 @@ func (mp *MessagePool) selectMessagesOptimal(curTs, ts *types.TipSet, tq float64
// 0b. Select all priority messages that fit in the block
minGas := int64(gasguess.MinGas)
result, gasLimit := mp.selectPriorityMessages(pending, baseFee, ts)
result, gasLimit := mp.selectPriorityMessages(ctx, pending, baseFee, ts)
// have we filled the block?
if gasLimit < minGas {
@ -391,7 +391,7 @@ tailLoop:
return result, nil
}
func (mp *MessagePool) selectMessagesGreedy(curTs, ts *types.TipSet) ([]*types.SignedMessage, error) {
func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *types.TipSet) ([]*types.SignedMessage, error) {
start := time.Now()
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
@ -417,7 +417,7 @@ func (mp *MessagePool) selectMessagesGreedy(curTs, ts *types.TipSet) ([]*types.S
// 0b. Select all priority messages that fit in the block
minGas := int64(gasguess.MinGas)
result, gasLimit := mp.selectPriorityMessages(pending, baseFee, ts)
result, gasLimit := mp.selectPriorityMessages(ctx, pending, baseFee, ts)
// have we filled the block?
if gasLimit < minGas {
@ -527,7 +527,7 @@ tailLoop:
return result, nil
}
func (mp *MessagePool) selectPriorityMessages(pending map[address.Address]map[uint64]*types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) ([]*types.SignedMessage, int64) {
func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[address.Address]map[uint64]*types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) ([]*types.SignedMessage, int64) {
start := time.Now()
defer func() {
if dt := time.Since(start); dt > time.Millisecond {
@ -543,7 +543,7 @@ func (mp *MessagePool) selectPriorityMessages(pending map[address.Address]map[ui
var chains []*msgChain
priority := mpCfg.PriorityAddrs
for _, actor := range priority {
pk, err := mp.api.StateAccountKey(context.TODO(), actor, mp.curTs)
pk, err := mp.api.StateAccountKey(ctx, actor, mp.curTs)
if err != nil {
log.Debugf("mpooladdlocal failed to resolve sender: %s", err)
return nil, gasLimit

View File

@ -60,7 +60,7 @@ func makeTestMessage(w *wallet.LocalWallet, from, to address.Address, nonce uint
func makeTestMpool() (*MessagePool, *testMpoolAPI) {
tma := newTestMpoolAPI()
ds := datastore.NewMapDatastore()
mp, err := New(context.TODO(), tma, ds, "test", nil)
mp, err := New(tma, ds, "test", nil)
if err != nil {
panic(err)
}
@ -427,7 +427,7 @@ func TestBasicMessageSelection(t *testing.T) {
mustAdd(t, mp, m)
}
msgs, err := mp.SelectMessages(ts, 1.0)
msgs, err := mp.SelectMessages(context.Background(), ts, 1.0)
if err != nil {
t.Fatal(err)
}
@ -495,7 +495,7 @@ func TestBasicMessageSelection(t *testing.T) {
tma.setStateNonce(a1, 10)
tma.setStateNonce(a2, 10)
msgs, err = mp.SelectMessages(ts3, 1.0)
msgs, err = mp.SelectMessages(context.Background(), ts3, 1.0)
if err != nil {
t.Fatal(err)
}
@ -569,7 +569,7 @@ func TestMessageSelectionTrimming(t *testing.T) {
mustAdd(t, mp, m)
}
msgs, err := mp.SelectMessages(ts, 1.0)
msgs, err := mp.SelectMessages(context.Background(), ts, 1.0)
if err != nil {
t.Fatal(err)
}
@ -633,7 +633,7 @@ func TestPriorityMessageSelection(t *testing.T) {
mustAdd(t, mp, m)
}
msgs, err := mp.SelectMessages(ts, 1.0)
msgs, err := mp.SelectMessages(context.Background(), ts, 1.0)
if err != nil {
t.Fatal(err)
}
@ -712,7 +712,7 @@ func TestPriorityMessageSelection2(t *testing.T) {
mustAdd(t, mp, m)
}
msgs, err := mp.SelectMessages(ts, 1.0)
msgs, err := mp.SelectMessages(context.Background(), ts, 1.0)
if err != nil {
t.Fatal(err)
}
@ -782,7 +782,7 @@ func TestPriorityMessageSelection3(t *testing.T) {
}
// test greedy selection
msgs, err := mp.SelectMessages(ts, 1.0)
msgs, err := mp.SelectMessages(context.Background(), ts, 1.0)
if err != nil {
t.Fatal(err)
}
@ -805,7 +805,7 @@ func TestPriorityMessageSelection3(t *testing.T) {
}
// test optimal selection
msgs, err = mp.SelectMessages(ts, 0.1)
msgs, err = mp.SelectMessages(context.Background(), ts, 0.1)
if err != nil {
t.Fatal(err)
}
@ -872,7 +872,7 @@ func TestOptimalMessageSelection1(t *testing.T) {
mustAdd(t, mp, m)
}
msgs, err := mp.SelectMessages(ts, 0.25)
msgs, err := mp.SelectMessages(context.Background(), ts, 0.25)
if err != nil {
t.Fatal(err)
}
@ -941,7 +941,7 @@ func TestOptimalMessageSelection2(t *testing.T) {
mustAdd(t, mp, m)
}
msgs, err := mp.SelectMessages(ts, 0.1)
msgs, err := mp.SelectMessages(context.Background(), ts, 0.1)
if err != nil {
t.Fatal(err)
}
@ -1020,7 +1020,7 @@ func TestOptimalMessageSelection3(t *testing.T) {
}
}
msgs, err := mp.SelectMessages(ts, 0.1)
msgs, err := mp.SelectMessages(context.Background(), ts, 0.1)
if err != nil {
t.Fatal(err)
}
@ -1108,7 +1108,7 @@ func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand, getPremium fu
logging.SetLogLevel("messagepool", "error")
// 1. greedy selection
greedyMsgs, err := mp.selectMessagesGreedy(ts, ts)
greedyMsgs, err := mp.selectMessagesGreedy(context.Background(), ts, ts)
if err != nil {
t.Fatal(err)
}
@ -1137,7 +1137,7 @@ func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand, getPremium fu
var bestMsgs []*types.SignedMessage
for j := 0; j < nMiners; j++ {
tq := rng.Float64()
msgs, err := mp.SelectMessages(ts, tq)
msgs, err := mp.SelectMessages(context.Background(), ts, tq)
if err != nil {
t.Fatal(err)
}
@ -1396,7 +1396,7 @@ readLoop:
minGasLimit := int64(0.9 * float64(build.BlockGasLimit))
// greedy first
selected, err := mp.SelectMessages(ts, 1.0)
selected, err := mp.SelectMessages(context.Background(), ts, 1.0)
if err != nil {
t.Fatal(err)
}
@ -1410,7 +1410,7 @@ readLoop:
}
// high quality ticket
selected, err = mp.SelectMessages(ts, .8)
selected, err = mp.SelectMessages(context.Background(), ts, .8)
if err != nil {
t.Fatal(err)
}
@ -1424,7 +1424,7 @@ readLoop:
}
// mid quality ticket
selected, err = mp.SelectMessages(ts, .4)
selected, err = mp.SelectMessages(context.Background(), ts, .4)
if err != nil {
t.Fatal(err)
}
@ -1438,7 +1438,7 @@ readLoop:
}
// low quality ticket
selected, err = mp.SelectMessages(ts, .1)
selected, err = mp.SelectMessages(context.Background(), ts, .1)
if err != nil {
t.Fatal(err)
}
@ -1452,7 +1452,7 @@ readLoop:
}
// very low quality ticket
selected, err = mp.SelectMessages(ts, .01)
selected, err = mp.SelectMessages(context.Background(), ts, .01)
if err != nil {
t.Fatal(err)
}

View File

@ -58,7 +58,7 @@ func (a *MpoolAPI) MpoolSelect(ctx context.Context, tsk types.TipSetKey, ticketQ
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
}
return a.Mpool.SelectMessages(ts, ticketQuality)
return a.Mpool.SelectMessages(ctx, ts, ticketQuality)
}
func (a *MpoolAPI) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) {

View File

@ -61,8 +61,7 @@ func ChainBlockService(bs dtypes.ExposedBlockstore, rem dtypes.ChainBitswap) dty
func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal) (*messagepool.MessagePool, error) {
mpp := messagepool.NewProvider(sm, ps)
// TODO: I still don't know how go works -- should this context be part of the builder?
mp, err := messagepool.New(context.TODO(), mpp, ds, nn, j)
mp, err := messagepool.New(mpp, ds, nn, j)
if err != nil {
return nil, xerrors.Errorf("constructing mpool: %w", err)
}

View File

@ -27,14 +27,14 @@ func (a *MpoolNonceAPI) GetNonce(ctx context.Context, addr address.Address) (uin
ts := a.StateAPI.Chain.GetHeaviestTipSet()
// make sure we have a key address so we can compare with messages
keyAddr, err := a.StateAPI.StateManager.ResolveToKeyAddress(context.TODO(), addr, ts)
keyAddr, err := a.StateAPI.StateManager.ResolveToKeyAddress(ctx, addr, ts)
if err != nil {
return 0, err
}
// Load the last nonce from the state, if it exists.
highestNonce := uint64(0)
if baseActor, err := a.StateAPI.StateManager.LoadActorRaw(context.TODO(), addr, ts.ParentState()); err != nil {
if baseActor, err := a.StateAPI.StateManager.LoadActorRaw(ctx, addr, ts.ParentState()); err != nil {
if !xerrors.Is(err, types.ErrActorNotFound) {
return 0, err
}