From c8478ddd3f5565c78e48eaa36083f895d5acfd46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 26 Jul 2019 18:13:25 +0200 Subject: [PATCH 01/14] chain: Make fetching parents in collectChainCaughtUp sort of work --- chain/sync.go | 47 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 45 insertions(+), 2 deletions(-) diff --git a/chain/sync.go b/chain/sync.go index a4cbc629f..01ddc0b17 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -584,8 +584,51 @@ func (syncer *Syncer) collectChainCaughtUp(fts *store.FullTipSet) ([]*store.Full for { ts, err := syncer.store.LoadTipSet(cur.Parents()) if err != nil { - log.Errorf("dont have parent blocks for sync tipset: %s", err) - panic("should do something better, like fetch? or error?") + // + // TODO: This is 'borrowed' from SyncBootstrap, needs at least some deduplicating + + blockSet := []*types.TipSet{cur} + + at := cur.Cids() + for blockSet[len(blockSet)-1].Height() > syncer.head.Height() { + // NB: GetBlocks validates that the blocks are in-fact the ones we + // requested, and that they are correctly linked to eachother. It does + // not validate any state transitions + fmt.Println("CaughtUp Get blocks") + blks, err := syncer.Bsync.GetBlocks(context.TODO(), at, 10) + if err != nil { + log.Error("failed to get blocks: ", err) + panic("aaa") + } + + for _, b := range blks { + blockSet = append(blockSet, b) + } + + at = blks[len(blks)-1].Parents() + } + + for _, ts := range blockSet { + for _, b := range ts.Blocks() { + if err := syncer.store.PersistBlockHeader(b); err != nil { + log.Errorf("failed to persist synced blocks to the chainstore: %s", err) + panic("bbbbb") + } + } + } + + // TODO: Message processing? + + //log.Errorf("dont have parent blocks for sync tipset: %s", err) + //panic("should do something better, like fetch? or error?") + + ts, err = syncer.store.LoadTipSet(cur.Parents()) + if err != nil { + log.Errorf("HACK DIDNT WORK :( dont have parent blocks for sync tipset: %s", err) + panic("should do something better, like fetch? or error?") + } + + // } return chain, nil // return the chain because we have this last block in our cache already. From 2f5cdf1d151b910a5795401af93c6fa400617c8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 26 Jul 2019 18:14:01 +0200 Subject: [PATCH 02/14] Use default address for mining --- cli/miner.go | 3 +-- lotuspond/front/src/Block.js | 1 + lotuspond/front/src/FullNode.js | 7 ++++++- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/cli/miner.go b/cli/miner.go index d4ed66b9c..a18bc9d50 100644 --- a/cli/miner.go +++ b/cli/miner.go @@ -3,7 +3,6 @@ package cli import ( "fmt" - "github.com/pkg/errors" "gopkg.in/urfave/cli.v2" ) @@ -29,7 +28,7 @@ var minerStart = &cli.Command{ // TODO: this address needs to be the address of an actual miner maddr, err := api.WalletDefaultAddress(ctx) if err != nil { - return errors.Wrap(err, "failed to create miner address") + return err } if err := api.MinerStart(ctx, maddr); err != nil { diff --git a/lotuspond/front/src/Block.js b/lotuspond/front/src/Block.js index 8cc2e9dd1..a97493bbd 100644 --- a/lotuspond/front/src/Block.js +++ b/lotuspond/front/src/Block.js @@ -26,6 +26,7 @@ class Block extends React.Component {
Height: {head.Height}
Parents:
Weight: {head.ParentWeight}
+
Miner: {head.Miner}
Messages: {head.Messages['/']} {/*TODO: link to message explorer */}
Receipts: {head.MessageReceipts['/']}
State Root: {head.StateRoot['/']}
diff --git a/lotuspond/front/src/FullNode.js b/lotuspond/front/src/FullNode.js index d0f12d881..43da62bd2 100644 --- a/lotuspond/front/src/FullNode.js +++ b/lotuspond/front/src/FullNode.js @@ -101,8 +101,13 @@ class FullNode extends React.Component { async startMining() { // TODO: Use actual miner address // see cli/miner.go + let addr = "t0523423423" // in case we have no wallets + if (this.state.defaultAddr) { + addr = this.state.defaultAddr + } + this.setState({mining: true}) - await this.state.client.call("Filecoin.MinerStart", ["t0523423423"]) + await this.state.client.call("Filecoin.MinerStart", [addr]) } render() { From e27d4358485483c30ace4420dc5784c60b6a163f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 26 Jul 2019 18:31:07 +0200 Subject: [PATCH 03/14] blocksync: retry GetBlocks with many peers --- chain/blocksync.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/chain/blocksync.go b/chain/blocksync.go index 808d325db..a2284c31a 100644 --- a/chain/blocksync.go +++ b/chain/blocksync.go @@ -10,6 +10,7 @@ import ( bserv "github.com/ipfs/go-blockservice" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/protocol" + "golang.org/x/xerrors" "github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/types" @@ -220,9 +221,19 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int) Options: BSOptBlocks, } - res, err := bs.sendRequestToPeer(ctx, peers[perm[0]], req) + var err error + var res *BlockSyncResponse + for _, p := range perm { + res, err = bs.sendRequestToPeer(ctx, peers[p], req) + if err == nil { + break + } + log.Warnf("BlockSync request failed for peer %s: %s", peers[p].String(), err) + + //TODO: also do the status check here + } if err != nil { - return nil, err + return nil, xerrors.Errorf("GetBlocks failed with all peers: %w", err) } switch res.Status { From 2be7bc502551d81fd42dfd953d668b5d53a8823f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 26 Jul 2019 18:47:18 +0200 Subject: [PATCH 04/14] blocksync: retry GetBlocks with error response too --- chain/blocksync.go | 47 ++++++++++++++++++++++++---------------------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/chain/blocksync.go b/chain/blocksync.go index a2284c31a..dd38148a3 100644 --- a/chain/blocksync.go +++ b/chain/blocksync.go @@ -210,6 +210,23 @@ func (bs *BlockSync) getPeers() []peer.ID { return out } +func (bs *BlockSync) processStatus(req *BlockSyncRequest, res *BlockSyncResponse) ([]*types.TipSet, error) { + switch res.Status { + case 0: // Success + return bs.processBlocksResponse(req, res) + case 101: // Partial Response + panic("not handled") + case 201: // req.Start not found + return nil, fmt.Errorf("not found") + case 202: // Go Away + panic("not handled") + case 203: // Internal Error + return nil, fmt.Errorf("block sync peer errored: %s", res.Message) + default: + return nil, fmt.Errorf("unrecognized response code") + } +} + func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int) ([]*types.TipSet, error) { peers := bs.getPeers() perm := rand.Perm(len(peers)) @@ -225,31 +242,17 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int) var res *BlockSyncResponse for _, p := range perm { res, err = bs.sendRequestToPeer(ctx, peers[p], req) - if err == nil { - break + if err != nil { + log.Warnf("BlockSync request failed for peer %s: %s", peers[p].String(), err) + continue } - log.Warnf("BlockSync request failed for peer %s: %s", peers[p].String(), err) - //TODO: also do the status check here - } - if err != nil { - return nil, xerrors.Errorf("GetBlocks failed with all peers: %w", err) - } - - switch res.Status { - case 0: // Success - return bs.processBlocksResponse(req, res) - case 101: // Partial Response - panic("not handled") - case 201: // req.Start not found - return nil, fmt.Errorf("not found") - case 202: // Go Away - panic("not handled") - case 203: // Internal Error - return nil, fmt.Errorf("block sync peer errored: %s", res.Message) - default: - return nil, fmt.Errorf("unrecognized response code") + ts, err := bs.processStatus(req, res) + if err == nil { + return ts, nil + } } + return nil, xerrors.Errorf("GetBlocks failed with all peers: %w", err) } func (bs *BlockSync) GetFullTipSet(ctx context.Context, p peer.ID, h []cid.Cid) (*store.FullTipSet, error) { From fae0422de6ca2251434eb29facab9928e20aa632 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 26 Jul 2019 20:16:57 +0200 Subject: [PATCH 05/14] chain: Don't use SyncBootstrap --- chain/blocksync.go | 8 +- chain/sync.go | 236 +++++++++++++++++++++------------------------ 2 files changed, 112 insertions(+), 132 deletions(-) diff --git a/chain/blocksync.go b/chain/blocksync.go index dd38148a3..d9751ca23 100644 --- a/chain/blocksync.go +++ b/chain/blocksync.go @@ -129,19 +129,19 @@ func (bss *BlockSyncService) collectChainSegment(start []cid.Cid, length uint64, } if opts.IncludeMessages { - log.Error("INCLUDING MESSAGES IN SYNC RESPONSE") + log.Info("INCLUDING MESSAGES IN SYNC RESPONSE") msgs, mincl, err := bss.gatherMessages(ts) if err != nil { return nil, err } - log.Errorf("messages: ", msgs) + log.Infof("messages: ", msgs) bst.Messages = msgs bst.MsgIncludes = mincl } if opts.IncludeBlocks { - log.Error("INCLUDING BLOCKS IN SYNC RESPONSE") + log.Info("INCLUDING BLOCKS IN SYNC RESPONSE") bst.Blocks = ts.Blocks() } @@ -165,7 +165,7 @@ func (bss *BlockSyncService) gatherMessages(ts *types.TipSet) ([]*types.SignedMe if err != nil { return nil, nil, err } - log.Errorf("MESSAGES FOR BLOCK: %d", len(msgs)) + log.Infof("MESSAGES FOR BLOCK: %d", len(msgs)) msgindexes := make([]int, 0, len(msgs)) for _, m := range msgs { diff --git a/chain/sync.go b/chain/sync.go index 01ddc0b17..3e24e9591 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -5,6 +5,8 @@ import ( "fmt" "sync" + "golang.org/x/xerrors" + "github.com/filecoin-project/go-lotus/chain/actors" "github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/types" @@ -34,9 +36,6 @@ type Syncer struct { // The known Genesis tipset Genesis *types.TipSet - // the current mode the syncer is in - syncMode SyncMode - syncLock sync.Mutex // TipSets known to be invalid @@ -65,7 +64,6 @@ func NewSyncer(cs *store.ChainStore, bsync *BlockSync, self peer.ID) (*Syncer, e } return &Syncer{ - syncMode: Bootstrap, Genesis: gent, Bsync: bsync, peerHeads: make(map[peer.ID]*types.TipSet), @@ -75,19 +73,11 @@ func NewSyncer(cs *store.ChainStore, bsync *BlockSync, self peer.ID) (*Syncer, e }, nil } -type SyncMode int - -const ( - Unknown = SyncMode(iota) - Bootstrap - CaughtUp -) - type BadTipSetCache struct { badBlocks map[cid.Cid]struct{} } -type BlockSet struct { +/*type BlockSet struct { tset map[uint64]*types.TipSet head *types.TipSet } @@ -120,7 +110,7 @@ func (bs *BlockSet) PersistTo(cs *store.ChainStore) error { func (bs *BlockSet) Head() *types.TipSet { return bs.head -} +}*/ const BootstrapPeerThreshold = 1 @@ -134,12 +124,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) { if from == syncer.self { // TODO: this is kindof a hack... log.Infof("got block from ourselves") - syncer.syncLock.Lock() - defer syncer.syncLock.Unlock() - if syncer.syncMode == Bootstrap { - syncer.syncMode = CaughtUp - } if err := syncer.SyncCaughtUp(fts); err != nil { log.Errorf("failed to sync our own block: %s", err) } @@ -152,18 +137,8 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) { syncer.Bsync.AddPeer(from) go func() { - syncer.syncLock.Lock() - defer syncer.syncLock.Unlock() - - switch syncer.syncMode { - case Bootstrap: - syncer.SyncBootstrap() - case CaughtUp: - if err := syncer.SyncCaughtUp(fts); err != nil { - log.Errorf("sync error: %s", err) - } - case Unknown: - panic("invalid syncer state") + if err := syncer.SyncCaughtUp(fts); err != nil { + log.Errorf("sync error: %s", err) } }() } @@ -191,12 +166,6 @@ func (syncer *Syncer) InformNewBlock(from peer.ID, blk *types.FullBlock) { func (syncer *Syncer) SyncBootstrap() { fmt.Println("Sync bootstrap!") defer fmt.Println("bye bye sync bootstrap") - ctx := context.Background() - - if syncer.syncMode == CaughtUp { - log.Errorf("Called SyncBootstrap while in caught up mode") - return - } selectedHead, err := syncer.selectHead(syncer.peerHeads) if err != nil { @@ -266,66 +235,12 @@ func (syncer *Syncer) SyncBootstrap() { } } - // Fetch all the messages for all the blocks in this chain - - windowSize := uint64(10) - for i := uint64(0); i <= selectedHead.Height(); i += windowSize { - bs := bstore.NewBlockstore(dstore.NewMapDatastore()) - cst := hamt.CSTFromBstore(bs) - - nextHeight := i + windowSize - 1 - if nextHeight > selectedHead.Height() { - nextHeight = selectedHead.Height() - } - - next := blockSet[nextHeight] - bstips, err := syncer.Bsync.GetChainMessages(ctx, next, (nextHeight+1)-i) - if err != nil { - log.Errorf("failed to fetch messages: %s", err) - return - } - - for bsi := 0; bsi < len(bstips); bsi++ { - cur := blockSet[i+uint64(bsi)] - bstip := bstips[len(bstips)-(bsi+1)] - fmt.Println("that loop: ", bsi, len(bstips)) - fts, err := zipTipSetAndMessages(cst, cur, bstip.Messages, bstip.MsgIncludes) - if err != nil { - log.Error("zipping failed: ", err, bsi, i) - log.Error("height: ", selectedHead.Height()) - log.Error("bstips: ", bstips) - log.Error("next height: ", nextHeight) - return - } - - if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil { - log.Errorf("failed to validate tipset: %s", err) - return - } - } - - for _, bst := range bstips { - for _, m := range bst.Messages { - if _, err := cst.Put(context.TODO(), m); err != nil { - log.Error("failed to persist messages: ", err) - return - } - } - } - - if err := copyBlockstore(bs, syncer.store.Blockstore()); err != nil { - log.Errorf("failed to persist temp blocks: %s", err) - return - } - } - head := blockSet[len(blockSet)-1] log.Errorf("Finished syncing! new head: %s", head.Cids()) if err := syncer.store.MaybeTakeHeavierTipSet(selectedHead); err != nil { log.Errorf("MaybeTakeHeavierTipSet failed: %s", err) } syncer.head = head - syncer.syncMode = CaughtUp } func reverse(tips []*types.TipSet) []*types.TipSet { @@ -472,6 +387,9 @@ func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*store.FullTipSet, erro // SyncCaughtUp is used to stay in sync once caught up to // the rest of the network. func (syncer *Syncer) SyncCaughtUp(maybeHead *store.FullTipSet) error { + syncer.syncLock.Lock() + defer syncer.syncLock.Unlock() + ts := maybeHead.TipSet() if syncer.Genesis.Equals(ts) { return nil @@ -581,8 +499,10 @@ func (syncer *Syncer) collectChainCaughtUp(fts *store.FullTipSet) ([]*store.Full chain := []*store.FullTipSet{fts} cur := fts.TipSet() + startHeight := syncer.head.Height() + for { - ts, err := syncer.store.LoadTipSet(cur.Parents()) + _, err := syncer.store.LoadTipSet(cur.Parents()) if err != nil { // // TODO: This is 'borrowed' from SyncBootstrap, needs at least some deduplicating @@ -590,15 +510,38 @@ func (syncer *Syncer) collectChainCaughtUp(fts *store.FullTipSet) ([]*store.Full blockSet := []*types.TipSet{cur} at := cur.Cids() - for blockSet[len(blockSet)-1].Height() > syncer.head.Height() { + + // If, for some reason, we have a suffix of the chain locally, handle that here + for blockSet[len(blockSet)-1].Height() > startHeight { + log.Warn("syncing local: ", at) + ts, err := syncer.store.LoadTipSet(at) + if err != nil { + if err == bstore.ErrNotFound { + log.Error("not found: ", at) + break + } + log.Warn("loading local tipset: %s", err) + continue // TODO: verify + } + + blockSet = append(blockSet, ts) + at = ts.Parents() + } + + for blockSet[len(blockSet)-1].Height() > startHeight { // NB: GetBlocks validates that the blocks are in-fact the ones we // requested, and that they are correctly linked to eachother. It does // not validate any state transitions fmt.Println("CaughtUp Get blocks") blks, err := syncer.Bsync.GetBlocks(context.TODO(), at, 10) if err != nil { + // Most likely our peers aren't fully synced yet, but forwarded + // new block message (ideally we'd find better peers) + log.Error("failed to get blocks: ", err) - panic("aaa") + + // This error will only be logged above, + return nil, xerrors.Errorf("failed to get blocks: %w", err) } for _, b := range blks { @@ -608,6 +551,23 @@ func (syncer *Syncer) collectChainCaughtUp(fts *store.FullTipSet) ([]*store.Full at = blks[len(blks)-1].Parents() } + if startHeight == 0 { + // hacks. in the case that we request X blocks starting at height X+1, we + // won't get the Genesis block in the returned blockset. This hacks around it + if blockSet[len(blockSet)-1].Height() != 0 { + blockSet = append(blockSet, syncer.Genesis) + } + + blockSet = reverse(blockSet) + + genesis := blockSet[0] + if !genesis.Equals(syncer.Genesis) { + // TODO: handle this... + log.Errorf("We synced to the wrong chain! %s != %s", genesis, syncer.Genesis) + panic("We synced to the wrong chain") + } + } + for _, ts := range blockSet { for _, b := range ts.Blocks() { if err := syncer.store.PersistBlockHeader(b); err != nil { @@ -617,12 +577,64 @@ func (syncer *Syncer) collectChainCaughtUp(fts *store.FullTipSet) ([]*store.Full } } - // TODO: Message processing? + // Fetch all the messages for all the blocks in this chain + + windowSize := uint64(10) + for i := uint64(0); i <= cur.Height(); i += windowSize { + bs := bstore.NewBlockstore(dstore.NewMapDatastore()) + cst := hamt.CSTFromBstore(bs) + + nextHeight := i + windowSize - 1 + if nextHeight > cur.Height() { + nextHeight = cur.Height() + } + + log.Infof("Fetch next messages on %d (len(blockSet)=%d)", nextHeight, len(blockSet)) + next := blockSet[nextHeight] + bstips, err := syncer.Bsync.GetChainMessages(context.TODO(), next, (nextHeight+1)-i) + if err != nil { + log.Errorf("failed to fetch messages: %s", err) + return nil, xerrors.Errorf("message processing failed: %w", err) + } + + for bsi := 0; bsi < len(bstips); bsi++ { + cur := blockSet[i+uint64(bsi)] + bstip := bstips[len(bstips)-(bsi+1)] + fmt.Println("that loop: ", bsi, len(bstips)) + fts, err := zipTipSetAndMessages(cst, cur, bstip.Messages, bstip.MsgIncludes) + if err != nil { + log.Error("zipping failed: ", err, bsi, i) + log.Error("height: ", cur.Height()) + log.Error("bstips: ", bstips) + log.Error("next height: ", nextHeight) + return nil, xerrors.Errorf("message processing failed: %w", err) + } + + if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil { + log.Errorf("failed to validate tipset: %s", err) + return nil, xerrors.Errorf("message processing failed: %w", err) + } + } + + for _, bst := range bstips { + for _, m := range bst.Messages { + if _, err := cst.Put(context.TODO(), m); err != nil { + log.Error("failed to persist messages: ", err) + return nil, xerrors.Errorf("message processing failed: %w", err) + } + } + } + + if err := copyBlockstore(bs, syncer.store.Blockstore()); err != nil { + log.Errorf("failed to persist temp blocks: %s", err) + return nil, xerrors.Errorf("message processing failed: %w", err) + } + } //log.Errorf("dont have parent blocks for sync tipset: %s", err) //panic("should do something better, like fetch? or error?") - ts, err = syncer.store.LoadTipSet(cur.Parents()) + _, err = syncer.store.LoadTipSet(cur.Parents()) if err != nil { log.Errorf("HACK DIDNT WORK :( dont have parent blocks for sync tipset: %s", err) panic("should do something better, like fetch? or error?") @@ -633,38 +645,6 @@ func (syncer *Syncer) collectChainCaughtUp(fts *store.FullTipSet) ([]*store.Full return chain, nil // return the chain because we have this last block in our cache already. - if ts.Equals(syncer.Genesis) { - break - } - - /* - if !syncer.Punctual(ts) { - syncer.bad.InvalidateChain(chain) - syncer.bad.InvalidateTipSet(ts) - return nil, errors.New("tipset forks too far back from head") - } - */ - - chain = append(chain, fts) - log.Error("received unknown chain in caught up mode...") - panic("for now, we panic...") - - has, err := syncer.store.Contains(ts) - if err != nil { - return nil, err - } - if has { - // Store has record of this tipset. - return chain, nil - } - - /* - parent, err := syncer.FetchTipSet(context.TODO(), ts.Parents()) - if err != nil { - return nil, err - } - ts = parent - */ } return chain, nil From 7f5cba1749c0cbfc4a98976f4a20ca66d6ca4012 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 29 Jul 2019 21:34:09 +0200 Subject: [PATCH 06/14] chain gen: YieldRepo --- chain/gen/gen.go | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/chain/gen/gen.go b/chain/gen/gen.go index 5d7b223ca..c4e01db1a 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -31,6 +31,9 @@ type ChainGen struct { curBlock *types.FullBlock miner address.Address + + r repo.Repo + lr repo.LockedRepo } type mybs struct { @@ -48,18 +51,23 @@ func (m mybs) Get(c cid.Cid) (block.Block, error) { } func NewGenerator() (*ChainGen, error) { - mr := repo.NewMemory(nil) lr, err := mr.Lock() if err != nil { return nil, err } - ds, err := lr.Datastore("/blocks") + ds, err := lr.Datastore("/metadata") if err != nil { return nil, err } - bs := mybs{blockstore.NewBlockstore(ds)} + + bds, err := lr.Datastore("/blocks") + if err != nil { + return nil, err + } + + bs := mybs{blockstore.NewBlockstore(bds)} ks, err := lr.KeyStore() if err != nil { @@ -105,6 +113,9 @@ func NewGenerator() (*ChainGen, error) { genesis: genb.Genesis, miner: miner, curBlock: genfb, + + r: mr, + lr: lr, } return gen, nil @@ -144,3 +155,10 @@ func (cg *ChainGen) NextBlock() (*types.FullBlock, error) { return fblk, nil } + +func (cg *ChainGen) YieldRepo() (repo.Repo, error) { + if err := cg.lr.Close(); err != nil { + return nil, err + } + return cg.r, nil +} From a656aea7fec32876aa8b630ef8c6f8577180d7ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 29 Jul 2019 21:34:34 +0200 Subject: [PATCH 07/14] chain: WIP Generated node sync test --- chain/sync_test.go | 81 ++++++++++++++++++++++++++++++++++++++++++++++ node/node_test.go | 2 +- node/opts_test.go | 21 ------------ node/testopts.go | 20 ++++++++++++ 4 files changed, 102 insertions(+), 22 deletions(-) create mode 100644 chain/sync_test.go delete mode 100644 node/opts_test.go create mode 100644 node/testopts.go diff --git a/chain/sync_test.go b/chain/sync_test.go new file mode 100644 index 000000000..acd700fa7 --- /dev/null +++ b/chain/sync_test.go @@ -0,0 +1,81 @@ +package chain_test + +import ( + "bytes" + "context" + "fmt" + "testing" + + mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + + "github.com/filecoin-project/go-lotus/api" + "github.com/filecoin-project/go-lotus/chain/gen" + "github.com/filecoin-project/go-lotus/node" + "github.com/filecoin-project/go-lotus/node/modules" + modtest "github.com/filecoin-project/go-lotus/node/modules/testing" + "github.com/filecoin-project/go-lotus/node/repo" +) + +func repoWithChain(t *testing.T, h int) repo.Repo { + g, err := gen.NewGenerator() + if err != nil { + t.Fatal(err) + } + + for i := 0; i < h; i++ { + b, err := g.NextBlock() + if err != nil { + t.Fatalf("error at H:%d, %s", i, err) + } + if b.Header.Height != uint64(i+1) { + t.Fatal("wrong height") + } + } + + r, err := g.YieldRepo() + if err != nil { + t.Fatal(err) + } + return r +} + +func TestSyncSimple(t *testing.T) { + ctx := context.Background() + + var genbuf bytes.Buffer + var source api.FullNode + var client api.FullNode + + mn := mocknet.New(ctx) + + err := node.New(ctx, + node.FullAPI(&source), + node.Online(), + node.Repo(repoWithChain(t, 20)), + node.MockHost(mn), + + node.Override(new(modules.Genesis), modtest.MakeGenesisMem(&genbuf)), + ) + if err != nil { + t.Fatal(err) + } + + b, err := source.ChainHead(ctx) + if err != nil { + t.Fatal(err) + } + fmt.Println(b.Height()) + + err = node.New(ctx, + node.FullAPI(&client), + node.Online(), + node.Repo(repo.NewMemory(nil)), + node.MockHost(mn), + + node.Override(new(modules.Genesis), modules.LoadGenesis(genbuf.Bytes())), + ) + if err != nil { + t.Fatal(err) + } + +} diff --git a/node/node_test.go b/node/node_test.go index a9fda6378..f120d508e 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -40,7 +40,7 @@ func builder(t *testing.T, n int) []api.FullNode { node.FullAPI(&out[i]), node.Online(), node.Repo(repo.NewMemory(nil)), - MockHost(mn), + node.MockHost(mn), genesis, ) diff --git a/node/opts_test.go b/node/opts_test.go deleted file mode 100644 index 60e024a84..000000000 --- a/node/opts_test.go +++ /dev/null @@ -1,21 +0,0 @@ -package node_test - -import ( - "errors" - - "github.com/filecoin-project/go-lotus/node" - - "github.com/filecoin-project/go-lotus/node/modules/lp2p" - mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" -) - -func MockHost(mn mocknet.Mocknet) node.Option { - return node.Options( - node.ApplyIf(func(s *node.Settings) bool { return !s.Online }, - node.Error(errors.New("MockHost must be specified after Online")), - ), - - node.Override(new(lp2p.RawHost), lp2p.MockHost), - node.Override(new(mocknet.Mocknet), mn), - ) -} diff --git a/node/testopts.go b/node/testopts.go new file mode 100644 index 000000000..5ec7a66bc --- /dev/null +++ b/node/testopts.go @@ -0,0 +1,20 @@ +package node + +import ( + "errors" + + mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + + "github.com/filecoin-project/go-lotus/node/modules/lp2p" +) + +func MockHost(mn mocknet.Mocknet) Option { + return Options( + ApplyIf(func(s *Settings) bool { return !s.Online }, + Error(errors.New("MockHost must be specified after Online")), + ), + + Override(new(lp2p.RawHost), lp2p.MockHost), + Override(new(mocknet.Mocknet), mn), + ) +} From 9ef5e1266e6d4c7275be866542bd79521715571f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 30 Jul 2019 13:45:48 +0200 Subject: [PATCH 08/14] chain: Test basinc sync on generated chain --- chain/gen/gen.go | 26 ++++++++++++++++-- chain/sync_test.go | 68 ++++++++++++++++++++++++++++------------------ node/builder.go | 1 + 3 files changed, 65 insertions(+), 30 deletions(-) diff --git a/chain/gen/gen.go b/chain/gen/gen.go index c4e01db1a..9b08ce133 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -1,8 +1,14 @@ package gen import ( + "bytes" "context" + "github.com/ipfs/go-blockservice" + "github.com/ipfs/go-car" + offline "github.com/ipfs/go-ipfs-exchange-offline" + "github.com/ipfs/go-merkledag" + "github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/types" @@ -32,7 +38,7 @@ type ChainGen struct { miner address.Address - r repo.Repo + r repo.Repo lr repo.LockedRepo } @@ -67,7 +73,7 @@ func NewGenerator() (*ChainGen, error) { return nil, err } - bs := mybs{blockstore.NewBlockstore(bds)} + bs := mybs{blockstore.NewIdStore(blockstore.NewBlockstore(bds))} ks, err := lr.KeyStore() if err != nil { @@ -114,7 +120,7 @@ func NewGenerator() (*ChainGen, error) { miner: miner, curBlock: genfb, - r: mr, + r: mr, lr: lr, } @@ -125,6 +131,20 @@ func (cg *ChainGen) Genesis() *types.BlockHeader { return cg.genesis } +func (cg *ChainGen) GenesisCar() ([]byte, error) { + offl := offline.Exchange(cg.bs) + blkserv := blockservice.New(cg.bs, offl) + dserv := merkledag.NewDAGService(blkserv) + + out := new(bytes.Buffer) + + if err := car.WriteCar(context.TODO(), dserv, []cid.Cid{cg.Genesis().Cid()}, out); err != nil { + return nil, err + } + + return out.Bytes(), nil +} + func (cg *ChainGen) nextBlockProof() (address.Address, types.ElectionProof, []types.Ticket, error) { return cg.miner, []byte("cat in a box"), []types.Ticket{types.Ticket("im a ticket, promise")}, nil } diff --git a/chain/sync_test.go b/chain/sync_test.go index acd700fa7..546d83c88 100644 --- a/chain/sync_test.go +++ b/chain/sync_test.go @@ -1,22 +1,22 @@ package chain_test import ( - "bytes" "context" "fmt" "testing" + "time" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + "github.com/stretchr/testify/require" "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/chain/gen" "github.com/filecoin-project/go-lotus/node" "github.com/filecoin-project/go-lotus/node/modules" - modtest "github.com/filecoin-project/go-lotus/node/modules/testing" "github.com/filecoin-project/go-lotus/node/repo" ) -func repoWithChain(t *testing.T, h int) repo.Repo { +func repoWithChain(t *testing.T, h int) (repo.Repo, []byte) { g, err := gen.NewGenerator() if err != nil { t.Fatal(err) @@ -24,47 +24,44 @@ func repoWithChain(t *testing.T, h int) repo.Repo { for i := 0; i < h; i++ { b, err := g.NextBlock() - if err != nil { - t.Fatalf("error at H:%d, %s", i, err) - } - if b.Header.Height != uint64(i+1) { - t.Fatal("wrong height") - } + require.NoError(t, err) + require.Equal(t, uint64(i+1), b.Header.Height, "wrong height") } r, err := g.YieldRepo() - if err != nil { - t.Fatal(err) - } - return r + require.NoError(t, err) + + genb, err := g.GenesisCar() + require.NoError(t, err) + + return r, genb } func TestSyncSimple(t *testing.T) { ctx := context.Background() - var genbuf bytes.Buffer var source api.FullNode var client api.FullNode mn := mocknet.New(ctx) + sourceRepo, genesis := repoWithChain(t, 20) + err := node.New(ctx, node.FullAPI(&source), node.Online(), - node.Repo(repoWithChain(t, 20)), + node.Repo(sourceRepo), node.MockHost(mn), - node.Override(new(modules.Genesis), modtest.MakeGenesisMem(&genbuf)), + node.Override(new(modules.Genesis), modules.LoadGenesis(genesis)), ) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) b, err := source.ChainHead(ctx) - if err != nil { - t.Fatal(err) - } - fmt.Println(b.Height()) + require.NoError(t, err) + + require.Equal(t, uint64(20), b.Height()) + fmt.Printf("source H: %d\n", b.Height()) err = node.New(ctx, node.FullAPI(&client), @@ -72,10 +69,27 @@ func TestSyncSimple(t *testing.T) { node.Repo(repo.NewMemory(nil)), node.MockHost(mn), - node.Override(new(modules.Genesis), modules.LoadGenesis(genbuf.Bytes())), + node.Override(new(modules.Genesis), modules.LoadGenesis(genesis)), ) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + require.NoError(t, mn.LinkAll()) + + cb, err := client.ChainHead(ctx) + require.NoError(t, err) + require.Equal(t, uint64(0), cb.Height()) + fmt.Printf("client H: %d\n", cb.Height()) + + sourcePI, err := source.NetAddrsListen(ctx) + require.NoError(t, err) + + err = client.NetConnect(ctx, sourcePI) + require.NoError(t, err) + + time.Sleep(time.Second) + + cb, err = client.ChainHead(ctx) + require.NoError(t, err) + require.Equal(t, uint64(20), cb.Height()) + fmt.Printf("client H: %d\n", cb.Height()) } diff --git a/node/builder.go b/node/builder.go index c41ea1e31..1cc62101b 100644 --- a/node/builder.go +++ b/node/builder.go @@ -335,6 +335,7 @@ func New(ctx context.Context, opts ...Option) error { // on this context, and implement closing logic through lifecycles // correctly if err := app.Start(ctx); err != nil { + // comment fx.NopLogger few lines above for easier debugging return err } From b6439fa57d08847debc701179ad74550de9e53a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 30 Jul 2019 15:20:40 +0200 Subject: [PATCH 09/14] chain: Test sync with messages --- chain/gen/gen.go | 73 ++++++++++++++++++++++++++++---- chain/sync.go | 10 ----- chain/sync_test.go | 101 +++++++++++++++++++++++++++++++-------------- chain/vm/vm.go | 2 +- 4 files changed, 137 insertions(+), 49 deletions(-) diff --git a/chain/gen/gen.go b/chain/gen/gen.go index 9b08ce133..f5d6ae8c9 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -3,6 +3,7 @@ package gen import ( "bytes" "context" + "sync/atomic" "github.com/ipfs/go-blockservice" "github.com/ipfs/go-car" @@ -23,6 +24,8 @@ import ( var log = logging.Logger("gen") +const msgsPerBlock = 2 + type ChainGen struct { accounts []address.Address @@ -32,11 +35,15 @@ type ChainGen struct { cs *store.ChainStore - genesis *types.BlockHeader - + genesis *types.BlockHeader curBlock *types.FullBlock - miner address.Address + w *wallet.Wallet + + miner address.Address + receivers []address.Address + banker address.Address + bankerNonce uint64 r repo.Repo lr repo.LockedRepo @@ -95,7 +102,16 @@ func NewGenerator() (*ChainGen, error) { return nil, err } + receievers := make([]address.Address, msgsPerBlock) + for r := range receievers { + receievers[r], err = w.GenerateKey(types.KTBLS) + if err != nil { + return nil, err + } + } + genb, err := MakeGenesisBlock(bs, map[address.Address]types.BigInt{ + miner: types.NewInt(5), banker: types.NewInt(90000000), }) if err != nil { @@ -104,8 +120,6 @@ func NewGenerator() (*ChainGen, error) { cs := store.NewChainStore(bs, ds) - msgsPerBlock := 10 - genfb := &types.FullBlock{Header: genb.Genesis} if err := cs.SetGenesis(genb.Genesis); err != nil { @@ -117,8 +131,13 @@ func NewGenerator() (*ChainGen, error) { cs: cs, msgsPerBlock: msgsPerBlock, genesis: genb.Genesis, - miner: miner, - curBlock: genfb, + w: w, + + miner: miner, + banker: banker, + receivers: receievers, + + curBlock: genfb, r: mr, lr: lr, @@ -155,7 +174,45 @@ func (cg *ChainGen) NextBlock() (*types.FullBlock, error) { return nil, err } - var msgs []*types.SignedMessage + // make some transfers from banker + + msgs := make([]*types.SignedMessage, cg.msgsPerBlock) + for m := range msgs { + msg := types.Message{ + To: cg.receivers[m], + From: cg.banker, + + Nonce: atomic.AddUint64(&cg.bankerNonce, 1) - 1, + + Value: types.NewInt(uint64(m + 1)), + + Method: 0, + + GasLimit: types.NewInt(10000), + GasPrice: types.NewInt(0), + } + + unsigned, err := msg.Serialize() + if err != nil { + return nil, err + } + + sig, err := cg.w.Sign(cg.banker, unsigned) + if err != nil { + return &types.FullBlock{}, err + } + + msgs[m] = &types.SignedMessage{ + Message: msg, + Signature: *sig, + } + + if _, err := cg.cs.PutMessage(msgs[m]); err != nil { + return nil, err + } + } + + // create block parents, err := types.NewTipSet([]*types.BlockHeader{cg.curBlock.Header}) if err != nil { diff --git a/chain/sync.go b/chain/sync.go index 3e24e9591..106e9e803 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -143,16 +143,6 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) { }() } -func (syncer *Syncer) GetPeers() []peer.ID { - syncer.peerHeadsLk.Lock() - defer syncer.peerHeadsLk.Unlock() - var out []peer.ID - for p, _ := range syncer.peerHeads { - out = append(out, p) - } - return out -} - func (syncer *Syncer) InformNewBlock(from peer.ID, blk *types.FullBlock) { // TODO: search for other blocks that could form a tipset with this block // and then send that tipset to InformNewHead diff --git a/chain/sync_test.go b/chain/sync_test.go index 546d83c88..055aaba4f 100644 --- a/chain/sync_test.go +++ b/chain/sync_test.go @@ -37,59 +37,100 @@ func repoWithChain(t *testing.T, h int) (repo.Repo, []byte) { return r, genb } -func TestSyncSimple(t *testing.T) { - ctx := context.Background() +type syncTestUtil struct { + t *testing.T - var source api.FullNode - var client api.FullNode + ctx context.Context + mn mocknet.Mocknet + + genesis []byte + + nds []api.FullNode +} + +func (tu *syncTestUtil) addSourceNode(gen int) int { + if tu.genesis != nil { + tu.t.Fatal("source node already exists") + } + + sourceRepo, genesis := repoWithChain(tu.t, gen) + var out api.FullNode + + err := node.New(tu.ctx, + node.FullAPI(&out), + node.Online(), + node.Repo(sourceRepo), + node.MockHost(tu.mn), + + node.Override(new(modules.Genesis), modules.LoadGenesis(genesis)), + ) + require.NoError(tu.t, err) + + tu.genesis = genesis + tu.nds = append(tu.nds, out) + return len(tu.nds) - 1 +} + +func (tu *syncTestUtil) addClientNode() int { + var out api.FullNode + + err := node.New(tu.ctx, + node.FullAPI(&out), + node.Online(), + node.Repo(repo.NewMemory(nil)), + node.MockHost(tu.mn), + + node.Override(new(modules.Genesis), modules.LoadGenesis(tu.genesis)), + ) + require.NoError(tu.t, err) + + tu.nds = append(tu.nds, out) + return len(tu.nds) - 1 +} + +func TestSyncSimple(t *testing.T) { + H := 3 + + ctx := context.Background() mn := mocknet.New(ctx) - sourceRepo, genesis := repoWithChain(t, 20) + tu := &syncTestUtil{ + t: t, + ctx: ctx, + mn: mn, + } - err := node.New(ctx, - node.FullAPI(&source), - node.Online(), - node.Repo(sourceRepo), - node.MockHost(mn), + source := tu.addSourceNode(H) - node.Override(new(modules.Genesis), modules.LoadGenesis(genesis)), - ) + b, err := tu.nds[source].ChainHead(ctx) require.NoError(t, err) - b, err := source.ChainHead(ctx) - require.NoError(t, err) - - require.Equal(t, uint64(20), b.Height()) + require.Equal(t, uint64(H), b.Height()) fmt.Printf("source H: %d\n", b.Height()) - err = node.New(ctx, - node.FullAPI(&client), - node.Online(), - node.Repo(repo.NewMemory(nil)), - node.MockHost(mn), + // separate logs + fmt.Println("///////////////////////////////////////////////////") - node.Override(new(modules.Genesis), modules.LoadGenesis(genesis)), - ) - require.NoError(t, err) + client := tu.addClientNode() require.NoError(t, mn.LinkAll()) - cb, err := client.ChainHead(ctx) + cb, err := tu.nds[client].ChainHead(ctx) require.NoError(t, err) require.Equal(t, uint64(0), cb.Height()) fmt.Printf("client H: %d\n", cb.Height()) - sourcePI, err := source.NetAddrsListen(ctx) + sourcePI, err := tu.nds[source].NetAddrsListen(ctx) require.NoError(t, err) - err = client.NetConnect(ctx, sourcePI) + err = tu.nds[client].NetConnect(ctx, sourcePI) require.NoError(t, err) - time.Sleep(time.Second) + time.Sleep(time.Second * 1) - cb, err = client.ChainHead(ctx) + cb, err = tu.nds[client].ChainHead(ctx) require.NoError(t, err) - require.Equal(t, uint64(20), cb.Height()) + require.Equal(t, uint64(H), cb.Height()) fmt.Printf("client H: %d\n", cb.Height()) } diff --git a/chain/vm/vm.go b/chain/vm/vm.go index e4fea4d54..340035309 100644 --- a/chain/vm/vm.go +++ b/chain/vm/vm.go @@ -220,7 +220,7 @@ func (vm *VM) ApplyMessage(ctx context.Context, msg *types.Message) (*types.Mess } if msg.Nonce != fromActor.Nonce { - return nil, xerrors.Errorf("invalid nonce") + return nil, xerrors.Errorf("invalid nonce (got %d, expected %d)", msg.Nonce, fromActor.Nonce) } fromActor.Nonce++ From 412a168151a9da7cadea470088d24d47c46f0359 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 30 Jul 2019 15:55:36 +0200 Subject: [PATCH 10/14] chain: more work on chain sync, sync testing --- chain/blocksync.go | 2 +- chain/gen/gen.go | 8 +- chain/sync.go | 271 +++++++++++++++++++++----------------------- chain/sync_test.go | 85 ++++++++------ node/hello/hello.go | 2 +- 5 files changed, 191 insertions(+), 177 deletions(-) diff --git a/chain/blocksync.go b/chain/blocksync.go index d9751ca23..1085d3fce 100644 --- a/chain/blocksync.go +++ b/chain/blocksync.go @@ -88,7 +88,7 @@ func (bss *BlockSyncService) HandleStream(s inet.Stream) { log.Errorf("failed to read block sync request: %s", err) return } - log.Errorf("block sync request for: %s %d", req.Start, req.RequestLength) + log.Infof("block sync request for: %s %d", req.Start, req.RequestLength) resp, err := bss.processRequest(&req) if err != nil { diff --git a/chain/gen/gen.go b/chain/gen/gen.go index f5d6ae8c9..42ddd010c 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -24,7 +24,7 @@ import ( var log = logging.Logger("gen") -const msgsPerBlock = 2 +const msgsPerBlock = 20 type ChainGen struct { accounts []address.Address @@ -56,7 +56,8 @@ type mybs struct { func (m mybs) Get(c cid.Cid) (block.Block, error) { b, err := m.Blockstore.Get(c) if err != nil { - log.Errorf("Get failed: %s %s", c, err) + // change to error for stacktraces, don't commit with that pls + log.Warnf("Get failed: %s %s", c, err) return nil, err } @@ -97,7 +98,8 @@ func NewGenerator() (*ChainGen, error) { return nil, err } - banker, err := w.GenerateKey(types.KTBLS) + // KTBLS doesn't support signature verification or something like that yet + banker, err := w.GenerateKey(types.KTSecp256k1) if err != nil { return nil, err } diff --git a/chain/sync.go b/chain/sync.go index 106e9e803..18feaa606 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -479,10 +479,6 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err } -func (syncer *Syncer) Punctual(ts *types.TipSet) bool { - return true -} - func (syncer *Syncer) collectChainCaughtUp(fts *store.FullTipSet) ([]*store.FullTipSet, error) { // fetch tipset and messages via bitswap @@ -491,151 +487,148 @@ func (syncer *Syncer) collectChainCaughtUp(fts *store.FullTipSet) ([]*store.Full startHeight := syncer.head.Height() - for { - _, err := syncer.store.LoadTipSet(cur.Parents()) - if err != nil { - // - // TODO: This is 'borrowed' from SyncBootstrap, needs at least some deduplicating + _, err := syncer.store.LoadTipSet(cur.Parents()) + if err != nil { + // + // TODO: This is 'borrowed' from SyncBootstrap, needs at least some deduplicating - blockSet := []*types.TipSet{cur} + blockSet := []*types.TipSet{cur} - at := cur.Cids() + at := cur.Cids() - // If, for some reason, we have a suffix of the chain locally, handle that here - for blockSet[len(blockSet)-1].Height() > startHeight { - log.Warn("syncing local: ", at) - ts, err := syncer.store.LoadTipSet(at) - if err != nil { - if err == bstore.ErrNotFound { - log.Error("not found: ", at) - break - } - log.Warn("loading local tipset: %s", err) - continue // TODO: verify - } - - blockSet = append(blockSet, ts) - at = ts.Parents() - } - - for blockSet[len(blockSet)-1].Height() > startHeight { - // NB: GetBlocks validates that the blocks are in-fact the ones we - // requested, and that they are correctly linked to eachother. It does - // not validate any state transitions - fmt.Println("CaughtUp Get blocks") - blks, err := syncer.Bsync.GetBlocks(context.TODO(), at, 10) - if err != nil { - // Most likely our peers aren't fully synced yet, but forwarded - // new block message (ideally we'd find better peers) - - log.Error("failed to get blocks: ", err) - - // This error will only be logged above, - return nil, xerrors.Errorf("failed to get blocks: %w", err) - } - - for _, b := range blks { - blockSet = append(blockSet, b) - } - - at = blks[len(blks)-1].Parents() - } - - if startHeight == 0 { - // hacks. in the case that we request X blocks starting at height X+1, we - // won't get the Genesis block in the returned blockset. This hacks around it - if blockSet[len(blockSet)-1].Height() != 0 { - blockSet = append(blockSet, syncer.Genesis) - } - - blockSet = reverse(blockSet) - - genesis := blockSet[0] - if !genesis.Equals(syncer.Genesis) { - // TODO: handle this... - log.Errorf("We synced to the wrong chain! %s != %s", genesis, syncer.Genesis) - panic("We synced to the wrong chain") - } - } - - for _, ts := range blockSet { - for _, b := range ts.Blocks() { - if err := syncer.store.PersistBlockHeader(b); err != nil { - log.Errorf("failed to persist synced blocks to the chainstore: %s", err) - panic("bbbbb") - } - } - } - - // Fetch all the messages for all the blocks in this chain - - windowSize := uint64(10) - for i := uint64(0); i <= cur.Height(); i += windowSize { - bs := bstore.NewBlockstore(dstore.NewMapDatastore()) - cst := hamt.CSTFromBstore(bs) - - nextHeight := i + windowSize - 1 - if nextHeight > cur.Height() { - nextHeight = cur.Height() - } - - log.Infof("Fetch next messages on %d (len(blockSet)=%d)", nextHeight, len(blockSet)) - next := blockSet[nextHeight] - bstips, err := syncer.Bsync.GetChainMessages(context.TODO(), next, (nextHeight+1)-i) - if err != nil { - log.Errorf("failed to fetch messages: %s", err) - return nil, xerrors.Errorf("message processing failed: %w", err) - } - - for bsi := 0; bsi < len(bstips); bsi++ { - cur := blockSet[i+uint64(bsi)] - bstip := bstips[len(bstips)-(bsi+1)] - fmt.Println("that loop: ", bsi, len(bstips)) - fts, err := zipTipSetAndMessages(cst, cur, bstip.Messages, bstip.MsgIncludes) - if err != nil { - log.Error("zipping failed: ", err, bsi, i) - log.Error("height: ", cur.Height()) - log.Error("bstips: ", bstips) - log.Error("next height: ", nextHeight) - return nil, xerrors.Errorf("message processing failed: %w", err) - } - - if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil { - log.Errorf("failed to validate tipset: %s", err) - return nil, xerrors.Errorf("message processing failed: %w", err) - } - } - - for _, bst := range bstips { - for _, m := range bst.Messages { - if _, err := cst.Put(context.TODO(), m); err != nil { - log.Error("failed to persist messages: ", err) - return nil, xerrors.Errorf("message processing failed: %w", err) - } - } - } - - if err := copyBlockstore(bs, syncer.store.Blockstore()); err != nil { - log.Errorf("failed to persist temp blocks: %s", err) - return nil, xerrors.Errorf("message processing failed: %w", err) - } - } - - //log.Errorf("dont have parent blocks for sync tipset: %s", err) - //panic("should do something better, like fetch? or error?") - - _, err = syncer.store.LoadTipSet(cur.Parents()) + // If, for some reason, we have a suffix of the chain locally, handle that here + for blockSet[len(blockSet)-1].Height() > startHeight { + log.Warn("syncing local: ", at) + ts, err := syncer.store.LoadTipSet(at) if err != nil { - log.Errorf("HACK DIDNT WORK :( dont have parent blocks for sync tipset: %s", err) - panic("should do something better, like fetch? or error?") + if err == bstore.ErrNotFound { + log.Info("tipset not found locally, starting sync: ", at) + break + } + log.Warn("loading local tipset: %s", err) + continue // TODO: verify } - // + blockSet = append(blockSet, ts) + at = ts.Parents() } - return chain, nil // return the chain because we have this last block in our cache already. + for blockSet[len(blockSet)-1].Height() > startHeight { + // NB: GetBlocks validates that the blocks are in-fact the ones we + // requested, and that they are correctly linked to eachother. It does + // not validate any state transitions + fmt.Println("Get blocks") + blks, err := syncer.Bsync.GetBlocks(context.TODO(), at, 10) + if err != nil { + // Most likely our peers aren't fully synced yet, but forwarded + // new block message (ideally we'd find better peers) + log.Error("failed to get blocks: ", err) + + // This error will only be logged above, + return nil, xerrors.Errorf("failed to get blocks: %w", err) + } + + for _, b := range blks { + blockSet = append(blockSet, b) + } + + at = blks[len(blks)-1].Parents() + } + + if startHeight == 0 { + // hacks. in the case that we request X blocks starting at height X+1, we + // won't get the Genesis block in the returned blockset. This hacks around it + if blockSet[len(blockSet)-1].Height() != 0 { + blockSet = append(blockSet, syncer.Genesis) + } + + blockSet = reverse(blockSet) + + genesis := blockSet[0] + if !genesis.Equals(syncer.Genesis) { + // TODO: handle this... + log.Errorf("We synced to the wrong chain! %s != %s", genesis, syncer.Genesis) + panic("We synced to the wrong chain") + } + } + + for _, ts := range blockSet { + for _, b := range ts.Blocks() { + if err := syncer.store.PersistBlockHeader(b); err != nil { + log.Errorf("failed to persist synced blocks to the chainstore: %s", err) + panic("bbbbb") + } + } + } + + // Fetch all the messages for all the blocks in this chain + + windowSize := uint64(10) + for i := uint64(0); i <= cur.Height(); i += windowSize { + bs := bstore.NewBlockstore(dstore.NewMapDatastore()) + cst := hamt.CSTFromBstore(bs) + + nextHeight := i + windowSize - 1 + if nextHeight > cur.Height() { + nextHeight = cur.Height() + } + + log.Infof("Fetch next messages on %d (len(blockSet)=%d)", nextHeight, len(blockSet)) + next := blockSet[nextHeight] + bstips, err := syncer.Bsync.GetChainMessages(context.TODO(), next, (nextHeight+1)-i) + if err != nil { + log.Errorf("failed to fetch messages: %s", err) + return nil, xerrors.Errorf("message processing failed: %w", err) + } + + for bsi := 0; bsi < len(bstips); bsi++ { + cur := blockSet[i+uint64(bsi)] + bstip := bstips[len(bstips)-(bsi+1)] + fmt.Println("that loop: ", bsi, len(bstips)) + fts, err := zipTipSetAndMessages(cst, cur, bstip.Messages, bstip.MsgIncludes) + if err != nil { + log.Error("zipping failed: ", err, bsi, i) + log.Error("height: ", cur.Height()) + log.Error("bstips: ", bstips) + log.Error("next height: ", nextHeight) + return nil, xerrors.Errorf("message processing failed: %w", err) + } + + if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil { + log.Errorf("failed to validate tipset: %s", err) + return nil, xerrors.Errorf("message processing failed: %w", err) + } + } + + for _, bst := range bstips { + for _, m := range bst.Messages { + if _, err := cst.Put(context.TODO(), m); err != nil { + log.Error("failed to persist messages: ", err) + return nil, xerrors.Errorf("message processing failed: %w", err) + } + } + } + + if err := copyBlockstore(bs, syncer.store.Blockstore()); err != nil { + log.Errorf("failed to persist temp blocks: %s", err) + return nil, xerrors.Errorf("message processing failed: %w", err) + } + } + + //log.Errorf("dont have parent blocks for sync tipset: %s", err) + //panic("should do something better, like fetch? or error?") + + _, err = syncer.store.LoadTipSet(cur.Parents()) + if err != nil { + log.Errorf("HACK DIDNT WORK :( dont have parent blocks for sync tipset: %s", err) + panic("should do something better, like fetch? or error?") + } + + // } + return chain, nil // return the chain because we have this last block in our cache already. + return chain, nil } diff --git a/chain/sync_test.go b/chain/sync_test.go index 055aaba4f..c1a3a50c2 100644 --- a/chain/sync_test.go +++ b/chain/sync_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + logging "github.com/ipfs/go-log" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/require" @@ -16,6 +17,8 @@ import ( "github.com/filecoin-project/go-lotus/node/repo" ) +const source = 0 + func repoWithChain(t *testing.T, h int) (repo.Repo, []byte) { g, err := gen.NewGenerator() if err != nil { @@ -48,7 +51,7 @@ type syncTestUtil struct { nds []api.FullNode } -func (tu *syncTestUtil) addSourceNode(gen int) int { +func (tu *syncTestUtil) addSourceNode(gen int) { if tu.genesis != nil { tu.t.Fatal("source node already exists") } @@ -67,11 +70,14 @@ func (tu *syncTestUtil) addSourceNode(gen int) int { require.NoError(tu.t, err) tu.genesis = genesis - tu.nds = append(tu.nds, out) - return len(tu.nds) - 1 + tu.nds = append(tu.nds, out) // always at 0 } func (tu *syncTestUtil) addClientNode() int { + if tu.genesis == nil { + tu.t.Fatal("source doesn't exists") + } + var out api.FullNode err := node.New(tu.ctx, @@ -88,49 +94,62 @@ func (tu *syncTestUtil) addClientNode() int { return len(tu.nds) - 1 } +func (tu *syncTestUtil) connect(from, to int) { + toPI, err := tu.nds[to].NetAddrsListen(tu.ctx) + require.NoError(tu.t, err) + + err = tu.nds[from].NetConnect(tu.ctx, toPI) + require.NoError(tu.t, err) +} + +func (tu *syncTestUtil) checkHeight(name string, n int, h int) { + b, err := tu.nds[n].ChainHead(tu.ctx) + require.NoError(tu.t, err) + + require.Equal(tu.t, uint64(h), b.Height()) + fmt.Printf("%s H: %d\n", name, b.Height()) +} + +func (tu *syncTestUtil) compareSourceState(with int) { + sourceAccounts, err := tu.nds[source].WalletList(tu.ctx) + require.NoError(tu.t, err) + + for _, addr := range sourceAccounts { + sourceBalance, err := tu.nds[source].WalletBalance(tu.ctx, addr) + require.NoError(tu.t, err) + + actBalance, err := tu.nds[with].WalletBalance(tu.ctx, addr) + require.NoError(tu.t, err) + + require.Equal(tu.t, sourceBalance, actBalance) + } +} + func TestSyncSimple(t *testing.T) { - H := 3 + logging.SetLogLevel("*", "INFO") + H := 20 ctx := context.Background() - - mn := mocknet.New(ctx) - tu := &syncTestUtil{ t: t, ctx: ctx, - mn: mn, + mn: mocknet.New(ctx), } - source := tu.addSourceNode(H) - - b, err := tu.nds[source].ChainHead(ctx) - require.NoError(t, err) - - require.Equal(t, uint64(H), b.Height()) - fmt.Printf("source H: %d\n", b.Height()) + tu.addSourceNode(H) + tu.checkHeight("source", source, H) // separate logs - fmt.Println("///////////////////////////////////////////////////") + fmt.Println("\x1b[31m///////////////////////////////////////////////////\x1b[39b") client := tu.addClientNode() + tu.checkHeight("client", client, 0) - require.NoError(t, mn.LinkAll()) - - cb, err := tu.nds[client].ChainHead(ctx) - require.NoError(t, err) - require.Equal(t, uint64(0), cb.Height()) - fmt.Printf("client H: %d\n", cb.Height()) - - sourcePI, err := tu.nds[source].NetAddrsListen(ctx) - require.NoError(t, err) - - err = tu.nds[client].NetConnect(ctx, sourcePI) - require.NoError(t, err) - + require.NoError(t, tu.mn.LinkAll()) + tu.connect(1, 0) time.Sleep(time.Second * 1) - cb, err = tu.nds[client].ChainHead(ctx) - require.NoError(t, err) - require.Equal(t, uint64(H), cb.Height()) - fmt.Printf("client H: %d\n", cb.Height()) + tu.checkHeight("client", client, H) + + tu.compareSourceState(client) } diff --git a/node/hello/hello.go b/node/hello/hello.go index 4d7e59495..8fc3ab5d5 100644 --- a/node/hello/hello.go +++ b/node/hello/hello.go @@ -93,7 +93,7 @@ func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error { HeaviestTipSetWeight: weight, GenesisHash: gen.Cid(), } - fmt.Println("SENDING HELLO MESSAGE: ", hts.Cids()) + fmt.Println("SENDING HELLO MESSAGE: ", hts.Cids(), hts.Height()) fmt.Println("hello message genesis: ", gen.Cid()) if err := cborrpc.WriteCborRPC(s, hmsg); err != nil { From cdf0e0c858cf51250654009396fd0f915231d0ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 30 Jul 2019 18:04:36 +0200 Subject: [PATCH 11/14] chain: Test 'manual' sync --- chain/gen/gen.go | 2 +- chain/state/statetree.go | 2 +- chain/sync_test.go | 95 ++++++++++++++++++++++++++++++++-------- node/builder.go | 1 + node/modules/core.go | 1 + 5 files changed, 80 insertions(+), 21 deletions(-) diff --git a/chain/gen/gen.go b/chain/gen/gen.go index 42ddd010c..0b9ae9b95 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -57,7 +57,7 @@ func (m mybs) Get(c cid.Cid) (block.Block, error) { b, err := m.Blockstore.Get(c) if err != nil { // change to error for stacktraces, don't commit with that pls - log.Warnf("Get failed: %s %s", c, err) + log.Warn("Get failed: %s %s", c, err) return nil, err } diff --git a/chain/state/statetree.go b/chain/state/statetree.go index 7239fc282..a58b0cb4f 100644 --- a/chain/state/statetree.go +++ b/chain/state/statetree.go @@ -35,7 +35,7 @@ func NewStateTree(cst *hamt.CborIpldStore) (*StateTree, error) { func LoadStateTree(cst *hamt.CborIpldStore, c cid.Cid) (*StateTree, error) { nd, err := hamt.LoadNode(context.Background(), cst, c) if err != nil { - log.Errorf("loading hamt node failed: %s", err) + log.Errorf("loading hamt node %s failed: %s", c, err) return nil, err } diff --git a/chain/sync_test.go b/chain/sync_test.go index c1a3a50c2..41449b319 100644 --- a/chain/sync_test.go +++ b/chain/sync_test.go @@ -11,24 +11,32 @@ import ( "github.com/stretchr/testify/require" "github.com/filecoin-project/go-lotus/api" + "github.com/filecoin-project/go-lotus/chain" "github.com/filecoin-project/go-lotus/chain/gen" + "github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/node" + "github.com/filecoin-project/go-lotus/node/impl" "github.com/filecoin-project/go-lotus/node/modules" "github.com/filecoin-project/go-lotus/node/repo" ) const source = 0 -func repoWithChain(t *testing.T, h int) (repo.Repo, []byte) { +func repoWithChain(t *testing.T, h int) (repo.Repo, []byte, []*types.FullBlock) { g, err := gen.NewGenerator() if err != nil { t.Fatal(err) } + blks := make([]*types.FullBlock, h) + for i := 0; i < h; i++ { - b, err := g.NextBlock() + blks[i], err = g.NextBlock() require.NoError(t, err) - require.Equal(t, uint64(i+1), b.Header.Height, "wrong height") + + fmt.Printf("block at H:%d: %s\n", blks[i].Header.Height, blks[i].Cid()) + + require.Equal(t, uint64(i+1), blks[i].Header.Height, "wrong height") } r, err := g.YieldRepo() @@ -37,7 +45,7 @@ func repoWithChain(t *testing.T, h int) (repo.Repo, []byte) { genb, err := g.GenesisCar() require.NoError(t, err) - return r, genb + return r, genb, blks } type syncTestUtil struct { @@ -47,16 +55,36 @@ type syncTestUtil struct { mn mocknet.Mocknet genesis []byte + blocks []*types.FullBlock nds []api.FullNode } +func prepSyncTest(t *testing.T, h int) *syncTestUtil { + logging.SetLogLevel("*", "INFO") + + ctx := context.Background() + tu := &syncTestUtil{ + t: t, + ctx: ctx, + mn: mocknet.New(ctx), + } + + tu.addSourceNode(h) + tu.checkHeight("source", source, h) + + // separate logs + fmt.Println("\x1b[31m///////////////////////////////////////////////////\x1b[39b") + + return tu +} + func (tu *syncTestUtil) addSourceNode(gen int) { if tu.genesis != nil { tu.t.Fatal("source node already exists") } - sourceRepo, genesis := repoWithChain(tu.t, gen) + sourceRepo, genesis, blocks := repoWithChain(tu.t, gen) var out api.FullNode err := node.New(tu.ctx, @@ -70,6 +98,7 @@ func (tu *syncTestUtil) addSourceNode(gen int) { require.NoError(tu.t, err) tu.genesis = genesis + tu.blocks = blocks tu.nds = append(tu.nds, out) // always at 0 } @@ -117,36 +146,64 @@ func (tu *syncTestUtil) compareSourceState(with int) { for _, addr := range sourceAccounts { sourceBalance, err := tu.nds[source].WalletBalance(tu.ctx, addr) require.NoError(tu.t, err) + fmt.Printf("Source state check for %s, expect %s\n", addr, sourceBalance) actBalance, err := tu.nds[with].WalletBalance(tu.ctx, addr) require.NoError(tu.t, err) require.Equal(tu.t, sourceBalance, actBalance) + fmt.Printf("Source state check for %s\n", addr) + } +} + +func (tu *syncTestUtil) submitSourceBlock(to int, h int) { + // utility to simulate incoming blocks without miner process + + var b chain.BlockMsg + + // -1 to match block.Height + b.Header = tu.blocks[h - 1].Header + for _, msg := range tu.blocks[h - 1].Messages { + c, err := tu.nds[to].(*impl.FullNodeAPI).Chain.PutMessage(msg) + require.NoError(tu.t, err) + + b.Messages = append(b.Messages, c) + } + + require.NoError(tu.t, tu.nds[to].ChainSubmitBlock(tu.ctx, &b)) +} + +func (tu *syncTestUtil) submitSourceBlocks(to int, h int, n int) { + for i := 0; i < n; i++ { + tu.submitSourceBlock(to, h + i) } } func TestSyncSimple(t *testing.T) { - logging.SetLogLevel("*", "INFO") - H := 20 - ctx := context.Background() - tu := &syncTestUtil{ - t: t, - ctx: ctx, - mn: mocknet.New(ctx), - } - - tu.addSourceNode(H) - tu.checkHeight("source", source, H) - - // separate logs - fmt.Println("\x1b[31m///////////////////////////////////////////////////\x1b[39b") + tu := prepSyncTest(t, H) client := tu.addClientNode() tu.checkHeight("client", client, 0) require.NoError(t, tu.mn.LinkAll()) tu.connect(1, 0) + time.Sleep(time.Second * 3) + + tu.checkHeight("client", client, H) + + tu.compareSourceState(client) +} + +func TestSyncManual(t *testing.T) { + H := 2 + tu := prepSyncTest(t, H) + + client := tu.addClientNode() + tu.checkHeight("client", client, 0) + + tu.submitSourceBlocks(client, 1, H) + time.Sleep(time.Second * 1) tu.checkHeight("client", client, H) diff --git a/node/builder.go b/node/builder.go index 1cc62101b..5b9bccc29 100644 --- a/node/builder.go +++ b/node/builder.go @@ -6,6 +6,7 @@ import ( "reflect" "time" + "github.com/ipfs/go-blockservice" "github.com/ipfs/go-filestore" exchange "github.com/ipfs/go-ipfs-exchange-interface" diff --git a/node/modules/core.go b/node/modules/core.go index 0ddae1224..4c5638d21 100644 --- a/node/modules/core.go +++ b/node/modules/core.go @@ -60,6 +60,7 @@ func Bitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routin return exch.Close() }, }) + return exch } From 99aaafaed2e54146cf6300881fd0495c8c798cae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 30 Jul 2019 18:39:07 +0200 Subject: [PATCH 12/14] chain: Some sync cleanup --- chain/sync.go | 25 +++++++------------------ chain/sync_test.go | 28 +++++++++++++++++++++++++++- 2 files changed, 34 insertions(+), 19 deletions(-) diff --git a/chain/sync.go b/chain/sync.go index 18feaa606..cfe926af7 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -17,7 +17,7 @@ import ( "github.com/ipfs/go-hamt-ipld" bstore "github.com/ipfs/go-ipfs-blockstore" logging "github.com/ipfs/go-log" - peer "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/peer" "github.com/pkg/errors" "github.com/whyrusleeping/sharray" ) @@ -125,7 +125,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) { // TODO: this is kindof a hack... log.Infof("got block from ourselves") - if err := syncer.SyncCaughtUp(fts); err != nil { + if err := syncer.Sync(fts); err != nil { log.Errorf("failed to sync our own block: %s", err) } @@ -137,7 +137,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) { syncer.Bsync.AddPeer(from) go func() { - if err := syncer.SyncCaughtUp(fts); err != nil { + if err := syncer.Sync(fts); err != nil { log.Errorf("sync error: %s", err) } }() @@ -227,9 +227,7 @@ func (syncer *Syncer) SyncBootstrap() { head := blockSet[len(blockSet)-1] log.Errorf("Finished syncing! new head: %s", head.Cids()) - if err := syncer.store.MaybeTakeHeavierTipSet(selectedHead); err != nil { - log.Errorf("MaybeTakeHeavierTipSet failed: %s", err) - } + syncer.store.MaybeTakeHeavierTipSet(selectedHead) syncer.head = head } @@ -374,9 +372,7 @@ func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*store.FullTipSet, erro return fts, nil } -// SyncCaughtUp is used to stay in sync once caught up to -// the rest of the network. -func (syncer *Syncer) SyncCaughtUp(maybeHead *store.FullTipSet) error { +func (syncer *Syncer) Sync(maybeHead *store.FullTipSet) error { syncer.syncLock.Lock() defer syncer.syncLock.Unlock() @@ -489,9 +485,6 @@ func (syncer *Syncer) collectChainCaughtUp(fts *store.FullTipSet) ([]*store.Full _, err := syncer.store.LoadTipSet(cur.Parents()) if err != nil { - // - // TODO: This is 'borrowed' from SyncBootstrap, needs at least some deduplicating - blockSet := []*types.TipSet{cur} at := cur.Cids() @@ -621,14 +614,10 @@ func (syncer *Syncer) collectChainCaughtUp(fts *store.FullTipSet) ([]*store.Full _, err = syncer.store.LoadTipSet(cur.Parents()) if err != nil { - log.Errorf("HACK DIDNT WORK :( dont have parent blocks for sync tipset: %s", err) - panic("should do something better, like fetch? or error?") + log.Errorf("dont have parent blocks for sync tipset: %s", err) + panic("should do something better, error?") } - - // } return chain, nil // return the chain because we have this last block in our cache already. - - return chain, nil } diff --git a/chain/sync_test.go b/chain/sync_test.go index 41449b319..0436756b0 100644 --- a/chain/sync_test.go +++ b/chain/sync_test.go @@ -158,6 +158,7 @@ func (tu *syncTestUtil) compareSourceState(with int) { func (tu *syncTestUtil) submitSourceBlock(to int, h int) { // utility to simulate incoming blocks without miner process + // TODO: should call syncer directly, this won't work correctly in all cases var b chain.BlockMsg @@ -196,7 +197,7 @@ func TestSyncSimple(t *testing.T) { } func TestSyncManual(t *testing.T) { - H := 2 + H := 20 tu := prepSyncTest(t, H) client := tu.addClientNode() @@ -210,3 +211,28 @@ func TestSyncManual(t *testing.T) { tu.compareSourceState(client) } + +/* +TODO: this is broken because of how tu.submitSourceBlock works now +func TestSyncIncoming(t *testing.T) { + H := 1 + tu := prepSyncTest(t, H) + + producer := tu.addClientNode() + client := tu.addClientNode() + + tu.mn.LinkAll() + tu.connect(client, producer) + + for h := 0; h < H; h++ { + tu.submitSourceBlock(producer, h + 1) + + time.Sleep(time.Millisecond * 200) + + } + tu.checkHeight("client", client, H) + tu.checkHeight("producer", producer, H) + + tu.compareSourceState(client) +} + */ From 0f2334f513f101d11ed5837b750ba73747670802 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 30 Jul 2019 19:26:53 +0200 Subject: [PATCH 13/14] chain sync: rebase 'fixes' --- chain/sync_test.go | 4 ++-- node/builder.go | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/chain/sync_test.go b/chain/sync_test.go index 0436756b0..08ac2a598 100644 --- a/chain/sync_test.go +++ b/chain/sync_test.go @@ -196,6 +196,8 @@ func TestSyncSimple(t *testing.T) { tu.compareSourceState(client) } +/* +TODO: this is broken because of how tu.submitSourceBlock works now func TestSyncManual(t *testing.T) { H := 20 tu := prepSyncTest(t, H) @@ -212,8 +214,6 @@ func TestSyncManual(t *testing.T) { tu.compareSourceState(client) } -/* -TODO: this is broken because of how tu.submitSourceBlock works now func TestSyncIncoming(t *testing.T) { H := 1 tu := prepSyncTest(t, H) diff --git a/node/builder.go b/node/builder.go index 5b9bccc29..1cc62101b 100644 --- a/node/builder.go +++ b/node/builder.go @@ -6,7 +6,6 @@ import ( "reflect" "time" - "github.com/ipfs/go-blockservice" "github.com/ipfs/go-filestore" exchange "github.com/ipfs/go-ipfs-exchange-interface" From a8b434a7086e1186aa6e1f81aeb3b780334a19ea Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Wed, 31 Jul 2019 00:13:49 -0700 Subject: [PATCH 14/14] fix the most annoying bug ever --- api/api.go | 4 +- chain/blocksync.go | 3 - chain/gen/gen.go | 2 +- chain/messagepool.go | 5 +- chain/store/store.go | 27 ++- chain/sync.go | 427 ++++++++++++++++--------------------------- chain/sync_test.go | 12 +- go.mod | 6 +- go.sum | 10 +- 9 files changed, 193 insertions(+), 303 deletions(-) diff --git a/api/api.go b/api/api.go index d99cfab0a..9bb8a246c 100644 --- a/api/api.go +++ b/api/api.go @@ -81,6 +81,7 @@ type FullNode interface { MpoolPending(context.Context, *types.TipSet) ([]*types.SignedMessage, error) MpoolPush(context.Context, *types.SignedMessage) error + MpoolGetNonce(context.Context, address.Address) (uint64, error) // FullNodeStruct @@ -99,9 +100,6 @@ type FullNode interface { WalletSign(context.Context, address.Address, []byte) (*types.Signature, error) WalletDefaultAddress(context.Context) (address.Address, error) - // Really not sure where this belongs. It could go on the wallet, or the message pool, or the chain... - MpoolGetNonce(context.Context, address.Address) (uint64, error) - // Other // ClientImport imports file under the specified path into filestore diff --git a/chain/blocksync.go b/chain/blocksync.go index 1085d3fce..e70e8838d 100644 --- a/chain/blocksync.go +++ b/chain/blocksync.go @@ -129,19 +129,16 @@ func (bss *BlockSyncService) collectChainSegment(start []cid.Cid, length uint64, } if opts.IncludeMessages { - log.Info("INCLUDING MESSAGES IN SYNC RESPONSE") msgs, mincl, err := bss.gatherMessages(ts) if err != nil { return nil, err } - log.Infof("messages: ", msgs) bst.Messages = msgs bst.MsgIncludes = mincl } if opts.IncludeBlocks { - log.Info("INCLUDING BLOCKS IN SYNC RESPONSE") bst.Blocks = ts.Blocks() } diff --git a/chain/gen/gen.go b/chain/gen/gen.go index 0b9ae9b95..42ddd010c 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -57,7 +57,7 @@ func (m mybs) Get(c cid.Cid) (block.Block, error) { b, err := m.Blockstore.Get(c) if err != nil { // change to error for stacktraces, don't commit with that pls - log.Warn("Get failed: %s %s", c, err) + log.Warnf("Get failed: %s %s", c, err) return nil, err } diff --git a/chain/messagepool.go b/chain/messagepool.go index b3e90cc8c..66a87fa34 100644 --- a/chain/messagepool.go +++ b/chain/messagepool.go @@ -6,6 +6,7 @@ import ( "github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/types" + "github.com/pkg/errors" ) type MessagePool struct { @@ -128,7 +129,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) for _, b := range ts.Blocks() { msgs, err := mp.cs.MessagesForBlock(b) if err != nil { - return err + return errors.Wrapf(err, "failed to get messages for revert block %s(height %d)", b.Cid(), b.Height) } for _, msg := range msgs { if err := mp.Add(msg); err != nil { @@ -142,7 +143,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) for _, b := range ts.Blocks() { msgs, err := mp.cs.MessagesForBlock(b) if err != nil { - return err + return errors.Wrapf(err, "failed to get messages for apply block %s(height %d) (msgroot = %s)", b.Cid(), b.Height, b.Messages) } for _, msg := range msgs { mp.Remove(msg) diff --git a/chain/store/store.go b/chain/store/store.go index e8629e0a6..89f02e8c3 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -187,9 +187,12 @@ func (cs *ChainStore) MaybeTakeHeavierTipSet(ts *types.TipSet) error { cs.heaviestLk.Lock() defer cs.heaviestLk.Unlock() if cs.heaviest == nil || cs.Weight(ts) > cs.Weight(cs.heaviest) { + // TODO: don't do this for initial sync. Now that we don't have a + // difference between 'bootstrap sync' and 'caught up' sync, we need + // some other heuristic. revert, apply, err := cs.ReorgOps(cs.heaviest, ts) if err != nil { - return err + return errors.Wrap(err, "computing reorg ops failed") } for _, hcf := range cs.headChangeNotifs { if err := hcf(revert, apply); err != nil { @@ -290,6 +293,7 @@ func (cs *ChainStore) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.Ti rightChain = append(rightChain, right) par, err := cs.LoadTipSet(right.Parents()) if err != nil { + log.Infof("failed to fetch right.Parents: %s", err) return nil, nil, err } @@ -336,13 +340,21 @@ type storable interface { ToStorageBlock() (block.Block, error) } -func (cs *ChainStore) PutMessage(m storable) (cid.Cid, error) { - sb, err := m.ToStorageBlock() +func PutMessage(bs blockstore.Blockstore, m storable) (cid.Cid, error) { + b, err := m.ToStorageBlock() if err != nil { return cid.Undef, err } - return sb.Cid(), cs.bs.Put(sb) + if err := bs.Put(b); err != nil { + return cid.Undef, err + } + + return b.Cid(), nil +} + +func (cs *ChainStore) PutMessage(m storable) (cid.Cid, error) { + return PutMessage(cs.bs, m) } func (cs *ChainStore) AddBlock(b *types.BlockHeader) error { @@ -395,6 +407,7 @@ func (cs *ChainStore) TipSetState(cids []cid.Cid) (cid.Cid, error) { func (cs *ChainStore) GetMessage(c cid.Cid) (*types.SignedMessage, error) { sb, err := cs.bs.Get(c) if err != nil { + log.Errorf("get message get failed: %s: %s", c, err) return nil, err } @@ -428,7 +441,7 @@ func (cs *ChainStore) MessageCidsForBlock(b *types.BlockHeader) ([]cid.Cid, erro func (cs *ChainStore) MessagesForBlock(b *types.BlockHeader) ([]*types.SignedMessage, error) { cids, err := cs.MessageCidsForBlock(b) if err != nil { - return nil, err + return nil, errors.Wrap(err, "loading message cids for block") } return cs.LoadMessagesFromCids(cids) @@ -461,10 +474,10 @@ func (cs *ChainStore) GetReceipt(b *types.BlockHeader, i int) (*types.MessageRec func (cs *ChainStore) LoadMessagesFromCids(cids []cid.Cid) ([]*types.SignedMessage, error) { msgs := make([]*types.SignedMessage, 0, len(cids)) - for _, c := range cids { + for i, c := range cids { m, err := cs.GetMessage(c) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "failed to get message: (%s):%d", c, i) } msgs = append(msgs, m) diff --git a/chain/sync.go b/chain/sync.go index cfe926af7..3b764f7e0 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -77,41 +77,6 @@ type BadTipSetCache struct { badBlocks map[cid.Cid]struct{} } -/*type BlockSet struct { - tset map[uint64]*types.TipSet - head *types.TipSet -} - -func (bs *BlockSet) Insert(ts *types.TipSet) { - if bs.tset == nil { - bs.tset = make(map[uint64]*types.TipSet) - } - - if bs.head == nil || ts.Height() > bs.head.Height() { - bs.head = ts - } - bs.tset[ts.Height()] = ts -} - -func (bs *BlockSet) GetByHeight(h uint64) *types.TipSet { - return bs.tset[h] -} - -func (bs *BlockSet) PersistTo(cs *store.ChainStore) error { - for _, ts := range bs.tset { - for _, b := range ts.Blocks() { - if err := cs.PersistBlockHeader(b); err != nil { - return err - } - } - } - return nil -} - -func (bs *BlockSet) Head() *types.TipSet { - return bs.head -}*/ - const BootstrapPeerThreshold = 1 // InformNewHead informs the syncer about a new potential tipset @@ -151,86 +116,6 @@ func (syncer *Syncer) InformNewBlock(from peer.ID, blk *types.FullBlock) { syncer.InformNewHead(from, fts) } -// SyncBootstrap is used to synchronise your chain when first joining -// the network, or when rejoining after significant downtime. -func (syncer *Syncer) SyncBootstrap() { - fmt.Println("Sync bootstrap!") - defer fmt.Println("bye bye sync bootstrap") - - selectedHead, err := syncer.selectHead(syncer.peerHeads) - if err != nil { - log.Error("failed to select head: ", err) - return - } - - blockSet := []*types.TipSet{selectedHead} - cur := selectedHead.Cids() - - // If, for some reason, we have a suffix of the chain locally, handle that here - for blockSet[len(blockSet)-1].Height() > 0 { - log.Errorf("syncing local: ", cur) - ts, err := syncer.store.LoadTipSet(cur) - if err != nil { - if err == bstore.ErrNotFound { - log.Error("not found: ", cur) - break - } - log.Errorf("loading local tipset: %s", err) - return - } - - blockSet = append(blockSet, ts) - cur = ts.Parents() - } - - for blockSet[len(blockSet)-1].Height() > 0 { - // NB: GetBlocks validates that the blocks are in-fact the ones we - // requested, and that they are correctly linked to eachother. It does - // not validate any state transitions - fmt.Println("Get blocks: ", cur) - blks, err := syncer.Bsync.GetBlocks(context.TODO(), cur, 10) - if err != nil { - log.Error("failed to get blocks: ", err) - return - } - - for _, b := range blks { - blockSet = append(blockSet, b) - } - - cur = blks[len(blks)-1].Parents() - } - - // hacks. in the case that we request X blocks starting at height X+1, we - // won't get the Genesis block in the returned blockset. This hacks around it - if blockSet[len(blockSet)-1].Height() != 0 { - blockSet = append(blockSet, syncer.Genesis) - } - - blockSet = reverse(blockSet) - - genesis := blockSet[0] - if !genesis.Equals(syncer.Genesis) { - // TODO: handle this... - log.Errorf("We synced to the wrong chain! %s != %s", genesis, syncer.Genesis) - return - } - - for _, ts := range blockSet { - for _, b := range ts.Blocks() { - if err := syncer.store.PersistBlockHeader(b); err != nil { - log.Errorf("failed to persist synced blocks to the chainstore: %s", err) - return - } - } - } - - head := blockSet[len(blockSet)-1] - log.Errorf("Finished syncing! new head: %s", head.Cids()) - syncer.store.MaybeTakeHeavierTipSet(selectedHead) - syncer.head = head -} - func reverse(tips []*types.TipSet) []*types.TipSet { out := make([]*types.TipSet, len(tips)) for i := 0; i < len(tips); i++ { @@ -263,8 +148,6 @@ func zipTipSetAndMessages(cst *hamt.CborIpldStore, ts *types.TipSet, messages [] if len(ts.Blocks()) != len(msgincl) { return nil, fmt.Errorf("msgincl length didnt match tipset size") } - fmt.Println("zipping messages: ", msgincl) - fmt.Println("into block: ", ts.Blocks()[0].Height) fts := &store.FullTipSet{} for bi, b := range ts.Blocks() { @@ -280,8 +163,6 @@ func zipTipSetAndMessages(cst *hamt.CborIpldStore, ts *types.TipSet, messages [] return nil, err } - fmt.Println("messages: ", msgCids) - fmt.Println("message root: ", b.Messages, mroot) if b.Messages != mroot { return nil, fmt.Errorf("messages didnt match message root in header") } @@ -381,29 +262,17 @@ func (syncer *Syncer) Sync(maybeHead *store.FullTipSet) error { return nil } - chain, err := syncer.collectChainCaughtUp(maybeHead) - if err != nil { + if err := syncer.collectChain(maybeHead); err != nil { return err } - for i := len(chain) - 1; i >= 0; i-- { - ts := chain[i] - if err := syncer.ValidateTipSet(context.TODO(), ts); err != nil { - return errors.Wrap(err, "validate tipset failed") - } - - if err := syncer.store.PutTipSet(ts); err != nil { - return errors.Wrap(err, "PutTipSet failed in SyncCaughtUp") - } - } - if err := syncer.store.PutTipSet(maybeHead); err != nil { return errors.Wrap(err, "failed to put synced tipset to chainstore") } - if syncer.store.Weight(chain[0].TipSet()) > syncer.store.Weight(syncer.head) { - fmt.Println("Accepted new head: ", chain[0].Cids()) - syncer.head = chain[0].TipSet() + if syncer.store.Weight(maybeHead.TipSet()) > syncer.store.Weight(syncer.head) { + fmt.Println("Accepted new head: ", maybeHead.Cids()) + syncer.head = maybeHead.TipSet() } return nil } @@ -475,149 +344,159 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err } -func (syncer *Syncer) collectChainCaughtUp(fts *store.FullTipSet) ([]*store.FullTipSet, error) { - // fetch tipset and messages via bitswap +func (syncer *Syncer) collectHeaders(from *types.TipSet, toHeight uint64) ([]*types.TipSet, error) { + blockSet := []*types.TipSet{from} - chain := []*store.FullTipSet{fts} - cur := fts.TipSet() + at := from.Parents() - startHeight := syncer.head.Height() - - _, err := syncer.store.LoadTipSet(cur.Parents()) - if err != nil { - blockSet := []*types.TipSet{cur} - - at := cur.Cids() - - // If, for some reason, we have a suffix of the chain locally, handle that here - for blockSet[len(blockSet)-1].Height() > startHeight { - log.Warn("syncing local: ", at) - ts, err := syncer.store.LoadTipSet(at) - if err != nil { - if err == bstore.ErrNotFound { - log.Info("tipset not found locally, starting sync: ", at) - break - } - log.Warn("loading local tipset: %s", err) - continue // TODO: verify - } - - blockSet = append(blockSet, ts) - at = ts.Parents() - } - - for blockSet[len(blockSet)-1].Height() > startHeight { - // NB: GetBlocks validates that the blocks are in-fact the ones we - // requested, and that they are correctly linked to eachother. It does - // not validate any state transitions - fmt.Println("Get blocks") - blks, err := syncer.Bsync.GetBlocks(context.TODO(), at, 10) - if err != nil { - // Most likely our peers aren't fully synced yet, but forwarded - // new block message (ideally we'd find better peers) - - log.Error("failed to get blocks: ", err) - - // This error will only be logged above, - return nil, xerrors.Errorf("failed to get blocks: %w", err) - } - - for _, b := range blks { - blockSet = append(blockSet, b) - } - - at = blks[len(blks)-1].Parents() - } - - if startHeight == 0 { - // hacks. in the case that we request X blocks starting at height X+1, we - // won't get the Genesis block in the returned blockset. This hacks around it - if blockSet[len(blockSet)-1].Height() != 0 { - blockSet = append(blockSet, syncer.Genesis) - } - - blockSet = reverse(blockSet) - - genesis := blockSet[0] - if !genesis.Equals(syncer.Genesis) { - // TODO: handle this... - log.Errorf("We synced to the wrong chain! %s != %s", genesis, syncer.Genesis) - panic("We synced to the wrong chain") - } - } - - for _, ts := range blockSet { - for _, b := range ts.Blocks() { - if err := syncer.store.PersistBlockHeader(b); err != nil { - log.Errorf("failed to persist synced blocks to the chainstore: %s", err) - panic("bbbbb") - } - } - } - - // Fetch all the messages for all the blocks in this chain - - windowSize := uint64(10) - for i := uint64(0); i <= cur.Height(); i += windowSize { - bs := bstore.NewBlockstore(dstore.NewMapDatastore()) - cst := hamt.CSTFromBstore(bs) - - nextHeight := i + windowSize - 1 - if nextHeight > cur.Height() { - nextHeight = cur.Height() - } - - log.Infof("Fetch next messages on %d (len(blockSet)=%d)", nextHeight, len(blockSet)) - next := blockSet[nextHeight] - bstips, err := syncer.Bsync.GetChainMessages(context.TODO(), next, (nextHeight+1)-i) - if err != nil { - log.Errorf("failed to fetch messages: %s", err) - return nil, xerrors.Errorf("message processing failed: %w", err) - } - - for bsi := 0; bsi < len(bstips); bsi++ { - cur := blockSet[i+uint64(bsi)] - bstip := bstips[len(bstips)-(bsi+1)] - fmt.Println("that loop: ", bsi, len(bstips)) - fts, err := zipTipSetAndMessages(cst, cur, bstip.Messages, bstip.MsgIncludes) - if err != nil { - log.Error("zipping failed: ", err, bsi, i) - log.Error("height: ", cur.Height()) - log.Error("bstips: ", bstips) - log.Error("next height: ", nextHeight) - return nil, xerrors.Errorf("message processing failed: %w", err) - } - - if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil { - log.Errorf("failed to validate tipset: %s", err) - return nil, xerrors.Errorf("message processing failed: %w", err) - } - } - - for _, bst := range bstips { - for _, m := range bst.Messages { - if _, err := cst.Put(context.TODO(), m); err != nil { - log.Error("failed to persist messages: ", err) - return nil, xerrors.Errorf("message processing failed: %w", err) - } - } - } - - if err := copyBlockstore(bs, syncer.store.Blockstore()); err != nil { - log.Errorf("failed to persist temp blocks: %s", err) - return nil, xerrors.Errorf("message processing failed: %w", err) - } - } - - //log.Errorf("dont have parent blocks for sync tipset: %s", err) - //panic("should do something better, like fetch? or error?") - - _, err = syncer.store.LoadTipSet(cur.Parents()) + // If, for some reason, we have a suffix of the chain locally, handle that here + for blockSet[len(blockSet)-1].Height() > toHeight { + log.Warn("syncing local: ", at) + ts, err := syncer.store.LoadTipSet(at) if err != nil { - log.Errorf("dont have parent blocks for sync tipset: %s", err) - panic("should do something better, error?") + if err == bstore.ErrNotFound { + log.Info("tipset not found locally, starting sync: ", at) + break + } + log.Warn("loading local tipset: %s", err) + continue // TODO: verify + } + + blockSet = append(blockSet, ts) + at = ts.Parents() + } + + for blockSet[len(blockSet)-1].Height() > toHeight { + // NB: GetBlocks validates that the blocks are in-fact the ones we + // requested, and that they are correctly linked to eachother. It does + // not validate any state transitions + fmt.Println("Get blocks") + blks, err := syncer.Bsync.GetBlocks(context.TODO(), at, 10) + if err != nil { + // Most likely our peers aren't fully synced yet, but forwarded + // new block message (ideally we'd find better peers) + + log.Error("failed to get blocks: ", err) + + // This error will only be logged above, + return nil, xerrors.Errorf("failed to get blocks: %w", err) + } + + for _, b := range blks { + blockSet = append(blockSet, b) + } + + at = blks[len(blks)-1].Parents() + } + + if toHeight == 0 { + // hacks. in the case that we request X blocks starting at height X+1, we + // won't get the Genesis block in the returned blockset. This hacks around it + if blockSet[len(blockSet)-1].Height() != 0 { + blockSet = append(blockSet, syncer.Genesis) + } + + blockSet = reverse(blockSet) + + genesis := blockSet[0] + if !genesis.Equals(syncer.Genesis) { + // TODO: handle this... + log.Errorf("We synced to the wrong chain! %s != %s", genesis, syncer.Genesis) + panic("We synced to the wrong chain") } } - return chain, nil // return the chain because we have this last block in our cache already. + return blockSet, nil +} + +func (syncer *Syncer) syncMessagesAndCheckState(headers []*types.TipSet) error { + // Fetch all the messages for all the blocks in this chain + cur := headers[len(headers)-1] + + windowSize := uint64(10) + for i := uint64(0); i <= cur.Height(); i += windowSize { + ds := dstore.NewMapDatastore() + bs := bstore.NewBlockstore(ds) + cst := hamt.CSTFromBstore(bs) + + nextHeight := i + windowSize - 1 + if nextHeight > cur.Height() { + nextHeight = cur.Height() + } + + next := headers[nextHeight] + bstips, err := syncer.Bsync.GetChainMessages(context.TODO(), next, (nextHeight+1)-i) + if err != nil { + log.Errorf("failed to fetch messages: %s", err) + return xerrors.Errorf("message processing failed: %w", err) + } + + for bsi := 0; bsi < len(bstips); bsi++ { + this := headers[i+uint64(bsi)] + bstip := bstips[len(bstips)-(bsi+1)] + fts, err := zipTipSetAndMessages(cst, this, bstip.Messages, bstip.MsgIncludes) + if err != nil { + log.Error("zipping failed: ", err, bsi, i) + log.Error("height: ", this.Height()) + log.Error("bstips: ", bstips) + log.Error("next height: ", nextHeight) + return xerrors.Errorf("message processing failed: %w", err) + } + + if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil { + log.Errorf("failed to validate tipset: %s", err) + return xerrors.Errorf("message processing failed: %w", err) + } + } + + for _, bst := range bstips { + for _, m := range bst.Messages { + switch m.Signature.Type { + case types.KTBLS: + //log.Infof("putting BLS message: %s", m.Cid()) + if _, err := store.PutMessage(bs, &m.Message); err != nil { + log.Error("failed to persist messages: ", err) + return xerrors.Errorf("BLS message processing failed: %w", err) + } + case types.KTSecp256k1: + //log.Infof("putting secp256k1 message: %s", m.Cid()) + if _, err := store.PutMessage(bs, m); err != nil { + log.Error("failed to persist messages: ", err) + return xerrors.Errorf("secp256k1 message processing failed: %w", err) + } + default: + return xerrors.Errorf("unknown signature type on message %s: %q", m.Cid(), m.Signature.TypeCode) + } + } + } + + if err := copyBlockstore(bs, syncer.store.Blockstore()); err != nil { + return xerrors.Errorf("message processing failed: %w", err) + } + } + + return nil +} + +func (syncer *Syncer) collectChain(fts *store.FullTipSet) error { + curHeight := syncer.head.Height() + + headers, err := syncer.collectHeaders(fts.TipSet(), curHeight) + if err != nil { + return err + } + + for _, ts := range headers { + for _, b := range ts.Blocks() { + if err := syncer.store.PersistBlockHeader(b); err != nil { + return xerrors.Errorf("failed to persist synced blocks to the chainstore: %w", err) + } + } + } + + if err := syncer.syncMessagesAndCheckState(headers); err != nil { + return err + } + + return nil } diff --git a/chain/sync_test.go b/chain/sync_test.go index 08ac2a598..b44213cc9 100644 --- a/chain/sync_test.go +++ b/chain/sync_test.go @@ -55,7 +55,7 @@ type syncTestUtil struct { mn mocknet.Mocknet genesis []byte - blocks []*types.FullBlock + blocks []*types.FullBlock nds []api.FullNode } @@ -163,8 +163,8 @@ func (tu *syncTestUtil) submitSourceBlock(to int, h int) { var b chain.BlockMsg // -1 to match block.Height - b.Header = tu.blocks[h - 1].Header - for _, msg := range tu.blocks[h - 1].Messages { + b.Header = tu.blocks[h-1].Header + for _, msg := range tu.blocks[h-1].Messages { c, err := tu.nds[to].(*impl.FullNodeAPI).Chain.PutMessage(msg) require.NoError(tu.t, err) @@ -176,12 +176,12 @@ func (tu *syncTestUtil) submitSourceBlock(to int, h int) { func (tu *syncTestUtil) submitSourceBlocks(to int, h int, n int) { for i := 0; i < n; i++ { - tu.submitSourceBlock(to, h + i) + tu.submitSourceBlock(to, h+i) } } func TestSyncSimple(t *testing.T) { - H := 20 + H := 21 tu := prepSyncTest(t, H) client := tu.addClientNode() @@ -235,4 +235,4 @@ func TestSyncIncoming(t *testing.T) { tu.compareSourceState(client) } - */ +*/ diff --git a/go.mod b/go.mod index 454dc74d4..e46c1852e 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/ipfs/go-ds-badger v0.0.5 github.com/ipfs/go-filestore v0.0.2 github.com/ipfs/go-fs-lock v0.0.1 - github.com/ipfs/go-hamt-ipld v0.0.0-20190613164304-cd074602062f + github.com/ipfs/go-hamt-ipld v0.0.10 github.com/ipfs/go-ipfs-blockstore v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.1 github.com/ipfs/go-ipfs-exchange-interface v0.0.1 @@ -61,7 +61,7 @@ require ( github.com/multiformats/go-multiaddr-net v0.0.1 github.com/multiformats/go-multihash v0.0.6 github.com/pkg/errors v0.8.1 - github.com/polydawn/refmt v0.0.0-20190408063855-01bf1e26dd14 + github.com/polydawn/refmt v0.0.0-20190731040541-eff0b363297a github.com/prometheus/common v0.6.0 github.com/smartystreets/assertions v1.0.1 // indirect github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945 // indirect @@ -77,7 +77,7 @@ require ( golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 // indirect golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 // indirect golang.org/x/sync v0.0.0-20190423024810-112230192c58 // indirect - golang.org/x/sys v0.0.0-20190726002231-94b544f455ef // indirect + golang.org/x/sys v0.0.0-20190730183949-1393eb018365 // indirect golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 gopkg.in/urfave/cli.v2 v2.0.0-20180128182452-d3ae77c26ac8 launchpad.net/gocheck v0.0.0-20140225173054-000000000087 // indirect diff --git a/go.sum b/go.sum index 0f87ce619..4708c9385 100644 --- a/go.sum +++ b/go.sum @@ -157,8 +157,8 @@ github.com/ipfs/go-filestore v0.0.2 h1:pcYwpjtXXwirtbjBXKVJM9CTa9F7/8v1EkfnDaHTO github.com/ipfs/go-filestore v0.0.2/go.mod h1:KnZ41qJsCt2OX2mxZS0xsK3Psr0/oB93HMMssLujjVc= github.com/ipfs/go-fs-lock v0.0.1 h1:XHX8uW4jQBYWHj59XXcjg7BHlHxV9ZOYs6Y43yb7/l0= github.com/ipfs/go-fs-lock v0.0.1/go.mod h1:DNBekbboPKcxs1aukPSaOtFA3QfSdi5C855v0i9XJ8Y= -github.com/ipfs/go-hamt-ipld v0.0.0-20190613164304-cd074602062f h1:CpQZA1HsuaRQaFIUq7h/KqSyclyp/LrpcyifPnKRT2k= -github.com/ipfs/go-hamt-ipld v0.0.0-20190613164304-cd074602062f/go.mod h1:WrX60HHX2SeMb602Z1s9Ztnf/4fzNHzwH9gxNTVpEmk= +github.com/ipfs/go-hamt-ipld v0.0.10 h1:jmJGsV/8OPpBEmO+b1nAPpqX8SG2kLeYveKk8F7IxG4= +github.com/ipfs/go-hamt-ipld v0.0.10/go.mod h1:WrX60HHX2SeMb602Z1s9Ztnf/4fzNHzwH9gxNTVpEmk= github.com/ipfs/go-ipfs-blockstore v0.0.1 h1:O9n3PbmTYZoNhkgkEyrXTznbmktIXif62xLX+8dPHzc= github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08= github.com/ipfs/go-ipfs-blocksutil v0.0.1 h1:Eh/H4pc1hsvhzsQoMEP3Bke/aW5P5rVM1IWFJMcGIPQ= @@ -461,6 +461,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/polydawn/refmt v0.0.0-20190221155625-df39d6c2d992/go.mod h1:uIp+gprXxxrWSjjklXD+mN4wed/tMfjMMmN/9+JsA9o= github.com/polydawn/refmt v0.0.0-20190408063855-01bf1e26dd14 h1:2m16U/rLwVaRdz7ANkHtHTodP3zTP3N451MADg64x5k= github.com/polydawn/refmt v0.0.0-20190408063855-01bf1e26dd14/go.mod h1:uIp+gprXxxrWSjjklXD+mN4wed/tMfjMMmN/9+JsA9o= +github.com/polydawn/refmt v0.0.0-20190731040541-eff0b363297a h1:TdavzKWkPcC2G+6rKJclm/JfrWC6WZFfLUR7EJJX8MA= +github.com/polydawn/refmt v0.0.0-20190731040541-eff0b363297a/go.mod h1:uIp+gprXxxrWSjjklXD+mN4wed/tMfjMMmN/9+JsA9o= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= @@ -622,8 +624,8 @@ golang.org/x/sys v0.0.0-20190524122548-abf6ff778158/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190526052359-791d8a0f4d09/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190610200419-93c9922d18ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190726002231-94b544f455ef h1:vwqipsjwy3Y8/PQk/LmiaFjos8aOnU6Tt6oRXKD3org= -golang.org/x/sys v0.0.0-20190726002231-94b544f455ef/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190730183949-1393eb018365 h1:SaXEMXhWzMJThc05vu6uh61Q245r4KaWMrsTedk0FDc= +golang.org/x/sys v0.0.0-20190730183949-1393eb018365/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=