feat(lotus-sim): make walk parallel
This commit is contained in:
parent
fbaffe86da
commit
1df5445ed2
@ -3,7 +3,9 @@ package simulation
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"runtime"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
@ -293,49 +295,144 @@ func (sim *Simulation) Walk(
|
||||
messages []*AppliedMessage) error,
|
||||
) error {
|
||||
store := sim.Chainstore.ActorStore(ctx)
|
||||
ts := sim.head
|
||||
stCid, recCid, err := sim.StateManager.TipSetState(ctx, ts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
minEpoch := abi.ChainEpoch(0)
|
||||
if maxLookback != 0 {
|
||||
minEpoch = ts.Height() - abi.ChainEpoch(maxLookback)
|
||||
minEpoch = sim.head.Height() - abi.ChainEpoch(maxLookback)
|
||||
}
|
||||
|
||||
for !ts.Equals(sim.start) && ctx.Err() == nil && ts.Height() > minEpoch {
|
||||
msgs, err := sim.Chainstore.MessagesForTipset(ts)
|
||||
// Given tha loading messages and receipts can be a little bit slow, we do this in parallel.
|
||||
//
|
||||
// 1. We spin up some number of workers.
|
||||
// 2. We hand tipsets to workers in round-robin order.
|
||||
// 3. We pull "resolved" tipsets in the same round-robin order.
|
||||
// 4. We serially call the callback in reverse-chain order.
|
||||
//
|
||||
// We have a buffer of size 1 for both resolved tipsets and unresolved tipsets. This should
|
||||
// ensure that we never block unecessarily.
|
||||
|
||||
type work struct {
|
||||
ts *types.TipSet
|
||||
stCid cid.Cid
|
||||
recCid cid.Cid
|
||||
}
|
||||
type result struct {
|
||||
ts *types.TipSet
|
||||
stCid cid.Cid
|
||||
messages []*AppliedMessage
|
||||
}
|
||||
|
||||
// This is more disk bound than CPU bound, but eh...
|
||||
workerCount := runtime.NumCPU() * 2
|
||||
|
||||
workQs := make([]chan *work, workerCount)
|
||||
resultQs := make([]chan *result, workerCount)
|
||||
|
||||
for i := range workQs {
|
||||
workQs[i] = make(chan *work, 1)
|
||||
}
|
||||
|
||||
for i := range resultQs {
|
||||
resultQs[i] = make(chan *result, 1)
|
||||
}
|
||||
|
||||
grp, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
// Walk the chain and fire off work items.
|
||||
grp.Go(func() error {
|
||||
ts := sim.head
|
||||
stCid, recCid, err := sim.StateManager.TipSetState(ctx, ts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
recs, err := blockadt.AsArray(store, recCid)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("amt load: %w", err)
|
||||
}
|
||||
applied := make([]*AppliedMessage, len(msgs))
|
||||
var rec types.MessageReceipt
|
||||
err = recs.ForEach(&rec, func(i int64) error {
|
||||
applied[i] = &AppliedMessage{
|
||||
Message: *msgs[i].VMMessage(),
|
||||
MessageReceipt: rec,
|
||||
i := 0
|
||||
for !ts.Equals(sim.start) && ctx.Err() == nil && ts.Height() > minEpoch {
|
||||
select {
|
||||
case workQs[i] <- &work{ts, stCid, recCid}:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
stCid = ts.MinTicketBlock().ParentStateRoot
|
||||
recCid = ts.MinTicketBlock().ParentMessageReceipts
|
||||
ts, err = sim.Chainstore.LoadTipSet(ts.Parents())
|
||||
if err != nil {
|
||||
return xerrors.Errorf("loading parent: %w", err)
|
||||
}
|
||||
i = (i + 1) % workerCount
|
||||
}
|
||||
for _, q := range workQs {
|
||||
close(q)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Spin up one worker per queue pair.
|
||||
for i := 0; i < workerCount; i++ {
|
||||
workQ := workQs[i]
|
||||
resultQ := resultQs[i]
|
||||
grp.Go(func() error {
|
||||
for job := range workQ {
|
||||
msgs, err := sim.Chainstore.MessagesForTipset(job.ts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
recs, err := blockadt.AsArray(store, job.recCid)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("amt load: %w", err)
|
||||
}
|
||||
applied := make([]*AppliedMessage, len(msgs))
|
||||
var rec types.MessageReceipt
|
||||
err = recs.ForEach(&rec, func(i int64) error {
|
||||
applied[i] = &AppliedMessage{
|
||||
Message: *msgs[i].VMMessage(),
|
||||
MessageReceipt: rec,
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
select {
|
||||
case resultQ <- &result{
|
||||
ts: job.ts,
|
||||
stCid: job.stCid,
|
||||
messages: applied,
|
||||
}:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
close(resultQ)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := cb(sim.StateManager, ts, stCid, applied); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
stCid = ts.MinTicketBlock().ParentStateRoot
|
||||
recCid = ts.MinTicketBlock().ParentMessageReceipts
|
||||
ts, err = sim.Chainstore.LoadTipSet(ts.Parents())
|
||||
if err != nil {
|
||||
return xerrors.Errorf("loading parent: %w", err)
|
||||
}
|
||||
}
|
||||
return ctx.Err()
|
||||
|
||||
// Process results in the same order we enqueued them.
|
||||
grp.Go(func() error {
|
||||
qs := resultQs
|
||||
for len(qs) > 0 {
|
||||
newQs := qs[:0]
|
||||
for _, q := range qs {
|
||||
select {
|
||||
case r, ok := <-q:
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
err := cb(sim.StateManager, r.ts, r.stCid, r.messages)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
newQs = append(newQs, q)
|
||||
}
|
||||
qs = newQs
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Wait for everything to finish.
|
||||
return grp.Wait()
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user