From 412a168151a9da7cadea470088d24d47c46f0359 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 30 Jul 2019 15:55:36 +0200 Subject: [PATCH] chain: more work on chain sync, sync testing --- chain/blocksync.go | 2 +- chain/gen/gen.go | 8 +- chain/sync.go | 271 +++++++++++++++++++++----------------------- chain/sync_test.go | 85 ++++++++------ node/hello/hello.go | 2 +- 5 files changed, 191 insertions(+), 177 deletions(-) diff --git a/chain/blocksync.go b/chain/blocksync.go index d9751ca23..1085d3fce 100644 --- a/chain/blocksync.go +++ b/chain/blocksync.go @@ -88,7 +88,7 @@ func (bss *BlockSyncService) HandleStream(s inet.Stream) { log.Errorf("failed to read block sync request: %s", err) return } - log.Errorf("block sync request for: %s %d", req.Start, req.RequestLength) + log.Infof("block sync request for: %s %d", req.Start, req.RequestLength) resp, err := bss.processRequest(&req) if err != nil { diff --git a/chain/gen/gen.go b/chain/gen/gen.go index f5d6ae8c9..42ddd010c 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -24,7 +24,7 @@ import ( var log = logging.Logger("gen") -const msgsPerBlock = 2 +const msgsPerBlock = 20 type ChainGen struct { accounts []address.Address @@ -56,7 +56,8 @@ type mybs struct { func (m mybs) Get(c cid.Cid) (block.Block, error) { b, err := m.Blockstore.Get(c) if err != nil { - log.Errorf("Get failed: %s %s", c, err) + // change to error for stacktraces, don't commit with that pls + log.Warnf("Get failed: %s %s", c, err) return nil, err } @@ -97,7 +98,8 @@ func NewGenerator() (*ChainGen, error) { return nil, err } - banker, err := w.GenerateKey(types.KTBLS) + // KTBLS doesn't support signature verification or something like that yet + banker, err := w.GenerateKey(types.KTSecp256k1) if err != nil { return nil, err } diff --git a/chain/sync.go b/chain/sync.go index 106e9e803..18feaa606 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -479,10 +479,6 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err } -func (syncer *Syncer) Punctual(ts *types.TipSet) bool { - return true -} - func (syncer *Syncer) collectChainCaughtUp(fts *store.FullTipSet) ([]*store.FullTipSet, error) { // fetch tipset and messages via bitswap @@ -491,151 +487,148 @@ func (syncer *Syncer) collectChainCaughtUp(fts *store.FullTipSet) ([]*store.Full startHeight := syncer.head.Height() - for { - _, err := syncer.store.LoadTipSet(cur.Parents()) - if err != nil { - // - // TODO: This is 'borrowed' from SyncBootstrap, needs at least some deduplicating + _, err := syncer.store.LoadTipSet(cur.Parents()) + if err != nil { + // + // TODO: This is 'borrowed' from SyncBootstrap, needs at least some deduplicating - blockSet := []*types.TipSet{cur} + blockSet := []*types.TipSet{cur} - at := cur.Cids() + at := cur.Cids() - // If, for some reason, we have a suffix of the chain locally, handle that here - for blockSet[len(blockSet)-1].Height() > startHeight { - log.Warn("syncing local: ", at) - ts, err := syncer.store.LoadTipSet(at) - if err != nil { - if err == bstore.ErrNotFound { - log.Error("not found: ", at) - break - } - log.Warn("loading local tipset: %s", err) - continue // TODO: verify - } - - blockSet = append(blockSet, ts) - at = ts.Parents() - } - - for blockSet[len(blockSet)-1].Height() > startHeight { - // NB: GetBlocks validates that the blocks are in-fact the ones we - // requested, and that they are correctly linked to eachother. It does - // not validate any state transitions - fmt.Println("CaughtUp Get blocks") - blks, err := syncer.Bsync.GetBlocks(context.TODO(), at, 10) - if err != nil { - // Most likely our peers aren't fully synced yet, but forwarded - // new block message (ideally we'd find better peers) - - log.Error("failed to get blocks: ", err) - - // This error will only be logged above, - return nil, xerrors.Errorf("failed to get blocks: %w", err) - } - - for _, b := range blks { - blockSet = append(blockSet, b) - } - - at = blks[len(blks)-1].Parents() - } - - if startHeight == 0 { - // hacks. in the case that we request X blocks starting at height X+1, we - // won't get the Genesis block in the returned blockset. This hacks around it - if blockSet[len(blockSet)-1].Height() != 0 { - blockSet = append(blockSet, syncer.Genesis) - } - - blockSet = reverse(blockSet) - - genesis := blockSet[0] - if !genesis.Equals(syncer.Genesis) { - // TODO: handle this... - log.Errorf("We synced to the wrong chain! %s != %s", genesis, syncer.Genesis) - panic("We synced to the wrong chain") - } - } - - for _, ts := range blockSet { - for _, b := range ts.Blocks() { - if err := syncer.store.PersistBlockHeader(b); err != nil { - log.Errorf("failed to persist synced blocks to the chainstore: %s", err) - panic("bbbbb") - } - } - } - - // Fetch all the messages for all the blocks in this chain - - windowSize := uint64(10) - for i := uint64(0); i <= cur.Height(); i += windowSize { - bs := bstore.NewBlockstore(dstore.NewMapDatastore()) - cst := hamt.CSTFromBstore(bs) - - nextHeight := i + windowSize - 1 - if nextHeight > cur.Height() { - nextHeight = cur.Height() - } - - log.Infof("Fetch next messages on %d (len(blockSet)=%d)", nextHeight, len(blockSet)) - next := blockSet[nextHeight] - bstips, err := syncer.Bsync.GetChainMessages(context.TODO(), next, (nextHeight+1)-i) - if err != nil { - log.Errorf("failed to fetch messages: %s", err) - return nil, xerrors.Errorf("message processing failed: %w", err) - } - - for bsi := 0; bsi < len(bstips); bsi++ { - cur := blockSet[i+uint64(bsi)] - bstip := bstips[len(bstips)-(bsi+1)] - fmt.Println("that loop: ", bsi, len(bstips)) - fts, err := zipTipSetAndMessages(cst, cur, bstip.Messages, bstip.MsgIncludes) - if err != nil { - log.Error("zipping failed: ", err, bsi, i) - log.Error("height: ", cur.Height()) - log.Error("bstips: ", bstips) - log.Error("next height: ", nextHeight) - return nil, xerrors.Errorf("message processing failed: %w", err) - } - - if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil { - log.Errorf("failed to validate tipset: %s", err) - return nil, xerrors.Errorf("message processing failed: %w", err) - } - } - - for _, bst := range bstips { - for _, m := range bst.Messages { - if _, err := cst.Put(context.TODO(), m); err != nil { - log.Error("failed to persist messages: ", err) - return nil, xerrors.Errorf("message processing failed: %w", err) - } - } - } - - if err := copyBlockstore(bs, syncer.store.Blockstore()); err != nil { - log.Errorf("failed to persist temp blocks: %s", err) - return nil, xerrors.Errorf("message processing failed: %w", err) - } - } - - //log.Errorf("dont have parent blocks for sync tipset: %s", err) - //panic("should do something better, like fetch? or error?") - - _, err = syncer.store.LoadTipSet(cur.Parents()) + // If, for some reason, we have a suffix of the chain locally, handle that here + for blockSet[len(blockSet)-1].Height() > startHeight { + log.Warn("syncing local: ", at) + ts, err := syncer.store.LoadTipSet(at) if err != nil { - log.Errorf("HACK DIDNT WORK :( dont have parent blocks for sync tipset: %s", err) - panic("should do something better, like fetch? or error?") + if err == bstore.ErrNotFound { + log.Info("tipset not found locally, starting sync: ", at) + break + } + log.Warn("loading local tipset: %s", err) + continue // TODO: verify } - // + blockSet = append(blockSet, ts) + at = ts.Parents() } - return chain, nil // return the chain because we have this last block in our cache already. + for blockSet[len(blockSet)-1].Height() > startHeight { + // NB: GetBlocks validates that the blocks are in-fact the ones we + // requested, and that they are correctly linked to eachother. It does + // not validate any state transitions + fmt.Println("Get blocks") + blks, err := syncer.Bsync.GetBlocks(context.TODO(), at, 10) + if err != nil { + // Most likely our peers aren't fully synced yet, but forwarded + // new block message (ideally we'd find better peers) + log.Error("failed to get blocks: ", err) + + // This error will only be logged above, + return nil, xerrors.Errorf("failed to get blocks: %w", err) + } + + for _, b := range blks { + blockSet = append(blockSet, b) + } + + at = blks[len(blks)-1].Parents() + } + + if startHeight == 0 { + // hacks. in the case that we request X blocks starting at height X+1, we + // won't get the Genesis block in the returned blockset. This hacks around it + if blockSet[len(blockSet)-1].Height() != 0 { + blockSet = append(blockSet, syncer.Genesis) + } + + blockSet = reverse(blockSet) + + genesis := blockSet[0] + if !genesis.Equals(syncer.Genesis) { + // TODO: handle this... + log.Errorf("We synced to the wrong chain! %s != %s", genesis, syncer.Genesis) + panic("We synced to the wrong chain") + } + } + + for _, ts := range blockSet { + for _, b := range ts.Blocks() { + if err := syncer.store.PersistBlockHeader(b); err != nil { + log.Errorf("failed to persist synced blocks to the chainstore: %s", err) + panic("bbbbb") + } + } + } + + // Fetch all the messages for all the blocks in this chain + + windowSize := uint64(10) + for i := uint64(0); i <= cur.Height(); i += windowSize { + bs := bstore.NewBlockstore(dstore.NewMapDatastore()) + cst := hamt.CSTFromBstore(bs) + + nextHeight := i + windowSize - 1 + if nextHeight > cur.Height() { + nextHeight = cur.Height() + } + + log.Infof("Fetch next messages on %d (len(blockSet)=%d)", nextHeight, len(blockSet)) + next := blockSet[nextHeight] + bstips, err := syncer.Bsync.GetChainMessages(context.TODO(), next, (nextHeight+1)-i) + if err != nil { + log.Errorf("failed to fetch messages: %s", err) + return nil, xerrors.Errorf("message processing failed: %w", err) + } + + for bsi := 0; bsi < len(bstips); bsi++ { + cur := blockSet[i+uint64(bsi)] + bstip := bstips[len(bstips)-(bsi+1)] + fmt.Println("that loop: ", bsi, len(bstips)) + fts, err := zipTipSetAndMessages(cst, cur, bstip.Messages, bstip.MsgIncludes) + if err != nil { + log.Error("zipping failed: ", err, bsi, i) + log.Error("height: ", cur.Height()) + log.Error("bstips: ", bstips) + log.Error("next height: ", nextHeight) + return nil, xerrors.Errorf("message processing failed: %w", err) + } + + if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil { + log.Errorf("failed to validate tipset: %s", err) + return nil, xerrors.Errorf("message processing failed: %w", err) + } + } + + for _, bst := range bstips { + for _, m := range bst.Messages { + if _, err := cst.Put(context.TODO(), m); err != nil { + log.Error("failed to persist messages: ", err) + return nil, xerrors.Errorf("message processing failed: %w", err) + } + } + } + + if err := copyBlockstore(bs, syncer.store.Blockstore()); err != nil { + log.Errorf("failed to persist temp blocks: %s", err) + return nil, xerrors.Errorf("message processing failed: %w", err) + } + } + + //log.Errorf("dont have parent blocks for sync tipset: %s", err) + //panic("should do something better, like fetch? or error?") + + _, err = syncer.store.LoadTipSet(cur.Parents()) + if err != nil { + log.Errorf("HACK DIDNT WORK :( dont have parent blocks for sync tipset: %s", err) + panic("should do something better, like fetch? or error?") + } + + // } + return chain, nil // return the chain because we have this last block in our cache already. + return chain, nil } diff --git a/chain/sync_test.go b/chain/sync_test.go index 055aaba4f..c1a3a50c2 100644 --- a/chain/sync_test.go +++ b/chain/sync_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + logging "github.com/ipfs/go-log" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/require" @@ -16,6 +17,8 @@ import ( "github.com/filecoin-project/go-lotus/node/repo" ) +const source = 0 + func repoWithChain(t *testing.T, h int) (repo.Repo, []byte) { g, err := gen.NewGenerator() if err != nil { @@ -48,7 +51,7 @@ type syncTestUtil struct { nds []api.FullNode } -func (tu *syncTestUtil) addSourceNode(gen int) int { +func (tu *syncTestUtil) addSourceNode(gen int) { if tu.genesis != nil { tu.t.Fatal("source node already exists") } @@ -67,11 +70,14 @@ func (tu *syncTestUtil) addSourceNode(gen int) int { require.NoError(tu.t, err) tu.genesis = genesis - tu.nds = append(tu.nds, out) - return len(tu.nds) - 1 + tu.nds = append(tu.nds, out) // always at 0 } func (tu *syncTestUtil) addClientNode() int { + if tu.genesis == nil { + tu.t.Fatal("source doesn't exists") + } + var out api.FullNode err := node.New(tu.ctx, @@ -88,49 +94,62 @@ func (tu *syncTestUtil) addClientNode() int { return len(tu.nds) - 1 } +func (tu *syncTestUtil) connect(from, to int) { + toPI, err := tu.nds[to].NetAddrsListen(tu.ctx) + require.NoError(tu.t, err) + + err = tu.nds[from].NetConnect(tu.ctx, toPI) + require.NoError(tu.t, err) +} + +func (tu *syncTestUtil) checkHeight(name string, n int, h int) { + b, err := tu.nds[n].ChainHead(tu.ctx) + require.NoError(tu.t, err) + + require.Equal(tu.t, uint64(h), b.Height()) + fmt.Printf("%s H: %d\n", name, b.Height()) +} + +func (tu *syncTestUtil) compareSourceState(with int) { + sourceAccounts, err := tu.nds[source].WalletList(tu.ctx) + require.NoError(tu.t, err) + + for _, addr := range sourceAccounts { + sourceBalance, err := tu.nds[source].WalletBalance(tu.ctx, addr) + require.NoError(tu.t, err) + + actBalance, err := tu.nds[with].WalletBalance(tu.ctx, addr) + require.NoError(tu.t, err) + + require.Equal(tu.t, sourceBalance, actBalance) + } +} + func TestSyncSimple(t *testing.T) { - H := 3 + logging.SetLogLevel("*", "INFO") + H := 20 ctx := context.Background() - - mn := mocknet.New(ctx) - tu := &syncTestUtil{ t: t, ctx: ctx, - mn: mn, + mn: mocknet.New(ctx), } - source := tu.addSourceNode(H) - - b, err := tu.nds[source].ChainHead(ctx) - require.NoError(t, err) - - require.Equal(t, uint64(H), b.Height()) - fmt.Printf("source H: %d\n", b.Height()) + tu.addSourceNode(H) + tu.checkHeight("source", source, H) // separate logs - fmt.Println("///////////////////////////////////////////////////") + fmt.Println("\x1b[31m///////////////////////////////////////////////////\x1b[39b") client := tu.addClientNode() + tu.checkHeight("client", client, 0) - require.NoError(t, mn.LinkAll()) - - cb, err := tu.nds[client].ChainHead(ctx) - require.NoError(t, err) - require.Equal(t, uint64(0), cb.Height()) - fmt.Printf("client H: %d\n", cb.Height()) - - sourcePI, err := tu.nds[source].NetAddrsListen(ctx) - require.NoError(t, err) - - err = tu.nds[client].NetConnect(ctx, sourcePI) - require.NoError(t, err) - + require.NoError(t, tu.mn.LinkAll()) + tu.connect(1, 0) time.Sleep(time.Second * 1) - cb, err = tu.nds[client].ChainHead(ctx) - require.NoError(t, err) - require.Equal(t, uint64(H), cb.Height()) - fmt.Printf("client H: %d\n", cb.Height()) + tu.checkHeight("client", client, H) + + tu.compareSourceState(client) } diff --git a/node/hello/hello.go b/node/hello/hello.go index 4d7e59495..8fc3ab5d5 100644 --- a/node/hello/hello.go +++ b/node/hello/hello.go @@ -93,7 +93,7 @@ func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error { HeaviestTipSetWeight: weight, GenesisHash: gen.Cid(), } - fmt.Println("SENDING HELLO MESSAGE: ", hts.Cids()) + fmt.Println("SENDING HELLO MESSAGE: ", hts.Cids(), hts.Height()) fmt.Println("hello message genesis: ", gen.Cid()) if err := cborrpc.WriteCborRPC(s, hmsg); err != nil {