Merge pull request #93 from filecoin-project/fix/catch-up-many
Single mode sync
This commit is contained in:
commit
785199c3d8
@ -81,6 +81,7 @@ type FullNode interface {
|
||||
|
||||
MpoolPending(context.Context, *types.TipSet) ([]*types.SignedMessage, error)
|
||||
MpoolPush(context.Context, *types.SignedMessage) error
|
||||
MpoolGetNonce(context.Context, address.Address) (uint64, error)
|
||||
|
||||
// FullNodeStruct
|
||||
|
||||
@ -99,9 +100,6 @@ type FullNode interface {
|
||||
WalletSign(context.Context, address.Address, []byte) (*types.Signature, error)
|
||||
WalletDefaultAddress(context.Context) (address.Address, error)
|
||||
|
||||
// Really not sure where this belongs. It could go on the wallet, or the message pool, or the chain...
|
||||
MpoolGetNonce(context.Context, address.Address) (uint64, error)
|
||||
|
||||
// Other
|
||||
|
||||
// ClientImport imports file under the specified path into filestore
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
bserv "github.com/ipfs/go-blockservice"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/protocol"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-lotus/chain/store"
|
||||
"github.com/filecoin-project/go-lotus/chain/types"
|
||||
@ -87,7 +88,7 @@ func (bss *BlockSyncService) HandleStream(s inet.Stream) {
|
||||
log.Errorf("failed to read block sync request: %s", err)
|
||||
return
|
||||
}
|
||||
log.Errorf("block sync request for: %s %d", req.Start, req.RequestLength)
|
||||
log.Infof("block sync request for: %s %d", req.Start, req.RequestLength)
|
||||
|
||||
resp, err := bss.processRequest(&req)
|
||||
if err != nil {
|
||||
@ -128,19 +129,16 @@ func (bss *BlockSyncService) collectChainSegment(start []cid.Cid, length uint64,
|
||||
}
|
||||
|
||||
if opts.IncludeMessages {
|
||||
log.Error("INCLUDING MESSAGES IN SYNC RESPONSE")
|
||||
msgs, mincl, err := bss.gatherMessages(ts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
log.Errorf("messages: ", msgs)
|
||||
|
||||
bst.Messages = msgs
|
||||
bst.MsgIncludes = mincl
|
||||
}
|
||||
|
||||
if opts.IncludeBlocks {
|
||||
log.Error("INCLUDING BLOCKS IN SYNC RESPONSE")
|
||||
bst.Blocks = ts.Blocks()
|
||||
}
|
||||
|
||||
@ -164,7 +162,7 @@ func (bss *BlockSyncService) gatherMessages(ts *types.TipSet) ([]*types.SignedMe
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
log.Errorf("MESSAGES FOR BLOCK: %d", len(msgs))
|
||||
log.Infof("MESSAGES FOR BLOCK: %d", len(msgs))
|
||||
|
||||
msgindexes := make([]int, 0, len(msgs))
|
||||
for _, m := range msgs {
|
||||
@ -209,22 +207,7 @@ func (bs *BlockSync) getPeers() []peer.ID {
|
||||
return out
|
||||
}
|
||||
|
||||
func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int) ([]*types.TipSet, error) {
|
||||
peers := bs.getPeers()
|
||||
perm := rand.Perm(len(peers))
|
||||
// TODO: round robin through these peers on error
|
||||
|
||||
req := &BlockSyncRequest{
|
||||
Start: tipset,
|
||||
RequestLength: uint64(count),
|
||||
Options: BSOptBlocks,
|
||||
}
|
||||
|
||||
res, err := bs.sendRequestToPeer(ctx, peers[perm[0]], req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (bs *BlockSync) processStatus(req *BlockSyncRequest, res *BlockSyncResponse) ([]*types.TipSet, error) {
|
||||
switch res.Status {
|
||||
case 0: // Success
|
||||
return bs.processBlocksResponse(req, res)
|
||||
@ -241,6 +224,34 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int)
|
||||
}
|
||||
}
|
||||
|
||||
func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int) ([]*types.TipSet, error) {
|
||||
peers := bs.getPeers()
|
||||
perm := rand.Perm(len(peers))
|
||||
// TODO: round robin through these peers on error
|
||||
|
||||
req := &BlockSyncRequest{
|
||||
Start: tipset,
|
||||
RequestLength: uint64(count),
|
||||
Options: BSOptBlocks,
|
||||
}
|
||||
|
||||
var err error
|
||||
var res *BlockSyncResponse
|
||||
for _, p := range perm {
|
||||
res, err = bs.sendRequestToPeer(ctx, peers[p], req)
|
||||
if err != nil {
|
||||
log.Warnf("BlockSync request failed for peer %s: %s", peers[p].String(), err)
|
||||
continue
|
||||
}
|
||||
|
||||
ts, err := bs.processStatus(req, res)
|
||||
if err == nil {
|
||||
return ts, nil
|
||||
}
|
||||
}
|
||||
return nil, xerrors.Errorf("GetBlocks failed with all peers: %w", err)
|
||||
}
|
||||
|
||||
func (bs *BlockSync) GetFullTipSet(ctx context.Context, p peer.ID, h []cid.Cid) (*store.FullTipSet, error) {
|
||||
// TODO: round robin through these peers on error
|
||||
|
||||
|
123
chain/gen/gen.go
123
chain/gen/gen.go
@ -1,7 +1,14 @@
|
||||
package gen
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/ipfs/go-blockservice"
|
||||
"github.com/ipfs/go-car"
|
||||
offline "github.com/ipfs/go-ipfs-exchange-offline"
|
||||
"github.com/ipfs/go-merkledag"
|
||||
|
||||
"github.com/filecoin-project/go-lotus/chain/address"
|
||||
"github.com/filecoin-project/go-lotus/chain/store"
|
||||
@ -17,6 +24,8 @@ import (
|
||||
|
||||
var log = logging.Logger("gen")
|
||||
|
||||
const msgsPerBlock = 20
|
||||
|
||||
type ChainGen struct {
|
||||
accounts []address.Address
|
||||
|
||||
@ -26,11 +35,18 @@ type ChainGen struct {
|
||||
|
||||
cs *store.ChainStore
|
||||
|
||||
genesis *types.BlockHeader
|
||||
|
||||
genesis *types.BlockHeader
|
||||
curBlock *types.FullBlock
|
||||
|
||||
miner address.Address
|
||||
w *wallet.Wallet
|
||||
|
||||
miner address.Address
|
||||
receivers []address.Address
|
||||
banker address.Address
|
||||
bankerNonce uint64
|
||||
|
||||
r repo.Repo
|
||||
lr repo.LockedRepo
|
||||
}
|
||||
|
||||
type mybs struct {
|
||||
@ -40,7 +56,8 @@ type mybs struct {
|
||||
func (m mybs) Get(c cid.Cid) (block.Block, error) {
|
||||
b, err := m.Blockstore.Get(c)
|
||||
if err != nil {
|
||||
log.Errorf("Get failed: %s %s", c, err)
|
||||
// change to error for stacktraces, don't commit with that pls
|
||||
log.Warnf("Get failed: %s %s", c, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -48,18 +65,23 @@ func (m mybs) Get(c cid.Cid) (block.Block, error) {
|
||||
}
|
||||
|
||||
func NewGenerator() (*ChainGen, error) {
|
||||
|
||||
mr := repo.NewMemory(nil)
|
||||
lr, err := mr.Lock()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ds, err := lr.Datastore("/blocks")
|
||||
ds, err := lr.Datastore("/metadata")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
bs := mybs{blockstore.NewBlockstore(ds)}
|
||||
|
||||
bds, err := lr.Datastore("/blocks")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bs := mybs{blockstore.NewIdStore(blockstore.NewBlockstore(bds))}
|
||||
|
||||
ks, err := lr.KeyStore()
|
||||
if err != nil {
|
||||
@ -76,12 +98,22 @@ func NewGenerator() (*ChainGen, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
banker, err := w.GenerateKey(types.KTBLS)
|
||||
// KTBLS doesn't support signature verification or something like that yet
|
||||
banker, err := w.GenerateKey(types.KTSecp256k1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
receievers := make([]address.Address, msgsPerBlock)
|
||||
for r := range receievers {
|
||||
receievers[r], err = w.GenerateKey(types.KTBLS)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
genb, err := MakeGenesisBlock(bs, map[address.Address]types.BigInt{
|
||||
miner: types.NewInt(5),
|
||||
banker: types.NewInt(90000000),
|
||||
})
|
||||
if err != nil {
|
||||
@ -90,8 +122,6 @@ func NewGenerator() (*ChainGen, error) {
|
||||
|
||||
cs := store.NewChainStore(bs, ds)
|
||||
|
||||
msgsPerBlock := 10
|
||||
|
||||
genfb := &types.FullBlock{Header: genb.Genesis}
|
||||
|
||||
if err := cs.SetGenesis(genb.Genesis); err != nil {
|
||||
@ -103,8 +133,16 @@ func NewGenerator() (*ChainGen, error) {
|
||||
cs: cs,
|
||||
msgsPerBlock: msgsPerBlock,
|
||||
genesis: genb.Genesis,
|
||||
miner: miner,
|
||||
curBlock: genfb,
|
||||
w: w,
|
||||
|
||||
miner: miner,
|
||||
banker: banker,
|
||||
receivers: receievers,
|
||||
|
||||
curBlock: genfb,
|
||||
|
||||
r: mr,
|
||||
lr: lr,
|
||||
}
|
||||
|
||||
return gen, nil
|
||||
@ -114,6 +152,20 @@ func (cg *ChainGen) Genesis() *types.BlockHeader {
|
||||
return cg.genesis
|
||||
}
|
||||
|
||||
func (cg *ChainGen) GenesisCar() ([]byte, error) {
|
||||
offl := offline.Exchange(cg.bs)
|
||||
blkserv := blockservice.New(cg.bs, offl)
|
||||
dserv := merkledag.NewDAGService(blkserv)
|
||||
|
||||
out := new(bytes.Buffer)
|
||||
|
||||
if err := car.WriteCar(context.TODO(), dserv, []cid.Cid{cg.Genesis().Cid()}, out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return out.Bytes(), nil
|
||||
}
|
||||
|
||||
func (cg *ChainGen) nextBlockProof() (address.Address, types.ElectionProof, []types.Ticket, error) {
|
||||
return cg.miner, []byte("cat in a box"), []types.Ticket{types.Ticket("im a ticket, promise")}, nil
|
||||
}
|
||||
@ -124,7 +176,45 @@ func (cg *ChainGen) NextBlock() (*types.FullBlock, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var msgs []*types.SignedMessage
|
||||
// make some transfers from banker
|
||||
|
||||
msgs := make([]*types.SignedMessage, cg.msgsPerBlock)
|
||||
for m := range msgs {
|
||||
msg := types.Message{
|
||||
To: cg.receivers[m],
|
||||
From: cg.banker,
|
||||
|
||||
Nonce: atomic.AddUint64(&cg.bankerNonce, 1) - 1,
|
||||
|
||||
Value: types.NewInt(uint64(m + 1)),
|
||||
|
||||
Method: 0,
|
||||
|
||||
GasLimit: types.NewInt(10000),
|
||||
GasPrice: types.NewInt(0),
|
||||
}
|
||||
|
||||
unsigned, err := msg.Serialize()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sig, err := cg.w.Sign(cg.banker, unsigned)
|
||||
if err != nil {
|
||||
return &types.FullBlock{}, err
|
||||
}
|
||||
|
||||
msgs[m] = &types.SignedMessage{
|
||||
Message: msg,
|
||||
Signature: *sig,
|
||||
}
|
||||
|
||||
if _, err := cg.cs.PutMessage(msgs[m]); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// create block
|
||||
|
||||
parents, err := types.NewTipSet([]*types.BlockHeader{cg.curBlock.Header})
|
||||
if err != nil {
|
||||
@ -144,3 +234,10 @@ func (cg *ChainGen) NextBlock() (*types.FullBlock, error) {
|
||||
|
||||
return fblk, nil
|
||||
}
|
||||
|
||||
func (cg *ChainGen) YieldRepo() (repo.Repo, error) {
|
||||
if err := cg.lr.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return cg.r, nil
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"github.com/filecoin-project/go-lotus/chain/address"
|
||||
"github.com/filecoin-project/go-lotus/chain/store"
|
||||
"github.com/filecoin-project/go-lotus/chain/types"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type MessagePool struct {
|
||||
@ -128,7 +129,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
|
||||
for _, b := range ts.Blocks() {
|
||||
msgs, err := mp.cs.MessagesForBlock(b)
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.Wrapf(err, "failed to get messages for revert block %s(height %d)", b.Cid(), b.Height)
|
||||
}
|
||||
for _, msg := range msgs {
|
||||
if err := mp.Add(msg); err != nil {
|
||||
@ -142,7 +143,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
|
||||
for _, b := range ts.Blocks() {
|
||||
msgs, err := mp.cs.MessagesForBlock(b)
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.Wrapf(err, "failed to get messages for apply block %s(height %d) (msgroot = %s)", b.Cid(), b.Height, b.Messages)
|
||||
}
|
||||
for _, msg := range msgs {
|
||||
mp.Remove(msg)
|
||||
|
@ -35,7 +35,7 @@ func NewStateTree(cst *hamt.CborIpldStore) (*StateTree, error) {
|
||||
func LoadStateTree(cst *hamt.CborIpldStore, c cid.Cid) (*StateTree, error) {
|
||||
nd, err := hamt.LoadNode(context.Background(), cst, c)
|
||||
if err != nil {
|
||||
log.Errorf("loading hamt node failed: %s", err)
|
||||
log.Errorf("loading hamt node %s failed: %s", c, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -187,9 +187,12 @@ func (cs *ChainStore) MaybeTakeHeavierTipSet(ts *types.TipSet) error {
|
||||
cs.heaviestLk.Lock()
|
||||
defer cs.heaviestLk.Unlock()
|
||||
if cs.heaviest == nil || cs.Weight(ts) > cs.Weight(cs.heaviest) {
|
||||
// TODO: don't do this for initial sync. Now that we don't have a
|
||||
// difference between 'bootstrap sync' and 'caught up' sync, we need
|
||||
// some other heuristic.
|
||||
revert, apply, err := cs.ReorgOps(cs.heaviest, ts)
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.Wrap(err, "computing reorg ops failed")
|
||||
}
|
||||
for _, hcf := range cs.headChangeNotifs {
|
||||
if err := hcf(revert, apply); err != nil {
|
||||
@ -290,6 +293,7 @@ func (cs *ChainStore) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.Ti
|
||||
rightChain = append(rightChain, right)
|
||||
par, err := cs.LoadTipSet(right.Parents())
|
||||
if err != nil {
|
||||
log.Infof("failed to fetch right.Parents: %s", err)
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
@ -336,13 +340,21 @@ type storable interface {
|
||||
ToStorageBlock() (block.Block, error)
|
||||
}
|
||||
|
||||
func (cs *ChainStore) PutMessage(m storable) (cid.Cid, error) {
|
||||
sb, err := m.ToStorageBlock()
|
||||
func PutMessage(bs blockstore.Blockstore, m storable) (cid.Cid, error) {
|
||||
b, err := m.ToStorageBlock()
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
return sb.Cid(), cs.bs.Put(sb)
|
||||
if err := bs.Put(b); err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
return b.Cid(), nil
|
||||
}
|
||||
|
||||
func (cs *ChainStore) PutMessage(m storable) (cid.Cid, error) {
|
||||
return PutMessage(cs.bs, m)
|
||||
}
|
||||
|
||||
func (cs *ChainStore) AddBlock(b *types.BlockHeader) error {
|
||||
@ -395,6 +407,7 @@ func (cs *ChainStore) TipSetState(cids []cid.Cid) (cid.Cid, error) {
|
||||
func (cs *ChainStore) GetMessage(c cid.Cid) (*types.SignedMessage, error) {
|
||||
sb, err := cs.bs.Get(c)
|
||||
if err != nil {
|
||||
log.Errorf("get message get failed: %s: %s", c, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -428,7 +441,7 @@ func (cs *ChainStore) MessageCidsForBlock(b *types.BlockHeader) ([]cid.Cid, erro
|
||||
func (cs *ChainStore) MessagesForBlock(b *types.BlockHeader) ([]*types.SignedMessage, error) {
|
||||
cids, err := cs.MessageCidsForBlock(b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errors.Wrap(err, "loading message cids for block")
|
||||
}
|
||||
|
||||
return cs.LoadMessagesFromCids(cids)
|
||||
@ -461,10 +474,10 @@ func (cs *ChainStore) GetReceipt(b *types.BlockHeader, i int) (*types.MessageRec
|
||||
|
||||
func (cs *ChainStore) LoadMessagesFromCids(cids []cid.Cid) ([]*types.SignedMessage, error) {
|
||||
msgs := make([]*types.SignedMessage, 0, len(cids))
|
||||
for _, c := range cids {
|
||||
for i, c := range cids {
|
||||
m, err := cs.GetMessage(c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errors.Wrapf(err, "failed to get message: (%s):%d", c, i)
|
||||
}
|
||||
|
||||
msgs = append(msgs, m)
|
||||
|
448
chain/sync.go
448
chain/sync.go
@ -5,6 +5,8 @@ import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-lotus/chain/actors"
|
||||
"github.com/filecoin-project/go-lotus/chain/store"
|
||||
"github.com/filecoin-project/go-lotus/chain/types"
|
||||
@ -15,7 +17,7 @@ import (
|
||||
"github.com/ipfs/go-hamt-ipld"
|
||||
bstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
logging "github.com/ipfs/go-log"
|
||||
peer "github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/whyrusleeping/sharray"
|
||||
)
|
||||
@ -34,9 +36,6 @@ type Syncer struct {
|
||||
// The known Genesis tipset
|
||||
Genesis *types.TipSet
|
||||
|
||||
// the current mode the syncer is in
|
||||
syncMode SyncMode
|
||||
|
||||
syncLock sync.Mutex
|
||||
|
||||
// TipSets known to be invalid
|
||||
@ -65,7 +64,6 @@ func NewSyncer(cs *store.ChainStore, bsync *BlockSync, self peer.ID) (*Syncer, e
|
||||
}
|
||||
|
||||
return &Syncer{
|
||||
syncMode: Bootstrap,
|
||||
Genesis: gent,
|
||||
Bsync: bsync,
|
||||
peerHeads: make(map[peer.ID]*types.TipSet),
|
||||
@ -75,53 +73,10 @@ func NewSyncer(cs *store.ChainStore, bsync *BlockSync, self peer.ID) (*Syncer, e
|
||||
}, nil
|
||||
}
|
||||
|
||||
type SyncMode int
|
||||
|
||||
const (
|
||||
Unknown = SyncMode(iota)
|
||||
Bootstrap
|
||||
CaughtUp
|
||||
)
|
||||
|
||||
type BadTipSetCache struct {
|
||||
badBlocks map[cid.Cid]struct{}
|
||||
}
|
||||
|
||||
type BlockSet struct {
|
||||
tset map[uint64]*types.TipSet
|
||||
head *types.TipSet
|
||||
}
|
||||
|
||||
func (bs *BlockSet) Insert(ts *types.TipSet) {
|
||||
if bs.tset == nil {
|
||||
bs.tset = make(map[uint64]*types.TipSet)
|
||||
}
|
||||
|
||||
if bs.head == nil || ts.Height() > bs.head.Height() {
|
||||
bs.head = ts
|
||||
}
|
||||
bs.tset[ts.Height()] = ts
|
||||
}
|
||||
|
||||
func (bs *BlockSet) GetByHeight(h uint64) *types.TipSet {
|
||||
return bs.tset[h]
|
||||
}
|
||||
|
||||
func (bs *BlockSet) PersistTo(cs *store.ChainStore) error {
|
||||
for _, ts := range bs.tset {
|
||||
for _, b := range ts.Blocks() {
|
||||
if err := cs.PersistBlockHeader(b); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (bs *BlockSet) Head() *types.TipSet {
|
||||
return bs.head
|
||||
}
|
||||
|
||||
const BootstrapPeerThreshold = 1
|
||||
|
||||
// InformNewHead informs the syncer about a new potential tipset
|
||||
@ -134,13 +89,8 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
|
||||
if from == syncer.self {
|
||||
// TODO: this is kindof a hack...
|
||||
log.Infof("got block from ourselves")
|
||||
syncer.syncLock.Lock()
|
||||
defer syncer.syncLock.Unlock()
|
||||
|
||||
if syncer.syncMode == Bootstrap {
|
||||
syncer.syncMode = CaughtUp
|
||||
}
|
||||
if err := syncer.SyncCaughtUp(fts); err != nil {
|
||||
if err := syncer.Sync(fts); err != nil {
|
||||
log.Errorf("failed to sync our own block: %s", err)
|
||||
}
|
||||
|
||||
@ -152,32 +102,12 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
|
||||
syncer.Bsync.AddPeer(from)
|
||||
|
||||
go func() {
|
||||
syncer.syncLock.Lock()
|
||||
defer syncer.syncLock.Unlock()
|
||||
|
||||
switch syncer.syncMode {
|
||||
case Bootstrap:
|
||||
syncer.SyncBootstrap()
|
||||
case CaughtUp:
|
||||
if err := syncer.SyncCaughtUp(fts); err != nil {
|
||||
log.Errorf("sync error: %s", err)
|
||||
}
|
||||
case Unknown:
|
||||
panic("invalid syncer state")
|
||||
if err := syncer.Sync(fts); err != nil {
|
||||
log.Errorf("sync error: %s", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (syncer *Syncer) GetPeers() []peer.ID {
|
||||
syncer.peerHeadsLk.Lock()
|
||||
defer syncer.peerHeadsLk.Unlock()
|
||||
var out []peer.ID
|
||||
for p, _ := range syncer.peerHeads {
|
||||
out = append(out, p)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func (syncer *Syncer) InformNewBlock(from peer.ID, blk *types.FullBlock) {
|
||||
// TODO: search for other blocks that could form a tipset with this block
|
||||
// and then send that tipset to InformNewHead
|
||||
@ -186,148 +116,6 @@ func (syncer *Syncer) InformNewBlock(from peer.ID, blk *types.FullBlock) {
|
||||
syncer.InformNewHead(from, fts)
|
||||
}
|
||||
|
||||
// SyncBootstrap is used to synchronise your chain when first joining
|
||||
// the network, or when rejoining after significant downtime.
|
||||
func (syncer *Syncer) SyncBootstrap() {
|
||||
fmt.Println("Sync bootstrap!")
|
||||
defer fmt.Println("bye bye sync bootstrap")
|
||||
ctx := context.Background()
|
||||
|
||||
if syncer.syncMode == CaughtUp {
|
||||
log.Errorf("Called SyncBootstrap while in caught up mode")
|
||||
return
|
||||
}
|
||||
|
||||
selectedHead, err := syncer.selectHead(syncer.peerHeads)
|
||||
if err != nil {
|
||||
log.Error("failed to select head: ", err)
|
||||
return
|
||||
}
|
||||
|
||||
blockSet := []*types.TipSet{selectedHead}
|
||||
cur := selectedHead.Cids()
|
||||
|
||||
// If, for some reason, we have a suffix of the chain locally, handle that here
|
||||
for blockSet[len(blockSet)-1].Height() > 0 {
|
||||
log.Errorf("syncing local: ", cur)
|
||||
ts, err := syncer.store.LoadTipSet(cur)
|
||||
if err != nil {
|
||||
if err == bstore.ErrNotFound {
|
||||
log.Error("not found: ", cur)
|
||||
break
|
||||
}
|
||||
log.Errorf("loading local tipset: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
blockSet = append(blockSet, ts)
|
||||
cur = ts.Parents()
|
||||
}
|
||||
|
||||
for blockSet[len(blockSet)-1].Height() > 0 {
|
||||
// NB: GetBlocks validates that the blocks are in-fact the ones we
|
||||
// requested, and that they are correctly linked to eachother. It does
|
||||
// not validate any state transitions
|
||||
fmt.Println("Get blocks: ", cur)
|
||||
blks, err := syncer.Bsync.GetBlocks(context.TODO(), cur, 10)
|
||||
if err != nil {
|
||||
log.Error("failed to get blocks: ", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, b := range blks {
|
||||
blockSet = append(blockSet, b)
|
||||
}
|
||||
|
||||
cur = blks[len(blks)-1].Parents()
|
||||
}
|
||||
|
||||
// hacks. in the case that we request X blocks starting at height X+1, we
|
||||
// won't get the Genesis block in the returned blockset. This hacks around it
|
||||
if blockSet[len(blockSet)-1].Height() != 0 {
|
||||
blockSet = append(blockSet, syncer.Genesis)
|
||||
}
|
||||
|
||||
blockSet = reverse(blockSet)
|
||||
|
||||
genesis := blockSet[0]
|
||||
if !genesis.Equals(syncer.Genesis) {
|
||||
// TODO: handle this...
|
||||
log.Errorf("We synced to the wrong chain! %s != %s", genesis, syncer.Genesis)
|
||||
return
|
||||
}
|
||||
|
||||
for _, ts := range blockSet {
|
||||
for _, b := range ts.Blocks() {
|
||||
if err := syncer.store.PersistBlockHeader(b); err != nil {
|
||||
log.Errorf("failed to persist synced blocks to the chainstore: %s", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch all the messages for all the blocks in this chain
|
||||
|
||||
windowSize := uint64(10)
|
||||
for i := uint64(0); i <= selectedHead.Height(); i += windowSize {
|
||||
bs := bstore.NewBlockstore(dstore.NewMapDatastore())
|
||||
cst := hamt.CSTFromBstore(bs)
|
||||
|
||||
nextHeight := i + windowSize - 1
|
||||
if nextHeight > selectedHead.Height() {
|
||||
nextHeight = selectedHead.Height()
|
||||
}
|
||||
|
||||
next := blockSet[nextHeight]
|
||||
bstips, err := syncer.Bsync.GetChainMessages(ctx, next, (nextHeight+1)-i)
|
||||
if err != nil {
|
||||
log.Errorf("failed to fetch messages: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
for bsi := 0; bsi < len(bstips); bsi++ {
|
||||
cur := blockSet[i+uint64(bsi)]
|
||||
bstip := bstips[len(bstips)-(bsi+1)]
|
||||
fmt.Println("that loop: ", bsi, len(bstips))
|
||||
fts, err := zipTipSetAndMessages(cst, cur, bstip.Messages, bstip.MsgIncludes)
|
||||
if err != nil {
|
||||
log.Error("zipping failed: ", err, bsi, i)
|
||||
log.Error("height: ", selectedHead.Height())
|
||||
log.Error("bstips: ", bstips)
|
||||
log.Error("next height: ", nextHeight)
|
||||
return
|
||||
}
|
||||
|
||||
if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil {
|
||||
log.Errorf("failed to validate tipset: %s", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
for _, bst := range bstips {
|
||||
for _, m := range bst.Messages {
|
||||
if _, err := cst.Put(context.TODO(), m); err != nil {
|
||||
log.Error("failed to persist messages: ", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := copyBlockstore(bs, syncer.store.Blockstore()); err != nil {
|
||||
log.Errorf("failed to persist temp blocks: %s", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
head := blockSet[len(blockSet)-1]
|
||||
log.Errorf("Finished syncing! new head: %s", head.Cids())
|
||||
if err := syncer.store.MaybeTakeHeavierTipSet(selectedHead); err != nil {
|
||||
log.Errorf("MaybeTakeHeavierTipSet failed: %s", err)
|
||||
}
|
||||
syncer.head = head
|
||||
syncer.syncMode = CaughtUp
|
||||
}
|
||||
|
||||
func reverse(tips []*types.TipSet) []*types.TipSet {
|
||||
out := make([]*types.TipSet, len(tips))
|
||||
for i := 0; i < len(tips); i++ {
|
||||
@ -360,8 +148,6 @@ func zipTipSetAndMessages(cst *hamt.CborIpldStore, ts *types.TipSet, messages []
|
||||
if len(ts.Blocks()) != len(msgincl) {
|
||||
return nil, fmt.Errorf("msgincl length didnt match tipset size")
|
||||
}
|
||||
fmt.Println("zipping messages: ", msgincl)
|
||||
fmt.Println("into block: ", ts.Blocks()[0].Height)
|
||||
|
||||
fts := &store.FullTipSet{}
|
||||
for bi, b := range ts.Blocks() {
|
||||
@ -377,8 +163,6 @@ func zipTipSetAndMessages(cst *hamt.CborIpldStore, ts *types.TipSet, messages []
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fmt.Println("messages: ", msgCids)
|
||||
fmt.Println("message root: ", b.Messages, mroot)
|
||||
if b.Messages != mroot {
|
||||
return nil, fmt.Errorf("messages didnt match message root in header")
|
||||
}
|
||||
@ -469,37 +253,26 @@ func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*store.FullTipSet, erro
|
||||
return fts, nil
|
||||
}
|
||||
|
||||
// SyncCaughtUp is used to stay in sync once caught up to
|
||||
// the rest of the network.
|
||||
func (syncer *Syncer) SyncCaughtUp(maybeHead *store.FullTipSet) error {
|
||||
func (syncer *Syncer) Sync(maybeHead *store.FullTipSet) error {
|
||||
syncer.syncLock.Lock()
|
||||
defer syncer.syncLock.Unlock()
|
||||
|
||||
ts := maybeHead.TipSet()
|
||||
if syncer.Genesis.Equals(ts) {
|
||||
return nil
|
||||
}
|
||||
|
||||
chain, err := syncer.collectChainCaughtUp(maybeHead)
|
||||
if err != nil {
|
||||
if err := syncer.collectChain(maybeHead); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := len(chain) - 1; i >= 0; i-- {
|
||||
ts := chain[i]
|
||||
if err := syncer.ValidateTipSet(context.TODO(), ts); err != nil {
|
||||
return errors.Wrap(err, "validate tipset failed")
|
||||
}
|
||||
|
||||
if err := syncer.store.PutTipSet(ts); err != nil {
|
||||
return errors.Wrap(err, "PutTipSet failed in SyncCaughtUp")
|
||||
}
|
||||
}
|
||||
|
||||
if err := syncer.store.PutTipSet(maybeHead); err != nil {
|
||||
return errors.Wrap(err, "failed to put synced tipset to chainstore")
|
||||
}
|
||||
|
||||
if syncer.store.Weight(chain[0].TipSet()) > syncer.store.Weight(syncer.head) {
|
||||
fmt.Println("Accepted new head: ", chain[0].Cids())
|
||||
syncer.head = chain[0].TipSet()
|
||||
if syncer.store.Weight(maybeHead.TipSet()) > syncer.store.Weight(syncer.head) {
|
||||
fmt.Println("Accepted new head: ", maybeHead.Cids())
|
||||
syncer.head = maybeHead.TipSet()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -571,58 +344,159 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err
|
||||
|
||||
}
|
||||
|
||||
func (syncer *Syncer) Punctual(ts *types.TipSet) bool {
|
||||
return true
|
||||
}
|
||||
func (syncer *Syncer) collectHeaders(from *types.TipSet, toHeight uint64) ([]*types.TipSet, error) {
|
||||
blockSet := []*types.TipSet{from}
|
||||
|
||||
func (syncer *Syncer) collectChainCaughtUp(fts *store.FullTipSet) ([]*store.FullTipSet, error) {
|
||||
// fetch tipset and messages via bitswap
|
||||
at := from.Parents()
|
||||
|
||||
chain := []*store.FullTipSet{fts}
|
||||
cur := fts.TipSet()
|
||||
|
||||
for {
|
||||
ts, err := syncer.store.LoadTipSet(cur.Parents())
|
||||
// If, for some reason, we have a suffix of the chain locally, handle that here
|
||||
for blockSet[len(blockSet)-1].Height() > toHeight {
|
||||
log.Warn("syncing local: ", at)
|
||||
ts, err := syncer.store.LoadTipSet(at)
|
||||
if err != nil {
|
||||
log.Errorf("dont have parent blocks for sync tipset: %s", err)
|
||||
panic("should do something better, like fetch? or error?")
|
||||
}
|
||||
|
||||
return chain, nil // return the chain because we have this last block in our cache already.
|
||||
|
||||
if ts.Equals(syncer.Genesis) {
|
||||
break
|
||||
}
|
||||
|
||||
/*
|
||||
if !syncer.Punctual(ts) {
|
||||
syncer.bad.InvalidateChain(chain)
|
||||
syncer.bad.InvalidateTipSet(ts)
|
||||
return nil, errors.New("tipset forks too far back from head")
|
||||
if err == bstore.ErrNotFound {
|
||||
log.Info("tipset not found locally, starting sync: ", at)
|
||||
break
|
||||
}
|
||||
*/
|
||||
|
||||
chain = append(chain, fts)
|
||||
log.Error("received unknown chain in caught up mode...")
|
||||
panic("for now, we panic...")
|
||||
|
||||
has, err := syncer.store.Contains(ts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if has {
|
||||
// Store has record of this tipset.
|
||||
return chain, nil
|
||||
log.Warn("loading local tipset: %s", err)
|
||||
continue // TODO: verify
|
||||
}
|
||||
|
||||
/*
|
||||
parent, err := syncer.FetchTipSet(context.TODO(), ts.Parents())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ts = parent
|
||||
*/
|
||||
blockSet = append(blockSet, ts)
|
||||
at = ts.Parents()
|
||||
}
|
||||
|
||||
return chain, nil
|
||||
for blockSet[len(blockSet)-1].Height() > toHeight {
|
||||
// NB: GetBlocks validates that the blocks are in-fact the ones we
|
||||
// requested, and that they are correctly linked to eachother. It does
|
||||
// not validate any state transitions
|
||||
fmt.Println("Get blocks")
|
||||
blks, err := syncer.Bsync.GetBlocks(context.TODO(), at, 10)
|
||||
if err != nil {
|
||||
// Most likely our peers aren't fully synced yet, but forwarded
|
||||
// new block message (ideally we'd find better peers)
|
||||
|
||||
log.Error("failed to get blocks: ", err)
|
||||
|
||||
// This error will only be logged above,
|
||||
return nil, xerrors.Errorf("failed to get blocks: %w", err)
|
||||
}
|
||||
|
||||
for _, b := range blks {
|
||||
blockSet = append(blockSet, b)
|
||||
}
|
||||
|
||||
at = blks[len(blks)-1].Parents()
|
||||
}
|
||||
|
||||
if toHeight == 0 {
|
||||
// hacks. in the case that we request X blocks starting at height X+1, we
|
||||
// won't get the Genesis block in the returned blockset. This hacks around it
|
||||
if blockSet[len(blockSet)-1].Height() != 0 {
|
||||
blockSet = append(blockSet, syncer.Genesis)
|
||||
}
|
||||
|
||||
blockSet = reverse(blockSet)
|
||||
|
||||
genesis := blockSet[0]
|
||||
if !genesis.Equals(syncer.Genesis) {
|
||||
// TODO: handle this...
|
||||
log.Errorf("We synced to the wrong chain! %s != %s", genesis, syncer.Genesis)
|
||||
panic("We synced to the wrong chain")
|
||||
}
|
||||
}
|
||||
|
||||
return blockSet, nil
|
||||
}
|
||||
|
||||
func (syncer *Syncer) syncMessagesAndCheckState(headers []*types.TipSet) error {
|
||||
// Fetch all the messages for all the blocks in this chain
|
||||
cur := headers[len(headers)-1]
|
||||
|
||||
windowSize := uint64(10)
|
||||
for i := uint64(0); i <= cur.Height(); i += windowSize {
|
||||
ds := dstore.NewMapDatastore()
|
||||
bs := bstore.NewBlockstore(ds)
|
||||
cst := hamt.CSTFromBstore(bs)
|
||||
|
||||
nextHeight := i + windowSize - 1
|
||||
if nextHeight > cur.Height() {
|
||||
nextHeight = cur.Height()
|
||||
}
|
||||
|
||||
next := headers[nextHeight]
|
||||
bstips, err := syncer.Bsync.GetChainMessages(context.TODO(), next, (nextHeight+1)-i)
|
||||
if err != nil {
|
||||
log.Errorf("failed to fetch messages: %s", err)
|
||||
return xerrors.Errorf("message processing failed: %w", err)
|
||||
}
|
||||
|
||||
for bsi := 0; bsi < len(bstips); bsi++ {
|
||||
this := headers[i+uint64(bsi)]
|
||||
bstip := bstips[len(bstips)-(bsi+1)]
|
||||
fts, err := zipTipSetAndMessages(cst, this, bstip.Messages, bstip.MsgIncludes)
|
||||
if err != nil {
|
||||
log.Error("zipping failed: ", err, bsi, i)
|
||||
log.Error("height: ", this.Height())
|
||||
log.Error("bstips: ", bstips)
|
||||
log.Error("next height: ", nextHeight)
|
||||
return xerrors.Errorf("message processing failed: %w", err)
|
||||
}
|
||||
|
||||
if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil {
|
||||
log.Errorf("failed to validate tipset: %s", err)
|
||||
return xerrors.Errorf("message processing failed: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, bst := range bstips {
|
||||
for _, m := range bst.Messages {
|
||||
switch m.Signature.Type {
|
||||
case types.KTBLS:
|
||||
//log.Infof("putting BLS message: %s", m.Cid())
|
||||
if _, err := store.PutMessage(bs, &m.Message); err != nil {
|
||||
log.Error("failed to persist messages: ", err)
|
||||
return xerrors.Errorf("BLS message processing failed: %w", err)
|
||||
}
|
||||
case types.KTSecp256k1:
|
||||
//log.Infof("putting secp256k1 message: %s", m.Cid())
|
||||
if _, err := store.PutMessage(bs, m); err != nil {
|
||||
log.Error("failed to persist messages: ", err)
|
||||
return xerrors.Errorf("secp256k1 message processing failed: %w", err)
|
||||
}
|
||||
default:
|
||||
return xerrors.Errorf("unknown signature type on message %s: %q", m.Cid(), m.Signature.TypeCode)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := copyBlockstore(bs, syncer.store.Blockstore()); err != nil {
|
||||
return xerrors.Errorf("message processing failed: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (syncer *Syncer) collectChain(fts *store.FullTipSet) error {
|
||||
curHeight := syncer.head.Height()
|
||||
|
||||
headers, err := syncer.collectHeaders(fts.TipSet(), curHeight)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, ts := range headers {
|
||||
for _, b := range ts.Blocks() {
|
||||
if err := syncer.store.PersistBlockHeader(b); err != nil {
|
||||
return xerrors.Errorf("failed to persist synced blocks to the chainstore: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := syncer.syncMessagesAndCheckState(headers); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
238
chain/sync_test.go
Normal file
238
chain/sync_test.go
Normal file
@ -0,0 +1,238 @@
|
||||
package chain_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
logging "github.com/ipfs/go-log"
|
||||
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/filecoin-project/go-lotus/api"
|
||||
"github.com/filecoin-project/go-lotus/chain"
|
||||
"github.com/filecoin-project/go-lotus/chain/gen"
|
||||
"github.com/filecoin-project/go-lotus/chain/types"
|
||||
"github.com/filecoin-project/go-lotus/node"
|
||||
"github.com/filecoin-project/go-lotus/node/impl"
|
||||
"github.com/filecoin-project/go-lotus/node/modules"
|
||||
"github.com/filecoin-project/go-lotus/node/repo"
|
||||
)
|
||||
|
||||
const source = 0
|
||||
|
||||
func repoWithChain(t *testing.T, h int) (repo.Repo, []byte, []*types.FullBlock) {
|
||||
g, err := gen.NewGenerator()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
blks := make([]*types.FullBlock, h)
|
||||
|
||||
for i := 0; i < h; i++ {
|
||||
blks[i], err = g.NextBlock()
|
||||
require.NoError(t, err)
|
||||
|
||||
fmt.Printf("block at H:%d: %s\n", blks[i].Header.Height, blks[i].Cid())
|
||||
|
||||
require.Equal(t, uint64(i+1), blks[i].Header.Height, "wrong height")
|
||||
}
|
||||
|
||||
r, err := g.YieldRepo()
|
||||
require.NoError(t, err)
|
||||
|
||||
genb, err := g.GenesisCar()
|
||||
require.NoError(t, err)
|
||||
|
||||
return r, genb, blks
|
||||
}
|
||||
|
||||
type syncTestUtil struct {
|
||||
t *testing.T
|
||||
|
||||
ctx context.Context
|
||||
mn mocknet.Mocknet
|
||||
|
||||
genesis []byte
|
||||
blocks []*types.FullBlock
|
||||
|
||||
nds []api.FullNode
|
||||
}
|
||||
|
||||
func prepSyncTest(t *testing.T, h int) *syncTestUtil {
|
||||
logging.SetLogLevel("*", "INFO")
|
||||
|
||||
ctx := context.Background()
|
||||
tu := &syncTestUtil{
|
||||
t: t,
|
||||
ctx: ctx,
|
||||
mn: mocknet.New(ctx),
|
||||
}
|
||||
|
||||
tu.addSourceNode(h)
|
||||
tu.checkHeight("source", source, h)
|
||||
|
||||
// separate logs
|
||||
fmt.Println("\x1b[31m///////////////////////////////////////////////////\x1b[39b")
|
||||
|
||||
return tu
|
||||
}
|
||||
|
||||
func (tu *syncTestUtil) addSourceNode(gen int) {
|
||||
if tu.genesis != nil {
|
||||
tu.t.Fatal("source node already exists")
|
||||
}
|
||||
|
||||
sourceRepo, genesis, blocks := repoWithChain(tu.t, gen)
|
||||
var out api.FullNode
|
||||
|
||||
err := node.New(tu.ctx,
|
||||
node.FullAPI(&out),
|
||||
node.Online(),
|
||||
node.Repo(sourceRepo),
|
||||
node.MockHost(tu.mn),
|
||||
|
||||
node.Override(new(modules.Genesis), modules.LoadGenesis(genesis)),
|
||||
)
|
||||
require.NoError(tu.t, err)
|
||||
|
||||
tu.genesis = genesis
|
||||
tu.blocks = blocks
|
||||
tu.nds = append(tu.nds, out) // always at 0
|
||||
}
|
||||
|
||||
func (tu *syncTestUtil) addClientNode() int {
|
||||
if tu.genesis == nil {
|
||||
tu.t.Fatal("source doesn't exists")
|
||||
}
|
||||
|
||||
var out api.FullNode
|
||||
|
||||
err := node.New(tu.ctx,
|
||||
node.FullAPI(&out),
|
||||
node.Online(),
|
||||
node.Repo(repo.NewMemory(nil)),
|
||||
node.MockHost(tu.mn),
|
||||
|
||||
node.Override(new(modules.Genesis), modules.LoadGenesis(tu.genesis)),
|
||||
)
|
||||
require.NoError(tu.t, err)
|
||||
|
||||
tu.nds = append(tu.nds, out)
|
||||
return len(tu.nds) - 1
|
||||
}
|
||||
|
||||
func (tu *syncTestUtil) connect(from, to int) {
|
||||
toPI, err := tu.nds[to].NetAddrsListen(tu.ctx)
|
||||
require.NoError(tu.t, err)
|
||||
|
||||
err = tu.nds[from].NetConnect(tu.ctx, toPI)
|
||||
require.NoError(tu.t, err)
|
||||
}
|
||||
|
||||
func (tu *syncTestUtil) checkHeight(name string, n int, h int) {
|
||||
b, err := tu.nds[n].ChainHead(tu.ctx)
|
||||
require.NoError(tu.t, err)
|
||||
|
||||
require.Equal(tu.t, uint64(h), b.Height())
|
||||
fmt.Printf("%s H: %d\n", name, b.Height())
|
||||
}
|
||||
|
||||
func (tu *syncTestUtil) compareSourceState(with int) {
|
||||
sourceAccounts, err := tu.nds[source].WalletList(tu.ctx)
|
||||
require.NoError(tu.t, err)
|
||||
|
||||
for _, addr := range sourceAccounts {
|
||||
sourceBalance, err := tu.nds[source].WalletBalance(tu.ctx, addr)
|
||||
require.NoError(tu.t, err)
|
||||
fmt.Printf("Source state check for %s, expect %s\n", addr, sourceBalance)
|
||||
|
||||
actBalance, err := tu.nds[with].WalletBalance(tu.ctx, addr)
|
||||
require.NoError(tu.t, err)
|
||||
|
||||
require.Equal(tu.t, sourceBalance, actBalance)
|
||||
fmt.Printf("Source state check <OK> for %s\n", addr)
|
||||
}
|
||||
}
|
||||
|
||||
func (tu *syncTestUtil) submitSourceBlock(to int, h int) {
|
||||
// utility to simulate incoming blocks without miner process
|
||||
// TODO: should call syncer directly, this won't work correctly in all cases
|
||||
|
||||
var b chain.BlockMsg
|
||||
|
||||
// -1 to match block.Height
|
||||
b.Header = tu.blocks[h-1].Header
|
||||
for _, msg := range tu.blocks[h-1].Messages {
|
||||
c, err := tu.nds[to].(*impl.FullNodeAPI).Chain.PutMessage(msg)
|
||||
require.NoError(tu.t, err)
|
||||
|
||||
b.Messages = append(b.Messages, c)
|
||||
}
|
||||
|
||||
require.NoError(tu.t, tu.nds[to].ChainSubmitBlock(tu.ctx, &b))
|
||||
}
|
||||
|
||||
func (tu *syncTestUtil) submitSourceBlocks(to int, h int, n int) {
|
||||
for i := 0; i < n; i++ {
|
||||
tu.submitSourceBlock(to, h+i)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncSimple(t *testing.T) {
|
||||
H := 21
|
||||
tu := prepSyncTest(t, H)
|
||||
|
||||
client := tu.addClientNode()
|
||||
tu.checkHeight("client", client, 0)
|
||||
|
||||
require.NoError(t, tu.mn.LinkAll())
|
||||
tu.connect(1, 0)
|
||||
time.Sleep(time.Second * 3)
|
||||
|
||||
tu.checkHeight("client", client, H)
|
||||
|
||||
tu.compareSourceState(client)
|
||||
}
|
||||
|
||||
/*
|
||||
TODO: this is broken because of how tu.submitSourceBlock works now
|
||||
func TestSyncManual(t *testing.T) {
|
||||
H := 20
|
||||
tu := prepSyncTest(t, H)
|
||||
|
||||
client := tu.addClientNode()
|
||||
tu.checkHeight("client", client, 0)
|
||||
|
||||
tu.submitSourceBlocks(client, 1, H)
|
||||
|
||||
time.Sleep(time.Second * 1)
|
||||
|
||||
tu.checkHeight("client", client, H)
|
||||
|
||||
tu.compareSourceState(client)
|
||||
}
|
||||
|
||||
func TestSyncIncoming(t *testing.T) {
|
||||
H := 1
|
||||
tu := prepSyncTest(t, H)
|
||||
|
||||
producer := tu.addClientNode()
|
||||
client := tu.addClientNode()
|
||||
|
||||
tu.mn.LinkAll()
|
||||
tu.connect(client, producer)
|
||||
|
||||
for h := 0; h < H; h++ {
|
||||
tu.submitSourceBlock(producer, h + 1)
|
||||
|
||||
time.Sleep(time.Millisecond * 200)
|
||||
|
||||
}
|
||||
tu.checkHeight("client", client, H)
|
||||
tu.checkHeight("producer", producer, H)
|
||||
|
||||
tu.compareSourceState(client)
|
||||
}
|
||||
*/
|
@ -220,7 +220,7 @@ func (vm *VM) ApplyMessage(ctx context.Context, msg *types.Message) (*types.Mess
|
||||
}
|
||||
|
||||
if msg.Nonce != fromActor.Nonce {
|
||||
return nil, xerrors.Errorf("invalid nonce")
|
||||
return nil, xerrors.Errorf("invalid nonce (got %d, expected %d)", msg.Nonce, fromActor.Nonce)
|
||||
}
|
||||
fromActor.Nonce++
|
||||
|
||||
|
@ -3,7 +3,6 @@ package cli
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"gopkg.in/urfave/cli.v2"
|
||||
)
|
||||
|
||||
@ -29,7 +28,7 @@ var minerStart = &cli.Command{
|
||||
// TODO: this address needs to be the address of an actual miner
|
||||
maddr, err := api.WalletDefaultAddress(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to create miner address")
|
||||
return err
|
||||
}
|
||||
|
||||
if err := api.MinerStart(ctx, maddr); err != nil {
|
||||
|
6
go.mod
6
go.mod
@ -21,7 +21,7 @@ require (
|
||||
github.com/ipfs/go-ds-badger v0.0.5
|
||||
github.com/ipfs/go-filestore v0.0.2
|
||||
github.com/ipfs/go-fs-lock v0.0.1
|
||||
github.com/ipfs/go-hamt-ipld v0.0.0-20190613164304-cd074602062f
|
||||
github.com/ipfs/go-hamt-ipld v0.0.10
|
||||
github.com/ipfs/go-ipfs-blockstore v0.0.1
|
||||
github.com/ipfs/go-ipfs-chunker v0.0.1
|
||||
github.com/ipfs/go-ipfs-exchange-interface v0.0.1
|
||||
@ -61,7 +61,7 @@ require (
|
||||
github.com/multiformats/go-multiaddr-net v0.0.1
|
||||
github.com/multiformats/go-multihash v0.0.6
|
||||
github.com/pkg/errors v0.8.1
|
||||
github.com/polydawn/refmt v0.0.0-20190408063855-01bf1e26dd14
|
||||
github.com/polydawn/refmt v0.0.0-20190731040541-eff0b363297a
|
||||
github.com/prometheus/common v0.6.0
|
||||
github.com/smartystreets/assertions v1.0.1 // indirect
|
||||
github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945 // indirect
|
||||
@ -77,7 +77,7 @@ require (
|
||||
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 // indirect
|
||||
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 // indirect
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58 // indirect
|
||||
golang.org/x/sys v0.0.0-20190726002231-94b544f455ef // indirect
|
||||
golang.org/x/sys v0.0.0-20190730183949-1393eb018365 // indirect
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7
|
||||
gopkg.in/urfave/cli.v2 v2.0.0-20180128182452-d3ae77c26ac8
|
||||
launchpad.net/gocheck v0.0.0-20140225173054-000000000087 // indirect
|
||||
|
10
go.sum
10
go.sum
@ -157,8 +157,8 @@ github.com/ipfs/go-filestore v0.0.2 h1:pcYwpjtXXwirtbjBXKVJM9CTa9F7/8v1EkfnDaHTO
|
||||
github.com/ipfs/go-filestore v0.0.2/go.mod h1:KnZ41qJsCt2OX2mxZS0xsK3Psr0/oB93HMMssLujjVc=
|
||||
github.com/ipfs/go-fs-lock v0.0.1 h1:XHX8uW4jQBYWHj59XXcjg7BHlHxV9ZOYs6Y43yb7/l0=
|
||||
github.com/ipfs/go-fs-lock v0.0.1/go.mod h1:DNBekbboPKcxs1aukPSaOtFA3QfSdi5C855v0i9XJ8Y=
|
||||
github.com/ipfs/go-hamt-ipld v0.0.0-20190613164304-cd074602062f h1:CpQZA1HsuaRQaFIUq7h/KqSyclyp/LrpcyifPnKRT2k=
|
||||
github.com/ipfs/go-hamt-ipld v0.0.0-20190613164304-cd074602062f/go.mod h1:WrX60HHX2SeMb602Z1s9Ztnf/4fzNHzwH9gxNTVpEmk=
|
||||
github.com/ipfs/go-hamt-ipld v0.0.10 h1:jmJGsV/8OPpBEmO+b1nAPpqX8SG2kLeYveKk8F7IxG4=
|
||||
github.com/ipfs/go-hamt-ipld v0.0.10/go.mod h1:WrX60HHX2SeMb602Z1s9Ztnf/4fzNHzwH9gxNTVpEmk=
|
||||
github.com/ipfs/go-ipfs-blockstore v0.0.1 h1:O9n3PbmTYZoNhkgkEyrXTznbmktIXif62xLX+8dPHzc=
|
||||
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
|
||||
github.com/ipfs/go-ipfs-blocksutil v0.0.1 h1:Eh/H4pc1hsvhzsQoMEP3Bke/aW5P5rVM1IWFJMcGIPQ=
|
||||
@ -461,6 +461,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
|
||||
github.com/polydawn/refmt v0.0.0-20190221155625-df39d6c2d992/go.mod h1:uIp+gprXxxrWSjjklXD+mN4wed/tMfjMMmN/9+JsA9o=
|
||||
github.com/polydawn/refmt v0.0.0-20190408063855-01bf1e26dd14 h1:2m16U/rLwVaRdz7ANkHtHTodP3zTP3N451MADg64x5k=
|
||||
github.com/polydawn/refmt v0.0.0-20190408063855-01bf1e26dd14/go.mod h1:uIp+gprXxxrWSjjklXD+mN4wed/tMfjMMmN/9+JsA9o=
|
||||
github.com/polydawn/refmt v0.0.0-20190731040541-eff0b363297a h1:TdavzKWkPcC2G+6rKJclm/JfrWC6WZFfLUR7EJJX8MA=
|
||||
github.com/polydawn/refmt v0.0.0-20190731040541-eff0b363297a/go.mod h1:uIp+gprXxxrWSjjklXD+mN4wed/tMfjMMmN/9+JsA9o=
|
||||
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
|
||||
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs=
|
||||
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
|
||||
@ -622,8 +624,8 @@ golang.org/x/sys v0.0.0-20190524122548-abf6ff778158/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20190526052359-791d8a0f4d09/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190610200419-93c9922d18ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190726002231-94b544f455ef h1:vwqipsjwy3Y8/PQk/LmiaFjos8aOnU6Tt6oRXKD3org=
|
||||
golang.org/x/sys v0.0.0-20190726002231-94b544f455ef/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190730183949-1393eb018365 h1:SaXEMXhWzMJThc05vu6uh61Q245r4KaWMrsTedk0FDc=
|
||||
golang.org/x/sys v0.0.0-20190730183949-1393eb018365/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
|
||||
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
||||
|
@ -26,6 +26,7 @@ class Block extends React.Component {
|
||||
<div>Height: {head.Height}</div>
|
||||
<div>Parents: <BlockLinks cids={head.Parents} conn={this.props.conn} mountWindow={this.props.mountWindow}/></div>
|
||||
<div>Weight: {head.ParentWeight}</div>
|
||||
<div>Miner: {head.Miner}</div>
|
||||
<div>Messages: {head.Messages['/']} {/*TODO: link to message explorer */}</div>
|
||||
<div>Receipts: {head.MessageReceipts['/']}</div>
|
||||
<div>State Root: {head.StateRoot['/']}</div>
|
||||
|
@ -101,8 +101,13 @@ class FullNode extends React.Component {
|
||||
async startMining() {
|
||||
// TODO: Use actual miner address
|
||||
// see cli/miner.go
|
||||
let addr = "t0523423423" // in case we have no wallets
|
||||
if (this.state.defaultAddr) {
|
||||
addr = this.state.defaultAddr
|
||||
}
|
||||
|
||||
this.setState({mining: true})
|
||||
await this.state.client.call("Filecoin.MinerStart", ["t0523423423"])
|
||||
await this.state.client.call("Filecoin.MinerStart", [addr])
|
||||
}
|
||||
|
||||
render() {
|
||||
|
@ -335,6 +335,7 @@ func New(ctx context.Context, opts ...Option) error {
|
||||
// on this context, and implement closing logic through lifecycles
|
||||
// correctly
|
||||
if err := app.Start(ctx); err != nil {
|
||||
// comment fx.NopLogger few lines above for easier debugging
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -93,7 +93,7 @@ func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error {
|
||||
HeaviestTipSetWeight: weight,
|
||||
GenesisHash: gen.Cid(),
|
||||
}
|
||||
fmt.Println("SENDING HELLO MESSAGE: ", hts.Cids())
|
||||
fmt.Println("SENDING HELLO MESSAGE: ", hts.Cids(), hts.Height())
|
||||
fmt.Println("hello message genesis: ", gen.Cid())
|
||||
|
||||
if err := cborrpc.WriteCborRPC(s, hmsg); err != nil {
|
||||
|
@ -60,6 +60,7 @@ func Bitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routin
|
||||
return exch.Close()
|
||||
},
|
||||
})
|
||||
|
||||
return exch
|
||||
}
|
||||
|
||||
|
@ -40,7 +40,7 @@ func builder(t *testing.T, n int) []api.FullNode {
|
||||
node.FullAPI(&out[i]),
|
||||
node.Online(),
|
||||
node.Repo(repo.NewMemory(nil)),
|
||||
MockHost(mn),
|
||||
node.MockHost(mn),
|
||||
|
||||
genesis,
|
||||
)
|
||||
|
@ -1,21 +0,0 @@
|
||||
package node_test
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/filecoin-project/go-lotus/node"
|
||||
|
||||
"github.com/filecoin-project/go-lotus/node/modules/lp2p"
|
||||
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
|
||||
)
|
||||
|
||||
func MockHost(mn mocknet.Mocknet) node.Option {
|
||||
return node.Options(
|
||||
node.ApplyIf(func(s *node.Settings) bool { return !s.Online },
|
||||
node.Error(errors.New("MockHost must be specified after Online")),
|
||||
),
|
||||
|
||||
node.Override(new(lp2p.RawHost), lp2p.MockHost),
|
||||
node.Override(new(mocknet.Mocknet), mn),
|
||||
)
|
||||
}
|
20
node/testopts.go
Normal file
20
node/testopts.go
Normal file
@ -0,0 +1,20 @@
|
||||
package node
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
|
||||
|
||||
"github.com/filecoin-project/go-lotus/node/modules/lp2p"
|
||||
)
|
||||
|
||||
func MockHost(mn mocknet.Mocknet) Option {
|
||||
return Options(
|
||||
ApplyIf(func(s *Settings) bool { return !s.Online },
|
||||
Error(errors.New("MockHost must be specified after Online")),
|
||||
),
|
||||
|
||||
Override(new(lp2p.RawHost), lp2p.MockHost),
|
||||
Override(new(mocknet.Mocknet), mn),
|
||||
)
|
||||
}
|
Loading…
Reference in New Issue
Block a user