fix the most annoying bug ever
This commit is contained in:
parent
0f2334f513
commit
a8b434a708
@ -81,6 +81,7 @@ type FullNode interface {
|
||||
|
||||
MpoolPending(context.Context, *types.TipSet) ([]*types.SignedMessage, error)
|
||||
MpoolPush(context.Context, *types.SignedMessage) error
|
||||
MpoolGetNonce(context.Context, address.Address) (uint64, error)
|
||||
|
||||
// FullNodeStruct
|
||||
|
||||
@ -99,9 +100,6 @@ type FullNode interface {
|
||||
WalletSign(context.Context, address.Address, []byte) (*types.Signature, error)
|
||||
WalletDefaultAddress(context.Context) (address.Address, error)
|
||||
|
||||
// Really not sure where this belongs. It could go on the wallet, or the message pool, or the chain...
|
||||
MpoolGetNonce(context.Context, address.Address) (uint64, error)
|
||||
|
||||
// Other
|
||||
|
||||
// ClientImport imports file under the specified path into filestore
|
||||
|
@ -129,19 +129,16 @@ func (bss *BlockSyncService) collectChainSegment(start []cid.Cid, length uint64,
|
||||
}
|
||||
|
||||
if opts.IncludeMessages {
|
||||
log.Info("INCLUDING MESSAGES IN SYNC RESPONSE")
|
||||
msgs, mincl, err := bss.gatherMessages(ts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
log.Infof("messages: ", msgs)
|
||||
|
||||
bst.Messages = msgs
|
||||
bst.MsgIncludes = mincl
|
||||
}
|
||||
|
||||
if opts.IncludeBlocks {
|
||||
log.Info("INCLUDING BLOCKS IN SYNC RESPONSE")
|
||||
bst.Blocks = ts.Blocks()
|
||||
}
|
||||
|
||||
|
@ -57,7 +57,7 @@ func (m mybs) Get(c cid.Cid) (block.Block, error) {
|
||||
b, err := m.Blockstore.Get(c)
|
||||
if err != nil {
|
||||
// change to error for stacktraces, don't commit with that pls
|
||||
log.Warn("Get failed: %s %s", c, err)
|
||||
log.Warnf("Get failed: %s %s", c, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"github.com/filecoin-project/go-lotus/chain/address"
|
||||
"github.com/filecoin-project/go-lotus/chain/store"
|
||||
"github.com/filecoin-project/go-lotus/chain/types"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type MessagePool struct {
|
||||
@ -128,7 +129,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
|
||||
for _, b := range ts.Blocks() {
|
||||
msgs, err := mp.cs.MessagesForBlock(b)
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.Wrapf(err, "failed to get messages for revert block %s(height %d)", b.Cid(), b.Height)
|
||||
}
|
||||
for _, msg := range msgs {
|
||||
if err := mp.Add(msg); err != nil {
|
||||
@ -142,7 +143,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
|
||||
for _, b := range ts.Blocks() {
|
||||
msgs, err := mp.cs.MessagesForBlock(b)
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.Wrapf(err, "failed to get messages for apply block %s(height %d) (msgroot = %s)", b.Cid(), b.Height, b.Messages)
|
||||
}
|
||||
for _, msg := range msgs {
|
||||
mp.Remove(msg)
|
||||
|
@ -187,9 +187,12 @@ func (cs *ChainStore) MaybeTakeHeavierTipSet(ts *types.TipSet) error {
|
||||
cs.heaviestLk.Lock()
|
||||
defer cs.heaviestLk.Unlock()
|
||||
if cs.heaviest == nil || cs.Weight(ts) > cs.Weight(cs.heaviest) {
|
||||
// TODO: don't do this for initial sync. Now that we don't have a
|
||||
// difference between 'bootstrap sync' and 'caught up' sync, we need
|
||||
// some other heuristic.
|
||||
revert, apply, err := cs.ReorgOps(cs.heaviest, ts)
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.Wrap(err, "computing reorg ops failed")
|
||||
}
|
||||
for _, hcf := range cs.headChangeNotifs {
|
||||
if err := hcf(revert, apply); err != nil {
|
||||
@ -290,6 +293,7 @@ func (cs *ChainStore) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.Ti
|
||||
rightChain = append(rightChain, right)
|
||||
par, err := cs.LoadTipSet(right.Parents())
|
||||
if err != nil {
|
||||
log.Infof("failed to fetch right.Parents: %s", err)
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
@ -336,13 +340,21 @@ type storable interface {
|
||||
ToStorageBlock() (block.Block, error)
|
||||
}
|
||||
|
||||
func (cs *ChainStore) PutMessage(m storable) (cid.Cid, error) {
|
||||
sb, err := m.ToStorageBlock()
|
||||
func PutMessage(bs blockstore.Blockstore, m storable) (cid.Cid, error) {
|
||||
b, err := m.ToStorageBlock()
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
return sb.Cid(), cs.bs.Put(sb)
|
||||
if err := bs.Put(b); err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
return b.Cid(), nil
|
||||
}
|
||||
|
||||
func (cs *ChainStore) PutMessage(m storable) (cid.Cid, error) {
|
||||
return PutMessage(cs.bs, m)
|
||||
}
|
||||
|
||||
func (cs *ChainStore) AddBlock(b *types.BlockHeader) error {
|
||||
@ -395,6 +407,7 @@ func (cs *ChainStore) TipSetState(cids []cid.Cid) (cid.Cid, error) {
|
||||
func (cs *ChainStore) GetMessage(c cid.Cid) (*types.SignedMessage, error) {
|
||||
sb, err := cs.bs.Get(c)
|
||||
if err != nil {
|
||||
log.Errorf("get message get failed: %s: %s", c, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -428,7 +441,7 @@ func (cs *ChainStore) MessageCidsForBlock(b *types.BlockHeader) ([]cid.Cid, erro
|
||||
func (cs *ChainStore) MessagesForBlock(b *types.BlockHeader) ([]*types.SignedMessage, error) {
|
||||
cids, err := cs.MessageCidsForBlock(b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errors.Wrap(err, "loading message cids for block")
|
||||
}
|
||||
|
||||
return cs.LoadMessagesFromCids(cids)
|
||||
@ -461,10 +474,10 @@ func (cs *ChainStore) GetReceipt(b *types.BlockHeader, i int) (*types.MessageRec
|
||||
|
||||
func (cs *ChainStore) LoadMessagesFromCids(cids []cid.Cid) ([]*types.SignedMessage, error) {
|
||||
msgs := make([]*types.SignedMessage, 0, len(cids))
|
||||
for _, c := range cids {
|
||||
for i, c := range cids {
|
||||
m, err := cs.GetMessage(c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errors.Wrapf(err, "failed to get message: (%s):%d", c, i)
|
||||
}
|
||||
|
||||
msgs = append(msgs, m)
|
||||
|
427
chain/sync.go
427
chain/sync.go
@ -77,41 +77,6 @@ type BadTipSetCache struct {
|
||||
badBlocks map[cid.Cid]struct{}
|
||||
}
|
||||
|
||||
/*type BlockSet struct {
|
||||
tset map[uint64]*types.TipSet
|
||||
head *types.TipSet
|
||||
}
|
||||
|
||||
func (bs *BlockSet) Insert(ts *types.TipSet) {
|
||||
if bs.tset == nil {
|
||||
bs.tset = make(map[uint64]*types.TipSet)
|
||||
}
|
||||
|
||||
if bs.head == nil || ts.Height() > bs.head.Height() {
|
||||
bs.head = ts
|
||||
}
|
||||
bs.tset[ts.Height()] = ts
|
||||
}
|
||||
|
||||
func (bs *BlockSet) GetByHeight(h uint64) *types.TipSet {
|
||||
return bs.tset[h]
|
||||
}
|
||||
|
||||
func (bs *BlockSet) PersistTo(cs *store.ChainStore) error {
|
||||
for _, ts := range bs.tset {
|
||||
for _, b := range ts.Blocks() {
|
||||
if err := cs.PersistBlockHeader(b); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (bs *BlockSet) Head() *types.TipSet {
|
||||
return bs.head
|
||||
}*/
|
||||
|
||||
const BootstrapPeerThreshold = 1
|
||||
|
||||
// InformNewHead informs the syncer about a new potential tipset
|
||||
@ -151,86 +116,6 @@ func (syncer *Syncer) InformNewBlock(from peer.ID, blk *types.FullBlock) {
|
||||
syncer.InformNewHead(from, fts)
|
||||
}
|
||||
|
||||
// SyncBootstrap is used to synchronise your chain when first joining
|
||||
// the network, or when rejoining after significant downtime.
|
||||
func (syncer *Syncer) SyncBootstrap() {
|
||||
fmt.Println("Sync bootstrap!")
|
||||
defer fmt.Println("bye bye sync bootstrap")
|
||||
|
||||
selectedHead, err := syncer.selectHead(syncer.peerHeads)
|
||||
if err != nil {
|
||||
log.Error("failed to select head: ", err)
|
||||
return
|
||||
}
|
||||
|
||||
blockSet := []*types.TipSet{selectedHead}
|
||||
cur := selectedHead.Cids()
|
||||
|
||||
// If, for some reason, we have a suffix of the chain locally, handle that here
|
||||
for blockSet[len(blockSet)-1].Height() > 0 {
|
||||
log.Errorf("syncing local: ", cur)
|
||||
ts, err := syncer.store.LoadTipSet(cur)
|
||||
if err != nil {
|
||||
if err == bstore.ErrNotFound {
|
||||
log.Error("not found: ", cur)
|
||||
break
|
||||
}
|
||||
log.Errorf("loading local tipset: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
blockSet = append(blockSet, ts)
|
||||
cur = ts.Parents()
|
||||
}
|
||||
|
||||
for blockSet[len(blockSet)-1].Height() > 0 {
|
||||
// NB: GetBlocks validates that the blocks are in-fact the ones we
|
||||
// requested, and that they are correctly linked to eachother. It does
|
||||
// not validate any state transitions
|
||||
fmt.Println("Get blocks: ", cur)
|
||||
blks, err := syncer.Bsync.GetBlocks(context.TODO(), cur, 10)
|
||||
if err != nil {
|
||||
log.Error("failed to get blocks: ", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, b := range blks {
|
||||
blockSet = append(blockSet, b)
|
||||
}
|
||||
|
||||
cur = blks[len(blks)-1].Parents()
|
||||
}
|
||||
|
||||
// hacks. in the case that we request X blocks starting at height X+1, we
|
||||
// won't get the Genesis block in the returned blockset. This hacks around it
|
||||
if blockSet[len(blockSet)-1].Height() != 0 {
|
||||
blockSet = append(blockSet, syncer.Genesis)
|
||||
}
|
||||
|
||||
blockSet = reverse(blockSet)
|
||||
|
||||
genesis := blockSet[0]
|
||||
if !genesis.Equals(syncer.Genesis) {
|
||||
// TODO: handle this...
|
||||
log.Errorf("We synced to the wrong chain! %s != %s", genesis, syncer.Genesis)
|
||||
return
|
||||
}
|
||||
|
||||
for _, ts := range blockSet {
|
||||
for _, b := range ts.Blocks() {
|
||||
if err := syncer.store.PersistBlockHeader(b); err != nil {
|
||||
log.Errorf("failed to persist synced blocks to the chainstore: %s", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
head := blockSet[len(blockSet)-1]
|
||||
log.Errorf("Finished syncing! new head: %s", head.Cids())
|
||||
syncer.store.MaybeTakeHeavierTipSet(selectedHead)
|
||||
syncer.head = head
|
||||
}
|
||||
|
||||
func reverse(tips []*types.TipSet) []*types.TipSet {
|
||||
out := make([]*types.TipSet, len(tips))
|
||||
for i := 0; i < len(tips); i++ {
|
||||
@ -263,8 +148,6 @@ func zipTipSetAndMessages(cst *hamt.CborIpldStore, ts *types.TipSet, messages []
|
||||
if len(ts.Blocks()) != len(msgincl) {
|
||||
return nil, fmt.Errorf("msgincl length didnt match tipset size")
|
||||
}
|
||||
fmt.Println("zipping messages: ", msgincl)
|
||||
fmt.Println("into block: ", ts.Blocks()[0].Height)
|
||||
|
||||
fts := &store.FullTipSet{}
|
||||
for bi, b := range ts.Blocks() {
|
||||
@ -280,8 +163,6 @@ func zipTipSetAndMessages(cst *hamt.CborIpldStore, ts *types.TipSet, messages []
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fmt.Println("messages: ", msgCids)
|
||||
fmt.Println("message root: ", b.Messages, mroot)
|
||||
if b.Messages != mroot {
|
||||
return nil, fmt.Errorf("messages didnt match message root in header")
|
||||
}
|
||||
@ -381,29 +262,17 @@ func (syncer *Syncer) Sync(maybeHead *store.FullTipSet) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
chain, err := syncer.collectChainCaughtUp(maybeHead)
|
||||
if err != nil {
|
||||
if err := syncer.collectChain(maybeHead); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := len(chain) - 1; i >= 0; i-- {
|
||||
ts := chain[i]
|
||||
if err := syncer.ValidateTipSet(context.TODO(), ts); err != nil {
|
||||
return errors.Wrap(err, "validate tipset failed")
|
||||
}
|
||||
|
||||
if err := syncer.store.PutTipSet(ts); err != nil {
|
||||
return errors.Wrap(err, "PutTipSet failed in SyncCaughtUp")
|
||||
}
|
||||
}
|
||||
|
||||
if err := syncer.store.PutTipSet(maybeHead); err != nil {
|
||||
return errors.Wrap(err, "failed to put synced tipset to chainstore")
|
||||
}
|
||||
|
||||
if syncer.store.Weight(chain[0].TipSet()) > syncer.store.Weight(syncer.head) {
|
||||
fmt.Println("Accepted new head: ", chain[0].Cids())
|
||||
syncer.head = chain[0].TipSet()
|
||||
if syncer.store.Weight(maybeHead.TipSet()) > syncer.store.Weight(syncer.head) {
|
||||
fmt.Println("Accepted new head: ", maybeHead.Cids())
|
||||
syncer.head = maybeHead.TipSet()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -475,149 +344,159 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err
|
||||
|
||||
}
|
||||
|
||||
func (syncer *Syncer) collectChainCaughtUp(fts *store.FullTipSet) ([]*store.FullTipSet, error) {
|
||||
// fetch tipset and messages via bitswap
|
||||
func (syncer *Syncer) collectHeaders(from *types.TipSet, toHeight uint64) ([]*types.TipSet, error) {
|
||||
blockSet := []*types.TipSet{from}
|
||||
|
||||
chain := []*store.FullTipSet{fts}
|
||||
cur := fts.TipSet()
|
||||
at := from.Parents()
|
||||
|
||||
startHeight := syncer.head.Height()
|
||||
|
||||
_, err := syncer.store.LoadTipSet(cur.Parents())
|
||||
if err != nil {
|
||||
blockSet := []*types.TipSet{cur}
|
||||
|
||||
at := cur.Cids()
|
||||
|
||||
// If, for some reason, we have a suffix of the chain locally, handle that here
|
||||
for blockSet[len(blockSet)-1].Height() > startHeight {
|
||||
log.Warn("syncing local: ", at)
|
||||
ts, err := syncer.store.LoadTipSet(at)
|
||||
if err != nil {
|
||||
if err == bstore.ErrNotFound {
|
||||
log.Info("tipset not found locally, starting sync: ", at)
|
||||
break
|
||||
}
|
||||
log.Warn("loading local tipset: %s", err)
|
||||
continue // TODO: verify
|
||||
}
|
||||
|
||||
blockSet = append(blockSet, ts)
|
||||
at = ts.Parents()
|
||||
}
|
||||
|
||||
for blockSet[len(blockSet)-1].Height() > startHeight {
|
||||
// NB: GetBlocks validates that the blocks are in-fact the ones we
|
||||
// requested, and that they are correctly linked to eachother. It does
|
||||
// not validate any state transitions
|
||||
fmt.Println("Get blocks")
|
||||
blks, err := syncer.Bsync.GetBlocks(context.TODO(), at, 10)
|
||||
if err != nil {
|
||||
// Most likely our peers aren't fully synced yet, but forwarded
|
||||
// new block message (ideally we'd find better peers)
|
||||
|
||||
log.Error("failed to get blocks: ", err)
|
||||
|
||||
// This error will only be logged above,
|
||||
return nil, xerrors.Errorf("failed to get blocks: %w", err)
|
||||
}
|
||||
|
||||
for _, b := range blks {
|
||||
blockSet = append(blockSet, b)
|
||||
}
|
||||
|
||||
at = blks[len(blks)-1].Parents()
|
||||
}
|
||||
|
||||
if startHeight == 0 {
|
||||
// hacks. in the case that we request X blocks starting at height X+1, we
|
||||
// won't get the Genesis block in the returned blockset. This hacks around it
|
||||
if blockSet[len(blockSet)-1].Height() != 0 {
|
||||
blockSet = append(blockSet, syncer.Genesis)
|
||||
}
|
||||
|
||||
blockSet = reverse(blockSet)
|
||||
|
||||
genesis := blockSet[0]
|
||||
if !genesis.Equals(syncer.Genesis) {
|
||||
// TODO: handle this...
|
||||
log.Errorf("We synced to the wrong chain! %s != %s", genesis, syncer.Genesis)
|
||||
panic("We synced to the wrong chain")
|
||||
}
|
||||
}
|
||||
|
||||
for _, ts := range blockSet {
|
||||
for _, b := range ts.Blocks() {
|
||||
if err := syncer.store.PersistBlockHeader(b); err != nil {
|
||||
log.Errorf("failed to persist synced blocks to the chainstore: %s", err)
|
||||
panic("bbbbb")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch all the messages for all the blocks in this chain
|
||||
|
||||
windowSize := uint64(10)
|
||||
for i := uint64(0); i <= cur.Height(); i += windowSize {
|
||||
bs := bstore.NewBlockstore(dstore.NewMapDatastore())
|
||||
cst := hamt.CSTFromBstore(bs)
|
||||
|
||||
nextHeight := i + windowSize - 1
|
||||
if nextHeight > cur.Height() {
|
||||
nextHeight = cur.Height()
|
||||
}
|
||||
|
||||
log.Infof("Fetch next messages on %d (len(blockSet)=%d)", nextHeight, len(blockSet))
|
||||
next := blockSet[nextHeight]
|
||||
bstips, err := syncer.Bsync.GetChainMessages(context.TODO(), next, (nextHeight+1)-i)
|
||||
if err != nil {
|
||||
log.Errorf("failed to fetch messages: %s", err)
|
||||
return nil, xerrors.Errorf("message processing failed: %w", err)
|
||||
}
|
||||
|
||||
for bsi := 0; bsi < len(bstips); bsi++ {
|
||||
cur := blockSet[i+uint64(bsi)]
|
||||
bstip := bstips[len(bstips)-(bsi+1)]
|
||||
fmt.Println("that loop: ", bsi, len(bstips))
|
||||
fts, err := zipTipSetAndMessages(cst, cur, bstip.Messages, bstip.MsgIncludes)
|
||||
if err != nil {
|
||||
log.Error("zipping failed: ", err, bsi, i)
|
||||
log.Error("height: ", cur.Height())
|
||||
log.Error("bstips: ", bstips)
|
||||
log.Error("next height: ", nextHeight)
|
||||
return nil, xerrors.Errorf("message processing failed: %w", err)
|
||||
}
|
||||
|
||||
if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil {
|
||||
log.Errorf("failed to validate tipset: %s", err)
|
||||
return nil, xerrors.Errorf("message processing failed: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, bst := range bstips {
|
||||
for _, m := range bst.Messages {
|
||||
if _, err := cst.Put(context.TODO(), m); err != nil {
|
||||
log.Error("failed to persist messages: ", err)
|
||||
return nil, xerrors.Errorf("message processing failed: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := copyBlockstore(bs, syncer.store.Blockstore()); err != nil {
|
||||
log.Errorf("failed to persist temp blocks: %s", err)
|
||||
return nil, xerrors.Errorf("message processing failed: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
//log.Errorf("dont have parent blocks for sync tipset: %s", err)
|
||||
//panic("should do something better, like fetch? or error?")
|
||||
|
||||
_, err = syncer.store.LoadTipSet(cur.Parents())
|
||||
// If, for some reason, we have a suffix of the chain locally, handle that here
|
||||
for blockSet[len(blockSet)-1].Height() > toHeight {
|
||||
log.Warn("syncing local: ", at)
|
||||
ts, err := syncer.store.LoadTipSet(at)
|
||||
if err != nil {
|
||||
log.Errorf("dont have parent blocks for sync tipset: %s", err)
|
||||
panic("should do something better, error?")
|
||||
if err == bstore.ErrNotFound {
|
||||
log.Info("tipset not found locally, starting sync: ", at)
|
||||
break
|
||||
}
|
||||
log.Warn("loading local tipset: %s", err)
|
||||
continue // TODO: verify
|
||||
}
|
||||
|
||||
blockSet = append(blockSet, ts)
|
||||
at = ts.Parents()
|
||||
}
|
||||
|
||||
for blockSet[len(blockSet)-1].Height() > toHeight {
|
||||
// NB: GetBlocks validates that the blocks are in-fact the ones we
|
||||
// requested, and that they are correctly linked to eachother. It does
|
||||
// not validate any state transitions
|
||||
fmt.Println("Get blocks")
|
||||
blks, err := syncer.Bsync.GetBlocks(context.TODO(), at, 10)
|
||||
if err != nil {
|
||||
// Most likely our peers aren't fully synced yet, but forwarded
|
||||
// new block message (ideally we'd find better peers)
|
||||
|
||||
log.Error("failed to get blocks: ", err)
|
||||
|
||||
// This error will only be logged above,
|
||||
return nil, xerrors.Errorf("failed to get blocks: %w", err)
|
||||
}
|
||||
|
||||
for _, b := range blks {
|
||||
blockSet = append(blockSet, b)
|
||||
}
|
||||
|
||||
at = blks[len(blks)-1].Parents()
|
||||
}
|
||||
|
||||
if toHeight == 0 {
|
||||
// hacks. in the case that we request X blocks starting at height X+1, we
|
||||
// won't get the Genesis block in the returned blockset. This hacks around it
|
||||
if blockSet[len(blockSet)-1].Height() != 0 {
|
||||
blockSet = append(blockSet, syncer.Genesis)
|
||||
}
|
||||
|
||||
blockSet = reverse(blockSet)
|
||||
|
||||
genesis := blockSet[0]
|
||||
if !genesis.Equals(syncer.Genesis) {
|
||||
// TODO: handle this...
|
||||
log.Errorf("We synced to the wrong chain! %s != %s", genesis, syncer.Genesis)
|
||||
panic("We synced to the wrong chain")
|
||||
}
|
||||
}
|
||||
|
||||
return chain, nil // return the chain because we have this last block in our cache already.
|
||||
return blockSet, nil
|
||||
}
|
||||
|
||||
func (syncer *Syncer) syncMessagesAndCheckState(headers []*types.TipSet) error {
|
||||
// Fetch all the messages for all the blocks in this chain
|
||||
cur := headers[len(headers)-1]
|
||||
|
||||
windowSize := uint64(10)
|
||||
for i := uint64(0); i <= cur.Height(); i += windowSize {
|
||||
ds := dstore.NewMapDatastore()
|
||||
bs := bstore.NewBlockstore(ds)
|
||||
cst := hamt.CSTFromBstore(bs)
|
||||
|
||||
nextHeight := i + windowSize - 1
|
||||
if nextHeight > cur.Height() {
|
||||
nextHeight = cur.Height()
|
||||
}
|
||||
|
||||
next := headers[nextHeight]
|
||||
bstips, err := syncer.Bsync.GetChainMessages(context.TODO(), next, (nextHeight+1)-i)
|
||||
if err != nil {
|
||||
log.Errorf("failed to fetch messages: %s", err)
|
||||
return xerrors.Errorf("message processing failed: %w", err)
|
||||
}
|
||||
|
||||
for bsi := 0; bsi < len(bstips); bsi++ {
|
||||
this := headers[i+uint64(bsi)]
|
||||
bstip := bstips[len(bstips)-(bsi+1)]
|
||||
fts, err := zipTipSetAndMessages(cst, this, bstip.Messages, bstip.MsgIncludes)
|
||||
if err != nil {
|
||||
log.Error("zipping failed: ", err, bsi, i)
|
||||
log.Error("height: ", this.Height())
|
||||
log.Error("bstips: ", bstips)
|
||||
log.Error("next height: ", nextHeight)
|
||||
return xerrors.Errorf("message processing failed: %w", err)
|
||||
}
|
||||
|
||||
if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil {
|
||||
log.Errorf("failed to validate tipset: %s", err)
|
||||
return xerrors.Errorf("message processing failed: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, bst := range bstips {
|
||||
for _, m := range bst.Messages {
|
||||
switch m.Signature.Type {
|
||||
case types.KTBLS:
|
||||
//log.Infof("putting BLS message: %s", m.Cid())
|
||||
if _, err := store.PutMessage(bs, &m.Message); err != nil {
|
||||
log.Error("failed to persist messages: ", err)
|
||||
return xerrors.Errorf("BLS message processing failed: %w", err)
|
||||
}
|
||||
case types.KTSecp256k1:
|
||||
//log.Infof("putting secp256k1 message: %s", m.Cid())
|
||||
if _, err := store.PutMessage(bs, m); err != nil {
|
||||
log.Error("failed to persist messages: ", err)
|
||||
return xerrors.Errorf("secp256k1 message processing failed: %w", err)
|
||||
}
|
||||
default:
|
||||
return xerrors.Errorf("unknown signature type on message %s: %q", m.Cid(), m.Signature.TypeCode)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := copyBlockstore(bs, syncer.store.Blockstore()); err != nil {
|
||||
return xerrors.Errorf("message processing failed: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (syncer *Syncer) collectChain(fts *store.FullTipSet) error {
|
||||
curHeight := syncer.head.Height()
|
||||
|
||||
headers, err := syncer.collectHeaders(fts.TipSet(), curHeight)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, ts := range headers {
|
||||
for _, b := range ts.Blocks() {
|
||||
if err := syncer.store.PersistBlockHeader(b); err != nil {
|
||||
return xerrors.Errorf("failed to persist synced blocks to the chainstore: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := syncer.syncMessagesAndCheckState(headers); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -55,7 +55,7 @@ type syncTestUtil struct {
|
||||
mn mocknet.Mocknet
|
||||
|
||||
genesis []byte
|
||||
blocks []*types.FullBlock
|
||||
blocks []*types.FullBlock
|
||||
|
||||
nds []api.FullNode
|
||||
}
|
||||
@ -163,8 +163,8 @@ func (tu *syncTestUtil) submitSourceBlock(to int, h int) {
|
||||
var b chain.BlockMsg
|
||||
|
||||
// -1 to match block.Height
|
||||
b.Header = tu.blocks[h - 1].Header
|
||||
for _, msg := range tu.blocks[h - 1].Messages {
|
||||
b.Header = tu.blocks[h-1].Header
|
||||
for _, msg := range tu.blocks[h-1].Messages {
|
||||
c, err := tu.nds[to].(*impl.FullNodeAPI).Chain.PutMessage(msg)
|
||||
require.NoError(tu.t, err)
|
||||
|
||||
@ -176,12 +176,12 @@ func (tu *syncTestUtil) submitSourceBlock(to int, h int) {
|
||||
|
||||
func (tu *syncTestUtil) submitSourceBlocks(to int, h int, n int) {
|
||||
for i := 0; i < n; i++ {
|
||||
tu.submitSourceBlock(to, h + i)
|
||||
tu.submitSourceBlock(to, h+i)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncSimple(t *testing.T) {
|
||||
H := 20
|
||||
H := 21
|
||||
tu := prepSyncTest(t, H)
|
||||
|
||||
client := tu.addClientNode()
|
||||
@ -235,4 +235,4 @@ func TestSyncIncoming(t *testing.T) {
|
||||
|
||||
tu.compareSourceState(client)
|
||||
}
|
||||
*/
|
||||
*/
|
||||
|
6
go.mod
6
go.mod
@ -21,7 +21,7 @@ require (
|
||||
github.com/ipfs/go-ds-badger v0.0.5
|
||||
github.com/ipfs/go-filestore v0.0.2
|
||||
github.com/ipfs/go-fs-lock v0.0.1
|
||||
github.com/ipfs/go-hamt-ipld v0.0.0-20190613164304-cd074602062f
|
||||
github.com/ipfs/go-hamt-ipld v0.0.10
|
||||
github.com/ipfs/go-ipfs-blockstore v0.0.1
|
||||
github.com/ipfs/go-ipfs-chunker v0.0.1
|
||||
github.com/ipfs/go-ipfs-exchange-interface v0.0.1
|
||||
@ -61,7 +61,7 @@ require (
|
||||
github.com/multiformats/go-multiaddr-net v0.0.1
|
||||
github.com/multiformats/go-multihash v0.0.6
|
||||
github.com/pkg/errors v0.8.1
|
||||
github.com/polydawn/refmt v0.0.0-20190408063855-01bf1e26dd14
|
||||
github.com/polydawn/refmt v0.0.0-20190731040541-eff0b363297a
|
||||
github.com/prometheus/common v0.6.0
|
||||
github.com/smartystreets/assertions v1.0.1 // indirect
|
||||
github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945 // indirect
|
||||
@ -77,7 +77,7 @@ require (
|
||||
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 // indirect
|
||||
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 // indirect
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58 // indirect
|
||||
golang.org/x/sys v0.0.0-20190726002231-94b544f455ef // indirect
|
||||
golang.org/x/sys v0.0.0-20190730183949-1393eb018365 // indirect
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7
|
||||
gopkg.in/urfave/cli.v2 v2.0.0-20180128182452-d3ae77c26ac8
|
||||
launchpad.net/gocheck v0.0.0-20140225173054-000000000087 // indirect
|
||||
|
10
go.sum
10
go.sum
@ -157,8 +157,8 @@ github.com/ipfs/go-filestore v0.0.2 h1:pcYwpjtXXwirtbjBXKVJM9CTa9F7/8v1EkfnDaHTO
|
||||
github.com/ipfs/go-filestore v0.0.2/go.mod h1:KnZ41qJsCt2OX2mxZS0xsK3Psr0/oB93HMMssLujjVc=
|
||||
github.com/ipfs/go-fs-lock v0.0.1 h1:XHX8uW4jQBYWHj59XXcjg7BHlHxV9ZOYs6Y43yb7/l0=
|
||||
github.com/ipfs/go-fs-lock v0.0.1/go.mod h1:DNBekbboPKcxs1aukPSaOtFA3QfSdi5C855v0i9XJ8Y=
|
||||
github.com/ipfs/go-hamt-ipld v0.0.0-20190613164304-cd074602062f h1:CpQZA1HsuaRQaFIUq7h/KqSyclyp/LrpcyifPnKRT2k=
|
||||
github.com/ipfs/go-hamt-ipld v0.0.0-20190613164304-cd074602062f/go.mod h1:WrX60HHX2SeMb602Z1s9Ztnf/4fzNHzwH9gxNTVpEmk=
|
||||
github.com/ipfs/go-hamt-ipld v0.0.10 h1:jmJGsV/8OPpBEmO+b1nAPpqX8SG2kLeYveKk8F7IxG4=
|
||||
github.com/ipfs/go-hamt-ipld v0.0.10/go.mod h1:WrX60HHX2SeMb602Z1s9Ztnf/4fzNHzwH9gxNTVpEmk=
|
||||
github.com/ipfs/go-ipfs-blockstore v0.0.1 h1:O9n3PbmTYZoNhkgkEyrXTznbmktIXif62xLX+8dPHzc=
|
||||
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
|
||||
github.com/ipfs/go-ipfs-blocksutil v0.0.1 h1:Eh/H4pc1hsvhzsQoMEP3Bke/aW5P5rVM1IWFJMcGIPQ=
|
||||
@ -461,6 +461,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
|
||||
github.com/polydawn/refmt v0.0.0-20190221155625-df39d6c2d992/go.mod h1:uIp+gprXxxrWSjjklXD+mN4wed/tMfjMMmN/9+JsA9o=
|
||||
github.com/polydawn/refmt v0.0.0-20190408063855-01bf1e26dd14 h1:2m16U/rLwVaRdz7ANkHtHTodP3zTP3N451MADg64x5k=
|
||||
github.com/polydawn/refmt v0.0.0-20190408063855-01bf1e26dd14/go.mod h1:uIp+gprXxxrWSjjklXD+mN4wed/tMfjMMmN/9+JsA9o=
|
||||
github.com/polydawn/refmt v0.0.0-20190731040541-eff0b363297a h1:TdavzKWkPcC2G+6rKJclm/JfrWC6WZFfLUR7EJJX8MA=
|
||||
github.com/polydawn/refmt v0.0.0-20190731040541-eff0b363297a/go.mod h1:uIp+gprXxxrWSjjklXD+mN4wed/tMfjMMmN/9+JsA9o=
|
||||
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
|
||||
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs=
|
||||
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
|
||||
@ -622,8 +624,8 @@ golang.org/x/sys v0.0.0-20190524122548-abf6ff778158/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20190526052359-791d8a0f4d09/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190610200419-93c9922d18ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190726002231-94b544f455ef h1:vwqipsjwy3Y8/PQk/LmiaFjos8aOnU6Tt6oRXKD3org=
|
||||
golang.org/x/sys v0.0.0-20190726002231-94b544f455ef/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190730183949-1393eb018365 h1:SaXEMXhWzMJThc05vu6uh61Q245r4KaWMrsTedk0FDc=
|
||||
golang.org/x/sys v0.0.0-20190730183949-1393eb018365/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
|
||||
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
||||
|
Loading…
Reference in New Issue
Block a user