From dc7c0fcabeb2b28ebd8444d760a5506ef26fd39b Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Thu, 1 Aug 2019 13:40:47 -0700 Subject: [PATCH] refactoring to accomodate the bls message changes in the spec --- api/api.go | 2 +- api/struct.go | 4 +- chain/blocksync.go | 130 ++++++++++++++++++++++++++----------- chain/gen/mining.go | 45 ++++++++++--- chain/gen/utils.go | 11 +++- chain/messagepool.go | 34 ++++++++-- chain/store/store.go | 104 ++++++++++++++++++++++++----- chain/sub/incoming.go | 17 +++-- chain/sync.go | 98 +++++++++++++++++++--------- chain/sync_test.go | 4 +- chain/types.go | 5 +- chain/types/blockheader.go | 6 ++ chain/types/fullblock.go | 5 +- chain/types/message.go | 9 +++ node/impl/full.go | 11 ++-- 15 files changed, 368 insertions(+), 117 deletions(-) diff --git a/api/api.go b/api/api.go index 9bb8a246c..34ebbf7f9 100644 --- a/api/api.go +++ b/api/api.go @@ -72,7 +72,7 @@ type FullNode interface { ChainGetRandomness(context.Context, *types.TipSet) ([]byte, error) ChainWaitMsg(context.Context, cid.Cid) (*MsgWait, error) ChainGetBlock(context.Context, cid.Cid) (*types.BlockHeader, error) - ChainGetBlockMessages(context.Context, cid.Cid) ([]*types.SignedMessage, error) + ChainGetBlockMessages(context.Context, cid.Cid) ([]*types.Message, []*types.SignedMessage, error) // if tipset is nil, we'll use heaviest ChainCall(context.Context, *types.Message, *types.TipSet) (*types.MessageReceipt, error) diff --git a/api/struct.go b/api/struct.go index 321376376..69ddc6fc5 100644 --- a/api/struct.go +++ b/api/struct.go @@ -45,7 +45,7 @@ type FullNodeStruct struct { ChainGetRandomness func(context.Context, *types.TipSet) ([]byte, error) `perm:"read"` ChainWaitMsg func(context.Context, cid.Cid) (*MsgWait, error) `perm:"read"` ChainGetBlock func(context.Context, cid.Cid) (*types.BlockHeader, error) `perm:"read"` - ChainGetBlockMessages func(context.Context, cid.Cid) ([]*types.SignedMessage, error) `perm:"read"` + ChainGetBlockMessages func(context.Context, cid.Cid) ([]*types.Message, []*types.SignedMessage, error) `perm:"read"` ChainCall func(context.Context, *types.Message, *types.TipSet) (*types.MessageReceipt, error) `perm:"read"` MpoolPending func(context.Context, *types.TipSet) ([]*types.SignedMessage, error) `perm:"read"` @@ -188,7 +188,7 @@ func (c *FullNodeStruct) ChainGetBlock(ctx context.Context, b cid.Cid) (*types.B return c.Internal.ChainGetBlock(ctx, b) } -func (c *FullNodeStruct) ChainGetBlockMessages(ctx context.Context, b cid.Cid) ([]*types.SignedMessage, error) { +func (c *FullNodeStruct) ChainGetBlockMessages(ctx context.Context, b cid.Cid) ([]*types.Message, []*types.SignedMessage, error) { return c.Internal.ChainGetBlockMessages(ctx, b) } diff --git a/chain/blocksync.go b/chain/blocksync.go index b38c2a8a0..50c94f430 100644 --- a/chain/blocksync.go +++ b/chain/blocksync.go @@ -17,6 +17,7 @@ import ( "github.com/filecoin-project/go-lotus/lib/cborrpc" "github.com/filecoin-project/go-lotus/node/modules/dtypes" + blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" cbor "github.com/ipfs/go-ipld-cbor" inet "github.com/libp2p/go-libp2p-core/network" @@ -71,8 +72,11 @@ type BlockSyncResponse struct { type BSTipSet struct { Blocks []*types.BlockHeader - Messages []*types.SignedMessage - MsgIncludes [][]int + BlsMessages []*types.Message + BlsMsgIncludes [][]int + + SecpkMessages []*types.SignedMessage + SecpkMsgIncludes [][]int } func NewBlockSyncService(cs *store.ChainStore) *BlockSyncService { @@ -130,13 +134,15 @@ func (bss *BlockSyncService) collectChainSegment(start []cid.Cid, length uint64, } if opts.IncludeMessages { - msgs, mincl, err := bss.gatherMessages(ts) + bmsgs, bmincl, smsgs, smincl, err := bss.gatherMessages(ts) if err != nil { - return nil, err + return nil, xerrors.Errorf("gather messages failed: %w", err) } - bst.Messages = msgs - bst.MsgIncludes = mincl + bst.BlsMessages = bmsgs + bst.BlsMsgIncludes = bmincl + bst.SecpkMessages = smsgs + bst.SecpkMsgIncludes = smincl } if opts.IncludeBlocks { @@ -153,33 +159,47 @@ func (bss *BlockSyncService) collectChainSegment(start []cid.Cid, length uint64, } } -func (bss *BlockSyncService) gatherMessages(ts *types.TipSet) ([]*types.SignedMessage, [][]int, error) { - msgmap := make(map[cid.Cid]int) - var allmsgs []*types.SignedMessage - var msgincl [][]int +func (bss *BlockSyncService) gatherMessages(ts *types.TipSet) ([]*types.Message, [][]int, []*types.SignedMessage, [][]int, error) { + blsmsgmap := make(map[cid.Cid]int) + secpkmsgmap := make(map[cid.Cid]int) + var secpkmsgs []*types.SignedMessage + var blsmsgs []*types.Message + var secpkincl, blsincl [][]int for _, b := range ts.Blocks() { - msgs, err := bss.cs.MessagesForBlock(b) + bmsgs, smsgs, err := bss.cs.MessagesForBlock(b) if err != nil { - return nil, nil, err + return nil, nil, nil, nil, err } - log.Infof("MESSAGES FOR BLOCK: %d", len(msgs)) - msgindexes := make([]int, 0, len(msgs)) - for _, m := range msgs { - i, ok := msgmap[m.Cid()] + bmi := make([]int, 0, len(bmsgs)) + for _, m := range bmsgs { + i, ok := blsmsgmap[m.Cid()] if !ok { - i = len(allmsgs) - allmsgs = append(allmsgs, m) - msgmap[m.Cid()] = i + i = len(blsmsgs) + blsmsgs = append(blsmsgs, m) + blsmsgmap[m.Cid()] = i } - msgindexes = append(msgindexes, i) + bmi = append(bmi, i) } - msgincl = append(msgincl, msgindexes) + blsincl = append(blsincl, bmi) + + smi := make([]int, 0, len(smsgs)) + for _, m := range smsgs { + i, ok := secpkmsgmap[m.Cid()] + if !ok { + i = len(secpkmsgs) + secpkmsgs = append(secpkmsgs, m) + secpkmsgmap[m.Cid()] = i + } + + smi = append(smi, i) + } + secpkincl = append(secpkincl, smi) } - return allmsgs, msgincl, nil + return blsmsgs, blsincl, secpkmsgs, secpkincl, nil } type BlockSync struct { @@ -326,8 +346,8 @@ func bstsToFullTipSet(bts *BSTipSet) (*store.FullTipSet, error) { fb := &types.FullBlock{ Header: b, } - for _, mi := range bts.MsgIncludes[i] { - fb.Messages = append(fb.Messages, bts.Messages[mi]) + for _, mi := range bts.BlsMsgIncludes[i] { + fb.BlsMessages = append(fb.BlsMessages, bts.BlsMessages[mi]) } fts.Blocks = append(fts.Blocks, fb) } @@ -404,9 +424,51 @@ func (bs *BlockSync) AddPeer(p peer.ID) { bs.syncPeers[p] = struct{}{} } -func (bs *BlockSync) FetchMessagesByCids(cids []cid.Cid) ([]*types.SignedMessage, error) { +func (bs *BlockSync) FetchMessagesByCids(ctx context.Context, cids []cid.Cid) ([]*types.Message, error) { + out := make([]*types.Message, len(cids)) + + err := bs.fetchCids(ctx, cids, func(i int, b blocks.Block) error { + msg, err := types.DecodeMessage(b.RawData()) + if err != nil { + return err + } + + if out[i] != nil { + return fmt.Errorf("received duplicate message") + } + + out[i] = msg + return nil + }) + if err != nil { + return nil, err + } + return out, nil +} + +func (bs *BlockSync) FetchSignedMessagesByCids(ctx context.Context, cids []cid.Cid) ([]*types.SignedMessage, error) { out := make([]*types.SignedMessage, len(cids)) + err := bs.fetchCids(ctx, cids, func(i int, b blocks.Block) error { + smsg, err := types.DecodeSignedMessage(b.RawData()) + if err != nil { + return err + } + + if out[i] != nil { + return fmt.Errorf("received duplicate message") + } + + out[i] = smsg + return nil + }) + if err != nil { + return nil, err + } + return out, nil +} + +func (bs *BlockSync) fetchCids(ctx context.Context, cids []cid.Cid, cb func(int, blocks.Block) error) error { resp := bs.bserv.GetBlocks(context.TODO(), cids) m := make(map[cid.Cid]int) @@ -422,25 +484,19 @@ func (bs *BlockSync) FetchMessagesByCids(cids []cid.Cid) ([]*types.SignedMessage break } - return nil, fmt.Errorf("failed to fetch all messages") + return fmt.Errorf("failed to fetch all messages") } - sm, err := types.DecodeSignedMessage(v.RawData()) - if err != nil { - return nil, err - } - ix, ok := m[sm.Cid()] + ix, ok := m[v.Cid()] if !ok { - return nil, fmt.Errorf("received message we didnt ask for") + return fmt.Errorf("received message we didnt ask for") } - if out[ix] != nil { - return nil, fmt.Errorf("received duplicate message") + if err := cb(ix, v); err != nil { + return err } - - out[ix] = sm } } - return out, nil + return nil } diff --git a/chain/gen/mining.go b/chain/gen/mining.go index 03e53e016..48f9fcd3c 100644 --- a/chain/gen/mining.go +++ b/chain/gen/mining.go @@ -41,22 +41,38 @@ func MinerCreateBlock(ctx context.Context, cs *store.ChainStore, miner address.A Height: height, } - var msgCids []cid.Cid + var blsMessages []*types.Message + var secpkMessages []*types.SignedMessage + + var blsMsgCids, secpkMsgCids []cid.Cid var blsSigs []types.Signature - var receipts []interface{} for _, msg := range msgs { if msg.Signature.TypeCode() == 2 { blsSigs = append(blsSigs, msg.Signature) + blsMessages = append(blsMessages, &msg.Message) c, err := cs.PutMessage(&msg.Message) if err != nil { return nil, err } - msgCids = append(msgCids, c) + blsMsgCids = append(blsMsgCids, c) } else { - msgCids = append(msgCids, msg.Cid()) + secpkMsgCids = append(secpkMsgCids, msg.Cid()) + secpkMessages = append(secpkMessages, msg) } + } + + var receipts []interface{} + for _, msg := range blsMessages { + rec, err := vmi.ApplyMessage(ctx, msg) + if err != nil { + return nil, errors.Wrap(err, "apply message failure") + } + + receipts = append(receipts, rec) + } + for _, msg := range secpkMessages { rec, err := vmi.ApplyMessage(ctx, &msg.Message) if err != nil { return nil, errors.Wrap(err, "apply message failure") @@ -66,11 +82,23 @@ func MinerCreateBlock(ctx context.Context, cs *store.ChainStore, miner address.A } cst := hamt.CSTFromBstore(cs.Blockstore()) - msgroot, err := sharray.Build(context.TODO(), 4, toIfArr(msgCids), cst) + blsmsgroot, err := sharray.Build(context.TODO(), 4, toIfArr(blsMsgCids), cst) if err != nil { return nil, err } - next.Messages = msgroot + secpkmsgroot, err := sharray.Build(context.TODO(), 4, toIfArr(secpkMsgCids), cst) + if err != nil { + return nil, err + } + + mmcid, err := cst.Put(context.TODO(), &types.MsgMeta{ + BlsMessages: blsmsgroot, + SecpkMessages: secpkmsgroot, + }) + if err != nil { + return nil, err + } + next.Messages = mmcid rectroot, err := sharray.Build(context.TODO(), 4, receipts, cst) if err != nil { @@ -94,8 +122,9 @@ func MinerCreateBlock(ctx context.Context, cs *store.ChainStore, miner address.A next.ParentWeight = types.NewInt(pweight) fullBlock := &types.FullBlock{ - Header: next, - Messages: msgs, + Header: next, + BlsMessages: blsMessages, + SecpkMessages: secpkMessages, } return fullBlock, nil diff --git a/chain/gen/utils.go b/chain/gen/utils.go index a6454bd76..f7603e9ae 100644 --- a/chain/gen/utils.go +++ b/chain/gen/utils.go @@ -147,10 +147,19 @@ func MakeGenesisBlock(bs bstore.Blockstore, balances map[address.Address]types.B } cst := hamt.CSTFromBstore(bs) + emptyroot, err := sharray.Build(context.TODO(), 4, []interface{}{}, cst) if err != nil { return nil, err } + mmcid, err := cst.Put(context.TODO(), &types.MsgMeta{ + BlsMessages: emptyroot, + SecpkMessages: emptyroot, + }) + if err != nil { + return nil, err + } + fmt.Println("Empty Genesis root: ", emptyroot) b := &types.BlockHeader{ @@ -161,7 +170,7 @@ func MakeGenesisBlock(bs bstore.Blockstore, balances map[address.Address]types.B Height: 0, ParentWeight: types.NewInt(0), StateRoot: stateroot, - Messages: emptyroot, + Messages: mmcid, MessageReceipts: emptyroot, } diff --git a/chain/messagepool.go b/chain/messagepool.go index 66a87fa34..7d5c3ca35 100644 --- a/chain/messagepool.go +++ b/chain/messagepool.go @@ -127,29 +127,55 @@ func (mp *MessagePool) Pending() []*types.SignedMessage { func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) error { for _, ts := range revert { for _, b := range ts.Blocks() { - msgs, err := mp.cs.MessagesForBlock(b) + bmsgs, smsgs, err := mp.cs.MessagesForBlock(b) if err != nil { return errors.Wrapf(err, "failed to get messages for revert block %s(height %d)", b.Cid(), b.Height) } - for _, msg := range msgs { + for _, msg := range smsgs { if err := mp.Add(msg); err != nil { return err } } + + for _, msg := range bmsgs { + smsg := mp.RecoverSig(msg) + if smsg != nil { + if err := mp.Add(smsg); err != nil { + return err + } + } else { + log.Warnf("could not recover signature for bls message %s during a reorg revert", msg.Cid()) + } + } } } for _, ts := range apply { for _, b := range ts.Blocks() { - msgs, err := mp.cs.MessagesForBlock(b) + bmsgs, smsgs, err := mp.cs.MessagesForBlock(b) if err != nil { return errors.Wrapf(err, "failed to get messages for apply block %s(height %d) (msgroot = %s)", b.Cid(), b.Height, b.Messages) } - for _, msg := range msgs { + for _, msg := range smsgs { mp.Remove(msg) } + + for _, msg := range bmsgs { + smsg := mp.RecoverSig(msg) + if smsg != nil { + mp.Remove(smsg) + } else { + // TODO: this one is likely fine + log.Warnf("could not recover signature for bls message %s during a reorg apply", msg.Cid()) + } + } } } return nil } + +func (mp *MessagePool) RecoverSig(msg *types.Message) *types.SignedMessage { + // TODO: persist signatures for BLS messages for a little while in case of reorgs + return nil +} diff --git a/chain/store/store.go b/chain/store/store.go index a6d724765..80dfe4860 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -328,7 +328,12 @@ func (cs *ChainStore) persistBlock(b *types.FullBlock) error { return err } - for _, m := range b.Messages { + for _, m := range b.BlsMessages { + if _, err := cs.PutMessage(m); err != nil { + return err + } + } + for _, m := range b.SecpkMessages { if _, err := cs.PutMessage(m); err != nil { return err } @@ -404,7 +409,17 @@ func (cs *ChainStore) TipSetState(cids []cid.Cid) (cid.Cid, error) { } -func (cs *ChainStore) GetMessage(c cid.Cid) (*types.SignedMessage, error) { +func (cs *ChainStore) GetMessage(c cid.Cid) (*types.Message, error) { + sb, err := cs.bs.Get(c) + if err != nil { + log.Errorf("get message get failed: %s: %s", c, err) + return nil, err + } + + return types.DecodeMessage(sb.RawData()) +} + +func (cs *ChainStore) GetSignedMessage(c cid.Cid) (*types.SignedMessage, error) { sb, err := cs.bs.Get(c) if err != nil { log.Errorf("get message get failed: %s: %s", c, err) @@ -414,9 +429,9 @@ func (cs *ChainStore) GetMessage(c cid.Cid) (*types.SignedMessage, error) { return types.DecodeSignedMessage(sb.RawData()) } -func (cs *ChainStore) MessageCidsForBlock(b *types.BlockHeader) ([]cid.Cid, error) { +func (cs *ChainStore) readSharrayCids(root cid.Cid) ([]cid.Cid, error) { cst := hamt.CSTFromBstore(cs.bs) - shar, err := sharray.Load(context.TODO(), b.Messages, 4, cst) + shar, err := sharray.Load(context.TODO(), root, 4, cst) if err != nil { return nil, errors.Wrap(err, "sharray load") } @@ -438,13 +453,34 @@ func (cs *ChainStore) MessageCidsForBlock(b *types.BlockHeader) ([]cid.Cid, erro return cids, nil } -func (cs *ChainStore) MessagesForBlock(b *types.BlockHeader) ([]*types.SignedMessage, error) { - cids, err := cs.MessageCidsForBlock(b) - if err != nil { - return nil, errors.Wrap(err, "loading message cids for block") +func (cs *ChainStore) MessagesForBlock(b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) { + cst := hamt.CSTFromBstore(cs.bs) + var msgmeta types.MsgMeta + if err := cst.Get(context.TODO(), b.Messages, &msgmeta); err != nil { + return nil, nil, err } - return cs.LoadMessagesFromCids(cids) + blscids, err := cs.readSharrayCids(msgmeta.BlsMessages) + if err != nil { + return nil, nil, errors.Wrap(err, "loading bls message cids for block") + } + + secpkcids, err := cs.readSharrayCids(msgmeta.SecpkMessages) + if err != nil { + return nil, nil, errors.Wrap(err, "loading secpk message cids for block") + } + + blsmsgs, err := cs.LoadMessagesFromCids(blscids) + if err != nil { + return nil, nil, errors.Wrap(err, "loading bls messages for block") + } + + secpkmsgs, err := cs.LoadSignedMessagesFromCids(secpkcids) + if err != nil { + return nil, nil, errors.Wrap(err, "loading secpk messages for block") + } + + return blsmsgs, secpkmsgs, nil } func (cs *ChainStore) GetReceipt(b *types.BlockHeader, i int) (*types.MessageReceipt, error) { @@ -472,8 +508,8 @@ func (cs *ChainStore) GetReceipt(b *types.BlockHeader, i int) (*types.MessageRec return &r, nil } -func (cs *ChainStore) LoadMessagesFromCids(cids []cid.Cid) ([]*types.SignedMessage, error) { - msgs := make([]*types.SignedMessage, 0, len(cids)) +func (cs *ChainStore) LoadMessagesFromCids(cids []cid.Cid) ([]*types.Message, error) { + msgs := make([]*types.Message, 0, len(cids)) for i, c := range cids { m, err := cs.GetMessage(c) if err != nil { @@ -486,6 +522,20 @@ func (cs *ChainStore) LoadMessagesFromCids(cids []cid.Cid) ([]*types.SignedMessa return msgs, nil } +func (cs *ChainStore) LoadSignedMessagesFromCids(cids []cid.Cid) ([]*types.SignedMessage, error) { + msgs := make([]*types.SignedMessage, 0, len(cids)) + for i, c := range cids { + m, err := cs.GetSignedMessage(c) + if err != nil { + return nil, errors.Wrapf(err, "failed to get message: (%s):%d", c, i) + } + + msgs = append(msgs, m) + } + + return msgs, nil +} + func (cs *ChainStore) GetBalance(addr address.Address) (types.BigInt, error) { act, err := cs.GetActor(addr) if err != nil { @@ -560,17 +610,34 @@ func (cs *ChainStore) tipsetContainsMsg(ts *types.TipSet, msg cid.Cid) (cid.Cid, } func (cs *ChainStore) blockContainsMsg(blk *types.BlockHeader, msg cid.Cid) (*types.MessageReceipt, error) { - msgs, err := cs.MessageCidsForBlock(blk) - if err != nil { + cst := hamt.CSTFromBstore(cs.bs) + var msgmeta types.MsgMeta + if err := cst.Get(context.TODO(), blk.Messages, &msgmeta); err != nil { return nil, err } - for i, mc := range msgs { - if mc == msg { + blscids, err := cs.readSharrayCids(msgmeta.BlsMessages) + if err != nil { + return nil, errors.Wrap(err, "loading bls message cids for block") + } + + for i, c := range blscids { + if c == msg { return cs.GetReceipt(blk, i) } } + secpkcids, err := cs.readSharrayCids(msgmeta.SecpkMessages) + if err != nil { + return nil, errors.Wrap(err, "loading secpk message cids for block") + } + + for i, c := range secpkcids { + if c == msg { + return cs.GetReceipt(blk, i+len(blscids)) + } + } + return nil, nil } @@ -582,7 +649,7 @@ func (cs *ChainStore) TryFillTipSet(ts *types.TipSet) (*FullTipSet, error) { var out []*types.FullBlock for _, b := range ts.Blocks() { - msgs, err := cs.MessagesForBlock(b) + bmsgs, smsgs, err := cs.MessagesForBlock(b) if err != nil { // TODO: check for 'not found' errors, and only return nil if this // is actually a 'not found' error @@ -590,8 +657,9 @@ func (cs *ChainStore) TryFillTipSet(ts *types.TipSet) (*FullTipSet, error) { } fb := &types.FullBlock{ - Header: b, - Messages: msgs, + Header: b, + BlsMessages: bmsgs, + SecpkMessages: smsgs, } out = append(out, fb) diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index c10d6f2c8..06898eb70 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -28,15 +28,24 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha } go func() { - msgs, err := s.Bsync.FetchMessagesByCids(blk.Messages) + log.Info("about to fetch messages for block from pubsub") + bmsgs, err := s.Bsync.FetchMessagesByCids(context.TODO(), blk.BlsMessages) if err != nil { - log.Errorf("failed to fetch all messages for block received over pubusb: %s", err) + log.Errorf("failed to fetch all bls messages for block received over pubusb: %s", err) return } + + smsgs, err := s.Bsync.FetchSignedMessagesByCids(context.TODO(), blk.SecpkMessages) + if err != nil { + log.Errorf("failed to fetch all secpk messages for block received over pubusb: %s", err) + return + } + log.Info("inform new block over pubsub") s.InformNewBlock(msg.GetFrom(), &types.FullBlock{ - Header: blk.Header, - Messages: msgs, + Header: blk.Header, + BlsMessages: bmsgs, + SecpkMessages: smsgs, }) }() } diff --git a/chain/sync.go b/chain/sync.go index 9102a52a0..9a1ec0dd4 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -144,32 +144,57 @@ func copyBlockstore(from, to bstore.Blockstore) error { return nil } -func zipTipSetAndMessages(cst *hamt.CborIpldStore, ts *types.TipSet, messages []*types.SignedMessage, msgincl [][]int) (*store.FullTipSet, error) { - if len(ts.Blocks()) != len(msgincl) { +// TODO: this function effectively accepts unchecked input from the network, +// either validate it here, or ensure that its validated elsewhere (maybe make +// sure the blocksync code checks it?) +// maybe this code should actually live in blocksync?? +func zipTipSetAndMessages(cst *hamt.CborIpldStore, ts *types.TipSet, allbmsgs []*types.Message, allsmsgs []*types.SignedMessage, bmi, smi [][]int) (*store.FullTipSet, error) { + if len(ts.Blocks()) != len(smi) || len(ts.Blocks()) != len(bmi) { return nil, fmt.Errorf("msgincl length didnt match tipset size") } fts := &store.FullTipSet{} for bi, b := range ts.Blocks() { - var msgs []*types.SignedMessage - var msgCids []interface{} - for _, m := range msgincl[bi] { - msgs = append(msgs, messages[m]) - msgCids = append(msgCids, messages[m].Cid()) + var smsgs []*types.SignedMessage + var smsgCids []interface{} + for _, m := range smi[bi] { + smsgs = append(smsgs, allsmsgs[m]) + smsgCids = append(smsgCids, allsmsgs[m].Cid()) } - mroot, err := sharray.Build(context.TODO(), 4, msgCids, cst) + smroot, err := sharray.Build(context.TODO(), 4, smsgCids, cst) if err != nil { return nil, err } - if b.Messages != mroot { + var bmsgs []*types.Message + var bmsgCids []interface{} + for _, m := range bmi[bi] { + bmsgs = append(bmsgs, allbmsgs[m]) + bmsgCids = append(bmsgCids, allbmsgs[m].Cid()) + } + + bmroot, err := sharray.Build(context.TODO(), 4, bmsgCids, cst) + if err != nil { + return nil, err + } + + mrcid, err := cst.Put(context.TODO(), &types.MsgMeta{ + BlsMessages: bmroot, + SecpkMessages: smroot, + }) + if err != nil { + return nil, err + } + + if b.Messages != mrcid { return nil, fmt.Errorf("messages didnt match message root in header") } fb := &types.FullBlock{ - Header: b, - Messages: msgs, + Header: b, + BlsMessages: bmsgs, + SecpkMessages: smsgs, } fts.Blocks = append(fts.Blocks, fb) @@ -238,14 +263,15 @@ func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*store.FullTipSet, erro fts := &store.FullTipSet{} for _, b := range ts.Blocks() { - messages, err := syncer.store.MessagesForBlock(b) + bmsgs, smsgs, err := syncer.store.MessagesForBlock(b) if err != nil { return nil, err } fb := &types.FullBlock{ - Header: b, - Messages: messages, + Header: b, + BlsMessages: bmsgs, + SecpkMessages: smsgs, } fts.Blocks = append(fts.Blocks, fb) } @@ -313,10 +339,19 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err } var receipts []interface{} - for _, m := range b.Messages { + for i, m := range b.BlsMessages { + receipt, err := vmi.ApplyMessage(ctx, m) + if err != nil { + return xerrors.Errorf("failed executing bls message %d in block %s: %w", i, b.Header.Cid(), err) + } + + receipts = append(receipts, receipt) + } + + for i, m := range b.SecpkMessages { receipt, err := vmi.ApplyMessage(ctx, &m.Message) if err != nil { - return err + return xerrors.Errorf("failed executing secpk message %d in block %s: %w", i, b.Header.Cid(), err) } receipts = append(receipts, receipt) @@ -460,7 +495,7 @@ func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.Fu for bsi := 0; bsi < len(bstips); bsi++ { this := headers[i+bsi] bstip := bstips[len(bstips)-(bsi+1)] - fts, err := zipTipSetAndMessages(cst, this, bstip.Messages, bstip.MsgIncludes) + fts, err := zipTipSetAndMessages(cst, this, bstip.BlsMessages, bstip.SecpkMessages, bstip.BlsMsgIncludes, bstip.SecpkMsgIncludes) if err != nil { log.Error("zipping failed: ", err, bsi, i) log.Error("height: ", this.Height()) @@ -489,23 +524,22 @@ func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.Fu func persistMessages(bs bstore.Blockstore, bstips []*BSTipSet) error { for _, bst := range bstips { - for _, m := range bst.Messages { - switch m.Signature.Type { - case types.KTBLS: - //log.Infof("putting BLS message: %s", m.Cid()) - if _, err := store.PutMessage(bs, &m.Message); err != nil { - log.Error("failed to persist messages: ", err) - return xerrors.Errorf("BLS message processing failed: %w", err) - } - case types.KTSecp256k1: - //log.Infof("putting secp256k1 message: %s", m.Cid()) - if _, err := store.PutMessage(bs, m); err != nil { - log.Error("failed to persist messages: ", err) - return xerrors.Errorf("secp256k1 message processing failed: %w", err) - } - default: + for _, m := range bst.BlsMessages { + //log.Infof("putting BLS message: %s", m.Cid()) + if _, err := store.PutMessage(bs, m); err != nil { + log.Error("failed to persist messages: ", err) + return xerrors.Errorf("BLS message processing failed: %w", err) + } + } + for _, m := range bst.SecpkMessages { + if m.Signature.Type != types.KTSecp256k1 { return xerrors.Errorf("unknown signature type on message %s: %q", m.Cid(), m.Signature.TypeCode) } + //log.Infof("putting secp256k1 message: %s", m.Cid()) + if _, err := store.PutMessage(bs, m); err != nil { + log.Error("failed to persist messages: ", err) + return xerrors.Errorf("secp256k1 message processing failed: %w", err) + } } } diff --git a/chain/sync_test.go b/chain/sync_test.go index b44213cc9..da54a43f2 100644 --- a/chain/sync_test.go +++ b/chain/sync_test.go @@ -164,11 +164,11 @@ func (tu *syncTestUtil) submitSourceBlock(to int, h int) { // -1 to match block.Height b.Header = tu.blocks[h-1].Header - for _, msg := range tu.blocks[h-1].Messages { + for _, msg := range tu.blocks[h-1].SecpkMessages { c, err := tu.nds[to].(*impl.FullNodeAPI).Chain.PutMessage(msg) require.NoError(tu.t, err) - b.Messages = append(b.Messages, c) + b.SecpkMessages = append(b.SecpkMessages, c) } require.NoError(tu.t, tu.nds[to].ChainSubmitBlock(tu.ctx, &b)) diff --git a/chain/types.go b/chain/types.go index 6a021a35a..e9c1da3d7 100644 --- a/chain/types.go +++ b/chain/types.go @@ -130,8 +130,9 @@ func (f *filecoinIpldNode) String() string { } type BlockMsg struct { - Header *types.BlockHeader - Messages []cid.Cid + Header *types.BlockHeader + BlsMessages []cid.Cid + SecpkMessages []cid.Cid } func DecodeBlockMsg(b []byte) (*BlockMsg, error) { diff --git a/chain/types/blockheader.go b/chain/types/blockheader.go index 9c953ca17..d4a2e126d 100644 --- a/chain/types/blockheader.go +++ b/chain/types/blockheader.go @@ -71,6 +71,7 @@ func init() { }, nil })). Complete()) + cbor.RegisterCborType(MsgMeta{}) } type Ticket []byte @@ -98,6 +99,11 @@ type BlockHeader struct { MessageReceipts cid.Cid } +type MsgMeta struct { + BlsMessages cid.Cid + SecpkMessages cid.Cid +} + func (b *BlockHeader) ToStorageBlock() (block.Block, error) { data, err := b.Serialize() if err != nil { diff --git a/chain/types/fullblock.go b/chain/types/fullblock.go index 286f17038..5511cea8c 100644 --- a/chain/types/fullblock.go +++ b/chain/types/fullblock.go @@ -3,8 +3,9 @@ package types import "github.com/ipfs/go-cid" type FullBlock struct { - Header *BlockHeader - Messages []*SignedMessage + Header *BlockHeader + BlsMessages []*Message + SecpkMessages []*SignedMessage } func (fb *FullBlock) Cid() cid.Cid { diff --git a/chain/types/message.go b/chain/types/message.go index 369fd7aed..b6d9008f5 100644 --- a/chain/types/message.go +++ b/chain/types/message.go @@ -114,3 +114,12 @@ func (m *Message) ToStorageBlock() (block.Block, error) { return block.NewBlockWithCid(data, c) } + +func (m *Message) Cid() cid.Cid { + b, err := m.ToStorageBlock() + if err != nil { + panic(fmt.Sprintf("failed to marshal message: %s", err)) + } + + return b.Cid() +} diff --git a/node/impl/full.go b/node/impl/full.go index 9afd68e0e..a1ace9be8 100644 --- a/node/impl/full.go +++ b/node/impl/full.go @@ -77,10 +77,10 @@ func (a *FullNodeAPI) ChainGetBlock(ctx context.Context, msg cid.Cid) (*types.Bl return a.Chain.GetBlock(msg) } -func (a *FullNodeAPI) ChainGetBlockMessages(ctx context.Context, msg cid.Cid) ([]*types.SignedMessage, error) { +func (a *FullNodeAPI) ChainGetBlockMessages(ctx context.Context, msg cid.Cid) ([]*types.Message, []*types.SignedMessage, error) { b, err := a.Chain.GetBlock(msg) if err != nil { - return nil, err + return nil, nil, err } return a.Chain.MessagesForBlock(b) @@ -153,8 +153,11 @@ func (a *FullNodeAPI) MinerCreateBlock(ctx context.Context, addr address.Address var out chain.BlockMsg out.Header = fblk.Header - for _, msg := range fblk.Messages { - out.Messages = append(out.Messages, msg.Cid()) + for _, msg := range fblk.BlsMessages { + out.BlsMessages = append(out.BlsMessages, msg.Cid()) + } + for _, msg := range fblk.SecpkMessages { + out.SecpkMessages = append(out.SecpkMessages, msg.Cid()) } return &out, nil