refactoring to accomodate the bls message changes in the spec

This commit is contained in:
whyrusleeping 2019-08-01 13:40:47 -07:00
parent db005e99e0
commit dc7c0fcabe
15 changed files with 368 additions and 117 deletions

View File

@ -72,7 +72,7 @@ type FullNode interface {
ChainGetRandomness(context.Context, *types.TipSet) ([]byte, error) ChainGetRandomness(context.Context, *types.TipSet) ([]byte, error)
ChainWaitMsg(context.Context, cid.Cid) (*MsgWait, error) ChainWaitMsg(context.Context, cid.Cid) (*MsgWait, error)
ChainGetBlock(context.Context, cid.Cid) (*types.BlockHeader, error) ChainGetBlock(context.Context, cid.Cid) (*types.BlockHeader, error)
ChainGetBlockMessages(context.Context, cid.Cid) ([]*types.SignedMessage, error) ChainGetBlockMessages(context.Context, cid.Cid) ([]*types.Message, []*types.SignedMessage, error)
// if tipset is nil, we'll use heaviest // if tipset is nil, we'll use heaviest
ChainCall(context.Context, *types.Message, *types.TipSet) (*types.MessageReceipt, error) ChainCall(context.Context, *types.Message, *types.TipSet) (*types.MessageReceipt, error)

View File

@ -45,7 +45,7 @@ type FullNodeStruct struct {
ChainGetRandomness func(context.Context, *types.TipSet) ([]byte, error) `perm:"read"` ChainGetRandomness func(context.Context, *types.TipSet) ([]byte, error) `perm:"read"`
ChainWaitMsg func(context.Context, cid.Cid) (*MsgWait, error) `perm:"read"` ChainWaitMsg func(context.Context, cid.Cid) (*MsgWait, error) `perm:"read"`
ChainGetBlock func(context.Context, cid.Cid) (*types.BlockHeader, error) `perm:"read"` ChainGetBlock func(context.Context, cid.Cid) (*types.BlockHeader, error) `perm:"read"`
ChainGetBlockMessages func(context.Context, cid.Cid) ([]*types.SignedMessage, error) `perm:"read"` ChainGetBlockMessages func(context.Context, cid.Cid) ([]*types.Message, []*types.SignedMessage, error) `perm:"read"`
ChainCall func(context.Context, *types.Message, *types.TipSet) (*types.MessageReceipt, error) `perm:"read"` ChainCall func(context.Context, *types.Message, *types.TipSet) (*types.MessageReceipt, error) `perm:"read"`
MpoolPending func(context.Context, *types.TipSet) ([]*types.SignedMessage, error) `perm:"read"` MpoolPending func(context.Context, *types.TipSet) ([]*types.SignedMessage, error) `perm:"read"`
@ -188,7 +188,7 @@ func (c *FullNodeStruct) ChainGetBlock(ctx context.Context, b cid.Cid) (*types.B
return c.Internal.ChainGetBlock(ctx, b) return c.Internal.ChainGetBlock(ctx, b)
} }
func (c *FullNodeStruct) ChainGetBlockMessages(ctx context.Context, b cid.Cid) ([]*types.SignedMessage, error) { func (c *FullNodeStruct) ChainGetBlockMessages(ctx context.Context, b cid.Cid) ([]*types.Message, []*types.SignedMessage, error) {
return c.Internal.ChainGetBlockMessages(ctx, b) return c.Internal.ChainGetBlockMessages(ctx, b)
} }

View File

@ -17,6 +17,7 @@ import (
"github.com/filecoin-project/go-lotus/lib/cborrpc" "github.com/filecoin-project/go-lotus/lib/cborrpc"
"github.com/filecoin-project/go-lotus/node/modules/dtypes" "github.com/filecoin-project/go-lotus/node/modules/dtypes"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor" cbor "github.com/ipfs/go-ipld-cbor"
inet "github.com/libp2p/go-libp2p-core/network" inet "github.com/libp2p/go-libp2p-core/network"
@ -71,8 +72,11 @@ type BlockSyncResponse struct {
type BSTipSet struct { type BSTipSet struct {
Blocks []*types.BlockHeader Blocks []*types.BlockHeader
Messages []*types.SignedMessage BlsMessages []*types.Message
MsgIncludes [][]int BlsMsgIncludes [][]int
SecpkMessages []*types.SignedMessage
SecpkMsgIncludes [][]int
} }
func NewBlockSyncService(cs *store.ChainStore) *BlockSyncService { func NewBlockSyncService(cs *store.ChainStore) *BlockSyncService {
@ -130,13 +134,15 @@ func (bss *BlockSyncService) collectChainSegment(start []cid.Cid, length uint64,
} }
if opts.IncludeMessages { if opts.IncludeMessages {
msgs, mincl, err := bss.gatherMessages(ts) bmsgs, bmincl, smsgs, smincl, err := bss.gatherMessages(ts)
if err != nil { if err != nil {
return nil, err return nil, xerrors.Errorf("gather messages failed: %w", err)
} }
bst.Messages = msgs bst.BlsMessages = bmsgs
bst.MsgIncludes = mincl bst.BlsMsgIncludes = bmincl
bst.SecpkMessages = smsgs
bst.SecpkMsgIncludes = smincl
} }
if opts.IncludeBlocks { if opts.IncludeBlocks {
@ -153,33 +159,47 @@ func (bss *BlockSyncService) collectChainSegment(start []cid.Cid, length uint64,
} }
} }
func (bss *BlockSyncService) gatherMessages(ts *types.TipSet) ([]*types.SignedMessage, [][]int, error) { func (bss *BlockSyncService) gatherMessages(ts *types.TipSet) ([]*types.Message, [][]int, []*types.SignedMessage, [][]int, error) {
msgmap := make(map[cid.Cid]int) blsmsgmap := make(map[cid.Cid]int)
var allmsgs []*types.SignedMessage secpkmsgmap := make(map[cid.Cid]int)
var msgincl [][]int var secpkmsgs []*types.SignedMessage
var blsmsgs []*types.Message
var secpkincl, blsincl [][]int
for _, b := range ts.Blocks() { for _, b := range ts.Blocks() {
msgs, err := bss.cs.MessagesForBlock(b) bmsgs, smsgs, err := bss.cs.MessagesForBlock(b)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, nil, nil, err
} }
log.Infof("MESSAGES FOR BLOCK: %d", len(msgs))
msgindexes := make([]int, 0, len(msgs)) bmi := make([]int, 0, len(bmsgs))
for _, m := range msgs { for _, m := range bmsgs {
i, ok := msgmap[m.Cid()] i, ok := blsmsgmap[m.Cid()]
if !ok { if !ok {
i = len(allmsgs) i = len(blsmsgs)
allmsgs = append(allmsgs, m) blsmsgs = append(blsmsgs, m)
msgmap[m.Cid()] = i blsmsgmap[m.Cid()] = i
} }
msgindexes = append(msgindexes, i) bmi = append(bmi, i)
} }
msgincl = append(msgincl, msgindexes) blsincl = append(blsincl, bmi)
smi := make([]int, 0, len(smsgs))
for _, m := range smsgs {
i, ok := secpkmsgmap[m.Cid()]
if !ok {
i = len(secpkmsgs)
secpkmsgs = append(secpkmsgs, m)
secpkmsgmap[m.Cid()] = i
}
smi = append(smi, i)
}
secpkincl = append(secpkincl, smi)
} }
return allmsgs, msgincl, nil return blsmsgs, blsincl, secpkmsgs, secpkincl, nil
} }
type BlockSync struct { type BlockSync struct {
@ -326,8 +346,8 @@ func bstsToFullTipSet(bts *BSTipSet) (*store.FullTipSet, error) {
fb := &types.FullBlock{ fb := &types.FullBlock{
Header: b, Header: b,
} }
for _, mi := range bts.MsgIncludes[i] { for _, mi := range bts.BlsMsgIncludes[i] {
fb.Messages = append(fb.Messages, bts.Messages[mi]) fb.BlsMessages = append(fb.BlsMessages, bts.BlsMessages[mi])
} }
fts.Blocks = append(fts.Blocks, fb) fts.Blocks = append(fts.Blocks, fb)
} }
@ -404,9 +424,51 @@ func (bs *BlockSync) AddPeer(p peer.ID) {
bs.syncPeers[p] = struct{}{} bs.syncPeers[p] = struct{}{}
} }
func (bs *BlockSync) FetchMessagesByCids(cids []cid.Cid) ([]*types.SignedMessage, error) { func (bs *BlockSync) FetchMessagesByCids(ctx context.Context, cids []cid.Cid) ([]*types.Message, error) {
out := make([]*types.Message, len(cids))
err := bs.fetchCids(ctx, cids, func(i int, b blocks.Block) error {
msg, err := types.DecodeMessage(b.RawData())
if err != nil {
return err
}
if out[i] != nil {
return fmt.Errorf("received duplicate message")
}
out[i] = msg
return nil
})
if err != nil {
return nil, err
}
return out, nil
}
func (bs *BlockSync) FetchSignedMessagesByCids(ctx context.Context, cids []cid.Cid) ([]*types.SignedMessage, error) {
out := make([]*types.SignedMessage, len(cids)) out := make([]*types.SignedMessage, len(cids))
err := bs.fetchCids(ctx, cids, func(i int, b blocks.Block) error {
smsg, err := types.DecodeSignedMessage(b.RawData())
if err != nil {
return err
}
if out[i] != nil {
return fmt.Errorf("received duplicate message")
}
out[i] = smsg
return nil
})
if err != nil {
return nil, err
}
return out, nil
}
func (bs *BlockSync) fetchCids(ctx context.Context, cids []cid.Cid, cb func(int, blocks.Block) error) error {
resp := bs.bserv.GetBlocks(context.TODO(), cids) resp := bs.bserv.GetBlocks(context.TODO(), cids)
m := make(map[cid.Cid]int) m := make(map[cid.Cid]int)
@ -422,25 +484,19 @@ func (bs *BlockSync) FetchMessagesByCids(cids []cid.Cid) ([]*types.SignedMessage
break break
} }
return nil, fmt.Errorf("failed to fetch all messages") return fmt.Errorf("failed to fetch all messages")
} }
sm, err := types.DecodeSignedMessage(v.RawData()) ix, ok := m[v.Cid()]
if err != nil {
return nil, err
}
ix, ok := m[sm.Cid()]
if !ok { if !ok {
return nil, fmt.Errorf("received message we didnt ask for") return fmt.Errorf("received message we didnt ask for")
} }
if out[ix] != nil { if err := cb(ix, v); err != nil {
return nil, fmt.Errorf("received duplicate message") return err
} }
out[ix] = sm
} }
} }
return out, nil return nil
} }

View File

@ -41,22 +41,38 @@ func MinerCreateBlock(ctx context.Context, cs *store.ChainStore, miner address.A
Height: height, Height: height,
} }
var msgCids []cid.Cid var blsMessages []*types.Message
var secpkMessages []*types.SignedMessage
var blsMsgCids, secpkMsgCids []cid.Cid
var blsSigs []types.Signature var blsSigs []types.Signature
var receipts []interface{}
for _, msg := range msgs { for _, msg := range msgs {
if msg.Signature.TypeCode() == 2 { if msg.Signature.TypeCode() == 2 {
blsSigs = append(blsSigs, msg.Signature) blsSigs = append(blsSigs, msg.Signature)
blsMessages = append(blsMessages, &msg.Message)
c, err := cs.PutMessage(&msg.Message) c, err := cs.PutMessage(&msg.Message)
if err != nil { if err != nil {
return nil, err return nil, err
} }
msgCids = append(msgCids, c) blsMsgCids = append(blsMsgCids, c)
} else { } else {
msgCids = append(msgCids, msg.Cid()) secpkMsgCids = append(secpkMsgCids, msg.Cid())
secpkMessages = append(secpkMessages, msg)
} }
}
var receipts []interface{}
for _, msg := range blsMessages {
rec, err := vmi.ApplyMessage(ctx, msg)
if err != nil {
return nil, errors.Wrap(err, "apply message failure")
}
receipts = append(receipts, rec)
}
for _, msg := range secpkMessages {
rec, err := vmi.ApplyMessage(ctx, &msg.Message) rec, err := vmi.ApplyMessage(ctx, &msg.Message)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "apply message failure") return nil, errors.Wrap(err, "apply message failure")
@ -66,11 +82,23 @@ func MinerCreateBlock(ctx context.Context, cs *store.ChainStore, miner address.A
} }
cst := hamt.CSTFromBstore(cs.Blockstore()) cst := hamt.CSTFromBstore(cs.Blockstore())
msgroot, err := sharray.Build(context.TODO(), 4, toIfArr(msgCids), cst) blsmsgroot, err := sharray.Build(context.TODO(), 4, toIfArr(blsMsgCids), cst)
if err != nil { if err != nil {
return nil, err return nil, err
} }
next.Messages = msgroot secpkmsgroot, err := sharray.Build(context.TODO(), 4, toIfArr(secpkMsgCids), cst)
if err != nil {
return nil, err
}
mmcid, err := cst.Put(context.TODO(), &types.MsgMeta{
BlsMessages: blsmsgroot,
SecpkMessages: secpkmsgroot,
})
if err != nil {
return nil, err
}
next.Messages = mmcid
rectroot, err := sharray.Build(context.TODO(), 4, receipts, cst) rectroot, err := sharray.Build(context.TODO(), 4, receipts, cst)
if err != nil { if err != nil {
@ -94,8 +122,9 @@ func MinerCreateBlock(ctx context.Context, cs *store.ChainStore, miner address.A
next.ParentWeight = types.NewInt(pweight) next.ParentWeight = types.NewInt(pweight)
fullBlock := &types.FullBlock{ fullBlock := &types.FullBlock{
Header: next, Header: next,
Messages: msgs, BlsMessages: blsMessages,
SecpkMessages: secpkMessages,
} }
return fullBlock, nil return fullBlock, nil

View File

@ -147,10 +147,19 @@ func MakeGenesisBlock(bs bstore.Blockstore, balances map[address.Address]types.B
} }
cst := hamt.CSTFromBstore(bs) cst := hamt.CSTFromBstore(bs)
emptyroot, err := sharray.Build(context.TODO(), 4, []interface{}{}, cst) emptyroot, err := sharray.Build(context.TODO(), 4, []interface{}{}, cst)
if err != nil { if err != nil {
return nil, err return nil, err
} }
mmcid, err := cst.Put(context.TODO(), &types.MsgMeta{
BlsMessages: emptyroot,
SecpkMessages: emptyroot,
})
if err != nil {
return nil, err
}
fmt.Println("Empty Genesis root: ", emptyroot) fmt.Println("Empty Genesis root: ", emptyroot)
b := &types.BlockHeader{ b := &types.BlockHeader{
@ -161,7 +170,7 @@ func MakeGenesisBlock(bs bstore.Blockstore, balances map[address.Address]types.B
Height: 0, Height: 0,
ParentWeight: types.NewInt(0), ParentWeight: types.NewInt(0),
StateRoot: stateroot, StateRoot: stateroot,
Messages: emptyroot, Messages: mmcid,
MessageReceipts: emptyroot, MessageReceipts: emptyroot,
} }

View File

@ -127,29 +127,55 @@ func (mp *MessagePool) Pending() []*types.SignedMessage {
func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) error { func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) error {
for _, ts := range revert { for _, ts := range revert {
for _, b := range ts.Blocks() { for _, b := range ts.Blocks() {
msgs, err := mp.cs.MessagesForBlock(b) bmsgs, smsgs, err := mp.cs.MessagesForBlock(b)
if err != nil { if err != nil {
return errors.Wrapf(err, "failed to get messages for revert block %s(height %d)", b.Cid(), b.Height) return errors.Wrapf(err, "failed to get messages for revert block %s(height %d)", b.Cid(), b.Height)
} }
for _, msg := range msgs { for _, msg := range smsgs {
if err := mp.Add(msg); err != nil { if err := mp.Add(msg); err != nil {
return err return err
} }
} }
for _, msg := range bmsgs {
smsg := mp.RecoverSig(msg)
if smsg != nil {
if err := mp.Add(smsg); err != nil {
return err
}
} else {
log.Warnf("could not recover signature for bls message %s during a reorg revert", msg.Cid())
}
}
} }
} }
for _, ts := range apply { for _, ts := range apply {
for _, b := range ts.Blocks() { for _, b := range ts.Blocks() {
msgs, err := mp.cs.MessagesForBlock(b) bmsgs, smsgs, err := mp.cs.MessagesForBlock(b)
if err != nil { if err != nil {
return errors.Wrapf(err, "failed to get messages for apply block %s(height %d) (msgroot = %s)", b.Cid(), b.Height, b.Messages) return errors.Wrapf(err, "failed to get messages for apply block %s(height %d) (msgroot = %s)", b.Cid(), b.Height, b.Messages)
} }
for _, msg := range msgs { for _, msg := range smsgs {
mp.Remove(msg) mp.Remove(msg)
} }
for _, msg := range bmsgs {
smsg := mp.RecoverSig(msg)
if smsg != nil {
mp.Remove(smsg)
} else {
// TODO: this one is likely fine
log.Warnf("could not recover signature for bls message %s during a reorg apply", msg.Cid())
}
}
} }
} }
return nil return nil
} }
func (mp *MessagePool) RecoverSig(msg *types.Message) *types.SignedMessage {
// TODO: persist signatures for BLS messages for a little while in case of reorgs
return nil
}

View File

@ -328,7 +328,12 @@ func (cs *ChainStore) persistBlock(b *types.FullBlock) error {
return err return err
} }
for _, m := range b.Messages { for _, m := range b.BlsMessages {
if _, err := cs.PutMessage(m); err != nil {
return err
}
}
for _, m := range b.SecpkMessages {
if _, err := cs.PutMessage(m); err != nil { if _, err := cs.PutMessage(m); err != nil {
return err return err
} }
@ -404,7 +409,17 @@ func (cs *ChainStore) TipSetState(cids []cid.Cid) (cid.Cid, error) {
} }
func (cs *ChainStore) GetMessage(c cid.Cid) (*types.SignedMessage, error) { func (cs *ChainStore) GetMessage(c cid.Cid) (*types.Message, error) {
sb, err := cs.bs.Get(c)
if err != nil {
log.Errorf("get message get failed: %s: %s", c, err)
return nil, err
}
return types.DecodeMessage(sb.RawData())
}
func (cs *ChainStore) GetSignedMessage(c cid.Cid) (*types.SignedMessage, error) {
sb, err := cs.bs.Get(c) sb, err := cs.bs.Get(c)
if err != nil { if err != nil {
log.Errorf("get message get failed: %s: %s", c, err) log.Errorf("get message get failed: %s: %s", c, err)
@ -414,9 +429,9 @@ func (cs *ChainStore) GetMessage(c cid.Cid) (*types.SignedMessage, error) {
return types.DecodeSignedMessage(sb.RawData()) return types.DecodeSignedMessage(sb.RawData())
} }
func (cs *ChainStore) MessageCidsForBlock(b *types.BlockHeader) ([]cid.Cid, error) { func (cs *ChainStore) readSharrayCids(root cid.Cid) ([]cid.Cid, error) {
cst := hamt.CSTFromBstore(cs.bs) cst := hamt.CSTFromBstore(cs.bs)
shar, err := sharray.Load(context.TODO(), b.Messages, 4, cst) shar, err := sharray.Load(context.TODO(), root, 4, cst)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "sharray load") return nil, errors.Wrap(err, "sharray load")
} }
@ -438,13 +453,34 @@ func (cs *ChainStore) MessageCidsForBlock(b *types.BlockHeader) ([]cid.Cid, erro
return cids, nil return cids, nil
} }
func (cs *ChainStore) MessagesForBlock(b *types.BlockHeader) ([]*types.SignedMessage, error) { func (cs *ChainStore) MessagesForBlock(b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
cids, err := cs.MessageCidsForBlock(b) cst := hamt.CSTFromBstore(cs.bs)
if err != nil { var msgmeta types.MsgMeta
return nil, errors.Wrap(err, "loading message cids for block") if err := cst.Get(context.TODO(), b.Messages, &msgmeta); err != nil {
return nil, nil, err
} }
return cs.LoadMessagesFromCids(cids) blscids, err := cs.readSharrayCids(msgmeta.BlsMessages)
if err != nil {
return nil, nil, errors.Wrap(err, "loading bls message cids for block")
}
secpkcids, err := cs.readSharrayCids(msgmeta.SecpkMessages)
if err != nil {
return nil, nil, errors.Wrap(err, "loading secpk message cids for block")
}
blsmsgs, err := cs.LoadMessagesFromCids(blscids)
if err != nil {
return nil, nil, errors.Wrap(err, "loading bls messages for block")
}
secpkmsgs, err := cs.LoadSignedMessagesFromCids(secpkcids)
if err != nil {
return nil, nil, errors.Wrap(err, "loading secpk messages for block")
}
return blsmsgs, secpkmsgs, nil
} }
func (cs *ChainStore) GetReceipt(b *types.BlockHeader, i int) (*types.MessageReceipt, error) { func (cs *ChainStore) GetReceipt(b *types.BlockHeader, i int) (*types.MessageReceipt, error) {
@ -472,8 +508,8 @@ func (cs *ChainStore) GetReceipt(b *types.BlockHeader, i int) (*types.MessageRec
return &r, nil return &r, nil
} }
func (cs *ChainStore) LoadMessagesFromCids(cids []cid.Cid) ([]*types.SignedMessage, error) { func (cs *ChainStore) LoadMessagesFromCids(cids []cid.Cid) ([]*types.Message, error) {
msgs := make([]*types.SignedMessage, 0, len(cids)) msgs := make([]*types.Message, 0, len(cids))
for i, c := range cids { for i, c := range cids {
m, err := cs.GetMessage(c) m, err := cs.GetMessage(c)
if err != nil { if err != nil {
@ -486,6 +522,20 @@ func (cs *ChainStore) LoadMessagesFromCids(cids []cid.Cid) ([]*types.SignedMessa
return msgs, nil return msgs, nil
} }
func (cs *ChainStore) LoadSignedMessagesFromCids(cids []cid.Cid) ([]*types.SignedMessage, error) {
msgs := make([]*types.SignedMessage, 0, len(cids))
for i, c := range cids {
m, err := cs.GetSignedMessage(c)
if err != nil {
return nil, errors.Wrapf(err, "failed to get message: (%s):%d", c, i)
}
msgs = append(msgs, m)
}
return msgs, nil
}
func (cs *ChainStore) GetBalance(addr address.Address) (types.BigInt, error) { func (cs *ChainStore) GetBalance(addr address.Address) (types.BigInt, error) {
act, err := cs.GetActor(addr) act, err := cs.GetActor(addr)
if err != nil { if err != nil {
@ -560,17 +610,34 @@ func (cs *ChainStore) tipsetContainsMsg(ts *types.TipSet, msg cid.Cid) (cid.Cid,
} }
func (cs *ChainStore) blockContainsMsg(blk *types.BlockHeader, msg cid.Cid) (*types.MessageReceipt, error) { func (cs *ChainStore) blockContainsMsg(blk *types.BlockHeader, msg cid.Cid) (*types.MessageReceipt, error) {
msgs, err := cs.MessageCidsForBlock(blk) cst := hamt.CSTFromBstore(cs.bs)
if err != nil { var msgmeta types.MsgMeta
if err := cst.Get(context.TODO(), blk.Messages, &msgmeta); err != nil {
return nil, err return nil, err
} }
for i, mc := range msgs { blscids, err := cs.readSharrayCids(msgmeta.BlsMessages)
if mc == msg { if err != nil {
return nil, errors.Wrap(err, "loading bls message cids for block")
}
for i, c := range blscids {
if c == msg {
return cs.GetReceipt(blk, i) return cs.GetReceipt(blk, i)
} }
} }
secpkcids, err := cs.readSharrayCids(msgmeta.SecpkMessages)
if err != nil {
return nil, errors.Wrap(err, "loading secpk message cids for block")
}
for i, c := range secpkcids {
if c == msg {
return cs.GetReceipt(blk, i+len(blscids))
}
}
return nil, nil return nil, nil
} }
@ -582,7 +649,7 @@ func (cs *ChainStore) TryFillTipSet(ts *types.TipSet) (*FullTipSet, error) {
var out []*types.FullBlock var out []*types.FullBlock
for _, b := range ts.Blocks() { for _, b := range ts.Blocks() {
msgs, err := cs.MessagesForBlock(b) bmsgs, smsgs, err := cs.MessagesForBlock(b)
if err != nil { if err != nil {
// TODO: check for 'not found' errors, and only return nil if this // TODO: check for 'not found' errors, and only return nil if this
// is actually a 'not found' error // is actually a 'not found' error
@ -590,8 +657,9 @@ func (cs *ChainStore) TryFillTipSet(ts *types.TipSet) (*FullTipSet, error) {
} }
fb := &types.FullBlock{ fb := &types.FullBlock{
Header: b, Header: b,
Messages: msgs, BlsMessages: bmsgs,
SecpkMessages: smsgs,
} }
out = append(out, fb) out = append(out, fb)

View File

@ -28,15 +28,24 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
} }
go func() { go func() {
msgs, err := s.Bsync.FetchMessagesByCids(blk.Messages) log.Info("about to fetch messages for block from pubsub")
bmsgs, err := s.Bsync.FetchMessagesByCids(context.TODO(), blk.BlsMessages)
if err != nil { if err != nil {
log.Errorf("failed to fetch all messages for block received over pubusb: %s", err) log.Errorf("failed to fetch all bls messages for block received over pubusb: %s", err)
return return
} }
smsgs, err := s.Bsync.FetchSignedMessagesByCids(context.TODO(), blk.SecpkMessages)
if err != nil {
log.Errorf("failed to fetch all secpk messages for block received over pubusb: %s", err)
return
}
log.Info("inform new block over pubsub") log.Info("inform new block over pubsub")
s.InformNewBlock(msg.GetFrom(), &types.FullBlock{ s.InformNewBlock(msg.GetFrom(), &types.FullBlock{
Header: blk.Header, Header: blk.Header,
Messages: msgs, BlsMessages: bmsgs,
SecpkMessages: smsgs,
}) })
}() }()
} }

View File

@ -144,32 +144,57 @@ func copyBlockstore(from, to bstore.Blockstore) error {
return nil return nil
} }
func zipTipSetAndMessages(cst *hamt.CborIpldStore, ts *types.TipSet, messages []*types.SignedMessage, msgincl [][]int) (*store.FullTipSet, error) { // TODO: this function effectively accepts unchecked input from the network,
if len(ts.Blocks()) != len(msgincl) { // either validate it here, or ensure that its validated elsewhere (maybe make
// sure the blocksync code checks it?)
// maybe this code should actually live in blocksync??
func zipTipSetAndMessages(cst *hamt.CborIpldStore, ts *types.TipSet, allbmsgs []*types.Message, allsmsgs []*types.SignedMessage, bmi, smi [][]int) (*store.FullTipSet, error) {
if len(ts.Blocks()) != len(smi) || len(ts.Blocks()) != len(bmi) {
return nil, fmt.Errorf("msgincl length didnt match tipset size") return nil, fmt.Errorf("msgincl length didnt match tipset size")
} }
fts := &store.FullTipSet{} fts := &store.FullTipSet{}
for bi, b := range ts.Blocks() { for bi, b := range ts.Blocks() {
var msgs []*types.SignedMessage var smsgs []*types.SignedMessage
var msgCids []interface{} var smsgCids []interface{}
for _, m := range msgincl[bi] { for _, m := range smi[bi] {
msgs = append(msgs, messages[m]) smsgs = append(smsgs, allsmsgs[m])
msgCids = append(msgCids, messages[m].Cid()) smsgCids = append(smsgCids, allsmsgs[m].Cid())
} }
mroot, err := sharray.Build(context.TODO(), 4, msgCids, cst) smroot, err := sharray.Build(context.TODO(), 4, smsgCids, cst)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if b.Messages != mroot { var bmsgs []*types.Message
var bmsgCids []interface{}
for _, m := range bmi[bi] {
bmsgs = append(bmsgs, allbmsgs[m])
bmsgCids = append(bmsgCids, allbmsgs[m].Cid())
}
bmroot, err := sharray.Build(context.TODO(), 4, bmsgCids, cst)
if err != nil {
return nil, err
}
mrcid, err := cst.Put(context.TODO(), &types.MsgMeta{
BlsMessages: bmroot,
SecpkMessages: smroot,
})
if err != nil {
return nil, err
}
if b.Messages != mrcid {
return nil, fmt.Errorf("messages didnt match message root in header") return nil, fmt.Errorf("messages didnt match message root in header")
} }
fb := &types.FullBlock{ fb := &types.FullBlock{
Header: b, Header: b,
Messages: msgs, BlsMessages: bmsgs,
SecpkMessages: smsgs,
} }
fts.Blocks = append(fts.Blocks, fb) fts.Blocks = append(fts.Blocks, fb)
@ -238,14 +263,15 @@ func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*store.FullTipSet, erro
fts := &store.FullTipSet{} fts := &store.FullTipSet{}
for _, b := range ts.Blocks() { for _, b := range ts.Blocks() {
messages, err := syncer.store.MessagesForBlock(b) bmsgs, smsgs, err := syncer.store.MessagesForBlock(b)
if err != nil { if err != nil {
return nil, err return nil, err
} }
fb := &types.FullBlock{ fb := &types.FullBlock{
Header: b, Header: b,
Messages: messages, BlsMessages: bmsgs,
SecpkMessages: smsgs,
} }
fts.Blocks = append(fts.Blocks, fb) fts.Blocks = append(fts.Blocks, fb)
} }
@ -313,10 +339,19 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err
} }
var receipts []interface{} var receipts []interface{}
for _, m := range b.Messages { for i, m := range b.BlsMessages {
receipt, err := vmi.ApplyMessage(ctx, m)
if err != nil {
return xerrors.Errorf("failed executing bls message %d in block %s: %w", i, b.Header.Cid(), err)
}
receipts = append(receipts, receipt)
}
for i, m := range b.SecpkMessages {
receipt, err := vmi.ApplyMessage(ctx, &m.Message) receipt, err := vmi.ApplyMessage(ctx, &m.Message)
if err != nil { if err != nil {
return err return xerrors.Errorf("failed executing secpk message %d in block %s: %w", i, b.Header.Cid(), err)
} }
receipts = append(receipts, receipt) receipts = append(receipts, receipt)
@ -460,7 +495,7 @@ func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.Fu
for bsi := 0; bsi < len(bstips); bsi++ { for bsi := 0; bsi < len(bstips); bsi++ {
this := headers[i+bsi] this := headers[i+bsi]
bstip := bstips[len(bstips)-(bsi+1)] bstip := bstips[len(bstips)-(bsi+1)]
fts, err := zipTipSetAndMessages(cst, this, bstip.Messages, bstip.MsgIncludes) fts, err := zipTipSetAndMessages(cst, this, bstip.BlsMessages, bstip.SecpkMessages, bstip.BlsMsgIncludes, bstip.SecpkMsgIncludes)
if err != nil { if err != nil {
log.Error("zipping failed: ", err, bsi, i) log.Error("zipping failed: ", err, bsi, i)
log.Error("height: ", this.Height()) log.Error("height: ", this.Height())
@ -489,23 +524,22 @@ func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.Fu
func persistMessages(bs bstore.Blockstore, bstips []*BSTipSet) error { func persistMessages(bs bstore.Blockstore, bstips []*BSTipSet) error {
for _, bst := range bstips { for _, bst := range bstips {
for _, m := range bst.Messages { for _, m := range bst.BlsMessages {
switch m.Signature.Type { //log.Infof("putting BLS message: %s", m.Cid())
case types.KTBLS: if _, err := store.PutMessage(bs, m); err != nil {
//log.Infof("putting BLS message: %s", m.Cid()) log.Error("failed to persist messages: ", err)
if _, err := store.PutMessage(bs, &m.Message); err != nil { return xerrors.Errorf("BLS message processing failed: %w", err)
log.Error("failed to persist messages: ", err) }
return xerrors.Errorf("BLS message processing failed: %w", err) }
} for _, m := range bst.SecpkMessages {
case types.KTSecp256k1: if m.Signature.Type != types.KTSecp256k1 {
//log.Infof("putting secp256k1 message: %s", m.Cid())
if _, err := store.PutMessage(bs, m); err != nil {
log.Error("failed to persist messages: ", err)
return xerrors.Errorf("secp256k1 message processing failed: %w", err)
}
default:
return xerrors.Errorf("unknown signature type on message %s: %q", m.Cid(), m.Signature.TypeCode) return xerrors.Errorf("unknown signature type on message %s: %q", m.Cid(), m.Signature.TypeCode)
} }
//log.Infof("putting secp256k1 message: %s", m.Cid())
if _, err := store.PutMessage(bs, m); err != nil {
log.Error("failed to persist messages: ", err)
return xerrors.Errorf("secp256k1 message processing failed: %w", err)
}
} }
} }

View File

@ -164,11 +164,11 @@ func (tu *syncTestUtil) submitSourceBlock(to int, h int) {
// -1 to match block.Height // -1 to match block.Height
b.Header = tu.blocks[h-1].Header b.Header = tu.blocks[h-1].Header
for _, msg := range tu.blocks[h-1].Messages { for _, msg := range tu.blocks[h-1].SecpkMessages {
c, err := tu.nds[to].(*impl.FullNodeAPI).Chain.PutMessage(msg) c, err := tu.nds[to].(*impl.FullNodeAPI).Chain.PutMessage(msg)
require.NoError(tu.t, err) require.NoError(tu.t, err)
b.Messages = append(b.Messages, c) b.SecpkMessages = append(b.SecpkMessages, c)
} }
require.NoError(tu.t, tu.nds[to].ChainSubmitBlock(tu.ctx, &b)) require.NoError(tu.t, tu.nds[to].ChainSubmitBlock(tu.ctx, &b))

View File

@ -130,8 +130,9 @@ func (f *filecoinIpldNode) String() string {
} }
type BlockMsg struct { type BlockMsg struct {
Header *types.BlockHeader Header *types.BlockHeader
Messages []cid.Cid BlsMessages []cid.Cid
SecpkMessages []cid.Cid
} }
func DecodeBlockMsg(b []byte) (*BlockMsg, error) { func DecodeBlockMsg(b []byte) (*BlockMsg, error) {

View File

@ -71,6 +71,7 @@ func init() {
}, nil }, nil
})). })).
Complete()) Complete())
cbor.RegisterCborType(MsgMeta{})
} }
type Ticket []byte type Ticket []byte
@ -98,6 +99,11 @@ type BlockHeader struct {
MessageReceipts cid.Cid MessageReceipts cid.Cid
} }
type MsgMeta struct {
BlsMessages cid.Cid
SecpkMessages cid.Cid
}
func (b *BlockHeader) ToStorageBlock() (block.Block, error) { func (b *BlockHeader) ToStorageBlock() (block.Block, error) {
data, err := b.Serialize() data, err := b.Serialize()
if err != nil { if err != nil {

View File

@ -3,8 +3,9 @@ package types
import "github.com/ipfs/go-cid" import "github.com/ipfs/go-cid"
type FullBlock struct { type FullBlock struct {
Header *BlockHeader Header *BlockHeader
Messages []*SignedMessage BlsMessages []*Message
SecpkMessages []*SignedMessage
} }
func (fb *FullBlock) Cid() cid.Cid { func (fb *FullBlock) Cid() cid.Cid {

View File

@ -114,3 +114,12 @@ func (m *Message) ToStorageBlock() (block.Block, error) {
return block.NewBlockWithCid(data, c) return block.NewBlockWithCid(data, c)
} }
func (m *Message) Cid() cid.Cid {
b, err := m.ToStorageBlock()
if err != nil {
panic(fmt.Sprintf("failed to marshal message: %s", err))
}
return b.Cid()
}

View File

@ -77,10 +77,10 @@ func (a *FullNodeAPI) ChainGetBlock(ctx context.Context, msg cid.Cid) (*types.Bl
return a.Chain.GetBlock(msg) return a.Chain.GetBlock(msg)
} }
func (a *FullNodeAPI) ChainGetBlockMessages(ctx context.Context, msg cid.Cid) ([]*types.SignedMessage, error) { func (a *FullNodeAPI) ChainGetBlockMessages(ctx context.Context, msg cid.Cid) ([]*types.Message, []*types.SignedMessage, error) {
b, err := a.Chain.GetBlock(msg) b, err := a.Chain.GetBlock(msg)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
return a.Chain.MessagesForBlock(b) return a.Chain.MessagesForBlock(b)
@ -153,8 +153,11 @@ func (a *FullNodeAPI) MinerCreateBlock(ctx context.Context, addr address.Address
var out chain.BlockMsg var out chain.BlockMsg
out.Header = fblk.Header out.Header = fblk.Header
for _, msg := range fblk.Messages { for _, msg := range fblk.BlsMessages {
out.Messages = append(out.Messages, msg.Cid()) out.BlsMessages = append(out.BlsMessages, msg.Cid())
}
for _, msg := range fblk.SecpkMessages {
out.SecpkMessages = append(out.SecpkMessages, msg.Cid())
} }
return &out, nil return &out, nil