package stmgr import ( "context" "fmt" "sync/atomic" "github.com/ipfs/go-cid" cbg "github.com/whyrusleeping/cbor-gen" "go.opencensus.io/stats" "go.opencensus.io/trace" "golang.org/x/xerrors" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" blockadt "github.com/filecoin-project/specs-actors/actors/util/adt" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/actors/builtin" "github.com/filecoin-project/lotus/chain/actors/builtin/cron" "github.com/filecoin-project/lotus/chain/actors/builtin/reward" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/vm" "github.com/filecoin-project/lotus/metrics" ) func (sm *StateManager) ApplyBlocks(ctx context.Context, parentEpoch abi.ChainEpoch, pstate cid.Cid, bms []store.BlockMessages, epoch abi.ChainEpoch, r vm.Rand, em ExecMonitor, baseFee abi.TokenAmount, ts *types.TipSet) (cid.Cid, cid.Cid, error) { done := metrics.Timer(ctx, metrics.VMApplyBlocksTotal) defer done() partDone := metrics.Timer(ctx, metrics.VMApplyEarly) defer func() { partDone() }() makeVmWithBaseState := func(base cid.Cid) (*vm.VM, error) { vmopt := &vm.VMOpts{ StateBase: base, Epoch: epoch, Rand: r, Bstore: sm.cs.StateBlockstore(), Syscalls: sm.syscalls, CircSupplyCalc: sm.GetVMCirculatingSupply, NtwkVersion: sm.GetNtwkVersion, BaseFee: baseFee, LookbackState: LookbackStateGetterForTipset(sm, ts), } return sm.newVM(ctx, vmopt) } vmi, err := makeVmWithBaseState(pstate) if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("making vm: %w", err) } runCron := func(epoch abi.ChainEpoch) error { cronMsg := &types.Message{ To: cron.Address, From: builtin.SystemActorAddr, Nonce: uint64(epoch), Value: types.NewInt(0), GasFeeCap: types.NewInt(0), GasPremium: types.NewInt(0), GasLimit: build.BlockGasLimit * 10000, // Make super sure this is never too little Method: cron.Methods.EpochTick, Params: nil, } ret, err := vmi.ApplyImplicitMessage(ctx, cronMsg) if err != nil { return err } if em != nil { if err := em.MessageApplied(ctx, ts, cronMsg.Cid(), cronMsg, ret, true); err != nil { return xerrors.Errorf("callback failed on cron message: %w", err) } } if ret.ExitCode != 0 { return xerrors.Errorf("CheckProofSubmissions exit was non-zero: %d", ret.ExitCode) } return nil } for i := parentEpoch; i < epoch; i++ { if i > parentEpoch { // run cron for null rounds if any if err := runCron(i); err != nil { return cid.Undef, cid.Undef, err } pstate, err = vmi.Flush(ctx) if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("flushing vm: %w", err) } } // handle state forks // XXX: The state tree newState, err := sm.handleStateForks(ctx, pstate, i, em, ts) if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("error handling state forks: %w", err) } if pstate != newState { vmi, err = makeVmWithBaseState(newState) if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("making vm: %w", err) } } vmi.SetBlockHeight(i + 1) pstate = newState } partDone() partDone = metrics.Timer(ctx, metrics.VMApplyMessages) var receipts []cbg.CBORMarshaler processedMsgs := make(map[cid.Cid]struct{}) for _, b := range bms { penalty := types.NewInt(0) gasReward := big.Zero() for _, cm := range append(b.BlsMessages, b.SecpkMessages...) { m := cm.VMMessage() if _, found := processedMsgs[m.Cid()]; found { continue } r, err := vmi.ApplyMessage(ctx, cm) if err != nil { return cid.Undef, cid.Undef, err } receipts = append(receipts, &r.MessageReceipt) gasReward = big.Add(gasReward, r.GasCosts.MinerTip) penalty = big.Add(penalty, r.GasCosts.MinerPenalty) if em != nil { if err := em.MessageApplied(ctx, ts, cm.Cid(), m, r, false); err != nil { return cid.Undef, cid.Undef, err } } processedMsgs[m.Cid()] = struct{}{} } params, err := actors.SerializeParams(&reward.AwardBlockRewardParams{ Miner: b.Miner, Penalty: penalty, GasReward: gasReward, WinCount: b.WinCount, }) if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("failed to serialize award params: %w", err) } rwMsg := &types.Message{ From: builtin.SystemActorAddr, To: reward.Address, Nonce: uint64(epoch), Value: types.NewInt(0), GasFeeCap: types.NewInt(0), GasPremium: types.NewInt(0), GasLimit: 1 << 30, Method: reward.Methods.AwardBlockReward, Params: params, } ret, actErr := vmi.ApplyImplicitMessage(ctx, rwMsg) if actErr != nil { return cid.Undef, cid.Undef, xerrors.Errorf("failed to apply reward message for miner %s: %w", b.Miner, actErr) } if em != nil { if err := em.MessageApplied(ctx, ts, rwMsg.Cid(), rwMsg, ret, true); err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("callback failed on reward message: %w", err) } } if ret.ExitCode != 0 { return cid.Undef, cid.Undef, xerrors.Errorf("reward application message failed (exit %d): %s", ret.ExitCode, ret.ActorErr) } } partDone() partDone = metrics.Timer(ctx, metrics.VMApplyCron) if err := runCron(epoch); err != nil { return cid.Cid{}, cid.Cid{}, err } partDone() partDone = metrics.Timer(ctx, metrics.VMApplyFlush) rectarr := blockadt.MakeEmptyArray(sm.cs.ActorStore(ctx)) for i, receipt := range receipts { if err := rectarr.Set(uint64(i), receipt); err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("failed to build receipts amt: %w", err) } } rectroot, err := rectarr.Root() if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("failed to build receipts amt: %w", err) } st, err := vmi.Flush(ctx) if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("vm flush failed: %w", err) } stats.Record(ctx, metrics.VMSends.M(int64(atomic.LoadUint64(&vm.StatSends))), metrics.VMApplied.M(int64(atomic.LoadUint64(&vm.StatApplied)))) return st, rectroot, nil } func (sm *StateManager) TipSetState(ctx context.Context, ts *types.TipSet) (st cid.Cid, rec cid.Cid, err error) { ctx, span := trace.StartSpan(ctx, "tipSetState") defer span.End() if span.IsRecordingEvents() { span.AddAttributes(trace.StringAttribute("tipset", fmt.Sprint(ts.Cids()))) } ck := cidsToKey(ts.Cids()) sm.stlk.Lock() cw, cwok := sm.compWait[ck] if cwok { sm.stlk.Unlock() span.AddAttributes(trace.BoolAttribute("waited", true)) select { case <-cw: sm.stlk.Lock() case <-ctx.Done(): return cid.Undef, cid.Undef, ctx.Err() } } cached, ok := sm.stCache[ck] if ok { sm.stlk.Unlock() span.AddAttributes(trace.BoolAttribute("cache", true)) return cached[0], cached[1], nil } ch := make(chan struct{}) sm.compWait[ck] = ch defer func() { sm.stlk.Lock() delete(sm.compWait, ck) if st != cid.Undef { sm.stCache[ck] = []cid.Cid{st, rec} } sm.stlk.Unlock() close(ch) }() sm.stlk.Unlock() if ts.Height() == 0 { // NB: This is here because the process that executes blocks requires that the // block miner reference a valid miner in the state tree. Unless we create some // magical genesis miner, this won't work properly, so we short circuit here // This avoids the question of 'who gets paid the genesis block reward' return ts.Blocks()[0].ParentStateRoot, ts.Blocks()[0].ParentMessageReceipts, nil } st, rec, err = sm.computeTipSetState(ctx, ts, sm.tsExecMonitor) if err != nil { return cid.Undef, cid.Undef, err } return st, rec, nil } func (sm *StateManager) ExecutionTraceWithMonitor(ctx context.Context, ts *types.TipSet, em ExecMonitor) (cid.Cid, error) { st, _, err := sm.computeTipSetState(ctx, ts, em) return st, err } func (sm *StateManager) ExecutionTrace(ctx context.Context, ts *types.TipSet) (cid.Cid, []*api.InvocResult, error) { var invocTrace []*api.InvocResult st, err := sm.ExecutionTraceWithMonitor(ctx, ts, &InvocationTracer{trace: &invocTrace}) if err != nil { return cid.Undef, nil, err } return st, invocTrace, nil } func (sm *StateManager) computeTipSetState(ctx context.Context, ts *types.TipSet, em ExecMonitor) (cid.Cid, cid.Cid, error) { ctx, span := trace.StartSpan(ctx, "computeTipSetState") defer span.End() blks := ts.Blocks() for i := 0; i < len(blks); i++ { for j := i + 1; j < len(blks); j++ { if blks[i].Miner == blks[j].Miner { return cid.Undef, cid.Undef, xerrors.Errorf("duplicate miner in a tipset (%s %s)", blks[i].Miner, blks[j].Miner) } } } var parentEpoch abi.ChainEpoch pstate := blks[0].ParentStateRoot if blks[0].Height > 0 { parent, err := sm.cs.GetBlock(blks[0].Parents[0]) if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("getting parent block: %w", err) } parentEpoch = parent.Height } r := store.NewChainRand(sm.cs, ts.Cids()) blkmsgs, err := sm.cs.BlockMsgsForTipset(ts) if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("getting block messages for tipset: %w", err) } baseFee := blks[0].ParentBaseFee return sm.ApplyBlocks(ctx, parentEpoch, pstate, blkmsgs, blks[0].Height, r, em, baseFee, ts) }