chain: more work on chain sync, sync testing

This commit is contained in:
Łukasz Magiera 2019-07-30 15:55:36 +02:00 committed by whyrusleeping
parent b6439fa57d
commit 412a168151
5 changed files with 191 additions and 177 deletions

View File

@ -88,7 +88,7 @@ func (bss *BlockSyncService) HandleStream(s inet.Stream) {
log.Errorf("failed to read block sync request: %s", err)
return
}
log.Errorf("block sync request for: %s %d", req.Start, req.RequestLength)
log.Infof("block sync request for: %s %d", req.Start, req.RequestLength)
resp, err := bss.processRequest(&req)
if err != nil {

View File

@ -24,7 +24,7 @@ import (
var log = logging.Logger("gen")
const msgsPerBlock = 2
const msgsPerBlock = 20
type ChainGen struct {
accounts []address.Address
@ -56,7 +56,8 @@ type mybs struct {
func (m mybs) Get(c cid.Cid) (block.Block, error) {
b, err := m.Blockstore.Get(c)
if err != nil {
log.Errorf("Get failed: %s %s", c, err)
// change to error for stacktraces, don't commit with that pls
log.Warnf("Get failed: %s %s", c, err)
return nil, err
}
@ -97,7 +98,8 @@ func NewGenerator() (*ChainGen, error) {
return nil, err
}
banker, err := w.GenerateKey(types.KTBLS)
// KTBLS doesn't support signature verification or something like that yet
banker, err := w.GenerateKey(types.KTSecp256k1)
if err != nil {
return nil, err
}

View File

@ -479,10 +479,6 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err
}
func (syncer *Syncer) Punctual(ts *types.TipSet) bool {
return true
}
func (syncer *Syncer) collectChainCaughtUp(fts *store.FullTipSet) ([]*store.FullTipSet, error) {
// fetch tipset and messages via bitswap
@ -491,151 +487,148 @@ func (syncer *Syncer) collectChainCaughtUp(fts *store.FullTipSet) ([]*store.Full
startHeight := syncer.head.Height()
for {
_, err := syncer.store.LoadTipSet(cur.Parents())
if err != nil {
// <TODO: cleanup>
// TODO: This is 'borrowed' from SyncBootstrap, needs at least some deduplicating
_, err := syncer.store.LoadTipSet(cur.Parents())
if err != nil {
// <TODO: cleanup>
// TODO: This is 'borrowed' from SyncBootstrap, needs at least some deduplicating
blockSet := []*types.TipSet{cur}
blockSet := []*types.TipSet{cur}
at := cur.Cids()
at := cur.Cids()
// If, for some reason, we have a suffix of the chain locally, handle that here
for blockSet[len(blockSet)-1].Height() > startHeight {
log.Warn("syncing local: ", at)
ts, err := syncer.store.LoadTipSet(at)
if err != nil {
if err == bstore.ErrNotFound {
log.Error("not found: ", at)
break
}
log.Warn("loading local tipset: %s", err)
continue // TODO: verify
}
blockSet = append(blockSet, ts)
at = ts.Parents()
}
for blockSet[len(blockSet)-1].Height() > startHeight {
// NB: GetBlocks validates that the blocks are in-fact the ones we
// requested, and that they are correctly linked to eachother. It does
// not validate any state transitions
fmt.Println("CaughtUp Get blocks")
blks, err := syncer.Bsync.GetBlocks(context.TODO(), at, 10)
if err != nil {
// Most likely our peers aren't fully synced yet, but forwarded
// new block message (ideally we'd find better peers)
log.Error("failed to get blocks: ", err)
// This error will only be logged above,
return nil, xerrors.Errorf("failed to get blocks: %w", err)
}
for _, b := range blks {
blockSet = append(blockSet, b)
}
at = blks[len(blks)-1].Parents()
}
if startHeight == 0 {
// hacks. in the case that we request X blocks starting at height X+1, we
// won't get the Genesis block in the returned blockset. This hacks around it
if blockSet[len(blockSet)-1].Height() != 0 {
blockSet = append(blockSet, syncer.Genesis)
}
blockSet = reverse(blockSet)
genesis := blockSet[0]
if !genesis.Equals(syncer.Genesis) {
// TODO: handle this...
log.Errorf("We synced to the wrong chain! %s != %s", genesis, syncer.Genesis)
panic("We synced to the wrong chain")
}
}
for _, ts := range blockSet {
for _, b := range ts.Blocks() {
if err := syncer.store.PersistBlockHeader(b); err != nil {
log.Errorf("failed to persist synced blocks to the chainstore: %s", err)
panic("bbbbb")
}
}
}
// Fetch all the messages for all the blocks in this chain
windowSize := uint64(10)
for i := uint64(0); i <= cur.Height(); i += windowSize {
bs := bstore.NewBlockstore(dstore.NewMapDatastore())
cst := hamt.CSTFromBstore(bs)
nextHeight := i + windowSize - 1
if nextHeight > cur.Height() {
nextHeight = cur.Height()
}
log.Infof("Fetch next messages on %d (len(blockSet)=%d)", nextHeight, len(blockSet))
next := blockSet[nextHeight]
bstips, err := syncer.Bsync.GetChainMessages(context.TODO(), next, (nextHeight+1)-i)
if err != nil {
log.Errorf("failed to fetch messages: %s", err)
return nil, xerrors.Errorf("message processing failed: %w", err)
}
for bsi := 0; bsi < len(bstips); bsi++ {
cur := blockSet[i+uint64(bsi)]
bstip := bstips[len(bstips)-(bsi+1)]
fmt.Println("that loop: ", bsi, len(bstips))
fts, err := zipTipSetAndMessages(cst, cur, bstip.Messages, bstip.MsgIncludes)
if err != nil {
log.Error("zipping failed: ", err, bsi, i)
log.Error("height: ", cur.Height())
log.Error("bstips: ", bstips)
log.Error("next height: ", nextHeight)
return nil, xerrors.Errorf("message processing failed: %w", err)
}
if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil {
log.Errorf("failed to validate tipset: %s", err)
return nil, xerrors.Errorf("message processing failed: %w", err)
}
}
for _, bst := range bstips {
for _, m := range bst.Messages {
if _, err := cst.Put(context.TODO(), m); err != nil {
log.Error("failed to persist messages: ", err)
return nil, xerrors.Errorf("message processing failed: %w", err)
}
}
}
if err := copyBlockstore(bs, syncer.store.Blockstore()); err != nil {
log.Errorf("failed to persist temp blocks: %s", err)
return nil, xerrors.Errorf("message processing failed: %w", err)
}
}
//log.Errorf("dont have parent blocks for sync tipset: %s", err)
//panic("should do something better, like fetch? or error?")
_, err = syncer.store.LoadTipSet(cur.Parents())
// If, for some reason, we have a suffix of the chain locally, handle that here
for blockSet[len(blockSet)-1].Height() > startHeight {
log.Warn("syncing local: ", at)
ts, err := syncer.store.LoadTipSet(at)
if err != nil {
log.Errorf("HACK DIDNT WORK :( dont have parent blocks for sync tipset: %s", err)
panic("should do something better, like fetch? or error?")
if err == bstore.ErrNotFound {
log.Info("tipset not found locally, starting sync: ", at)
break
}
log.Warn("loading local tipset: %s", err)
continue // TODO: verify
}
// </TODO>
blockSet = append(blockSet, ts)
at = ts.Parents()
}
return chain, nil // return the chain because we have this last block in our cache already.
for blockSet[len(blockSet)-1].Height() > startHeight {
// NB: GetBlocks validates that the blocks are in-fact the ones we
// requested, and that they are correctly linked to eachother. It does
// not validate any state transitions
fmt.Println("Get blocks")
blks, err := syncer.Bsync.GetBlocks(context.TODO(), at, 10)
if err != nil {
// Most likely our peers aren't fully synced yet, but forwarded
// new block message (ideally we'd find better peers)
log.Error("failed to get blocks: ", err)
// This error will only be logged above,
return nil, xerrors.Errorf("failed to get blocks: %w", err)
}
for _, b := range blks {
blockSet = append(blockSet, b)
}
at = blks[len(blks)-1].Parents()
}
if startHeight == 0 {
// hacks. in the case that we request X blocks starting at height X+1, we
// won't get the Genesis block in the returned blockset. This hacks around it
if blockSet[len(blockSet)-1].Height() != 0 {
blockSet = append(blockSet, syncer.Genesis)
}
blockSet = reverse(blockSet)
genesis := blockSet[0]
if !genesis.Equals(syncer.Genesis) {
// TODO: handle this...
log.Errorf("We synced to the wrong chain! %s != %s", genesis, syncer.Genesis)
panic("We synced to the wrong chain")
}
}
for _, ts := range blockSet {
for _, b := range ts.Blocks() {
if err := syncer.store.PersistBlockHeader(b); err != nil {
log.Errorf("failed to persist synced blocks to the chainstore: %s", err)
panic("bbbbb")
}
}
}
// Fetch all the messages for all the blocks in this chain
windowSize := uint64(10)
for i := uint64(0); i <= cur.Height(); i += windowSize {
bs := bstore.NewBlockstore(dstore.NewMapDatastore())
cst := hamt.CSTFromBstore(bs)
nextHeight := i + windowSize - 1
if nextHeight > cur.Height() {
nextHeight = cur.Height()
}
log.Infof("Fetch next messages on %d (len(blockSet)=%d)", nextHeight, len(blockSet))
next := blockSet[nextHeight]
bstips, err := syncer.Bsync.GetChainMessages(context.TODO(), next, (nextHeight+1)-i)
if err != nil {
log.Errorf("failed to fetch messages: %s", err)
return nil, xerrors.Errorf("message processing failed: %w", err)
}
for bsi := 0; bsi < len(bstips); bsi++ {
cur := blockSet[i+uint64(bsi)]
bstip := bstips[len(bstips)-(bsi+1)]
fmt.Println("that loop: ", bsi, len(bstips))
fts, err := zipTipSetAndMessages(cst, cur, bstip.Messages, bstip.MsgIncludes)
if err != nil {
log.Error("zipping failed: ", err, bsi, i)
log.Error("height: ", cur.Height())
log.Error("bstips: ", bstips)
log.Error("next height: ", nextHeight)
return nil, xerrors.Errorf("message processing failed: %w", err)
}
if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil {
log.Errorf("failed to validate tipset: %s", err)
return nil, xerrors.Errorf("message processing failed: %w", err)
}
}
for _, bst := range bstips {
for _, m := range bst.Messages {
if _, err := cst.Put(context.TODO(), m); err != nil {
log.Error("failed to persist messages: ", err)
return nil, xerrors.Errorf("message processing failed: %w", err)
}
}
}
if err := copyBlockstore(bs, syncer.store.Blockstore()); err != nil {
log.Errorf("failed to persist temp blocks: %s", err)
return nil, xerrors.Errorf("message processing failed: %w", err)
}
}
//log.Errorf("dont have parent blocks for sync tipset: %s", err)
//panic("should do something better, like fetch? or error?")
_, err = syncer.store.LoadTipSet(cur.Parents())
if err != nil {
log.Errorf("HACK DIDNT WORK :( dont have parent blocks for sync tipset: %s", err)
panic("should do something better, like fetch? or error?")
}
// </TODO>
}
return chain, nil // return the chain because we have this last block in our cache already.
return chain, nil
}

View File

@ -6,6 +6,7 @@ import (
"testing"
"time"
logging "github.com/ipfs/go-log"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require"
@ -16,6 +17,8 @@ import (
"github.com/filecoin-project/go-lotus/node/repo"
)
const source = 0
func repoWithChain(t *testing.T, h int) (repo.Repo, []byte) {
g, err := gen.NewGenerator()
if err != nil {
@ -48,7 +51,7 @@ type syncTestUtil struct {
nds []api.FullNode
}
func (tu *syncTestUtil) addSourceNode(gen int) int {
func (tu *syncTestUtil) addSourceNode(gen int) {
if tu.genesis != nil {
tu.t.Fatal("source node already exists")
}
@ -67,11 +70,14 @@ func (tu *syncTestUtil) addSourceNode(gen int) int {
require.NoError(tu.t, err)
tu.genesis = genesis
tu.nds = append(tu.nds, out)
return len(tu.nds) - 1
tu.nds = append(tu.nds, out) // always at 0
}
func (tu *syncTestUtil) addClientNode() int {
if tu.genesis == nil {
tu.t.Fatal("source doesn't exists")
}
var out api.FullNode
err := node.New(tu.ctx,
@ -88,49 +94,62 @@ func (tu *syncTestUtil) addClientNode() int {
return len(tu.nds) - 1
}
func (tu *syncTestUtil) connect(from, to int) {
toPI, err := tu.nds[to].NetAddrsListen(tu.ctx)
require.NoError(tu.t, err)
err = tu.nds[from].NetConnect(tu.ctx, toPI)
require.NoError(tu.t, err)
}
func (tu *syncTestUtil) checkHeight(name string, n int, h int) {
b, err := tu.nds[n].ChainHead(tu.ctx)
require.NoError(tu.t, err)
require.Equal(tu.t, uint64(h), b.Height())
fmt.Printf("%s H: %d\n", name, b.Height())
}
func (tu *syncTestUtil) compareSourceState(with int) {
sourceAccounts, err := tu.nds[source].WalletList(tu.ctx)
require.NoError(tu.t, err)
for _, addr := range sourceAccounts {
sourceBalance, err := tu.nds[source].WalletBalance(tu.ctx, addr)
require.NoError(tu.t, err)
actBalance, err := tu.nds[with].WalletBalance(tu.ctx, addr)
require.NoError(tu.t, err)
require.Equal(tu.t, sourceBalance, actBalance)
}
}
func TestSyncSimple(t *testing.T) {
H := 3
logging.SetLogLevel("*", "INFO")
H := 20
ctx := context.Background()
mn := mocknet.New(ctx)
tu := &syncTestUtil{
t: t,
ctx: ctx,
mn: mn,
mn: mocknet.New(ctx),
}
source := tu.addSourceNode(H)
b, err := tu.nds[source].ChainHead(ctx)
require.NoError(t, err)
require.Equal(t, uint64(H), b.Height())
fmt.Printf("source H: %d\n", b.Height())
tu.addSourceNode(H)
tu.checkHeight("source", source, H)
// separate logs
fmt.Println("///////////////////////////////////////////////////")
fmt.Println("\x1b[31m///////////////////////////////////////////////////\x1b[39b")
client := tu.addClientNode()
tu.checkHeight("client", client, 0)
require.NoError(t, mn.LinkAll())
cb, err := tu.nds[client].ChainHead(ctx)
require.NoError(t, err)
require.Equal(t, uint64(0), cb.Height())
fmt.Printf("client H: %d\n", cb.Height())
sourcePI, err := tu.nds[source].NetAddrsListen(ctx)
require.NoError(t, err)
err = tu.nds[client].NetConnect(ctx, sourcePI)
require.NoError(t, err)
require.NoError(t, tu.mn.LinkAll())
tu.connect(1, 0)
time.Sleep(time.Second * 1)
cb, err = tu.nds[client].ChainHead(ctx)
require.NoError(t, err)
require.Equal(t, uint64(H), cb.Height())
fmt.Printf("client H: %d\n", cb.Height())
tu.checkHeight("client", client, H)
tu.compareSourceState(client)
}

View File

@ -93,7 +93,7 @@ func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error {
HeaviestTipSetWeight: weight,
GenesisHash: gen.Cid(),
}
fmt.Println("SENDING HELLO MESSAGE: ", hts.Cids())
fmt.Println("SENDING HELLO MESSAGE: ", hts.Cids(), hts.Height())
fmt.Println("hello message genesis: ", gen.Cid())
if err := cborrpc.WriteCborRPC(s, hmsg); err != nil {