chain: Don't use SyncBootstrap

This commit is contained in:
Łukasz Magiera 2019-07-26 20:16:57 +02:00 committed by whyrusleeping
parent 2be7bc5025
commit fae0422de6
2 changed files with 112 additions and 132 deletions

View File

@ -129,19 +129,19 @@ func (bss *BlockSyncService) collectChainSegment(start []cid.Cid, length uint64,
}
if opts.IncludeMessages {
log.Error("INCLUDING MESSAGES IN SYNC RESPONSE")
log.Info("INCLUDING MESSAGES IN SYNC RESPONSE")
msgs, mincl, err := bss.gatherMessages(ts)
if err != nil {
return nil, err
}
log.Errorf("messages: ", msgs)
log.Infof("messages: ", msgs)
bst.Messages = msgs
bst.MsgIncludes = mincl
}
if opts.IncludeBlocks {
log.Error("INCLUDING BLOCKS IN SYNC RESPONSE")
log.Info("INCLUDING BLOCKS IN SYNC RESPONSE")
bst.Blocks = ts.Blocks()
}
@ -165,7 +165,7 @@ func (bss *BlockSyncService) gatherMessages(ts *types.TipSet) ([]*types.SignedMe
if err != nil {
return nil, nil, err
}
log.Errorf("MESSAGES FOR BLOCK: %d", len(msgs))
log.Infof("MESSAGES FOR BLOCK: %d", len(msgs))
msgindexes := make([]int, 0, len(msgs))
for _, m := range msgs {

View File

@ -5,6 +5,8 @@ import (
"fmt"
"sync"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types"
@ -34,9 +36,6 @@ type Syncer struct {
// The known Genesis tipset
Genesis *types.TipSet
// the current mode the syncer is in
syncMode SyncMode
syncLock sync.Mutex
// TipSets known to be invalid
@ -65,7 +64,6 @@ func NewSyncer(cs *store.ChainStore, bsync *BlockSync, self peer.ID) (*Syncer, e
}
return &Syncer{
syncMode: Bootstrap,
Genesis: gent,
Bsync: bsync,
peerHeads: make(map[peer.ID]*types.TipSet),
@ -75,19 +73,11 @@ func NewSyncer(cs *store.ChainStore, bsync *BlockSync, self peer.ID) (*Syncer, e
}, nil
}
type SyncMode int
const (
Unknown = SyncMode(iota)
Bootstrap
CaughtUp
)
type BadTipSetCache struct {
badBlocks map[cid.Cid]struct{}
}
type BlockSet struct {
/*type BlockSet struct {
tset map[uint64]*types.TipSet
head *types.TipSet
}
@ -120,7 +110,7 @@ func (bs *BlockSet) PersistTo(cs *store.ChainStore) error {
func (bs *BlockSet) Head() *types.TipSet {
return bs.head
}
}*/
const BootstrapPeerThreshold = 1
@ -134,12 +124,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
if from == syncer.self {
// TODO: this is kindof a hack...
log.Infof("got block from ourselves")
syncer.syncLock.Lock()
defer syncer.syncLock.Unlock()
if syncer.syncMode == Bootstrap {
syncer.syncMode = CaughtUp
}
if err := syncer.SyncCaughtUp(fts); err != nil {
log.Errorf("failed to sync our own block: %s", err)
}
@ -152,19 +137,9 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
syncer.Bsync.AddPeer(from)
go func() {
syncer.syncLock.Lock()
defer syncer.syncLock.Unlock()
switch syncer.syncMode {
case Bootstrap:
syncer.SyncBootstrap()
case CaughtUp:
if err := syncer.SyncCaughtUp(fts); err != nil {
log.Errorf("sync error: %s", err)
}
case Unknown:
panic("invalid syncer state")
}
}()
}
@ -191,12 +166,6 @@ func (syncer *Syncer) InformNewBlock(from peer.ID, blk *types.FullBlock) {
func (syncer *Syncer) SyncBootstrap() {
fmt.Println("Sync bootstrap!")
defer fmt.Println("bye bye sync bootstrap")
ctx := context.Background()
if syncer.syncMode == CaughtUp {
log.Errorf("Called SyncBootstrap while in caught up mode")
return
}
selectedHead, err := syncer.selectHead(syncer.peerHeads)
if err != nil {
@ -266,66 +235,12 @@ func (syncer *Syncer) SyncBootstrap() {
}
}
// Fetch all the messages for all the blocks in this chain
windowSize := uint64(10)
for i := uint64(0); i <= selectedHead.Height(); i += windowSize {
bs := bstore.NewBlockstore(dstore.NewMapDatastore())
cst := hamt.CSTFromBstore(bs)
nextHeight := i + windowSize - 1
if nextHeight > selectedHead.Height() {
nextHeight = selectedHead.Height()
}
next := blockSet[nextHeight]
bstips, err := syncer.Bsync.GetChainMessages(ctx, next, (nextHeight+1)-i)
if err != nil {
log.Errorf("failed to fetch messages: %s", err)
return
}
for bsi := 0; bsi < len(bstips); bsi++ {
cur := blockSet[i+uint64(bsi)]
bstip := bstips[len(bstips)-(bsi+1)]
fmt.Println("that loop: ", bsi, len(bstips))
fts, err := zipTipSetAndMessages(cst, cur, bstip.Messages, bstip.MsgIncludes)
if err != nil {
log.Error("zipping failed: ", err, bsi, i)
log.Error("height: ", selectedHead.Height())
log.Error("bstips: ", bstips)
log.Error("next height: ", nextHeight)
return
}
if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil {
log.Errorf("failed to validate tipset: %s", err)
return
}
}
for _, bst := range bstips {
for _, m := range bst.Messages {
if _, err := cst.Put(context.TODO(), m); err != nil {
log.Error("failed to persist messages: ", err)
return
}
}
}
if err := copyBlockstore(bs, syncer.store.Blockstore()); err != nil {
log.Errorf("failed to persist temp blocks: %s", err)
return
}
}
head := blockSet[len(blockSet)-1]
log.Errorf("Finished syncing! new head: %s", head.Cids())
if err := syncer.store.MaybeTakeHeavierTipSet(selectedHead); err != nil {
log.Errorf("MaybeTakeHeavierTipSet failed: %s", err)
}
syncer.head = head
syncer.syncMode = CaughtUp
}
func reverse(tips []*types.TipSet) []*types.TipSet {
@ -472,6 +387,9 @@ func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*store.FullTipSet, erro
// SyncCaughtUp is used to stay in sync once caught up to
// the rest of the network.
func (syncer *Syncer) SyncCaughtUp(maybeHead *store.FullTipSet) error {
syncer.syncLock.Lock()
defer syncer.syncLock.Unlock()
ts := maybeHead.TipSet()
if syncer.Genesis.Equals(ts) {
return nil
@ -581,8 +499,10 @@ func (syncer *Syncer) collectChainCaughtUp(fts *store.FullTipSet) ([]*store.Full
chain := []*store.FullTipSet{fts}
cur := fts.TipSet()
startHeight := syncer.head.Height()
for {
ts, err := syncer.store.LoadTipSet(cur.Parents())
_, err := syncer.store.LoadTipSet(cur.Parents())
if err != nil {
// <TODO: cleanup>
// TODO: This is 'borrowed' from SyncBootstrap, needs at least some deduplicating
@ -590,15 +510,38 @@ func (syncer *Syncer) collectChainCaughtUp(fts *store.FullTipSet) ([]*store.Full
blockSet := []*types.TipSet{cur}
at := cur.Cids()
for blockSet[len(blockSet)-1].Height() > syncer.head.Height() {
// If, for some reason, we have a suffix of the chain locally, handle that here
for blockSet[len(blockSet)-1].Height() > startHeight {
log.Warn("syncing local: ", at)
ts, err := syncer.store.LoadTipSet(at)
if err != nil {
if err == bstore.ErrNotFound {
log.Error("not found: ", at)
break
}
log.Warn("loading local tipset: %s", err)
continue // TODO: verify
}
blockSet = append(blockSet, ts)
at = ts.Parents()
}
for blockSet[len(blockSet)-1].Height() > startHeight {
// NB: GetBlocks validates that the blocks are in-fact the ones we
// requested, and that they are correctly linked to eachother. It does
// not validate any state transitions
fmt.Println("CaughtUp Get blocks")
blks, err := syncer.Bsync.GetBlocks(context.TODO(), at, 10)
if err != nil {
// Most likely our peers aren't fully synced yet, but forwarded
// new block message (ideally we'd find better peers)
log.Error("failed to get blocks: ", err)
panic("aaa")
// This error will only be logged above,
return nil, xerrors.Errorf("failed to get blocks: %w", err)
}
for _, b := range blks {
@ -608,6 +551,23 @@ func (syncer *Syncer) collectChainCaughtUp(fts *store.FullTipSet) ([]*store.Full
at = blks[len(blks)-1].Parents()
}
if startHeight == 0 {
// hacks. in the case that we request X blocks starting at height X+1, we
// won't get the Genesis block in the returned blockset. This hacks around it
if blockSet[len(blockSet)-1].Height() != 0 {
blockSet = append(blockSet, syncer.Genesis)
}
blockSet = reverse(blockSet)
genesis := blockSet[0]
if !genesis.Equals(syncer.Genesis) {
// TODO: handle this...
log.Errorf("We synced to the wrong chain! %s != %s", genesis, syncer.Genesis)
panic("We synced to the wrong chain")
}
}
for _, ts := range blockSet {
for _, b := range ts.Blocks() {
if err := syncer.store.PersistBlockHeader(b); err != nil {
@ -617,12 +577,64 @@ func (syncer *Syncer) collectChainCaughtUp(fts *store.FullTipSet) ([]*store.Full
}
}
// TODO: Message processing?
// Fetch all the messages for all the blocks in this chain
windowSize := uint64(10)
for i := uint64(0); i <= cur.Height(); i += windowSize {
bs := bstore.NewBlockstore(dstore.NewMapDatastore())
cst := hamt.CSTFromBstore(bs)
nextHeight := i + windowSize - 1
if nextHeight > cur.Height() {
nextHeight = cur.Height()
}
log.Infof("Fetch next messages on %d (len(blockSet)=%d)", nextHeight, len(blockSet))
next := blockSet[nextHeight]
bstips, err := syncer.Bsync.GetChainMessages(context.TODO(), next, (nextHeight+1)-i)
if err != nil {
log.Errorf("failed to fetch messages: %s", err)
return nil, xerrors.Errorf("message processing failed: %w", err)
}
for bsi := 0; bsi < len(bstips); bsi++ {
cur := blockSet[i+uint64(bsi)]
bstip := bstips[len(bstips)-(bsi+1)]
fmt.Println("that loop: ", bsi, len(bstips))
fts, err := zipTipSetAndMessages(cst, cur, bstip.Messages, bstip.MsgIncludes)
if err != nil {
log.Error("zipping failed: ", err, bsi, i)
log.Error("height: ", cur.Height())
log.Error("bstips: ", bstips)
log.Error("next height: ", nextHeight)
return nil, xerrors.Errorf("message processing failed: %w", err)
}
if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil {
log.Errorf("failed to validate tipset: %s", err)
return nil, xerrors.Errorf("message processing failed: %w", err)
}
}
for _, bst := range bstips {
for _, m := range bst.Messages {
if _, err := cst.Put(context.TODO(), m); err != nil {
log.Error("failed to persist messages: ", err)
return nil, xerrors.Errorf("message processing failed: %w", err)
}
}
}
if err := copyBlockstore(bs, syncer.store.Blockstore()); err != nil {
log.Errorf("failed to persist temp blocks: %s", err)
return nil, xerrors.Errorf("message processing failed: %w", err)
}
}
//log.Errorf("dont have parent blocks for sync tipset: %s", err)
//panic("should do something better, like fetch? or error?")
ts, err = syncer.store.LoadTipSet(cur.Parents())
_, err = syncer.store.LoadTipSet(cur.Parents())
if err != nil {
log.Errorf("HACK DIDNT WORK :( dont have parent blocks for sync tipset: %s", err)
panic("should do something better, like fetch? or error?")
@ -633,38 +645,6 @@ func (syncer *Syncer) collectChainCaughtUp(fts *store.FullTipSet) ([]*store.Full
return chain, nil // return the chain because we have this last block in our cache already.
if ts.Equals(syncer.Genesis) {
break
}
/*
if !syncer.Punctual(ts) {
syncer.bad.InvalidateChain(chain)
syncer.bad.InvalidateTipSet(ts)
return nil, errors.New("tipset forks too far back from head")
}
*/
chain = append(chain, fts)
log.Error("received unknown chain in caught up mode...")
panic("for now, we panic...")
has, err := syncer.store.Contains(ts)
if err != nil {
return nil, err
}
if has {
// Store has record of this tipset.
return chain, nil
}
/*
parent, err := syncer.FetchTipSet(context.TODO(), ts.Parents())
if err != nil {
return nil, err
}
ts = parent
*/
}
return chain, nil