clean up sync, make everything go in the same direction

This commit is contained in:
whyrusleeping 2019-08-02 15:21:46 -07:00
parent af73ae0b89
commit 14f6cc8a0b
3 changed files with 116 additions and 50 deletions

View File

@ -105,6 +105,12 @@ func (bss *BlockSyncService) HandleStream(s inet.Stream) {
func (bss *BlockSyncService) processRequest(req *BlockSyncRequest) (*BlockSyncResponse, error) { func (bss *BlockSyncService) processRequest(req *BlockSyncRequest) (*BlockSyncResponse, error) {
opts := ParseBSOptions(req.Options) opts := ParseBSOptions(req.Options)
if len(req.Start) == 0 {
return &BlockSyncResponse{
Status: 204,
}, nil
}
chain, err := bss.collectChainSegment(req.Start, req.RequestLength, opts) chain, err := bss.collectChainSegment(req.Start, req.RequestLength, opts)
if err != nil { if err != nil {
log.Error("encountered error while responding to block sync request: ", err) log.Error("encountered error while responding to block sync request: ", err)

View File

@ -66,8 +66,8 @@ func NewSyncer(cs *store.ChainStore, bsync *BlockSync, self peer.ID) (*Syncer, e
Genesis: gent, Genesis: gent,
Bsync: bsync, Bsync: bsync,
peerHeads: make(map[peer.ID]*types.TipSet), peerHeads: make(map[peer.ID]*types.TipSet),
store: cs, store: cs,
self: self, self: self,
}, nil }, nil
} }
@ -256,7 +256,7 @@ func (syncer *Syncer) Sync(maybeHead *store.FullTipSet) error {
defer syncer.syncLock.Unlock() defer syncer.syncLock.Unlock()
ts := maybeHead.TipSet() ts := maybeHead.TipSet()
if syncer.Genesis.Equals(ts) { if syncer.Genesis.Equals(ts) || syncer.store.GetHeaviestTipSet().Equals(ts) {
return nil return nil
} }
@ -289,9 +289,9 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err
h := b.Header h := b.Header
stateroot, err := syncer.store.TipSetState(h.Parents) stateroot, err := syncer.store.TipSetState(h.Parents)
if err != nil { if err != nil {
log.Error("get tipsetstate failed: ", h.Height, h.Parents, err) return xerrors.Errorf("get tipsetstate(%d, %s) failed: %w", h.Height, h.Parents, err)
return err
} }
baseTs, err := syncer.store.LoadTipSet(b.Header.Parents) baseTs, err := syncer.store.LoadTipSet(b.Header.Parents)
if err != nil { if err != nil {
return err return err
@ -338,13 +338,16 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err
} }
func (syncer *Syncer) collectHeaders(from *types.TipSet, toHeight uint64) ([]*types.TipSet, error) { func (syncer *Syncer) collectHeaders(from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) {
blockSet := []*types.TipSet{from} blockSet := []*types.TipSet{from}
at := from.Parents() at := from.Parents()
// we want to sync all the blocks until the height above the block we have
untilHeight := to.Height() + 1
// If, for some reason, we have a suffix of the chain locally, handle that here // If, for some reason, we have a suffix of the chain locally, handle that here
for blockSet[len(blockSet)-1].Height() > toHeight { for blockSet[len(blockSet)-1].Height() > untilHeight {
log.Warn("syncing local: ", at) log.Warn("syncing local: ", at)
ts, err := syncer.store.LoadTipSet(at) ts, err := syncer.store.LoadTipSet(at)
if err != nil { if err != nil {
@ -360,7 +363,7 @@ func (syncer *Syncer) collectHeaders(from *types.TipSet, toHeight uint64) ([]*ty
at = ts.Parents() at = ts.Parents()
} }
for blockSet[len(blockSet)-1].Height() > toHeight { for blockSet[len(blockSet)-1].Height() > untilHeight {
// NB: GetBlocks validates that the blocks are in-fact the ones we // NB: GetBlocks validates that the blocks are in-fact the ones we
// requested, and that they are correctly linked to eachother. It does // requested, and that they are correctly linked to eachother. It does
// not validate any state transitions // not validate any state transitions
@ -377,34 +380,43 @@ func (syncer *Syncer) collectHeaders(from *types.TipSet, toHeight uint64) ([]*ty
} }
for _, b := range blks { for _, b := range blks {
if b.Height() < untilHeight {
break
}
blockSet = append(blockSet, b) blockSet = append(blockSet, b)
} }
at = blks[len(blks)-1].Parents() at = blks[len(blks)-1].Parents()
} }
if toHeight == 0 { if !cidArrsEqual(blockSet[len(blockSet)-1].Parents(), to.Cids()) {
// hacks. in the case that we request X blocks starting at height X+1, we // TODO: handle the case where we are on a fork and its not a simple fast forward
// won't get the Genesis block in the returned blockset. This hacks around it return nil, xerrors.Errorf("synced header chain does not link to our best block")
if blockSet[len(blockSet)-1].Height() != 0 {
blockSet = append(blockSet, syncer.Genesis)
}
blockSet = reverse(blockSet)
genesis := blockSet[0]
if !genesis.Equals(syncer.Genesis) {
// TODO: handle this...
log.Errorf("We synced to the wrong chain! %s != %s", genesis, syncer.Genesis)
panic("We synced to the wrong chain")
}
} }
/*
if to.Height() == 0 {
// hacks. in the case that we request X blocks starting at height X+1, we
// won't get the Genesis block in the returned blockset. This hacks around it
if blockSet[len(blockSet)-1].Height() != 0 {
blockSet = append(blockSet, syncer.Genesis)
}
genesis := blockSet[len(blockSet)-1]
if !genesis.Equals(syncer.Genesis) {
// TODO: handle this...
log.Errorf("We synced to the wrong chain! %s != %s", genesis, syncer.Genesis)
panic("We synced to the wrong chain")
}
}
*/
return blockSet, nil return blockSet, nil
} }
func (syncer *Syncer) syncMessagesAndCheckState(headers []*types.TipSet) error { func (syncer *Syncer) syncMessagesAndCheckState(headers []*types.TipSet) error {
return syncer.iterFullTipsets(headers, func(fts *store.FullTipSet) error { return syncer.iterFullTipsets(headers, func(fts *store.FullTipSet) error {
log.Warn("validating tipset: ", fts.TipSet().Height())
if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil { if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil {
log.Errorf("failed to validate tipset: %s", err) log.Errorf("failed to validate tipset: %s", err)
return xerrors.Errorf("message processing failed: %w", err) return xerrors.Errorf("message processing failed: %w", err)
@ -415,9 +427,9 @@ func (syncer *Syncer) syncMessagesAndCheckState(headers []*types.TipSet) error {
// fills out each of the given tipsets with messages and calls the callback with it // fills out each of the given tipsets with messages and calls the callback with it
func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.FullTipSet) error) error { func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.FullTipSet) error) error {
var beg int beg := len(headers) - 1
// handle case where we have a prefix of these locally // handle case where we have a prefix of these locally
for ; beg < len(headers); beg++ { for ; beg >= 0; beg-- {
fts, err := syncer.store.TryFillTipSet(headers[beg]) fts, err := syncer.store.TryFillTipSet(headers[beg])
if err != nil { if err != nil {
return err return err
@ -429,38 +441,46 @@ func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.Fu
return err return err
} }
} }
headers = headers[beg:] headers = headers[:beg+1]
windowSize := 10 windowSize := 10
for i := 0; i < len(headers); i += windowSize { for i := len(headers) - 1; i >= 0; i -= windowSize {
// temp storage so we don't persist data we dont want to // temp storage so we don't persist data we dont want to
ds := dstore.NewMapDatastore() ds := dstore.NewMapDatastore()
bs := bstore.NewBlockstore(ds) bs := bstore.NewBlockstore(ds)
cst := hamt.CSTFromBstore(bs) cst := hamt.CSTFromBstore(bs)
batchSize := windowSize batchSize := windowSize
if i+batchSize >= len(headers) { if i < batchSize {
batchSize = (len(headers) - i) - 1 batchSize = i
} }
next := headers[i+batchSize] next := headers[i-batchSize]
bstips, err := syncer.Bsync.GetChainMessages(context.TODO(), next, uint64(batchSize+1)) bstips, err := syncer.Bsync.GetChainMessages(context.TODO(), next, uint64(batchSize+1))
if err != nil { if err != nil {
log.Errorf("failed to fetch messages: %s", err) log.Errorf("failed to fetch messages: %s", err)
return xerrors.Errorf("message processing failed: %w", err) return xerrors.Errorf("message processing failed: %w", err)
} }
log.Infof("len bstips: %d, batchSize+1: %d, len(headers): %d", len(bstips), batchSize+1, len(headers))
if len(headers) == 2 {
log.Infof("headers[0]: %d", headers[0].Height())
log.Infof("headers[1]: %d", headers[1].Height())
log.Infof("bstips[0]: %d", bstips[0].Blocks[0].Height)
log.Infof("bstips[1]: %d", bstips[1].Blocks[0].Height)
}
for bsi := 0; bsi < len(bstips); bsi++ { for bsi := 0; bsi < len(bstips); bsi++ {
this := headers[i+bsi] this := headers[i-bsi]
bstip := bstips[len(bstips)-(bsi+1)] bstip := bstips[len(bstips)-(bsi+1)]
fts, err := zipTipSetAndMessages(cst, this, bstip.Messages, bstip.MsgIncludes) fts, err := zipTipSetAndMessages(cst, this, bstip.Messages, bstip.MsgIncludes)
if err != nil { if err != nil {
log.Error("zipping failed: ", err, bsi, i) log.Warn("zipping failed: ", err, bsi, i)
log.Error("height: ", this.Height()) log.Warn("height: ", this.Height())
log.Error("bstip height: ", bstip.Blocks[0].Height) log.Warn("bstip height: ", bstip.Blocks[0].Height)
log.Error("bstips: ", bstips) log.Warn("bstips: ", bstips)
log.Error("next height: ", i+batchSize) log.Warn("next height: ", i+batchSize)
return xerrors.Errorf("message processing failed: %w", err) return xerrors.Errorf("message processing failed: %w", err)
} }
@ -507,9 +527,7 @@ func persistMessages(bs bstore.Blockstore, bstips []*BSTipSet) error {
} }
func (syncer *Syncer) collectChain(fts *store.FullTipSet) error { func (syncer *Syncer) collectChain(fts *store.FullTipSet) error {
curHeight := syncer.store.GetHeaviestTipSet().Height() headers, err := syncer.collectHeaders(fts.TipSet(), syncer.store.GetHeaviestTipSet())
headers, err := syncer.collectHeaders(fts.TipSet(), curHeight)
if err != nil { if err != nil {
return err return err
} }

View File

@ -22,16 +22,12 @@ import (
const source = 0 const source = 0
func repoWithChain(t *testing.T, h int) (repo.Repo, []byte, []*types.FullBlock) { func (tu *syncTestUtil) repoWithChain(t *testing.T, h int) (repo.Repo, []byte, []*types.FullBlock) {
g, err := gen.NewGenerator()
if err != nil {
t.Fatal(err)
}
blks := make([]*types.FullBlock, h) blks := make([]*types.FullBlock, h)
for i := 0; i < h; i++ { for i := 0; i < h; i++ {
blks[i], err = g.NextBlock() var err error
blks[i], err = tu.g.NextBlock()
require.NoError(t, err) require.NoError(t, err)
fmt.Printf("block at H:%d: %s\n", blks[i].Header.Height, blks[i].Cid()) fmt.Printf("block at H:%d: %s\n", blks[i].Header.Height, blks[i].Cid())
@ -39,10 +35,10 @@ func repoWithChain(t *testing.T, h int) (repo.Repo, []byte, []*types.FullBlock)
require.Equal(t, uint64(i+1), blks[i].Header.Height, "wrong height") require.Equal(t, uint64(i+1), blks[i].Header.Height, "wrong height")
} }
r, err := g.YieldRepo() r, err := tu.g.YieldRepo()
require.NoError(t, err) require.NoError(t, err)
genb, err := g.GenesisCar() genb, err := tu.g.GenesisCar()
require.NoError(t, err) require.NoError(t, err)
return r, genb, blks return r, genb, blks
@ -54,6 +50,8 @@ type syncTestUtil struct {
ctx context.Context ctx context.Context
mn mocknet.Mocknet mn mocknet.Mocknet
g *gen.ChainGen
genesis []byte genesis []byte
blocks []*types.FullBlock blocks []*types.FullBlock
@ -63,11 +61,17 @@ type syncTestUtil struct {
func prepSyncTest(t *testing.T, h int) *syncTestUtil { func prepSyncTest(t *testing.T, h int) *syncTestUtil {
logging.SetLogLevel("*", "INFO") logging.SetLogLevel("*", "INFO")
g, err := gen.NewGenerator()
if err != nil {
t.Fatal(err)
}
ctx := context.Background() ctx := context.Background()
tu := &syncTestUtil{ tu := &syncTestUtil{
t: t, t: t,
ctx: ctx, ctx: ctx,
mn: mocknet.New(ctx), mn: mocknet.New(ctx),
g: g,
} }
tu.addSourceNode(h) tu.addSourceNode(h)
@ -84,7 +88,7 @@ func (tu *syncTestUtil) addSourceNode(gen int) {
tu.t.Fatal("source node already exists") tu.t.Fatal("source node already exists")
} }
sourceRepo, genesis, blocks := repoWithChain(tu.t, gen) sourceRepo, genesis, blocks := tu.repoWithChain(tu.t, gen)
var out api.FullNode var out api.FullNode
err := node.New(tu.ctx, err := node.New(tu.ctx,
@ -181,7 +185,7 @@ func (tu *syncTestUtil) submitSourceBlocks(to int, h int, n int) {
} }
func TestSyncSimple(t *testing.T) { func TestSyncSimple(t *testing.T) {
H := 21 H := 50
tu := prepSyncTest(t, H) tu := prepSyncTest(t, H)
client := tu.addClientNode() client := tu.addClientNode()
@ -196,6 +200,44 @@ func TestSyncSimple(t *testing.T) {
tu.compareSourceState(client) tu.compareSourceState(client)
} }
func TestSyncMining(t *testing.T) {
H := 50
tu := prepSyncTest(t, H)
client := tu.addClientNode()
tu.checkHeight("client", client, 0)
require.NoError(t, tu.mn.LinkAll())
tu.connect(1, 0)
time.Sleep(time.Second)
tu.checkHeight("client", client, H)
tu.compareSourceState(client)
fblk, err := tu.g.NextBlock()
require.NoError(t, err)
for _, msg := range fblk.Messages {
require.NoError(t, tu.nds[0].MpoolPush(context.TODO(), msg))
}
require.NoError(t, tu.nds[0].ChainSubmitBlock(context.TODO(), fblkToBlkMsg(fblk)))
time.Sleep(time.Second)
}
func fblkToBlkMsg(fb *types.FullBlock) *chain.BlockMsg {
out := &chain.BlockMsg{
Header: fb.Header,
}
for _, msg := range fb.Messages {
out.Messages = append(out.Messages, msg.Cid())
}
return out
}
/* /*
TODO: this is broken because of how tu.submitSourceBlock works now TODO: this is broken because of how tu.submitSourceBlock works now
func TestSyncManual(t *testing.T) { func TestSyncManual(t *testing.T) {