From 5626c69ce5641d425e4e08a1a05b5debe0e19fdc Mon Sep 17 00:00:00 2001 From: Alfonso de la Rocha Date: Tue, 27 Sep 2022 09:31:48 +0200 Subject: [PATCH] wip: abstract common fns required for consensus --- chain/consensus/common.go | 466 +++++++++++++++++++++++++++++ chain/consensus/filcns/filecoin.go | 432 ++------------------------ chain/consensus/filcns/mine.go | 80 +---- chain/consensus/iface.go | 59 +++- chain/sub/incoming.go | 2 +- 5 files changed, 559 insertions(+), 480 deletions(-) create mode 100644 chain/consensus/common.go diff --git a/chain/consensus/common.go b/chain/consensus/common.go new file mode 100644 index 000000000..c3a1120aa --- /dev/null +++ b/chain/consensus/common.go @@ -0,0 +1,466 @@ +package consensus + +import ( + "context" + "fmt" + "strings" + + "go.opencensus.io/stats" + "golang.org/x/xerrors" + + "github.com/hashicorp/go-multierror" + "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" + logging "github.com/ipfs/go-log/v2" + pubsub "github.com/libp2p/go-libp2p-pubsub" + cbg "github.com/whyrusleeping/cbor-gen" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/crypto" + "github.com/filecoin-project/go-state-types/network" + "github.com/filecoin-project/lotus/api" + bstore "github.com/filecoin-project/lotus/blockstore" + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/actors/builtin" + "github.com/filecoin-project/lotus/chain/state" + "github.com/filecoin-project/lotus/chain/stmgr" + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/chain/vm" + "github.com/filecoin-project/lotus/lib/async" + "github.com/filecoin-project/lotus/lib/sigs" + "github.com/filecoin-project/lotus/metrics" + blockadt "github.com/filecoin-project/specs-actors/actors/util/adt" +) + +// Common operations shared by all consensus algorithm implementations. +var log = logging.Logger("consensus-common") + +// RunAsyncChecks accepts a list of checks to perform in parallel. +// +// Each consensus algorithm may choose to perform a set of different +// checks when a new blocks is received. +func RunAsyncChecks(ctx context.Context, await []async.ErrorFuture) error { + var merr error + for _, fut := range await { + if err := fut.AwaitContext(ctx); err != nil { + merr = multierror.Append(merr, err) + } + } + if merr != nil { + mulErr := merr.(*multierror.Error) + mulErr.ErrorFormat = func(es []error) string { + if len(es) == 1 { + return fmt.Sprintf("1 error occurred:\n\t* %+v\n\n", es[0]) + } + + points := make([]string, len(es)) + for i, err := range es { + points[i] = fmt.Sprintf("* %+v", err) + } + + return fmt.Sprintf( + "%d errors occurred:\n\t%s\n\n", + len(es), strings.Join(points, "\n\t")) + } + return mulErr + } + + return nil +} + +// CommonBlkChecks performed by all consensus implementations. +//TODO: Take stateManager and ChainStore in a common object abstracted by all consensus algorithms? +func CommonBlkChecks(ctx context.Context, sm *stmgr.StateManager, cs *store.ChainStore, + b *types.FullBlock, baseTs *types.TipSet) []async.ErrorFuture { + h := b.Header + msgsCheck := async.Err(func() error { + if b.Cid() == build.WhitelistedBlock { + return nil + } + + if err := checkBlockMessages(ctx, sm, cs, b, baseTs); err != nil { + return xerrors.Errorf("block had invalid messages: %w", err) + } + return nil + }) + + baseFeeCheck := async.Err(func() error { + baseFee, err := cs.ComputeBaseFee(ctx, baseTs) + if err != nil { + return xerrors.Errorf("computing base fee: %w", err) + } + if types.BigCmp(baseFee, b.Header.ParentBaseFee) != 0 { + return xerrors.Errorf("base fee doesn't match: %s (header) != %s (computed)", + b.Header.ParentBaseFee, baseFee) + } + return nil + }) + + stateRootCheck := async.Err(func() error { + stateroot, precp, err := sm.TipSetState(ctx, baseTs) + if err != nil { + return xerrors.Errorf("get tipsetstate(%d, %s) failed: %w", h.Height, h.Parents, err) + } + + if stateroot != h.ParentStateRoot { + msgs, err := cs.MessagesForTipset(ctx, baseTs) + if err != nil { + log.Error("failed to load messages for tipset during tipset state mismatch error: ", err) + } else { + log.Warn("Messages for tipset with mismatching state:") + for i, m := range msgs { + mm := m.VMMessage() + log.Warnf("Message[%d]: from=%s to=%s method=%d params=%x", i, mm.From, mm.To, mm.Method, mm.Params) + } + } + + return xerrors.Errorf("parent state root did not match computed state (%s != %s)", h.ParentStateRoot, stateroot) + } + + if precp != h.ParentMessageReceipts { + return xerrors.Errorf("parent receipts root did not match computed value (%s != %s)", precp, h.ParentMessageReceipts) + } + return nil + }) + + return []async.ErrorFuture{ + msgsCheck, + baseFeeCheck, + stateRootCheck, + } +} + +// Check the validity of the messages included in a block. +// TODO: We should extract this somewhere else and make the message pool and miner use the same logic +func checkBlockMessages(ctx context.Context, sm *stmgr.StateManager, cs *store.ChainStore, b *types.FullBlock, baseTs *types.TipSet) error { + { + var sigCids []cid.Cid // this is what we get for people not wanting the marshalcbor method on the cid type + var pubks [][]byte + + for _, m := range b.BlsMessages { + sigCids = append(sigCids, m.Cid()) + + pubk, err := sm.GetBlsPublicKey(ctx, m.From, baseTs) + if err != nil { + return xerrors.Errorf("failed to load bls public to validate block: %w", err) + } + + pubks = append(pubks, pubk) + } + + if err := VerifyBlsAggregate(ctx, b.Header.BLSAggregate, sigCids, pubks); err != nil { + return xerrors.Errorf("bls aggregate signature was invalid: %w", err) + } + } + + nonces := make(map[address.Address]uint64) + + stateroot, _, err := sm.TipSetState(ctx, baseTs) + if err != nil { + return xerrors.Errorf("failed to compute tipsettate for %s: %w", baseTs.Key(), err) + } + + st, err := state.LoadStateTree(cs.ActorStore(ctx), stateroot) + if err != nil { + return xerrors.Errorf("failed to load base state tree: %w", err) + } + + nv := sm.GetNetworkVersion(ctx, b.Header.Height) + pl := vm.PricelistByEpoch(b.Header.Height) + var sumGasLimit int64 + checkMsg := func(msg types.ChainMsg) error { + m := msg.VMMessage() + + // Phase 1: syntactic validation, as defined in the spec + minGas := pl.OnChainMessage(msg.ChainLength()) + if err := m.ValidForBlockInclusion(minGas.Total(), nv); err != nil { + return xerrors.Errorf("msg %s invalid for block inclusion: %w", m.Cid(), err) + } + + // ValidForBlockInclusion checks if any single message does not exceed BlockGasLimit + // So below is overflow safe + sumGasLimit += m.GasLimit + if sumGasLimit > build.BlockGasLimit { + return xerrors.Errorf("block gas limit exceeded") + } + + // Phase 2: (Partial) semantic validation: + // the sender exists and is an account actor, and the nonces make sense + var sender address.Address + if nv >= network.Version13 { + sender, err = st.LookupID(m.From) + if err != nil { + return xerrors.Errorf("failed to lookup sender %s: %w", m.From, err) + } + } else { + sender = m.From + } + + if _, ok := nonces[sender]; !ok { + // `GetActor` does not validate that this is an account actor. + act, err := st.GetActor(sender) + if err != nil { + return xerrors.Errorf("failed to get actor: %w", err) + } + + if !builtin.IsAccountActor(act.Code) { + return xerrors.New("Sender must be an account actor") + } + nonces[sender] = act.Nonce + } + + if nonces[sender] != m.Nonce { + return xerrors.Errorf("wrong nonce (exp: %d, got: %d)", nonces[sender], m.Nonce) + } + nonces[sender]++ + + return nil + } + + // Validate message arrays in a temporary blockstore. + tmpbs := bstore.NewMemory() + tmpstore := blockadt.WrapStore(ctx, cbor.NewCborStore(tmpbs)) + + bmArr := blockadt.MakeEmptyArray(tmpstore) + for i, m := range b.BlsMessages { + if err := checkMsg(m); err != nil { + return xerrors.Errorf("block had invalid bls message at index %d: %w", i, err) + } + + c, err := store.PutMessage(ctx, tmpbs, m) + if err != nil { + return xerrors.Errorf("failed to store message %s: %w", m.Cid(), err) + } + + k := cbg.CborCid(c) + if err := bmArr.Set(uint64(i), &k); err != nil { + return xerrors.Errorf("failed to put bls message at index %d: %w", i, err) + } + } + + smArr := blockadt.MakeEmptyArray(tmpstore) + for i, m := range b.SecpkMessages { + if sm.GetNetworkVersion(ctx, b.Header.Height) >= network.Version14 { + if m.Signature.Type != crypto.SigTypeSecp256k1 { + return xerrors.Errorf("block had invalid secpk message at index %d: %w", i, err) + } + } + + if err := checkMsg(m); err != nil { + return xerrors.Errorf("block had invalid secpk message at index %d: %w", i, err) + } + + // `From` being an account actor is only validated inside the `vm.ResolveToKeyAddr` call + // in `StateManager.ResolveToKeyAddress` here (and not in `checkMsg`). + kaddr, err := sm.ResolveToKeyAddress(ctx, m.Message.From, baseTs) + if err != nil { + return xerrors.Errorf("failed to resolve key addr: %w", err) + } + + if err := sigs.Verify(&m.Signature, kaddr, m.Message.Cid().Bytes()); err != nil { + return xerrors.Errorf("secpk message %s has invalid signature: %w", m.Cid(), err) + } + + c, err := store.PutMessage(ctx, tmpbs, m) + if err != nil { + return xerrors.Errorf("failed to store message %s: %w", m.Cid(), err) + } + k := cbg.CborCid(c) + if err := smArr.Set(uint64(i), &k); err != nil { + return xerrors.Errorf("failed to put secpk message at index %d: %w", i, err) + } + } + + bmroot, err := bmArr.Root() + if err != nil { + return xerrors.Errorf("failed to root bls msgs: %w", err) + + } + + smroot, err := smArr.Root() + if err != nil { + return xerrors.Errorf("failed to root secp msgs: %w", err) + } + + mrcid, err := tmpstore.Put(ctx, &types.MsgMeta{ + BlsMessages: bmroot, + SecpkMessages: smroot, + }) + if err != nil { + return xerrors.Errorf("failed to put msg meta: %w", err) + } + + if b.Header.Messages != mrcid { + return fmt.Errorf("messages didnt match message root in header") + } + + // Finally, flush. + err = vm.Copy(ctx, tmpbs, cs.ChainBlockstore(), mrcid) + if err != nil { + return xerrors.Errorf("failed to flush:%w", err) + } + + return nil +} + +// MsgsFromBlockTemplate extracts the different types of messages from a block +// template and populates the header for the next block to be proposed. +func MsgsFromBlockTemplate(ctx context.Context, sm *stmgr.StateManager, next *types.BlockHeader, + pts *types.TipSet, bt *api.BlockTemplate) ([]*types.Message, []*types.SignedMessage, error) { + + var blsMessages []*types.Message + var secpkMessages []*types.SignedMessage + + var blsMsgCids, secpkMsgCids []cid.Cid + var blsSigs []crypto.Signature + for _, msg := range bt.Messages { + if msg.Signature.Type == crypto.SigTypeBLS { + blsSigs = append(blsSigs, msg.Signature) + blsMessages = append(blsMessages, &msg.Message) + + c, err := sm.ChainStore().PutMessage(ctx, &msg.Message) + if err != nil { + return nil, nil, err + } + + blsMsgCids = append(blsMsgCids, c) + } else if msg.Signature.Type == crypto.SigTypeSecp256k1 { + c, err := sm.ChainStore().PutMessage(ctx, msg) + if err != nil { + return nil, nil, err + } + + secpkMsgCids = append(secpkMsgCids, c) + secpkMessages = append(secpkMessages, msg) + + } else { + return nil, nil, xerrors.Errorf("unknown sig type: %d", msg.Signature.Type) + } + } + + store := sm.ChainStore().ActorStore(ctx) + blsmsgroot, err := ToMessagesArray(store, blsMsgCids) + if err != nil { + return nil, nil, xerrors.Errorf("building bls amt: %w", err) + } + secpkmsgroot, err := ToMessagesArray(store, secpkMsgCids) + if err != nil { + return nil, nil, xerrors.Errorf("building secpk amt: %w", err) + } + + mmcid, err := store.Put(store.Context(), &types.MsgMeta{ + BlsMessages: blsmsgroot, + SecpkMessages: secpkmsgroot, + }) + if err != nil { + return nil, nil, err + } + next.Messages = mmcid + + aggSig, err := AggregateSignatures(blsSigs) + if err != nil { + return nil, nil, err + } + + next.BLSAggregate = aggSig + pweight, err := sm.ChainStore().Weight(ctx, pts) + if err != nil { + return nil, nil, err + } + next.ParentWeight = pweight + + baseFee, err := sm.ChainStore().ComputeBaseFee(ctx, pts) + if err != nil { + return nil, nil, xerrors.Errorf("computing base fee: %w", err) + } + next.ParentBaseFee = baseFee + + return blsMessages, secpkMessages, err + +} + +// Basic sanity-checks performed when a block is proposed locally. +func validateLocalBlock(ctx context.Context, msg *pubsub.Message) (pubsub.ValidationResult, string) { + stats.Record(ctx, metrics.BlockPublished.M(1)) + + if size := msg.Size(); size > 1<<20-1<<15 { + log.Errorf("ignoring oversize block (%dB)", size) + return pubsub.ValidationIgnore, "oversize_block" + } + + blk, what, err := decodeAndCheckBlock(msg) + if err != nil { + log.Errorf("got invalid local block: %s", err) + return pubsub.ValidationIgnore, what + } + + msg.ValidatorData = blk + stats.Record(ctx, metrics.BlockValidationSuccess.M(1)) + return pubsub.ValidationAccept, "" +} + +func decodeAndCheckBlock(msg *pubsub.Message) (*types.BlockMsg, string, error) { + blk, err := types.DecodeBlockMsg(msg.GetData()) + if err != nil { + return nil, "invalid", xerrors.Errorf("error decoding block: %w", err) + } + + if count := len(blk.BlsMessages) + len(blk.SecpkMessages); count > build.BlockMessageLimit { + return nil, "too_many_messages", fmt.Errorf("block contains too many messages (%d)", count) + } + + // make sure we have a signature + if blk.Header.BlockSig == nil { + return nil, "missing_signature", fmt.Errorf("block without a signature") + } + + return blk, "", nil +} + +func validateMsgMeta(ctx context.Context, msg *types.BlockMsg) error { + // TODO there has to be a simpler way to do this without the blockstore dance + // block headers use adt0 + store := blockadt.WrapStore(ctx, cbor.NewCborStore(bstore.NewMemory())) + bmArr := blockadt.MakeEmptyArray(store) + smArr := blockadt.MakeEmptyArray(store) + + for i, m := range msg.BlsMessages { + c := cbg.CborCid(m) + if err := bmArr.Set(uint64(i), &c); err != nil { + return err + } + } + + for i, m := range msg.SecpkMessages { + c := cbg.CborCid(m) + if err := smArr.Set(uint64(i), &c); err != nil { + return err + } + } + + bmroot, err := bmArr.Root() + if err != nil { + return err + } + + smroot, err := smArr.Root() + if err != nil { + return err + } + + mrcid, err := store.Put(store.Context(), &types.MsgMeta{ + BlsMessages: bmroot, + SecpkMessages: smroot, + }) + + if err != nil { + return err + } + + if msg.Header.Messages != mrcid { + return fmt.Errorf("messages didn't match root cid in header") + } + + return nil +} diff --git a/chain/consensus/filcns/filecoin.go b/chain/consensus/filcns/filecoin.go index de3cf7cf7..24aa3ffb4 100644 --- a/chain/consensus/filcns/filecoin.go +++ b/chain/consensus/filcns/filecoin.go @@ -4,18 +4,11 @@ import ( "bytes" "context" "errors" - "fmt" "os" - "strings" "time" - "github.com/hashicorp/go-multierror" "github.com/ipfs/go-cid" - cbor "github.com/ipfs/go-ipld-cbor" logging "github.com/ipfs/go-log/v2" - pubsub "github.com/libp2p/go-libp2p-pubsub" - cbg "github.com/whyrusleeping/cbor-gen" - "go.opencensus.io/stats" "go.opencensus.io/trace" "golang.org/x/xerrors" @@ -23,25 +16,20 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/network" - blockadt "github.com/filecoin-project/specs-actors/actors/util/adt" "github.com/filecoin-project/specs-actors/v7/actors/runtime/proof" - bstore "github.com/filecoin-project/lotus/blockstore" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain" - "github.com/filecoin-project/lotus/chain/actors/builtin" "github.com/filecoin-project/lotus/chain/actors/builtin/power" "github.com/filecoin-project/lotus/chain/beacon" "github.com/filecoin-project/lotus/chain/consensus" "github.com/filecoin-project/lotus/chain/rand" - "github.com/filecoin-project/lotus/chain/state" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/chain/vm" "github.com/filecoin-project/lotus/lib/async" "github.com/filecoin-project/lotus/lib/sigs" - "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/storage/sealer/ffiwrapper" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) @@ -125,17 +113,6 @@ func (filec *FilecoinEC) ValidateBlock(ctx context.Context, b *types.FullBlock) log.Warn("Got block from the future, but within threshold", h.Timestamp, build.Clock.Now().Unix()) } - msgsCheck := async.Err(func() error { - if b.Cid() == build.WhitelistedBlock { - return nil - } - - if err := filec.checkBlockMessages(ctx, b, baseTs); err != nil { - return xerrors.Errorf("block had invalid messages: %w", err) - } - return nil - }) - minerCheck := async.Err(func() error { if err := filec.minerIsValid(ctx, h.Miner, baseTs); err != nil { return xerrors.Errorf("minerIsValid failed: %w", err) @@ -143,17 +120,6 @@ func (filec *FilecoinEC) ValidateBlock(ctx context.Context, b *types.FullBlock) return nil }) - baseFeeCheck := async.Err(func() error { - baseFee, err := filec.store.ComputeBaseFee(ctx, baseTs) - if err != nil { - return xerrors.Errorf("computing base fee: %w", err) - } - if types.BigCmp(baseFee, b.Header.ParentBaseFee) != 0 { - return xerrors.Errorf("base fee doesn't match: %s (header) != %s (computed)", - b.Header.ParentBaseFee, baseFee) - } - return nil - }) pweight, err := filec.store.Weight(ctx, baseTs) if err != nil { return xerrors.Errorf("getting parent weight: %w", err) @@ -164,34 +130,6 @@ func (filec *FilecoinEC) ValidateBlock(ctx context.Context, b *types.FullBlock) b.Header.ParentWeight, pweight) } - stateRootCheck := async.Err(func() error { - stateroot, precp, err := filec.sm.TipSetState(ctx, baseTs) - if err != nil { - return xerrors.Errorf("get tipsetstate(%d, %s) failed: %w", h.Height, h.Parents, err) - } - - if stateroot != h.ParentStateRoot { - msgs, err := filec.store.MessagesForTipset(ctx, baseTs) - if err != nil { - log.Error("failed to load messages for tipset during tipset state mismatch error: ", err) - } else { - log.Warn("Messages for tipset with mismatching state:") - for i, m := range msgs { - mm := m.VMMessage() - log.Warnf("Message[%d]: from=%s to=%s method=%d params=%x", i, mm.From, mm.To, mm.Method, mm.Params) - } - } - - return xerrors.Errorf("parent state root did not match computed state (%s != %s)", h.ParentStateRoot, stateroot) - } - - if precp != h.ParentMessageReceipts { - return xerrors.Errorf("parent receipts root did not match computed value (%s != %s)", precp, h.ParentMessageReceipts) - } - - return nil - }) - // Stuff that needs worker address waddr, err := stmgr.GetMinerWorkerRaw(ctx, filec.sm, lbst, h.Miner) if err != nil { @@ -253,10 +191,11 @@ func (filec *FilecoinEC) ValidateBlock(ctx context.Context, b *types.FullBlock) }) blockSigCheck := async.Err(func() error { - if err := sigs.CheckBlockSignature(ctx, h, waddr); err != nil { + if err := filec.VerifyBlockSignature(ctx, h, waddr); err != nil { return xerrors.Errorf("check block signature failed: %w", err) } return nil + }) beaconValuesCheck := async.Err(func() error { @@ -305,44 +244,17 @@ func (filec *FilecoinEC) ValidateBlock(ctx context.Context, b *types.FullBlock) return nil }) - await := []async.ErrorFuture{ + commonChecks := consensus.CommonBlkChecks(ctx, filec.sm, filec.store, b, baseTs) + await := append([]async.ErrorFuture{ minerCheck, tktsCheck, blockSigCheck, beaconValuesCheck, wproofCheck, winnerCheck, - msgsCheck, - baseFeeCheck, - stateRootCheck, - } + }, commonChecks...) - var merr error - for _, fut := range await { - if err := fut.AwaitContext(ctx); err != nil { - merr = multierror.Append(merr, err) - } - } - if merr != nil { - mulErr := merr.(*multierror.Error) - mulErr.ErrorFormat = func(es []error) string { - if len(es) == 1 { - return fmt.Sprintf("1 error occurred:\n\t* %+v\n\n", es[0]) - } - - points := make([]string, len(es)) - for i, err := range es { - points[i] = fmt.Sprintf("* %+v", err) - } - - return fmt.Sprintf( - "%d errors occurred:\n\t%s\n\n", - len(es), strings.Join(points, "\n\t")) - } - return mulErr - } - - return nil + return consensus.RunAsyncChecks(ctx, await) } func blockSanityChecks(h *types.BlockHeader) error { @@ -433,178 +345,6 @@ func (filec *FilecoinEC) VerifyWinningPoStProof(ctx context.Context, nv network. return nil } -// TODO: We should extract this somewhere else and make the message pool and miner use the same logic -func (filec *FilecoinEC) checkBlockMessages(ctx context.Context, b *types.FullBlock, baseTs *types.TipSet) error { - { - var sigCids []cid.Cid // this is what we get for people not wanting the marshalcbor method on the cid type - var pubks [][]byte - - for _, m := range b.BlsMessages { - sigCids = append(sigCids, m.Cid()) - - pubk, err := filec.sm.GetBlsPublicKey(ctx, m.From, baseTs) - if err != nil { - return xerrors.Errorf("failed to load bls public to validate block: %w", err) - } - - pubks = append(pubks, pubk) - } - - if err := consensus.VerifyBlsAggregate(ctx, b.Header.BLSAggregate, sigCids, pubks); err != nil { - return xerrors.Errorf("bls aggregate signature was invalid: %w", err) - } - } - - nonces := make(map[address.Address]uint64) - - stateroot, _, err := filec.sm.TipSetState(ctx, baseTs) - if err != nil { - return xerrors.Errorf("failed to compute tipsettate for %s: %w", baseTs.Key(), err) - } - - st, err := state.LoadStateTree(filec.store.ActorStore(ctx), stateroot) - if err != nil { - return xerrors.Errorf("failed to load base state tree: %w", err) - } - - nv := filec.sm.GetNetworkVersion(ctx, b.Header.Height) - pl := vm.PricelistByEpoch(b.Header.Height) - var sumGasLimit int64 - checkMsg := func(msg types.ChainMsg) error { - m := msg.VMMessage() - - // Phase 1: syntactic validation, as defined in the spec - minGas := pl.OnChainMessage(msg.ChainLength()) - if err := m.ValidForBlockInclusion(minGas.Total(), nv); err != nil { - return xerrors.Errorf("msg %s invalid for block inclusion: %w", m.Cid(), err) - } - - // ValidForBlockInclusion checks if any single message does not exceed BlockGasLimit - // So below is overflow safe - sumGasLimit += m.GasLimit - if sumGasLimit > build.BlockGasLimit { - return xerrors.Errorf("block gas limit exceeded") - } - - // Phase 2: (Partial) semantic validation: - // the sender exists and is an account actor, and the nonces make sense - var sender address.Address - if nv >= network.Version13 { - sender, err = st.LookupID(m.From) - if err != nil { - return xerrors.Errorf("failed to lookup sender %s: %w", m.From, err) - } - } else { - sender = m.From - } - - if _, ok := nonces[sender]; !ok { - // `GetActor` does not validate that this is an account actor. - act, err := st.GetActor(sender) - if err != nil { - return xerrors.Errorf("failed to get actor: %w", err) - } - - if !builtin.IsAccountActor(act.Code) { - return xerrors.New("Sender must be an account actor") - } - nonces[sender] = act.Nonce - } - - if nonces[sender] != m.Nonce { - return xerrors.Errorf("wrong nonce (exp: %d, got: %d)", nonces[sender], m.Nonce) - } - nonces[sender]++ - - return nil - } - - // Validate message arrays in a temporary blockstore. - tmpbs := bstore.NewMemory() - tmpstore := blockadt.WrapStore(ctx, cbor.NewCborStore(tmpbs)) - - bmArr := blockadt.MakeEmptyArray(tmpstore) - for i, m := range b.BlsMessages { - if err := checkMsg(m); err != nil { - return xerrors.Errorf("block had invalid bls message at index %d: %w", i, err) - } - - c, err := store.PutMessage(ctx, tmpbs, m) - if err != nil { - return xerrors.Errorf("failed to store message %s: %w", m.Cid(), err) - } - - k := cbg.CborCid(c) - if err := bmArr.Set(uint64(i), &k); err != nil { - return xerrors.Errorf("failed to put bls message at index %d: %w", i, err) - } - } - - smArr := blockadt.MakeEmptyArray(tmpstore) - for i, m := range b.SecpkMessages { - if filec.sm.GetNetworkVersion(ctx, b.Header.Height) >= network.Version14 { - if m.Signature.Type != crypto.SigTypeSecp256k1 { - return xerrors.Errorf("block had invalid secpk message at index %d: %w", i, err) - } - } - - if err := checkMsg(m); err != nil { - return xerrors.Errorf("block had invalid secpk message at index %d: %w", i, err) - } - - // `From` being an account actor is only validated inside the `vm.ResolveToKeyAddr` call - // in `StateManager.ResolveToKeyAddress` here (and not in `checkMsg`). - kaddr, err := filec.sm.ResolveToKeyAddress(ctx, m.Message.From, baseTs) - if err != nil { - return xerrors.Errorf("failed to resolve key addr: %w", err) - } - - if err := sigs.Verify(&m.Signature, kaddr, m.Message.Cid().Bytes()); err != nil { - return xerrors.Errorf("secpk message %s has invalid signature: %w", m.Cid(), err) - } - - c, err := store.PutMessage(ctx, tmpbs, m) - if err != nil { - return xerrors.Errorf("failed to store message %s: %w", m.Cid(), err) - } - k := cbg.CborCid(c) - if err := smArr.Set(uint64(i), &k); err != nil { - return xerrors.Errorf("failed to put secpk message at index %d: %w", i, err) - } - } - - bmroot, err := bmArr.Root() - if err != nil { - return xerrors.Errorf("failed to root bls msgs: %w", err) - - } - - smroot, err := smArr.Root() - if err != nil { - return xerrors.Errorf("failed to root secp msgs: %w", err) - } - - mrcid, err := tmpstore.Put(ctx, &types.MsgMeta{ - BlsMessages: bmroot, - SecpkMessages: smroot, - }) - if err != nil { - return xerrors.Errorf("failed to put msg meta: %w", err) - } - - if b.Header.Messages != mrcid { - return fmt.Errorf("messages didnt match message root in header") - } - - // Finally, flush. - err = vm.Copy(ctx, tmpbs, filec.store.ChainBlockstore(), mrcid) - if err != nil { - return xerrors.Errorf("failed to flush:%w", err) - } - - return nil -} - func (filec *FilecoinEC) IsEpochBeyondCurrMax(epoch abi.ChainEpoch) bool { if filec.genesis == nil { return false @@ -660,140 +400,7 @@ func VerifyVRF(ctx context.Context, worker address.Address, vrfBase, vrfproof [] var ErrSoftFailure = errors.New("soft validation failure") var ErrInsufficientPower = errors.New("incoming block's miner does not have minimum power") -func (filec *FilecoinEC) ValidateBlockPubsub(ctx context.Context, self bool, msg *pubsub.Message) (pubsub.ValidationResult, string) { - if self { - return filec.validateLocalBlock(ctx, msg) - } - - // track validation time - begin := build.Clock.Now() - defer func() { - log.Debugf("block validation time: %s", build.Clock.Since(begin)) - }() - - stats.Record(ctx, metrics.BlockReceived.M(1)) - - recordFailureFlagPeer := func(what string) { - // bv.Validate will flag the peer in that case - panic(what) - } - - blk, what, err := filec.decodeAndCheckBlock(msg) - if err != nil { - log.Error("got invalid block over pubsub: ", err) - recordFailureFlagPeer(what) - return pubsub.ValidationReject, what - } - - // validate the block meta: the Message CID in the header must match the included messages - err = filec.validateMsgMeta(ctx, blk) - if err != nil { - log.Warnf("error validating message metadata: %s", err) - recordFailureFlagPeer("invalid_block_meta") - return pubsub.ValidationReject, "invalid_block_meta" - } - - reject, err := filec.validateBlockHeader(ctx, blk.Header) - if err != nil { - if reject == "" { - log.Warn("ignoring block msg: ", err) - return pubsub.ValidationIgnore, reject - } - recordFailureFlagPeer(reject) - return pubsub.ValidationReject, reject - } - - // all good, accept the block - msg.ValidatorData = blk - stats.Record(ctx, metrics.BlockValidationSuccess.M(1)) - return pubsub.ValidationAccept, "" -} - -func (filec *FilecoinEC) validateLocalBlock(ctx context.Context, msg *pubsub.Message) (pubsub.ValidationResult, string) { - stats.Record(ctx, metrics.BlockPublished.M(1)) - - if size := msg.Size(); size > 1<<20-1<<15 { - log.Errorf("ignoring oversize block (%dB)", size) - return pubsub.ValidationIgnore, "oversize_block" - } - - blk, what, err := filec.decodeAndCheckBlock(msg) - if err != nil { - log.Errorf("got invalid local block: %s", err) - return pubsub.ValidationIgnore, what - } - - msg.ValidatorData = blk - stats.Record(ctx, metrics.BlockValidationSuccess.M(1)) - return pubsub.ValidationAccept, "" -} - -func (filec *FilecoinEC) decodeAndCheckBlock(msg *pubsub.Message) (*types.BlockMsg, string, error) { - blk, err := types.DecodeBlockMsg(msg.GetData()) - if err != nil { - return nil, "invalid", xerrors.Errorf("error decoding block: %w", err) - } - - if count := len(blk.BlsMessages) + len(blk.SecpkMessages); count > build.BlockMessageLimit { - return nil, "too_many_messages", fmt.Errorf("block contains too many messages (%d)", count) - } - - // make sure we have a signature - if blk.Header.BlockSig == nil { - return nil, "missing_signature", fmt.Errorf("block without a signature") - } - - return blk, "", nil -} - -func (filec *FilecoinEC) validateMsgMeta(ctx context.Context, msg *types.BlockMsg) error { - // TODO there has to be a simpler way to do this without the blockstore dance - // block headers use adt0 - store := blockadt.WrapStore(ctx, cbor.NewCborStore(bstore.NewMemory())) - bmArr := blockadt.MakeEmptyArray(store) - smArr := blockadt.MakeEmptyArray(store) - - for i, m := range msg.BlsMessages { - c := cbg.CborCid(m) - if err := bmArr.Set(uint64(i), &c); err != nil { - return err - } - } - - for i, m := range msg.SecpkMessages { - c := cbg.CborCid(m) - if err := smArr.Set(uint64(i), &c); err != nil { - return err - } - } - - bmroot, err := bmArr.Root() - if err != nil { - return err - } - - smroot, err := smArr.Root() - if err != nil { - return err - } - - mrcid, err := store.Put(store.Context(), &types.MsgMeta{ - BlsMessages: bmroot, - SecpkMessages: smroot, - }) - - if err != nil { - return err - } - - if msg.Header.Messages != mrcid { - return fmt.Errorf("messages didn't match root cid in header") - } - - return nil -} - -func (filec *FilecoinEC) validateBlockHeader(ctx context.Context, b *types.BlockHeader) (rejectReason string, err error) { +func (filec *FilecoinEC) ValidateBlockHeader(ctx context.Context, b *types.BlockHeader) (rejectReason string, err error) { // we want to ensure that it is a block from a known miner; we reject blocks from unknown miners // to prevent spam attacks. @@ -868,4 +475,27 @@ func (filec *FilecoinEC) isChainNearSynced() bool { return build.Clock.Since(timestampTime) < 6*time.Hour } +func (filec *FilecoinEC) VerifyBlockSignature(ctx context.Context, h *types.BlockHeader, + addr address.Address) error { + return sigs.CheckBlockSignature(ctx, h, addr) +} + +func (filec *FilecoinEC) SignBlock(ctx context.Context, w api.Wallet, + addr address.Address, next *types.BlockHeader) error { + + nosigbytes, err := next.SigningBytes() + if err != nil { + return xerrors.Errorf("failed to get signing bytes for block: %w", err) + } + + sig, err := w.WalletSign(ctx, addr, nosigbytes, api.MsgMeta{ + Type: api.MTBlock, + }) + if err != nil { + return xerrors.Errorf("failed to sign new block: %w", err) + } + next.BlockSig = sig + return nil +} + var _ consensus.Consensus = &FilecoinEC{} diff --git a/chain/consensus/filcns/mine.go b/chain/consensus/filcns/mine.go index 35e38883d..68d4cc88c 100644 --- a/chain/consensus/filcns/mine.go +++ b/chain/consensus/filcns/mine.go @@ -3,11 +3,8 @@ package filcns import ( "context" - "github.com/ipfs/go-cid" "golang.org/x/xerrors" - "github.com/filecoin-project/go-state-types/crypto" - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/consensus" "github.com/filecoin-project/lotus/chain/stmgr" @@ -49,87 +46,16 @@ func (filec *FilecoinEC) CreateBlock(ctx context.Context, w api.Wallet, bt *api. ParentMessageReceipts: recpts, } - var blsMessages []*types.Message - var secpkMessages []*types.SignedMessage - - var blsMsgCids, secpkMsgCids []cid.Cid - var blsSigs []crypto.Signature - for _, msg := range bt.Messages { - if msg.Signature.Type == crypto.SigTypeBLS { - blsSigs = append(blsSigs, msg.Signature) - blsMessages = append(blsMessages, &msg.Message) - - c, err := filec.sm.ChainStore().PutMessage(ctx, &msg.Message) - if err != nil { - return nil, err - } - - blsMsgCids = append(blsMsgCids, c) - } else if msg.Signature.Type == crypto.SigTypeSecp256k1 { - c, err := filec.sm.ChainStore().PutMessage(ctx, msg) - if err != nil { - return nil, err - } - - secpkMsgCids = append(secpkMsgCids, c) - secpkMessages = append(secpkMessages, msg) - - } else { - return nil, xerrors.Errorf("unknown sig type: %d", msg.Signature.Type) - } - } - - store := filec.sm.ChainStore().ActorStore(ctx) - blsmsgroot, err := consensus.ToMessagesArray(store, blsMsgCids) + blsMessages, secpkMessages, err := consensus.MsgsFromBlockTemplate(ctx, filec.sm, next, pts, bt) if err != nil { - return nil, xerrors.Errorf("building bls amt: %w", err) - } - secpkmsgroot, err := consensus.ToMessagesArray(store, secpkMsgCids) - if err != nil { - return nil, xerrors.Errorf("building secpk amt: %w", err) + return nil, xerrors.Errorf("failed to process messages from block template: %w", err) } - mmcid, err := store.Put(store.Context(), &types.MsgMeta{ - BlsMessages: blsmsgroot, - SecpkMessages: secpkmsgroot, - }) - if err != nil { - return nil, err - } - next.Messages = mmcid - - aggSig, err := consensus.AggregateSignatures(blsSigs) - if err != nil { - return nil, err - } - - next.BLSAggregate = aggSig - pweight, err := filec.sm.ChainStore().Weight(ctx, pts) - if err != nil { - return nil, err - } - next.ParentWeight = pweight - - baseFee, err := filec.sm.ChainStore().ComputeBaseFee(ctx, pts) - if err != nil { - return nil, xerrors.Errorf("computing base fee: %w", err) - } - next.ParentBaseFee = baseFee - - nosigbytes, err := next.SigningBytes() - if err != nil { - return nil, xerrors.Errorf("failed to get signing bytes for block: %w", err) - } - - sig, err := w.WalletSign(ctx, worker, nosigbytes, api.MsgMeta{ - Type: api.MTBlock, - }) + filec.SignBlock(ctx, w, worker, next) if err != nil { return nil, xerrors.Errorf("failed to sign new block: %w", err) } - next.BlockSig = sig - fullBlock := &types.FullBlock{ Header: next, BlsMessages: blsMessages, diff --git a/chain/consensus/iface.go b/chain/consensus/iface.go index 06dc0a113..ebb8f450e 100644 --- a/chain/consensus/iface.go +++ b/chain/consensus/iface.go @@ -4,17 +4,74 @@ import ( "context" pubsub "github.com/libp2p/go-libp2p-pubsub" + "go.opencensus.io/stats" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/metrics" ) type Consensus interface { ValidateBlock(ctx context.Context, b *types.FullBlock) (err error) - ValidateBlockPubsub(ctx context.Context, self bool, msg *pubsub.Message) (pubsub.ValidationResult, string) + ValidateBlockHeader(ctx context.Context, b *types.BlockHeader) (rejectReason string, err error) IsEpochBeyondCurrMax(epoch abi.ChainEpoch) bool CreateBlock(ctx context.Context, w api.Wallet, bt *api.BlockTemplate) (*types.FullBlock, error) + SignBlock(ctx context.Context, w api.Wallet, addr address.Address, next *types.BlockHeader) error + VerifyBlockSignature(ctx context.Context, h *types.BlockHeader, addr address.Address) error +} + +// ValidateBlockPubsub implements the common checks performed by all consensus implementations +// when a block is received through the pubsub channel. +func ValidateBlockPubsub(ctx context.Context, cns Consensus, self bool, msg *pubsub.Message) (pubsub.ValidationResult, string) { + if self { + return validateLocalBlock(ctx, msg) + } + + // track validation time + begin := build.Clock.Now() + defer func() { + log.Debugf("block validation time: %s", build.Clock.Since(begin)) + }() + + stats.Record(ctx, metrics.BlockReceived.M(1)) + + recordFailureFlagPeer := func(what string) { + // bv.Validate will flag the peer in that case + panic(what) + } + + blk, what, err := decodeAndCheckBlock(msg) + if err != nil { + log.Error("got invalid block over pubsub: ", err) + recordFailureFlagPeer(what) + return pubsub.ValidationReject, what + } + + // validate the block meta: the Message CID in the header must match the included messages + err = validateMsgMeta(ctx, blk) + if err != nil { + log.Warnf("error validating message metadata: %s", err) + recordFailureFlagPeer("invalid_block_meta") + return pubsub.ValidationReject, "invalid_block_meta" + } + + reject, err := cns.ValidateBlockHeader(ctx, blk.Header) + if err != nil { + if reject == "" { + log.Warn("ignoring block msg: ", err) + return pubsub.ValidationIgnore, reject + } + recordFailureFlagPeer(reject) + return pubsub.ValidationReject, reject + } + + // all good, accept the block + msg.ValidatorData = blk + stats.Record(ctx, metrics.BlockValidationSuccess.M(1)) + return pubsub.ValidationAccept, "" } diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index b8427e036..2eb846d80 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -273,7 +273,7 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub }() var what string - res, what = bv.consensus.ValidateBlockPubsub(ctx, pid == bv.self, msg) + res, what = consensus.ValidateBlockPubsub(ctx, bv.consensus, pid == bv.self, msg) if res == pubsub.ValidationAccept { // it's a good block! make sure we've only seen it once if count := bv.recvBlocks.add(msg.ValidatorData.(*types.BlockMsg).Cid()); count > 0 {