Merge pull request #113 from filecoin-project/feat/bls-spec-change-refactor
refactoring to accomodate the bls message changes in the spec
This commit is contained in:
		
						commit
						3e768742a2
					
				| @ -41,6 +41,11 @@ type MsgWait struct { | ||||
| 	Receipt types.MessageReceipt | ||||
| } | ||||
| 
 | ||||
| type BlockMessages struct { | ||||
| 	BlsMessages   []*types.Message | ||||
| 	SecpkMessages []*types.SignedMessage | ||||
| } | ||||
| 
 | ||||
| type Common interface { | ||||
| 	// Auth
 | ||||
| 	AuthVerify(ctx context.Context, token string) ([]string, error) | ||||
| @ -72,7 +77,7 @@ type FullNode interface { | ||||
| 	ChainGetRandomness(context.Context, *types.TipSet) ([]byte, error) | ||||
| 	ChainWaitMsg(context.Context, cid.Cid) (*MsgWait, error) | ||||
| 	ChainGetBlock(context.Context, cid.Cid) (*types.BlockHeader, error) | ||||
| 	ChainGetBlockMessages(context.Context, cid.Cid) ([]*types.SignedMessage, error) | ||||
| 	ChainGetBlockMessages(context.Context, cid.Cid) (*BlockMessages, error) | ||||
| 
 | ||||
| 	// if tipset is nil, we'll use heaviest
 | ||||
| 	ChainCall(context.Context, *types.Message, *types.TipSet) (*types.MessageReceipt, error) | ||||
|  | ||||
| @ -45,7 +45,7 @@ type FullNodeStruct struct { | ||||
| 		ChainGetRandomness    func(context.Context, *types.TipSet) ([]byte, error)                                `perm:"read"` | ||||
| 		ChainWaitMsg          func(context.Context, cid.Cid) (*MsgWait, error)                                    `perm:"read"` | ||||
| 		ChainGetBlock         func(context.Context, cid.Cid) (*types.BlockHeader, error)                          `perm:"read"` | ||||
| 		ChainGetBlockMessages func(context.Context, cid.Cid) ([]*types.SignedMessage, error)                      `perm:"read"` | ||||
| 		ChainGetBlockMessages func(context.Context, cid.Cid) (*BlockMessages, error)                              `perm:"read"` | ||||
| 		ChainCall             func(context.Context, *types.Message, *types.TipSet) (*types.MessageReceipt, error) `perm:"read"` | ||||
| 
 | ||||
| 		MpoolPending func(context.Context, *types.TipSet) ([]*types.SignedMessage, error) `perm:"read"` | ||||
| @ -188,7 +188,7 @@ func (c *FullNodeStruct) ChainGetBlock(ctx context.Context, b cid.Cid) (*types.B | ||||
| 	return c.Internal.ChainGetBlock(ctx, b) | ||||
| } | ||||
| 
 | ||||
| func (c *FullNodeStruct) ChainGetBlockMessages(ctx context.Context, b cid.Cid) ([]*types.SignedMessage, error) { | ||||
| func (c *FullNodeStruct) ChainGetBlockMessages(ctx context.Context, b cid.Cid) (*BlockMessages, error) { | ||||
| 	return c.Internal.ChainGetBlockMessages(ctx, b) | ||||
| } | ||||
| 
 | ||||
|  | ||||
| @ -17,6 +17,7 @@ import ( | ||||
| 	"github.com/filecoin-project/go-lotus/lib/cborrpc" | ||||
| 	"github.com/filecoin-project/go-lotus/node/modules/dtypes" | ||||
| 
 | ||||
| 	blocks "github.com/ipfs/go-block-format" | ||||
| 	"github.com/ipfs/go-cid" | ||||
| 	cbor "github.com/ipfs/go-ipld-cbor" | ||||
| 	inet "github.com/libp2p/go-libp2p-core/network" | ||||
| @ -71,8 +72,11 @@ type BlockSyncResponse struct { | ||||
| type BSTipSet struct { | ||||
| 	Blocks []*types.BlockHeader | ||||
| 
 | ||||
| 	Messages    []*types.SignedMessage | ||||
| 	MsgIncludes [][]int | ||||
| 	BlsMessages    []*types.Message | ||||
| 	BlsMsgIncludes [][]int | ||||
| 
 | ||||
| 	SecpkMessages    []*types.SignedMessage | ||||
| 	SecpkMsgIncludes [][]int | ||||
| } | ||||
| 
 | ||||
| func NewBlockSyncService(cs *store.ChainStore) *BlockSyncService { | ||||
| @ -130,13 +134,15 @@ func (bss *BlockSyncService) collectChainSegment(start []cid.Cid, length uint64, | ||||
| 		} | ||||
| 
 | ||||
| 		if opts.IncludeMessages { | ||||
| 			msgs, mincl, err := bss.gatherMessages(ts) | ||||
| 			bmsgs, bmincl, smsgs, smincl, err := bss.gatherMessages(ts) | ||||
| 			if err != nil { | ||||
| 				return nil, err | ||||
| 				return nil, xerrors.Errorf("gather messages failed: %w", err) | ||||
| 			} | ||||
| 
 | ||||
| 			bst.Messages = msgs | ||||
| 			bst.MsgIncludes = mincl | ||||
| 			bst.BlsMessages = bmsgs | ||||
| 			bst.BlsMsgIncludes = bmincl | ||||
| 			bst.SecpkMessages = smsgs | ||||
| 			bst.SecpkMsgIncludes = smincl | ||||
| 		} | ||||
| 
 | ||||
| 		if opts.IncludeBlocks { | ||||
| @ -153,33 +159,47 @@ func (bss *BlockSyncService) collectChainSegment(start []cid.Cid, length uint64, | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (bss *BlockSyncService) gatherMessages(ts *types.TipSet) ([]*types.SignedMessage, [][]int, error) { | ||||
| 	msgmap := make(map[cid.Cid]int) | ||||
| 	var allmsgs []*types.SignedMessage | ||||
| 	var msgincl [][]int | ||||
| func (bss *BlockSyncService) gatherMessages(ts *types.TipSet) ([]*types.Message, [][]int, []*types.SignedMessage, [][]int, error) { | ||||
| 	blsmsgmap := make(map[cid.Cid]int) | ||||
| 	secpkmsgmap := make(map[cid.Cid]int) | ||||
| 	var secpkmsgs []*types.SignedMessage | ||||
| 	var blsmsgs []*types.Message | ||||
| 	var secpkincl, blsincl [][]int | ||||
| 
 | ||||
| 	for _, b := range ts.Blocks() { | ||||
| 		msgs, err := bss.cs.MessagesForBlock(b) | ||||
| 		bmsgs, smsgs, err := bss.cs.MessagesForBlock(b) | ||||
| 		if err != nil { | ||||
| 			return nil, nil, err | ||||
| 			return nil, nil, nil, nil, err | ||||
| 		} | ||||
| 		log.Infof("MESSAGES FOR BLOCK: %d", len(msgs)) | ||||
| 
 | ||||
| 		msgindexes := make([]int, 0, len(msgs)) | ||||
| 		for _, m := range msgs { | ||||
| 			i, ok := msgmap[m.Cid()] | ||||
| 		bmi := make([]int, 0, len(bmsgs)) | ||||
| 		for _, m := range bmsgs { | ||||
| 			i, ok := blsmsgmap[m.Cid()] | ||||
| 			if !ok { | ||||
| 				i = len(allmsgs) | ||||
| 				allmsgs = append(allmsgs, m) | ||||
| 				msgmap[m.Cid()] = i | ||||
| 				i = len(blsmsgs) | ||||
| 				blsmsgs = append(blsmsgs, m) | ||||
| 				blsmsgmap[m.Cid()] = i | ||||
| 			} | ||||
| 
 | ||||
| 			msgindexes = append(msgindexes, i) | ||||
| 			bmi = append(bmi, i) | ||||
| 		} | ||||
| 		msgincl = append(msgincl, msgindexes) | ||||
| 		blsincl = append(blsincl, bmi) | ||||
| 
 | ||||
| 		smi := make([]int, 0, len(smsgs)) | ||||
| 		for _, m := range smsgs { | ||||
| 			i, ok := secpkmsgmap[m.Cid()] | ||||
| 			if !ok { | ||||
| 				i = len(secpkmsgs) | ||||
| 				secpkmsgs = append(secpkmsgs, m) | ||||
| 				secpkmsgmap[m.Cid()] = i | ||||
| 			} | ||||
| 
 | ||||
| 			smi = append(smi, i) | ||||
| 		} | ||||
| 		secpkincl = append(secpkincl, smi) | ||||
| 	} | ||||
| 
 | ||||
| 	return allmsgs, msgincl, nil | ||||
| 	return blsmsgs, blsincl, secpkmsgs, secpkincl, nil | ||||
| } | ||||
| 
 | ||||
| type BlockSync struct { | ||||
| @ -326,8 +346,8 @@ func bstsToFullTipSet(bts *BSTipSet) (*store.FullTipSet, error) { | ||||
| 		fb := &types.FullBlock{ | ||||
| 			Header: b, | ||||
| 		} | ||||
| 		for _, mi := range bts.MsgIncludes[i] { | ||||
| 			fb.Messages = append(fb.Messages, bts.Messages[mi]) | ||||
| 		for _, mi := range bts.BlsMsgIncludes[i] { | ||||
| 			fb.BlsMessages = append(fb.BlsMessages, bts.BlsMessages[mi]) | ||||
| 		} | ||||
| 		fts.Blocks = append(fts.Blocks, fb) | ||||
| 	} | ||||
| @ -404,9 +424,51 @@ func (bs *BlockSync) AddPeer(p peer.ID) { | ||||
| 	bs.syncPeers[p] = struct{}{} | ||||
| } | ||||
| 
 | ||||
| func (bs *BlockSync) FetchMessagesByCids(cids []cid.Cid) ([]*types.SignedMessage, error) { | ||||
| func (bs *BlockSync) FetchMessagesByCids(ctx context.Context, cids []cid.Cid) ([]*types.Message, error) { | ||||
| 	out := make([]*types.Message, len(cids)) | ||||
| 
 | ||||
| 	err := bs.fetchCids(ctx, cids, func(i int, b blocks.Block) error { | ||||
| 		msg, err := types.DecodeMessage(b.RawData()) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 
 | ||||
| 		if out[i] != nil { | ||||
| 			return fmt.Errorf("received duplicate message") | ||||
| 		} | ||||
| 
 | ||||
| 		out[i] = msg | ||||
| 		return nil | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return out, nil | ||||
| } | ||||
| 
 | ||||
| func (bs *BlockSync) FetchSignedMessagesByCids(ctx context.Context, cids []cid.Cid) ([]*types.SignedMessage, error) { | ||||
| 	out := make([]*types.SignedMessage, len(cids)) | ||||
| 
 | ||||
| 	err := bs.fetchCids(ctx, cids, func(i int, b blocks.Block) error { | ||||
| 		smsg, err := types.DecodeSignedMessage(b.RawData()) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 
 | ||||
| 		if out[i] != nil { | ||||
| 			return fmt.Errorf("received duplicate message") | ||||
| 		} | ||||
| 
 | ||||
| 		out[i] = smsg | ||||
| 		return nil | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return out, nil | ||||
| } | ||||
| 
 | ||||
| func (bs *BlockSync) fetchCids(ctx context.Context, cids []cid.Cid, cb func(int, blocks.Block) error) error { | ||||
| 	resp := bs.bserv.GetBlocks(context.TODO(), cids) | ||||
| 
 | ||||
| 	m := make(map[cid.Cid]int) | ||||
| @ -422,25 +484,19 @@ func (bs *BlockSync) FetchMessagesByCids(cids []cid.Cid) ([]*types.SignedMessage | ||||
| 					break | ||||
| 				} | ||||
| 
 | ||||
| 				return nil, fmt.Errorf("failed to fetch all messages") | ||||
| 				return fmt.Errorf("failed to fetch all messages") | ||||
| 			} | ||||
| 
 | ||||
| 			sm, err := types.DecodeSignedMessage(v.RawData()) | ||||
| 			if err != nil { | ||||
| 				return nil, err | ||||
| 			} | ||||
| 			ix, ok := m[sm.Cid()] | ||||
| 			ix, ok := m[v.Cid()] | ||||
| 			if !ok { | ||||
| 				return nil, fmt.Errorf("received message we didnt ask for") | ||||
| 				return fmt.Errorf("received message we didnt ask for") | ||||
| 			} | ||||
| 
 | ||||
| 			if out[ix] != nil { | ||||
| 				return nil, fmt.Errorf("received duplicate message") | ||||
| 			if err := cb(ix, v); err != nil { | ||||
| 				return err | ||||
| 			} | ||||
| 
 | ||||
| 			out[ix] = sm | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return out, nil | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @ -41,22 +41,38 @@ func MinerCreateBlock(ctx context.Context, cs *store.ChainStore, miner address.A | ||||
| 		Height:  height, | ||||
| 	} | ||||
| 
 | ||||
| 	var msgCids []cid.Cid | ||||
| 	var blsMessages []*types.Message | ||||
| 	var secpkMessages []*types.SignedMessage | ||||
| 
 | ||||
| 	var blsMsgCids, secpkMsgCids []cid.Cid | ||||
| 	var blsSigs []types.Signature | ||||
| 	var receipts []interface{} | ||||
| 	for _, msg := range msgs { | ||||
| 		if msg.Signature.TypeCode() == 2 { | ||||
| 			blsSigs = append(blsSigs, msg.Signature) | ||||
| 			blsMessages = append(blsMessages, &msg.Message) | ||||
| 
 | ||||
| 			c, err := cs.PutMessage(&msg.Message) | ||||
| 			if err != nil { | ||||
| 				return nil, err | ||||
| 			} | ||||
| 
 | ||||
| 			msgCids = append(msgCids, c) | ||||
| 			blsMsgCids = append(blsMsgCids, c) | ||||
| 		} else { | ||||
| 			msgCids = append(msgCids, msg.Cid()) | ||||
| 			secpkMsgCids = append(secpkMsgCids, msg.Cid()) | ||||
| 			secpkMessages = append(secpkMessages, msg) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	var receipts []interface{} | ||||
| 	for _, msg := range blsMessages { | ||||
| 		rec, err := vmi.ApplyMessage(ctx, msg) | ||||
| 		if err != nil { | ||||
| 			return nil, errors.Wrap(err, "apply message failure") | ||||
| 		} | ||||
| 
 | ||||
| 		receipts = append(receipts, rec) | ||||
| 	} | ||||
| 	for _, msg := range secpkMessages { | ||||
| 		rec, err := vmi.ApplyMessage(ctx, &msg.Message) | ||||
| 		if err != nil { | ||||
| 			return nil, errors.Wrap(err, "apply message failure") | ||||
| @ -66,11 +82,23 @@ func MinerCreateBlock(ctx context.Context, cs *store.ChainStore, miner address.A | ||||
| 	} | ||||
| 
 | ||||
| 	cst := hamt.CSTFromBstore(cs.Blockstore()) | ||||
| 	msgroot, err := sharray.Build(context.TODO(), 4, toIfArr(msgCids), cst) | ||||
| 	blsmsgroot, err := sharray.Build(context.TODO(), 4, toIfArr(blsMsgCids), cst) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	next.Messages = msgroot | ||||
| 	secpkmsgroot, err := sharray.Build(context.TODO(), 4, toIfArr(secpkMsgCids), cst) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	mmcid, err := cst.Put(context.TODO(), &types.MsgMeta{ | ||||
| 		BlsMessages:   blsmsgroot, | ||||
| 		SecpkMessages: secpkmsgroot, | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	next.Messages = mmcid | ||||
| 
 | ||||
| 	rectroot, err := sharray.Build(context.TODO(), 4, receipts, cst) | ||||
| 	if err != nil { | ||||
| @ -94,8 +122,9 @@ func MinerCreateBlock(ctx context.Context, cs *store.ChainStore, miner address.A | ||||
| 	next.ParentWeight = types.NewInt(pweight) | ||||
| 
 | ||||
| 	fullBlock := &types.FullBlock{ | ||||
| 		Header:   next, | ||||
| 		Messages: msgs, | ||||
| 		Header:        next, | ||||
| 		BlsMessages:   blsMessages, | ||||
| 		SecpkMessages: secpkMessages, | ||||
| 	} | ||||
| 
 | ||||
| 	return fullBlock, nil | ||||
|  | ||||
| @ -147,10 +147,19 @@ func MakeGenesisBlock(bs bstore.Blockstore, balances map[address.Address]types.B | ||||
| 	} | ||||
| 
 | ||||
| 	cst := hamt.CSTFromBstore(bs) | ||||
| 
 | ||||
| 	emptyroot, err := sharray.Build(context.TODO(), 4, []interface{}{}, cst) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	mmcid, err := cst.Put(context.TODO(), &types.MsgMeta{ | ||||
| 		BlsMessages:   emptyroot, | ||||
| 		SecpkMessages: emptyroot, | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	fmt.Println("Empty Genesis root: ", emptyroot) | ||||
| 
 | ||||
| 	b := &types.BlockHeader{ | ||||
| @ -161,7 +170,7 @@ func MakeGenesisBlock(bs bstore.Blockstore, balances map[address.Address]types.B | ||||
| 		Height:          0, | ||||
| 		ParentWeight:    types.NewInt(0), | ||||
| 		StateRoot:       stateroot, | ||||
| 		Messages:        emptyroot, | ||||
| 		Messages:        mmcid, | ||||
| 		MessageReceipts: emptyroot, | ||||
| 	} | ||||
| 
 | ||||
|  | ||||
| @ -127,29 +127,55 @@ func (mp *MessagePool) Pending() []*types.SignedMessage { | ||||
| func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) error { | ||||
| 	for _, ts := range revert { | ||||
| 		for _, b := range ts.Blocks() { | ||||
| 			msgs, err := mp.cs.MessagesForBlock(b) | ||||
| 			bmsgs, smsgs, err := mp.cs.MessagesForBlock(b) | ||||
| 			if err != nil { | ||||
| 				return errors.Wrapf(err, "failed to get messages for revert block %s(height %d)", b.Cid(), b.Height) | ||||
| 			} | ||||
| 			for _, msg := range msgs { | ||||
| 			for _, msg := range smsgs { | ||||
| 				if err := mp.Add(msg); err != nil { | ||||
| 					return err | ||||
| 				} | ||||
| 			} | ||||
| 
 | ||||
| 			for _, msg := range bmsgs { | ||||
| 				smsg := mp.RecoverSig(msg) | ||||
| 				if smsg != nil { | ||||
| 					if err := mp.Add(smsg); err != nil { | ||||
| 						return err | ||||
| 					} | ||||
| 				} else { | ||||
| 					log.Warnf("could not recover signature for bls message %s during a reorg revert", msg.Cid()) | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	for _, ts := range apply { | ||||
| 		for _, b := range ts.Blocks() { | ||||
| 			msgs, err := mp.cs.MessagesForBlock(b) | ||||
| 			bmsgs, smsgs, err := mp.cs.MessagesForBlock(b) | ||||
| 			if err != nil { | ||||
| 				return errors.Wrapf(err, "failed to get messages for apply block %s(height %d) (msgroot = %s)", b.Cid(), b.Height, b.Messages) | ||||
| 			} | ||||
| 			for _, msg := range msgs { | ||||
| 			for _, msg := range smsgs { | ||||
| 				mp.Remove(msg) | ||||
| 			} | ||||
| 
 | ||||
| 			for _, msg := range bmsgs { | ||||
| 				smsg := mp.RecoverSig(msg) | ||||
| 				if smsg != nil { | ||||
| 					mp.Remove(smsg) | ||||
| 				} else { | ||||
| 					// TODO: this one is likely fine
 | ||||
| 					log.Warnf("could not recover signature for bls message %s during a reorg apply", msg.Cid()) | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (mp *MessagePool) RecoverSig(msg *types.Message) *types.SignedMessage { | ||||
| 	// TODO: persist signatures for BLS messages for a little while in case of reorgs
 | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @ -328,7 +328,12 @@ func (cs *ChainStore) persistBlock(b *types.FullBlock) error { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	for _, m := range b.Messages { | ||||
| 	for _, m := range b.BlsMessages { | ||||
| 		if _, err := cs.PutMessage(m); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
| 	for _, m := range b.SecpkMessages { | ||||
| 		if _, err := cs.PutMessage(m); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| @ -404,7 +409,17 @@ func (cs *ChainStore) TipSetState(cids []cid.Cid) (cid.Cid, error) { | ||||
| 
 | ||||
| } | ||||
| 
 | ||||
| func (cs *ChainStore) GetMessage(c cid.Cid) (*types.SignedMessage, error) { | ||||
| func (cs *ChainStore) GetMessage(c cid.Cid) (*types.Message, error) { | ||||
| 	sb, err := cs.bs.Get(c) | ||||
| 	if err != nil { | ||||
| 		log.Errorf("get message get failed: %s: %s", c, err) | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	return types.DecodeMessage(sb.RawData()) | ||||
| } | ||||
| 
 | ||||
| func (cs *ChainStore) GetSignedMessage(c cid.Cid) (*types.SignedMessage, error) { | ||||
| 	sb, err := cs.bs.Get(c) | ||||
| 	if err != nil { | ||||
| 		log.Errorf("get message get failed: %s: %s", c, err) | ||||
| @ -414,9 +429,9 @@ func (cs *ChainStore) GetMessage(c cid.Cid) (*types.SignedMessage, error) { | ||||
| 	return types.DecodeSignedMessage(sb.RawData()) | ||||
| } | ||||
| 
 | ||||
| func (cs *ChainStore) MessageCidsForBlock(b *types.BlockHeader) ([]cid.Cid, error) { | ||||
| func (cs *ChainStore) readSharrayCids(root cid.Cid) ([]cid.Cid, error) { | ||||
| 	cst := hamt.CSTFromBstore(cs.bs) | ||||
| 	shar, err := sharray.Load(context.TODO(), b.Messages, 4, cst) | ||||
| 	shar, err := sharray.Load(context.TODO(), root, 4, cst) | ||||
| 	if err != nil { | ||||
| 		return nil, errors.Wrap(err, "sharray load") | ||||
| 	} | ||||
| @ -438,13 +453,34 @@ func (cs *ChainStore) MessageCidsForBlock(b *types.BlockHeader) ([]cid.Cid, erro | ||||
| 	return cids, nil | ||||
| } | ||||
| 
 | ||||
| func (cs *ChainStore) MessagesForBlock(b *types.BlockHeader) ([]*types.SignedMessage, error) { | ||||
| 	cids, err := cs.MessageCidsForBlock(b) | ||||
| 	if err != nil { | ||||
| 		return nil, errors.Wrap(err, "loading message cids for block") | ||||
| func (cs *ChainStore) MessagesForBlock(b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) { | ||||
| 	cst := hamt.CSTFromBstore(cs.bs) | ||||
| 	var msgmeta types.MsgMeta | ||||
| 	if err := cst.Get(context.TODO(), b.Messages, &msgmeta); err != nil { | ||||
| 		return nil, nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	return cs.LoadMessagesFromCids(cids) | ||||
| 	blscids, err := cs.readSharrayCids(msgmeta.BlsMessages) | ||||
| 	if err != nil { | ||||
| 		return nil, nil, errors.Wrap(err, "loading bls message cids for block") | ||||
| 	} | ||||
| 
 | ||||
| 	secpkcids, err := cs.readSharrayCids(msgmeta.SecpkMessages) | ||||
| 	if err != nil { | ||||
| 		return nil, nil, errors.Wrap(err, "loading secpk message cids for block") | ||||
| 	} | ||||
| 
 | ||||
| 	blsmsgs, err := cs.LoadMessagesFromCids(blscids) | ||||
| 	if err != nil { | ||||
| 		return nil, nil, errors.Wrap(err, "loading bls messages for block") | ||||
| 	} | ||||
| 
 | ||||
| 	secpkmsgs, err := cs.LoadSignedMessagesFromCids(secpkcids) | ||||
| 	if err != nil { | ||||
| 		return nil, nil, errors.Wrap(err, "loading secpk messages for block") | ||||
| 	} | ||||
| 
 | ||||
| 	return blsmsgs, secpkmsgs, nil | ||||
| } | ||||
| 
 | ||||
| func (cs *ChainStore) GetReceipt(b *types.BlockHeader, i int) (*types.MessageReceipt, error) { | ||||
| @ -472,8 +508,8 @@ func (cs *ChainStore) GetReceipt(b *types.BlockHeader, i int) (*types.MessageRec | ||||
| 	return &r, nil | ||||
| } | ||||
| 
 | ||||
| func (cs *ChainStore) LoadMessagesFromCids(cids []cid.Cid) ([]*types.SignedMessage, error) { | ||||
| 	msgs := make([]*types.SignedMessage, 0, len(cids)) | ||||
| func (cs *ChainStore) LoadMessagesFromCids(cids []cid.Cid) ([]*types.Message, error) { | ||||
| 	msgs := make([]*types.Message, 0, len(cids)) | ||||
| 	for i, c := range cids { | ||||
| 		m, err := cs.GetMessage(c) | ||||
| 		if err != nil { | ||||
| @ -486,6 +522,20 @@ func (cs *ChainStore) LoadMessagesFromCids(cids []cid.Cid) ([]*types.SignedMessa | ||||
| 	return msgs, nil | ||||
| } | ||||
| 
 | ||||
| func (cs *ChainStore) LoadSignedMessagesFromCids(cids []cid.Cid) ([]*types.SignedMessage, error) { | ||||
| 	msgs := make([]*types.SignedMessage, 0, len(cids)) | ||||
| 	for i, c := range cids { | ||||
| 		m, err := cs.GetSignedMessage(c) | ||||
| 		if err != nil { | ||||
| 			return nil, errors.Wrapf(err, "failed to get message: (%s):%d", c, i) | ||||
| 		} | ||||
| 
 | ||||
| 		msgs = append(msgs, m) | ||||
| 	} | ||||
| 
 | ||||
| 	return msgs, nil | ||||
| } | ||||
| 
 | ||||
| func (cs *ChainStore) GetBalance(addr address.Address) (types.BigInt, error) { | ||||
| 	act, err := cs.GetActor(addr) | ||||
| 	if err != nil { | ||||
| @ -560,17 +610,34 @@ func (cs *ChainStore) tipsetContainsMsg(ts *types.TipSet, msg cid.Cid) (cid.Cid, | ||||
| } | ||||
| 
 | ||||
| func (cs *ChainStore) blockContainsMsg(blk *types.BlockHeader, msg cid.Cid) (*types.MessageReceipt, error) { | ||||
| 	msgs, err := cs.MessageCidsForBlock(blk) | ||||
| 	if err != nil { | ||||
| 	cst := hamt.CSTFromBstore(cs.bs) | ||||
| 	var msgmeta types.MsgMeta | ||||
| 	if err := cst.Get(context.TODO(), blk.Messages, &msgmeta); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	for i, mc := range msgs { | ||||
| 		if mc == msg { | ||||
| 	blscids, err := cs.readSharrayCids(msgmeta.BlsMessages) | ||||
| 	if err != nil { | ||||
| 		return nil, errors.Wrap(err, "loading bls message cids for block") | ||||
| 	} | ||||
| 
 | ||||
| 	for i, c := range blscids { | ||||
| 		if c == msg { | ||||
| 			return cs.GetReceipt(blk, i) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	secpkcids, err := cs.readSharrayCids(msgmeta.SecpkMessages) | ||||
| 	if err != nil { | ||||
| 		return nil, errors.Wrap(err, "loading secpk message cids for block") | ||||
| 	} | ||||
| 
 | ||||
| 	for i, c := range secpkcids { | ||||
| 		if c == msg { | ||||
| 			return cs.GetReceipt(blk, i+len(blscids)) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return nil, nil | ||||
| } | ||||
| 
 | ||||
| @ -582,7 +649,7 @@ func (cs *ChainStore) TryFillTipSet(ts *types.TipSet) (*FullTipSet, error) { | ||||
| 	var out []*types.FullBlock | ||||
| 
 | ||||
| 	for _, b := range ts.Blocks() { | ||||
| 		msgs, err := cs.MessagesForBlock(b) | ||||
| 		bmsgs, smsgs, err := cs.MessagesForBlock(b) | ||||
| 		if err != nil { | ||||
| 			// TODO: check for 'not found' errors, and only return nil if this
 | ||||
| 			// is actually a 'not found' error
 | ||||
| @ -590,8 +657,9 @@ func (cs *ChainStore) TryFillTipSet(ts *types.TipSet) (*FullTipSet, error) { | ||||
| 		} | ||||
| 
 | ||||
| 		fb := &types.FullBlock{ | ||||
| 			Header:   b, | ||||
| 			Messages: msgs, | ||||
| 			Header:        b, | ||||
| 			BlsMessages:   bmsgs, | ||||
| 			SecpkMessages: smsgs, | ||||
| 		} | ||||
| 
 | ||||
| 		out = append(out, fb) | ||||
|  | ||||
| @ -28,15 +28,24 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha | ||||
| 		} | ||||
| 
 | ||||
| 		go func() { | ||||
| 			msgs, err := s.Bsync.FetchMessagesByCids(blk.Messages) | ||||
| 			log.Info("about to fetch messages for block from pubsub") | ||||
| 			bmsgs, err := s.Bsync.FetchMessagesByCids(context.TODO(), blk.BlsMessages) | ||||
| 			if err != nil { | ||||
| 				log.Errorf("failed to fetch all messages for block received over pubusb: %s", err) | ||||
| 				log.Errorf("failed to fetch all bls messages for block received over pubusb: %s", err) | ||||
| 				return | ||||
| 			} | ||||
| 
 | ||||
| 			smsgs, err := s.Bsync.FetchSignedMessagesByCids(context.TODO(), blk.SecpkMessages) | ||||
| 			if err != nil { | ||||
| 				log.Errorf("failed to fetch all secpk messages for block received over pubusb: %s", err) | ||||
| 				return | ||||
| 			} | ||||
| 
 | ||||
| 			log.Info("inform new block over pubsub") | ||||
| 			s.InformNewBlock(msg.GetFrom(), &types.FullBlock{ | ||||
| 				Header:   blk.Header, | ||||
| 				Messages: msgs, | ||||
| 				Header:        blk.Header, | ||||
| 				BlsMessages:   bmsgs, | ||||
| 				SecpkMessages: smsgs, | ||||
| 			}) | ||||
| 		}() | ||||
| 	} | ||||
|  | ||||
| @ -142,32 +142,57 @@ func copyBlockstore(from, to bstore.Blockstore) error { | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func zipTipSetAndMessages(cst *hamt.CborIpldStore, ts *types.TipSet, messages []*types.SignedMessage, msgincl [][]int) (*store.FullTipSet, error) { | ||||
| 	if len(ts.Blocks()) != len(msgincl) { | ||||
| // TODO: this function effectively accepts unchecked input from the network,
 | ||||
| // either validate it here, or ensure that its validated elsewhere (maybe make
 | ||||
| // sure the blocksync code checks it?)
 | ||||
| // maybe this code should actually live in blocksync??
 | ||||
| func zipTipSetAndMessages(cst *hamt.CborIpldStore, ts *types.TipSet, allbmsgs []*types.Message, allsmsgs []*types.SignedMessage, bmi, smi [][]int) (*store.FullTipSet, error) { | ||||
| 	if len(ts.Blocks()) != len(smi) || len(ts.Blocks()) != len(bmi) { | ||||
| 		return nil, fmt.Errorf("msgincl length didnt match tipset size") | ||||
| 	} | ||||
| 
 | ||||
| 	fts := &store.FullTipSet{} | ||||
| 	for bi, b := range ts.Blocks() { | ||||
| 		var msgs []*types.SignedMessage | ||||
| 		var msgCids []interface{} | ||||
| 		for _, m := range msgincl[bi] { | ||||
| 			msgs = append(msgs, messages[m]) | ||||
| 			msgCids = append(msgCids, messages[m].Cid()) | ||||
| 		var smsgs []*types.SignedMessage | ||||
| 		var smsgCids []interface{} | ||||
| 		for _, m := range smi[bi] { | ||||
| 			smsgs = append(smsgs, allsmsgs[m]) | ||||
| 			smsgCids = append(smsgCids, allsmsgs[m].Cid()) | ||||
| 		} | ||||
| 
 | ||||
| 		mroot, err := sharray.Build(context.TODO(), 4, msgCids, cst) | ||||
| 		smroot, err := sharray.Build(context.TODO(), 4, smsgCids, cst) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 
 | ||||
| 		if b.Messages != mroot { | ||||
| 		var bmsgs []*types.Message | ||||
| 		var bmsgCids []interface{} | ||||
| 		for _, m := range bmi[bi] { | ||||
| 			bmsgs = append(bmsgs, allbmsgs[m]) | ||||
| 			bmsgCids = append(bmsgCids, allbmsgs[m].Cid()) | ||||
| 		} | ||||
| 
 | ||||
| 		bmroot, err := sharray.Build(context.TODO(), 4, bmsgCids, cst) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 
 | ||||
| 		mrcid, err := cst.Put(context.TODO(), &types.MsgMeta{ | ||||
| 			BlsMessages:   bmroot, | ||||
| 			SecpkMessages: smroot, | ||||
| 		}) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 
 | ||||
| 		if b.Messages != mrcid { | ||||
| 			return nil, fmt.Errorf("messages didnt match message root in header") | ||||
| 		} | ||||
| 
 | ||||
| 		fb := &types.FullBlock{ | ||||
| 			Header:   b, | ||||
| 			Messages: msgs, | ||||
| 			Header:        b, | ||||
| 			BlsMessages:   bmsgs, | ||||
| 			SecpkMessages: smsgs, | ||||
| 		} | ||||
| 
 | ||||
| 		fts.Blocks = append(fts.Blocks, fb) | ||||
| @ -236,14 +261,15 @@ func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*store.FullTipSet, erro | ||||
| 
 | ||||
| 	fts := &store.FullTipSet{} | ||||
| 	for _, b := range ts.Blocks() { | ||||
| 		messages, err := syncer.store.MessagesForBlock(b) | ||||
| 		bmsgs, smsgs, err := syncer.store.MessagesForBlock(b) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 
 | ||||
| 		fb := &types.FullBlock{ | ||||
| 			Header:   b, | ||||
| 			Messages: messages, | ||||
| 			Header:        b, | ||||
| 			BlsMessages:   bmsgs, | ||||
| 			SecpkMessages: smsgs, | ||||
| 		} | ||||
| 		fts.Blocks = append(fts.Blocks, fb) | ||||
| 	} | ||||
| @ -307,10 +333,19 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err | ||||
| 	} | ||||
| 
 | ||||
| 	var receipts []interface{} | ||||
| 	for _, m := range b.Messages { | ||||
| 	for i, m := range b.BlsMessages { | ||||
| 		receipt, err := vmi.ApplyMessage(ctx, m) | ||||
| 		if err != nil { | ||||
| 			return xerrors.Errorf("failed executing bls message %d in block %s: %w", i, b.Header.Cid(), err) | ||||
| 		} | ||||
| 
 | ||||
| 		receipts = append(receipts, receipt) | ||||
| 	} | ||||
| 
 | ||||
| 	for i, m := range b.SecpkMessages { | ||||
| 		receipt, err := vmi.ApplyMessage(ctx, &m.Message) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 			return xerrors.Errorf("failed executing secpk message %d in block %s: %w", i, b.Header.Cid(), err) | ||||
| 		} | ||||
| 
 | ||||
| 		receipts = append(receipts, receipt.MessageReceipt) | ||||
| @ -454,7 +489,7 @@ func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.Fu | ||||
| 		for bsi := 0; bsi < len(bstips); bsi++ { | ||||
| 			this := headers[i+bsi] | ||||
| 			bstip := bstips[len(bstips)-(bsi+1)] | ||||
| 			fts, err := zipTipSetAndMessages(cst, this, bstip.Messages, bstip.MsgIncludes) | ||||
| 			fts, err := zipTipSetAndMessages(cst, this, bstip.BlsMessages, bstip.SecpkMessages, bstip.BlsMsgIncludes, bstip.SecpkMsgIncludes) | ||||
| 			if err != nil { | ||||
| 				log.Error("zipping failed: ", err, bsi, i) | ||||
| 				log.Error("height: ", this.Height()) | ||||
| @ -483,23 +518,22 @@ func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.Fu | ||||
| 
 | ||||
| func persistMessages(bs bstore.Blockstore, bstips []*BSTipSet) error { | ||||
| 	for _, bst := range bstips { | ||||
| 		for _, m := range bst.Messages { | ||||
| 			switch m.Signature.Type { | ||||
| 			case types.KTBLS: | ||||
| 				//log.Infof("putting BLS message: %s", m.Cid())
 | ||||
| 				if _, err := store.PutMessage(bs, &m.Message); err != nil { | ||||
| 					log.Error("failed to persist messages: ", err) | ||||
| 					return xerrors.Errorf("BLS message processing failed: %w", err) | ||||
| 				} | ||||
| 			case types.KTSecp256k1: | ||||
| 				//log.Infof("putting secp256k1 message: %s", m.Cid())
 | ||||
| 				if _, err := store.PutMessage(bs, m); err != nil { | ||||
| 					log.Error("failed to persist messages: ", err) | ||||
| 					return xerrors.Errorf("secp256k1 message processing failed: %w", err) | ||||
| 				} | ||||
| 			default: | ||||
| 		for _, m := range bst.BlsMessages { | ||||
| 			//log.Infof("putting BLS message: %s", m.Cid())
 | ||||
| 			if _, err := store.PutMessage(bs, m); err != nil { | ||||
| 				log.Error("failed to persist messages: ", err) | ||||
| 				return xerrors.Errorf("BLS message processing failed: %w", err) | ||||
| 			} | ||||
| 		} | ||||
| 		for _, m := range bst.SecpkMessages { | ||||
| 			if m.Signature.Type != types.KTSecp256k1 { | ||||
| 				return xerrors.Errorf("unknown signature type on message %s: %q", m.Cid(), m.Signature.TypeCode) | ||||
| 			} | ||||
| 			//log.Infof("putting secp256k1 message: %s", m.Cid())
 | ||||
| 			if _, err := store.PutMessage(bs, m); err != nil { | ||||
| 				log.Error("failed to persist messages: ", err) | ||||
| 				return xerrors.Errorf("secp256k1 message processing failed: %w", err) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
|  | ||||
| @ -164,11 +164,11 @@ func (tu *syncTestUtil) submitSourceBlock(to int, h int) { | ||||
| 
 | ||||
| 	// -1 to match block.Height
 | ||||
| 	b.Header = tu.blocks[h-1].Header | ||||
| 	for _, msg := range tu.blocks[h-1].Messages { | ||||
| 	for _, msg := range tu.blocks[h-1].SecpkMessages { | ||||
| 		c, err := tu.nds[to].(*impl.FullNodeAPI).Chain.PutMessage(msg) | ||||
| 		require.NoError(tu.t, err) | ||||
| 
 | ||||
| 		b.Messages = append(b.Messages, c) | ||||
| 		b.SecpkMessages = append(b.SecpkMessages, c) | ||||
| 	} | ||||
| 
 | ||||
| 	require.NoError(tu.t, tu.nds[to].ChainSubmitBlock(tu.ctx, &b)) | ||||
|  | ||||
| @ -130,8 +130,9 @@ func (f *filecoinIpldNode) String() string { | ||||
| } | ||||
| 
 | ||||
| type BlockMsg struct { | ||||
| 	Header   *types.BlockHeader | ||||
| 	Messages []cid.Cid | ||||
| 	Header        *types.BlockHeader | ||||
| 	BlsMessages   []cid.Cid | ||||
| 	SecpkMessages []cid.Cid | ||||
| } | ||||
| 
 | ||||
| func DecodeBlockMsg(b []byte) (*BlockMsg, error) { | ||||
|  | ||||
| @ -71,6 +71,7 @@ func init() { | ||||
| 				}, nil | ||||
| 			})). | ||||
| 		Complete()) | ||||
| 	cbor.RegisterCborType(MsgMeta{}) | ||||
| } | ||||
| 
 | ||||
| type Ticket []byte | ||||
| @ -98,6 +99,11 @@ type BlockHeader struct { | ||||
| 	MessageReceipts cid.Cid | ||||
| } | ||||
| 
 | ||||
| type MsgMeta struct { | ||||
| 	BlsMessages   cid.Cid | ||||
| 	SecpkMessages cid.Cid | ||||
| } | ||||
| 
 | ||||
| func (b *BlockHeader) ToStorageBlock() (block.Block, error) { | ||||
| 	data, err := b.Serialize() | ||||
| 	if err != nil { | ||||
|  | ||||
| @ -3,8 +3,9 @@ package types | ||||
| import "github.com/ipfs/go-cid" | ||||
| 
 | ||||
| type FullBlock struct { | ||||
| 	Header   *BlockHeader | ||||
| 	Messages []*SignedMessage | ||||
| 	Header        *BlockHeader | ||||
| 	BlsMessages   []*Message | ||||
| 	SecpkMessages []*SignedMessage | ||||
| } | ||||
| 
 | ||||
| func (fb *FullBlock) Cid() cid.Cid { | ||||
|  | ||||
| @ -114,3 +114,12 @@ func (m *Message) ToStorageBlock() (block.Block, error) { | ||||
| 
 | ||||
| 	return block.NewBlockWithCid(data, c) | ||||
| } | ||||
| 
 | ||||
| func (m *Message) Cid() cid.Cid { | ||||
| 	b, err := m.ToStorageBlock() | ||||
| 	if err != nil { | ||||
| 		panic(fmt.Sprintf("failed to marshal message: %s", err)) | ||||
| 	} | ||||
| 
 | ||||
| 	return b.Cid() | ||||
| } | ||||
|  | ||||
| @ -88,11 +88,13 @@ var chainGetBlock = &cli.Command{ | ||||
| 
 | ||||
| 		cblock := struct { | ||||
| 			types.BlockHeader | ||||
| 			Messages []*types.SignedMessage | ||||
| 			BlsMessages   []*types.Message | ||||
| 			SecpkMessages []*types.SignedMessage | ||||
| 		}{} | ||||
| 
 | ||||
| 		cblock.BlockHeader = *blk | ||||
| 		cblock.Messages = msgs | ||||
| 		cblock.BlsMessages = msgs.BlsMessages | ||||
| 		cblock.SecpkMessages = msgs.SecpkMessages | ||||
| 
 | ||||
| 		out, err := json.MarshalIndent(cblock, "", "  ") | ||||
| 		if err != nil { | ||||
|  | ||||
| @ -77,13 +77,21 @@ func (a *FullNodeAPI) ChainGetBlock(ctx context.Context, msg cid.Cid) (*types.Bl | ||||
| 	return a.Chain.GetBlock(msg) | ||||
| } | ||||
| 
 | ||||
| func (a *FullNodeAPI) ChainGetBlockMessages(ctx context.Context, msg cid.Cid) ([]*types.SignedMessage, error) { | ||||
| func (a *FullNodeAPI) ChainGetBlockMessages(ctx context.Context, msg cid.Cid) (*api.BlockMessages, error) { | ||||
| 	b, err := a.Chain.GetBlock(msg) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	return a.Chain.MessagesForBlock(b) | ||||
| 	bmsgs, smsgs, err := a.Chain.MessagesForBlock(b) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	return &api.BlockMessages{ | ||||
| 		BlsMessages:   bmsgs, | ||||
| 		SecpkMessages: smsgs, | ||||
| 	}, nil | ||||
| } | ||||
| 
 | ||||
| func (a *FullNodeAPI) ChainCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error) { | ||||
| @ -154,8 +162,11 @@ func (a *FullNodeAPI) MinerCreateBlock(ctx context.Context, addr address.Address | ||||
| 
 | ||||
| 	var out chain.BlockMsg | ||||
| 	out.Header = fblk.Header | ||||
| 	for _, msg := range fblk.Messages { | ||||
| 		out.Messages = append(out.Messages, msg.Cid()) | ||||
| 	for _, msg := range fblk.BlsMessages { | ||||
| 		out.BlsMessages = append(out.BlsMessages, msg.Cid()) | ||||
| 	} | ||||
| 	for _, msg := range fblk.SecpkMessages { | ||||
| 		out.SecpkMessages = append(out.SecpkMessages, msg.Cid()) | ||||
| 	} | ||||
| 
 | ||||
| 	return &out, nil | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user