diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 12e8f24c2..405e22320 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -347,7 +347,7 @@ func (ms *msgSet) toSlice() []*types.SignedMessage { return set } -func New(ctx context.Context, api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, j journal.Journal) (*MessagePool, error) { +func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, j journal.Journal) (*MessagePool, error) { cache, _ := lru.New2Q(build.BlsSignatureCacheSize) verifcache, _ := lru.New2Q(build.VerifSigCacheSize) @@ -390,6 +390,8 @@ func New(ctx context.Context, api Provider, ds dtypes.MetadataDS, netName dtypes // enable initial prunes mp.pruneCooldown <- struct{}{} + ctx, cancel := context.WithCancel(context.Background()) + // load the current tipset and subscribe to head changes _before_ loading local messages mp.curTs = api.SubscribeHeadChanges(func(rev, app []*types.TipSet) error { err := mp.HeadChange(ctx, rev, app) @@ -403,6 +405,7 @@ func New(ctx context.Context, api Provider, ds dtypes.MetadataDS, netName dtypes mp.lk.Lock() go func() { + defer cancel() err := mp.loadLocal(ctx) mp.lk.Unlock() @@ -414,7 +417,7 @@ func New(ctx context.Context, api Provider, ds dtypes.MetadataDS, netName dtypes log.Info("mpool ready") - mp.runLoop(context.Background()) + mp.runLoop(ctx) }() return mp, nil diff --git a/chain/messagepool/messagepool_test.go b/chain/messagepool/messagepool_test.go index aa3331c11..307ec20a0 100644 --- a/chain/messagepool/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -226,7 +226,7 @@ func TestMessagePool(t *testing.T) { ds := datastore.NewMapDatastore() - mp, err := New(context.TODO(), tma, ds, "mptest", nil) + mp, err := New(tma, ds, "mptest", nil) if err != nil { t.Fatal(err) } @@ -267,7 +267,7 @@ func TestMessagePoolMessagesInEachBlock(t *testing.T) { ds := datastore.NewMapDatastore() - mp, err := New(context.TODO(), tma, ds, "mptest", nil) + mp, err := New(tma, ds, "mptest", nil) if err != nil { t.Fatal(err) } @@ -295,7 +295,7 @@ func TestMessagePoolMessagesInEachBlock(t *testing.T) { _, _ = mp.Pending(context.TODO()) - selm, _ := mp.SelectMessages(tsa, 1) + selm, _ := mp.SelectMessages(context.Background(), tsa, 1) if len(selm) == 0 { t.Fatal("should have returned the rest of the messages") } @@ -316,7 +316,7 @@ func TestRevertMessages(t *testing.T) { ds := datastore.NewMapDatastore() - mp, err := New(context.TODO(), tma, ds, "mptest", nil) + mp, err := New(tma, ds, "mptest", nil) if err != nil { t.Fatal(err) } @@ -379,7 +379,7 @@ func TestPruningSimple(t *testing.T) { ds := datastore.NewMapDatastore() - mp, err := New(context.TODO(), tma, ds, "mptest", nil) + mp, err := New(tma, ds, "mptest", nil) if err != nil { t.Fatal(err) } @@ -423,7 +423,7 @@ func TestLoadLocal(t *testing.T) { tma := newTestMpoolAPI() ds := datastore.NewMapDatastore() - mp, err := New(context.TODO(), tma, ds, "mptest", nil) + mp, err := New(tma, ds, "mptest", nil) if err != nil { t.Fatal(err) } @@ -466,7 +466,7 @@ func TestLoadLocal(t *testing.T) { t.Fatal(err) } - mp, err = New(context.TODO(), tma, ds, "mptest", nil) + mp, err = New(tma, ds, "mptest", nil) if err != nil { t.Fatal(err) } @@ -495,7 +495,7 @@ func TestClearAll(t *testing.T) { tma := newTestMpoolAPI() ds := datastore.NewMapDatastore() - mp, err := New(context.TODO(), tma, ds, "mptest", nil) + mp, err := New(tma, ds, "mptest", nil) if err != nil { t.Fatal(err) } @@ -549,7 +549,7 @@ func TestClearNonLocal(t *testing.T) { tma := newTestMpoolAPI() ds := datastore.NewMapDatastore() - mp, err := New(context.TODO(), tma, ds, "mptest", nil) + mp, err := New(tma, ds, "mptest", nil) if err != nil { t.Fatal(err) } @@ -610,7 +610,7 @@ func TestUpdates(t *testing.T) { tma := newTestMpoolAPI() ds := datastore.NewMapDatastore() - mp, err := New(context.TODO(), tma, ds, "mptest", nil) + mp, err := New(tma, ds, "mptest", nil) if err != nil { t.Fatal(err) } diff --git a/chain/messagepool/pruning.go b/chain/messagepool/pruning.go index 6802e23f3..829258d25 100644 --- a/chain/messagepool/pruning.go +++ b/chain/messagepool/pruning.go @@ -57,7 +57,12 @@ func (mp *MessagePool) pruneMessages(ctx context.Context, ts *types.TipSet) erro mpCfg := mp.getConfig() // we never prune priority addresses for _, actor := range mpCfg.PriorityAddrs { - protected[actor] = struct{}{} + pk, err := mp.api.StateAccountKey(ctx, actor, mp.curTs) + if err != nil { + log.Debugf("pruneMessages failed to resolve priority address: %s", err) + } + + protected[pk] = struct{}{} } // we also never prune locally published messages diff --git a/chain/messagepool/repub_test.go b/chain/messagepool/repub_test.go index 70e457aaa..580231f7a 100644 --- a/chain/messagepool/repub_test.go +++ b/chain/messagepool/repub_test.go @@ -24,7 +24,7 @@ func TestRepubMessages(t *testing.T) { tma := newTestMpoolAPI() ds := datastore.NewMapDatastore() - mp, err := New(context.TODO(), tma, ds, "mptest", nil) + mp, err := New(tma, ds, "mptest", nil) if err != nil { t.Fatal(err) } diff --git a/chain/messagepool/selection.go b/chain/messagepool/selection.go index dfed2b6b5..9379cb892 100644 --- a/chain/messagepool/selection.go +++ b/chain/messagepool/selection.go @@ -40,7 +40,7 @@ type msgChain struct { prev *msgChain } -func (mp *MessagePool) SelectMessages(ts *types.TipSet, tq float64) (msgs []*types.SignedMessage, err error) { +func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq float64) (msgs []*types.SignedMessage, err error) { mp.curTsLk.Lock() defer mp.curTsLk.Unlock() @@ -51,9 +51,9 @@ func (mp *MessagePool) SelectMessages(ts *types.TipSet, tq float64) (msgs []*typ // than any other block, then we don't bother with optimal selection because the // first block will always have higher effective performance if tq > 0.84 { - msgs, err = mp.selectMessagesGreedy(mp.curTs, ts) + msgs, err = mp.selectMessagesGreedy(ctx, mp.curTs, ts) } else { - msgs, err = mp.selectMessagesOptimal(mp.curTs, ts, tq) + msgs, err = mp.selectMessagesOptimal(ctx, mp.curTs, ts, tq) } if err != nil { @@ -67,7 +67,7 @@ func (mp *MessagePool) SelectMessages(ts *types.TipSet, tq float64) (msgs []*typ return msgs, nil } -func (mp *MessagePool) selectMessagesOptimal(curTs, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) { +func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) { start := time.Now() baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts) @@ -93,7 +93,7 @@ func (mp *MessagePool) selectMessagesOptimal(curTs, ts *types.TipSet, tq float64 // 0b. Select all priority messages that fit in the block minGas := int64(gasguess.MinGas) - result, gasLimit := mp.selectPriorityMessages(pending, baseFee, ts) + result, gasLimit := mp.selectPriorityMessages(ctx, pending, baseFee, ts) // have we filled the block? if gasLimit < minGas { @@ -391,7 +391,7 @@ tailLoop: return result, nil } -func (mp *MessagePool) selectMessagesGreedy(curTs, ts *types.TipSet) ([]*types.SignedMessage, error) { +func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *types.TipSet) ([]*types.SignedMessage, error) { start := time.Now() baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts) @@ -417,7 +417,7 @@ func (mp *MessagePool) selectMessagesGreedy(curTs, ts *types.TipSet) ([]*types.S // 0b. Select all priority messages that fit in the block minGas := int64(gasguess.MinGas) - result, gasLimit := mp.selectPriorityMessages(pending, baseFee, ts) + result, gasLimit := mp.selectPriorityMessages(ctx, pending, baseFee, ts) // have we filled the block? if gasLimit < minGas { @@ -527,7 +527,7 @@ tailLoop: return result, nil } -func (mp *MessagePool) selectPriorityMessages(pending map[address.Address]map[uint64]*types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) ([]*types.SignedMessage, int64) { +func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[address.Address]map[uint64]*types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) ([]*types.SignedMessage, int64) { start := time.Now() defer func() { if dt := time.Since(start); dt > time.Millisecond { @@ -543,7 +543,7 @@ func (mp *MessagePool) selectPriorityMessages(pending map[address.Address]map[ui var chains []*msgChain priority := mpCfg.PriorityAddrs for _, actor := range priority { - pk, err := mp.api.StateAccountKey(context.TODO(), actor, mp.curTs) + pk, err := mp.api.StateAccountKey(ctx, actor, mp.curTs) if err != nil { log.Debugf("mpooladdlocal failed to resolve sender: %s", err) return nil, gasLimit diff --git a/chain/messagepool/selection_test.go b/chain/messagepool/selection_test.go index f254c6706..463473229 100644 --- a/chain/messagepool/selection_test.go +++ b/chain/messagepool/selection_test.go @@ -60,7 +60,7 @@ func makeTestMessage(w *wallet.LocalWallet, from, to address.Address, nonce uint func makeTestMpool() (*MessagePool, *testMpoolAPI) { tma := newTestMpoolAPI() ds := datastore.NewMapDatastore() - mp, err := New(context.TODO(), tma, ds, "test", nil) + mp, err := New(tma, ds, "test", nil) if err != nil { panic(err) } @@ -427,7 +427,7 @@ func TestBasicMessageSelection(t *testing.T) { mustAdd(t, mp, m) } - msgs, err := mp.SelectMessages(ts, 1.0) + msgs, err := mp.SelectMessages(context.Background(), ts, 1.0) if err != nil { t.Fatal(err) } @@ -495,7 +495,7 @@ func TestBasicMessageSelection(t *testing.T) { tma.setStateNonce(a1, 10) tma.setStateNonce(a2, 10) - msgs, err = mp.SelectMessages(ts3, 1.0) + msgs, err = mp.SelectMessages(context.Background(), ts3, 1.0) if err != nil { t.Fatal(err) } @@ -569,7 +569,7 @@ func TestMessageSelectionTrimming(t *testing.T) { mustAdd(t, mp, m) } - msgs, err := mp.SelectMessages(ts, 1.0) + msgs, err := mp.SelectMessages(context.Background(), ts, 1.0) if err != nil { t.Fatal(err) } @@ -633,7 +633,7 @@ func TestPriorityMessageSelection(t *testing.T) { mustAdd(t, mp, m) } - msgs, err := mp.SelectMessages(ts, 1.0) + msgs, err := mp.SelectMessages(context.Background(), ts, 1.0) if err != nil { t.Fatal(err) } @@ -712,7 +712,7 @@ func TestPriorityMessageSelection2(t *testing.T) { mustAdd(t, mp, m) } - msgs, err := mp.SelectMessages(ts, 1.0) + msgs, err := mp.SelectMessages(context.Background(), ts, 1.0) if err != nil { t.Fatal(err) } @@ -782,7 +782,7 @@ func TestPriorityMessageSelection3(t *testing.T) { } // test greedy selection - msgs, err := mp.SelectMessages(ts, 1.0) + msgs, err := mp.SelectMessages(context.Background(), ts, 1.0) if err != nil { t.Fatal(err) } @@ -805,7 +805,7 @@ func TestPriorityMessageSelection3(t *testing.T) { } // test optimal selection - msgs, err = mp.SelectMessages(ts, 0.1) + msgs, err = mp.SelectMessages(context.Background(), ts, 0.1) if err != nil { t.Fatal(err) } @@ -872,7 +872,7 @@ func TestOptimalMessageSelection1(t *testing.T) { mustAdd(t, mp, m) } - msgs, err := mp.SelectMessages(ts, 0.25) + msgs, err := mp.SelectMessages(context.Background(), ts, 0.25) if err != nil { t.Fatal(err) } @@ -941,7 +941,7 @@ func TestOptimalMessageSelection2(t *testing.T) { mustAdd(t, mp, m) } - msgs, err := mp.SelectMessages(ts, 0.1) + msgs, err := mp.SelectMessages(context.Background(), ts, 0.1) if err != nil { t.Fatal(err) } @@ -1020,7 +1020,7 @@ func TestOptimalMessageSelection3(t *testing.T) { } } - msgs, err := mp.SelectMessages(ts, 0.1) + msgs, err := mp.SelectMessages(context.Background(), ts, 0.1) if err != nil { t.Fatal(err) } @@ -1108,7 +1108,7 @@ func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand, getPremium fu logging.SetLogLevel("messagepool", "error") // 1. greedy selection - greedyMsgs, err := mp.selectMessagesGreedy(ts, ts) + greedyMsgs, err := mp.selectMessagesGreedy(context.Background(), ts, ts) if err != nil { t.Fatal(err) } @@ -1137,7 +1137,7 @@ func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand, getPremium fu var bestMsgs []*types.SignedMessage for j := 0; j < nMiners; j++ { tq := rng.Float64() - msgs, err := mp.SelectMessages(ts, tq) + msgs, err := mp.SelectMessages(context.Background(), ts, tq) if err != nil { t.Fatal(err) } @@ -1396,7 +1396,7 @@ readLoop: minGasLimit := int64(0.9 * float64(build.BlockGasLimit)) // greedy first - selected, err := mp.SelectMessages(ts, 1.0) + selected, err := mp.SelectMessages(context.Background(), ts, 1.0) if err != nil { t.Fatal(err) } @@ -1410,7 +1410,7 @@ readLoop: } // high quality ticket - selected, err = mp.SelectMessages(ts, .8) + selected, err = mp.SelectMessages(context.Background(), ts, .8) if err != nil { t.Fatal(err) } @@ -1424,7 +1424,7 @@ readLoop: } // mid quality ticket - selected, err = mp.SelectMessages(ts, .4) + selected, err = mp.SelectMessages(context.Background(), ts, .4) if err != nil { t.Fatal(err) } @@ -1438,7 +1438,7 @@ readLoop: } // low quality ticket - selected, err = mp.SelectMessages(ts, .1) + selected, err = mp.SelectMessages(context.Background(), ts, .1) if err != nil { t.Fatal(err) } @@ -1452,7 +1452,7 @@ readLoop: } // very low quality ticket - selected, err = mp.SelectMessages(ts, .01) + selected, err = mp.SelectMessages(context.Background(), ts, .01) if err != nil { t.Fatal(err) } diff --git a/node/impl/full/mpool.go b/node/impl/full/mpool.go index e91fc8b9e..d2552e1d5 100644 --- a/node/impl/full/mpool.go +++ b/node/impl/full/mpool.go @@ -58,7 +58,7 @@ func (a *MpoolAPI) MpoolSelect(ctx context.Context, tsk types.TipSetKey, ticketQ return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err) } - return a.Mpool.SelectMessages(ts, ticketQuality) + return a.Mpool.SelectMessages(ctx, ts, ticketQuality) } func (a *MpoolAPI) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) { diff --git a/node/modules/chain.go b/node/modules/chain.go index b0f0543c6..ffdf3aa3a 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -61,8 +61,7 @@ func ChainBlockService(bs dtypes.ExposedBlockstore, rem dtypes.ChainBitswap) dty func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal) (*messagepool.MessagePool, error) { mpp := messagepool.NewProvider(sm, ps) - // TODO: I still don't know how go works -- should this context be part of the builder? - mp, err := messagepool.New(context.TODO(), mpp, ds, nn, j) + mp, err := messagepool.New(mpp, ds, nn, j) if err != nil { return nil, xerrors.Errorf("constructing mpool: %w", err) } diff --git a/node/modules/mpoolnonceapi.go b/node/modules/mpoolnonceapi.go index 3d670611b..6d775f010 100644 --- a/node/modules/mpoolnonceapi.go +++ b/node/modules/mpoolnonceapi.go @@ -27,14 +27,14 @@ func (a *MpoolNonceAPI) GetNonce(ctx context.Context, addr address.Address) (uin ts := a.StateAPI.Chain.GetHeaviestTipSet() // make sure we have a key address so we can compare with messages - keyAddr, err := a.StateAPI.StateManager.ResolveToKeyAddress(context.TODO(), addr, ts) + keyAddr, err := a.StateAPI.StateManager.ResolveToKeyAddress(ctx, addr, ts) if err != nil { return 0, err } // Load the last nonce from the state, if it exists. highestNonce := uint64(0) - if baseActor, err := a.StateAPI.StateManager.LoadActorRaw(context.TODO(), addr, ts.ParentState()); err != nil { + if baseActor, err := a.StateAPI.StateManager.LoadActorRaw(ctx, addr, ts.ParentState()); err != nil { if !xerrors.Is(err, types.ErrActorNotFound) { return 0, err }