diff --git a/chain/blocksync.go b/chain/blocksync.go index dd38148a3..d9751ca23 100644 --- a/chain/blocksync.go +++ b/chain/blocksync.go @@ -129,19 +129,19 @@ func (bss *BlockSyncService) collectChainSegment(start []cid.Cid, length uint64, } if opts.IncludeMessages { - log.Error("INCLUDING MESSAGES IN SYNC RESPONSE") + log.Info("INCLUDING MESSAGES IN SYNC RESPONSE") msgs, mincl, err := bss.gatherMessages(ts) if err != nil { return nil, err } - log.Errorf("messages: ", msgs) + log.Infof("messages: ", msgs) bst.Messages = msgs bst.MsgIncludes = mincl } if opts.IncludeBlocks { - log.Error("INCLUDING BLOCKS IN SYNC RESPONSE") + log.Info("INCLUDING BLOCKS IN SYNC RESPONSE") bst.Blocks = ts.Blocks() } @@ -165,7 +165,7 @@ func (bss *BlockSyncService) gatherMessages(ts *types.TipSet) ([]*types.SignedMe if err != nil { return nil, nil, err } - log.Errorf("MESSAGES FOR BLOCK: %d", len(msgs)) + log.Infof("MESSAGES FOR BLOCK: %d", len(msgs)) msgindexes := make([]int, 0, len(msgs)) for _, m := range msgs { diff --git a/chain/sync.go b/chain/sync.go index 01ddc0b17..3e24e9591 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -5,6 +5,8 @@ import ( "fmt" "sync" + "golang.org/x/xerrors" + "github.com/filecoin-project/go-lotus/chain/actors" "github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/types" @@ -34,9 +36,6 @@ type Syncer struct { // The known Genesis tipset Genesis *types.TipSet - // the current mode the syncer is in - syncMode SyncMode - syncLock sync.Mutex // TipSets known to be invalid @@ -65,7 +64,6 @@ func NewSyncer(cs *store.ChainStore, bsync *BlockSync, self peer.ID) (*Syncer, e } return &Syncer{ - syncMode: Bootstrap, Genesis: gent, Bsync: bsync, peerHeads: make(map[peer.ID]*types.TipSet), @@ -75,19 +73,11 @@ func NewSyncer(cs *store.ChainStore, bsync *BlockSync, self peer.ID) (*Syncer, e }, nil } -type SyncMode int - -const ( - Unknown = SyncMode(iota) - Bootstrap - CaughtUp -) - type BadTipSetCache struct { badBlocks map[cid.Cid]struct{} } -type BlockSet struct { +/*type BlockSet struct { tset map[uint64]*types.TipSet head *types.TipSet } @@ -120,7 +110,7 @@ func (bs *BlockSet) PersistTo(cs *store.ChainStore) error { func (bs *BlockSet) Head() *types.TipSet { return bs.head -} +}*/ const BootstrapPeerThreshold = 1 @@ -134,12 +124,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) { if from == syncer.self { // TODO: this is kindof a hack... log.Infof("got block from ourselves") - syncer.syncLock.Lock() - defer syncer.syncLock.Unlock() - if syncer.syncMode == Bootstrap { - syncer.syncMode = CaughtUp - } if err := syncer.SyncCaughtUp(fts); err != nil { log.Errorf("failed to sync our own block: %s", err) } @@ -152,18 +137,8 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) { syncer.Bsync.AddPeer(from) go func() { - syncer.syncLock.Lock() - defer syncer.syncLock.Unlock() - - switch syncer.syncMode { - case Bootstrap: - syncer.SyncBootstrap() - case CaughtUp: - if err := syncer.SyncCaughtUp(fts); err != nil { - log.Errorf("sync error: %s", err) - } - case Unknown: - panic("invalid syncer state") + if err := syncer.SyncCaughtUp(fts); err != nil { + log.Errorf("sync error: %s", err) } }() } @@ -191,12 +166,6 @@ func (syncer *Syncer) InformNewBlock(from peer.ID, blk *types.FullBlock) { func (syncer *Syncer) SyncBootstrap() { fmt.Println("Sync bootstrap!") defer fmt.Println("bye bye sync bootstrap") - ctx := context.Background() - - if syncer.syncMode == CaughtUp { - log.Errorf("Called SyncBootstrap while in caught up mode") - return - } selectedHead, err := syncer.selectHead(syncer.peerHeads) if err != nil { @@ -266,66 +235,12 @@ func (syncer *Syncer) SyncBootstrap() { } } - // Fetch all the messages for all the blocks in this chain - - windowSize := uint64(10) - for i := uint64(0); i <= selectedHead.Height(); i += windowSize { - bs := bstore.NewBlockstore(dstore.NewMapDatastore()) - cst := hamt.CSTFromBstore(bs) - - nextHeight := i + windowSize - 1 - if nextHeight > selectedHead.Height() { - nextHeight = selectedHead.Height() - } - - next := blockSet[nextHeight] - bstips, err := syncer.Bsync.GetChainMessages(ctx, next, (nextHeight+1)-i) - if err != nil { - log.Errorf("failed to fetch messages: %s", err) - return - } - - for bsi := 0; bsi < len(bstips); bsi++ { - cur := blockSet[i+uint64(bsi)] - bstip := bstips[len(bstips)-(bsi+1)] - fmt.Println("that loop: ", bsi, len(bstips)) - fts, err := zipTipSetAndMessages(cst, cur, bstip.Messages, bstip.MsgIncludes) - if err != nil { - log.Error("zipping failed: ", err, bsi, i) - log.Error("height: ", selectedHead.Height()) - log.Error("bstips: ", bstips) - log.Error("next height: ", nextHeight) - return - } - - if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil { - log.Errorf("failed to validate tipset: %s", err) - return - } - } - - for _, bst := range bstips { - for _, m := range bst.Messages { - if _, err := cst.Put(context.TODO(), m); err != nil { - log.Error("failed to persist messages: ", err) - return - } - } - } - - if err := copyBlockstore(bs, syncer.store.Blockstore()); err != nil { - log.Errorf("failed to persist temp blocks: %s", err) - return - } - } - head := blockSet[len(blockSet)-1] log.Errorf("Finished syncing! new head: %s", head.Cids()) if err := syncer.store.MaybeTakeHeavierTipSet(selectedHead); err != nil { log.Errorf("MaybeTakeHeavierTipSet failed: %s", err) } syncer.head = head - syncer.syncMode = CaughtUp } func reverse(tips []*types.TipSet) []*types.TipSet { @@ -472,6 +387,9 @@ func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*store.FullTipSet, erro // SyncCaughtUp is used to stay in sync once caught up to // the rest of the network. func (syncer *Syncer) SyncCaughtUp(maybeHead *store.FullTipSet) error { + syncer.syncLock.Lock() + defer syncer.syncLock.Unlock() + ts := maybeHead.TipSet() if syncer.Genesis.Equals(ts) { return nil @@ -581,8 +499,10 @@ func (syncer *Syncer) collectChainCaughtUp(fts *store.FullTipSet) ([]*store.Full chain := []*store.FullTipSet{fts} cur := fts.TipSet() + startHeight := syncer.head.Height() + for { - ts, err := syncer.store.LoadTipSet(cur.Parents()) + _, err := syncer.store.LoadTipSet(cur.Parents()) if err != nil { // // TODO: This is 'borrowed' from SyncBootstrap, needs at least some deduplicating @@ -590,15 +510,38 @@ func (syncer *Syncer) collectChainCaughtUp(fts *store.FullTipSet) ([]*store.Full blockSet := []*types.TipSet{cur} at := cur.Cids() - for blockSet[len(blockSet)-1].Height() > syncer.head.Height() { + + // If, for some reason, we have a suffix of the chain locally, handle that here + for blockSet[len(blockSet)-1].Height() > startHeight { + log.Warn("syncing local: ", at) + ts, err := syncer.store.LoadTipSet(at) + if err != nil { + if err == bstore.ErrNotFound { + log.Error("not found: ", at) + break + } + log.Warn("loading local tipset: %s", err) + continue // TODO: verify + } + + blockSet = append(blockSet, ts) + at = ts.Parents() + } + + for blockSet[len(blockSet)-1].Height() > startHeight { // NB: GetBlocks validates that the blocks are in-fact the ones we // requested, and that they are correctly linked to eachother. It does // not validate any state transitions fmt.Println("CaughtUp Get blocks") blks, err := syncer.Bsync.GetBlocks(context.TODO(), at, 10) if err != nil { + // Most likely our peers aren't fully synced yet, but forwarded + // new block message (ideally we'd find better peers) + log.Error("failed to get blocks: ", err) - panic("aaa") + + // This error will only be logged above, + return nil, xerrors.Errorf("failed to get blocks: %w", err) } for _, b := range blks { @@ -608,6 +551,23 @@ func (syncer *Syncer) collectChainCaughtUp(fts *store.FullTipSet) ([]*store.Full at = blks[len(blks)-1].Parents() } + if startHeight == 0 { + // hacks. in the case that we request X blocks starting at height X+1, we + // won't get the Genesis block in the returned blockset. This hacks around it + if blockSet[len(blockSet)-1].Height() != 0 { + blockSet = append(blockSet, syncer.Genesis) + } + + blockSet = reverse(blockSet) + + genesis := blockSet[0] + if !genesis.Equals(syncer.Genesis) { + // TODO: handle this... + log.Errorf("We synced to the wrong chain! %s != %s", genesis, syncer.Genesis) + panic("We synced to the wrong chain") + } + } + for _, ts := range blockSet { for _, b := range ts.Blocks() { if err := syncer.store.PersistBlockHeader(b); err != nil { @@ -617,12 +577,64 @@ func (syncer *Syncer) collectChainCaughtUp(fts *store.FullTipSet) ([]*store.Full } } - // TODO: Message processing? + // Fetch all the messages for all the blocks in this chain + + windowSize := uint64(10) + for i := uint64(0); i <= cur.Height(); i += windowSize { + bs := bstore.NewBlockstore(dstore.NewMapDatastore()) + cst := hamt.CSTFromBstore(bs) + + nextHeight := i + windowSize - 1 + if nextHeight > cur.Height() { + nextHeight = cur.Height() + } + + log.Infof("Fetch next messages on %d (len(blockSet)=%d)", nextHeight, len(blockSet)) + next := blockSet[nextHeight] + bstips, err := syncer.Bsync.GetChainMessages(context.TODO(), next, (nextHeight+1)-i) + if err != nil { + log.Errorf("failed to fetch messages: %s", err) + return nil, xerrors.Errorf("message processing failed: %w", err) + } + + for bsi := 0; bsi < len(bstips); bsi++ { + cur := blockSet[i+uint64(bsi)] + bstip := bstips[len(bstips)-(bsi+1)] + fmt.Println("that loop: ", bsi, len(bstips)) + fts, err := zipTipSetAndMessages(cst, cur, bstip.Messages, bstip.MsgIncludes) + if err != nil { + log.Error("zipping failed: ", err, bsi, i) + log.Error("height: ", cur.Height()) + log.Error("bstips: ", bstips) + log.Error("next height: ", nextHeight) + return nil, xerrors.Errorf("message processing failed: %w", err) + } + + if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil { + log.Errorf("failed to validate tipset: %s", err) + return nil, xerrors.Errorf("message processing failed: %w", err) + } + } + + for _, bst := range bstips { + for _, m := range bst.Messages { + if _, err := cst.Put(context.TODO(), m); err != nil { + log.Error("failed to persist messages: ", err) + return nil, xerrors.Errorf("message processing failed: %w", err) + } + } + } + + if err := copyBlockstore(bs, syncer.store.Blockstore()); err != nil { + log.Errorf("failed to persist temp blocks: %s", err) + return nil, xerrors.Errorf("message processing failed: %w", err) + } + } //log.Errorf("dont have parent blocks for sync tipset: %s", err) //panic("should do something better, like fetch? or error?") - ts, err = syncer.store.LoadTipSet(cur.Parents()) + _, err = syncer.store.LoadTipSet(cur.Parents()) if err != nil { log.Errorf("HACK DIDNT WORK :( dont have parent blocks for sync tipset: %s", err) panic("should do something better, like fetch? or error?") @@ -633,38 +645,6 @@ func (syncer *Syncer) collectChainCaughtUp(fts *store.FullTipSet) ([]*store.Full return chain, nil // return the chain because we have this last block in our cache already. - if ts.Equals(syncer.Genesis) { - break - } - - /* - if !syncer.Punctual(ts) { - syncer.bad.InvalidateChain(chain) - syncer.bad.InvalidateTipSet(ts) - return nil, errors.New("tipset forks too far back from head") - } - */ - - chain = append(chain, fts) - log.Error("received unknown chain in caught up mode...") - panic("for now, we panic...") - - has, err := syncer.store.Contains(ts) - if err != nil { - return nil, err - } - if has { - // Store has record of this tipset. - return chain, nil - } - - /* - parent, err := syncer.FetchTipSet(context.TODO(), ts.Parents()) - if err != nil { - return nil, err - } - ts = parent - */ } return chain, nil