fix tests and fix a bug in blocksync

This commit is contained in:
whyrusleeping 2019-09-06 13:03:28 -07:00
parent ed45d1c2b4
commit b5462542a8
8 changed files with 134 additions and 68 deletions

View File

@ -245,8 +245,10 @@ func (bs *BlockSync) processStatus(req *BlockSyncRequest, res *BlockSyncResponse
panic("not handled")
case 203: // Internal Error
return fmt.Errorf("block sync peer errored: %s", res.Message)
case 204:
return fmt.Errorf("block sync request invalid: %s", res.Message)
default:
return fmt.Errorf("unrecognized response code")
return fmt.Errorf("unrecognized response code: %d", res.Status)
}
}
@ -261,10 +263,11 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int)
Options: BSOptBlocks,
}
var err error
var oerr error
for _, p := range perm {
res, err := bs.sendRequestToPeer(ctx, peers[p], req)
if err != nil {
oerr = err
log.Warnf("BlockSync request failed for peer %s: %s", peers[p].String(), err)
continue
}
@ -272,12 +275,12 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int)
if res.Status == 0 {
return bs.processBlocksResponse(req, res)
}
err = bs.processStatus(req, res)
if err != nil {
oerr = bs.processStatus(req, res)
if oerr != nil {
log.Warnf("BlockSync peer %s response was an error: %s", peers[p].String(), err)
}
}
return nil, xerrors.Errorf("GetBlocks failed with all peers: %w", err)
return nil, xerrors.Errorf("GetBlocks failed with all peers: %w", oerr)
}
func (bs *BlockSync) GetFullTipSet(ctx context.Context, p peer.ID, h []cid.Cid) (*store.FullTipSet, error) {

View File

@ -122,18 +122,19 @@ func NewGenerator() (*ChainGen, error) {
}
}
minercfg := &GenMinerCfg{
Workers: []address.Address{worker, worker},
Owners: []address.Address{worker, worker},
PeerIDs: []peer.ID{"peerID1", "peerID2"},
}
/*
minercfg := &GenMinerCfg{
Workers: []address.Address{worker, worker},
Owners: []address.Address{worker, worker},
PeerIDs: []peer.ID{"peerID1", "peerID2"},
Workers: []address.Address{worker},
Owners: []address.Address{worker},
PeerIDs: []peer.ID{"peerID1"},
}
*/
minercfg := &GenMinerCfg{
Workers: []address.Address{worker},
Owners: []address.Address{worker},
PeerIDs: []peer.ID{"peerID1"},
}
genb, err := MakeGenesisBlock(bs, map[address.Address]types.BigInt{
worker: types.NewInt(50000),
@ -201,7 +202,6 @@ func (cg *ChainGen) GenesisCar() ([]byte, error) {
func (cg *ChainGen) nextBlockProof(ctx context.Context, m address.Address, ticks []*types.Ticket) (types.ElectionProof, *types.Ticket, error) {
pts := cg.curTipset.TipSet()
fmt.Println("checking winner:", m, ticks)
var lastTicket *types.Ticket
if len(ticks) == 0 {
lastTicket = pts.MinTicket()
@ -262,7 +262,6 @@ func (cg *ChainGen) NextTipSet() (*MinedTipSet, error) {
for len(blks) == 0 {
for i, m := range cg.miners {
fmt.Println("Checking for winner: ", m)
proof, t, err := cg.nextBlockProof(context.TODO(), m, ticketSets[i])
if err != nil {
return nil, err
@ -270,7 +269,6 @@ func (cg *ChainGen) NextTipSet() (*MinedTipSet, error) {
ticketSets[i] = append(ticketSets[i], t)
if proof != nil {
fmt.Println("WINNER!!!!", m)
fblk, err := cg.makeBlock(m, proof, ticketSets[i], msgs)
if err != nil {
return nil, xerrors.Errorf("making a block for next tipset failed: %w", err)
@ -285,8 +283,6 @@ func (cg *ChainGen) NextTipSet() (*MinedTipSet, error) {
}
}
fmt.Println("num winners: ", len(blks))
cg.curTipset = store.NewFullTipSet(blks)
return &MinedTipSet{
@ -301,6 +297,7 @@ func (cg *ChainGen) makeBlock(m address.Address, eproof types.ElectionProof, tic
ts := parents.MinTimestamp() + (uint64(len(tickets)) * build.BlockDelay)
fmt.Println("Make block: ", parents.Height(), len(tickets))
fblk, err := MinerCreateBlock(context.TODO(), cg.sm, cg.w, m, parents, tickets, eproof, msgs, ts)
if err != nil {
return nil, err
@ -406,7 +403,6 @@ func IsRoundWinner(ctx context.Context, ts *types.TipSet, ticks []*types.Ticket,
return false, nil, err
}
fmt.Println("chain randomness: ", r)
mworker, err := a.StateMinerWorker(ctx, miner, ts)
if err != nil {
return false, nil, xerrors.Errorf("failed to get miner worker: %w", err)

View File

@ -13,15 +13,19 @@ func testGeneration(t testing.TB, n int, msgs int) {
g.msgsPerBlock = msgs
var height int
for i := 0; i < n; i++ {
fmt.Println("LOOP: ", i)
mts, err := g.NextTipSet()
if err != nil {
t.Fatalf("error at H:%d, %s", i, err)
}
if mts.TipSet.TipSet().Height() != uint64(i+len(mts.TipSet.Blocks[0].Header.Tickets)) {
t.Fatal("wrong height")
ts := mts.TipSet.TipSet()
if ts.Height() != uint64(height+len(ts.Blocks()[0].Tickets)) {
t.Fatal("wrong height", ts.Height(), i, len(ts.Blocks()[0].Tickets), len(ts.Blocks()))
}
height += len(ts.Blocks()[0].Tickets)
}
}

View File

@ -2,9 +2,11 @@ package gen
import (
"context"
"github.com/filecoin-project/go-bls-sigs"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-hamt-ipld"
"fmt"
bls "github.com/filecoin-project/go-bls-sigs"
cid "github.com/ipfs/go-cid"
hamt "github.com/ipfs/go-hamt-ipld"
"github.com/pkg/errors"
"github.com/whyrusleeping/sharray"
"golang.org/x/xerrors"
@ -23,6 +25,7 @@ func MinerCreateBlock(ctx context.Context, sm *stmgr.StateManager, w *wallet.Wal
return nil, errors.Wrap(err, "failed to load tipset state")
}
fmt.Println("HEIGHT CALC: ", parents.Height(), len(tickets))
height := parents.Height() + uint64(len(tickets))
vmi, err := vm.NewVM(st, height, miner, sm.ChainStore())

View File

@ -2,12 +2,14 @@ package stmgr
import (
"context"
"sync"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/state"
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/chain/vm"
"golang.org/x/xerrors"
"github.com/ipfs/go-cid"
hamt "github.com/ipfs/go-hamt-ipld"
@ -18,13 +20,47 @@ var log = logging.Logger("chainstore")
type StateManager struct {
cs *store.ChainStore
stCache map[string]cid.Cid
stlk sync.Mutex
}
func NewStateManager(cs *store.ChainStore) *StateManager {
return &StateManager{cs}
return &StateManager{
cs: cs,
stCache: make(map[string]cid.Cid),
}
}
func cidsToKey(cids []cid.Cid) string {
var out string
for _, c := range cids {
out += c.KeyString()
}
return out
}
func (sm *StateManager) TipSetState(cids []cid.Cid) (cid.Cid, error) {
ck := cidsToKey(cids)
sm.stlk.Lock()
cached, ok := sm.stCache[ck]
sm.stlk.Unlock()
if ok {
return cached, nil
}
out, err := sm.computeTipSetState(cids)
if err != nil {
return cid.Undef, err
}
sm.stlk.Lock()
sm.stCache[ck] = out
sm.stlk.Unlock()
return out, nil
}
func (sm *StateManager) computeTipSetState(cids []cid.Cid) (cid.Cid, error) {
ctx := context.TODO()
ts, err := sm.cs.LoadTipSet(cids)
@ -39,12 +75,12 @@ func (sm *StateManager) TipSetState(cids []cid.Cid) (cid.Cid, error) {
pstate, err := sm.TipSetState(ts.Parents())
if err != nil {
return cid.Undef, err
return cid.Undef, xerrors.Errorf("recursive TipSetState failed: %w", err)
}
vmi, err := vm.NewVM(pstate, ts.Height(), address.Undef, sm.cs)
if err != nil {
return cid.Undef, err
return cid.Undef, xerrors.Errorf("instantiating VM failed: %w", err)
}
applied := make(map[cid.Cid]bool)
@ -53,7 +89,8 @@ func (sm *StateManager) TipSetState(cids []cid.Cid) (cid.Cid, error) {
bms, sms, err := sm.cs.MessagesForBlock(b)
if err != nil {
return cid.Undef, err
panic("stop a sec: " + err.Error())
return cid.Undef, xerrors.Errorf("failed to get messages for block: %w", err)
}
for _, m := range bms {

View File

@ -509,7 +509,7 @@ func (cs *ChainStore) MessagesForBlock(b *types.BlockHeader) ([]*types.Message,
cst := hamt.CSTFromBstore(cs.bs)
var msgmeta types.MsgMeta
if err := cst.Get(context.TODO(), b.Messages, &msgmeta); err != nil {
return nil, nil, err
return nil, nil, xerrors.Errorf("failed to load msgmeta: %w", err)
}
blscids, err := cs.readSharrayCids(msgmeta.BlsMessages)
@ -699,6 +699,7 @@ func (cs *ChainStore) TryFillTipSet(ts *types.TipSet) (*FullTipSet, error) {
func (cs *ChainStore) GetRandomness(ctx context.Context, pts *types.TipSet, tickets []*types.Ticket, lb int) ([]byte, error) {
if lb < len(tickets) {
panic("self sampling is bad")
t := tickets[len(tickets)-(1+lb)]
return t.VDFResult, nil
@ -706,9 +707,9 @@ func (cs *ChainStore) GetRandomness(ctx context.Context, pts *types.TipSet, tick
nv := lb - len(tickets)
nextCids := pts.Cids()
for {
fmt.Println("lookback looping: ", nv)
nts, err := cs.LoadTipSet(pts.Cids())
nts, err := cs.LoadTipSet(nextCids)
if err != nil {
return nil, err
}
@ -724,7 +725,6 @@ func (cs *ChainStore) GetRandomness(ctx context.Context, pts *types.TipSet, tick
// special case for lookback behind genesis block
// TODO(spec): this is not in the spec, need to sync that
if mtb.Height == 0 {
fmt.Println("Randomness from height 0: ", nv)
t := mtb.Tickets[0]
@ -735,5 +735,7 @@ func (cs *ChainStore) GetRandomness(ctx context.Context, pts *types.TipSet, tick
}
return rval, nil
}
nextCids = mtb.Parents
}
}

View File

@ -478,7 +478,6 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err
}
return nil
}
func (syncer *Syncer) collectHeaders(from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) {
@ -506,11 +505,16 @@ func (syncer *Syncer) collectHeaders(from *types.TipSet, to *types.TipSet) ([]*t
at = ts.Parents()
}
loop:
for blockSet[len(blockSet)-1].Height() > untilHeight {
// NB: GetBlocks validates that the blocks are in-fact the ones we
// requested, and that they are correctly linked to eachother. It does
// not validate any state transitions
fmt.Println("Get blocks")
if len(at) == 0 {
fmt.Println("Weird situation, about to request blocks with empty tipset")
fmt.Println("info: ", len(blockSet), blockSet[len(blockSet)-1].Height(), untilHeight)
}
blks, err := syncer.Bsync.GetBlocks(context.TODO(), at, 10)
if err != nil {
// Most likely our peers aren't fully synced yet, but forwarded
@ -524,11 +528,16 @@ func (syncer *Syncer) collectHeaders(from *types.TipSet, to *types.TipSet) ([]*t
for _, b := range blks {
if b.Height() < untilHeight {
break
// REVIEW: this was an existing bug I think, if this got hit
// we would not append the remaining blocks to our blockset, but
// we would keep looping, and the outer for loop condition wouldnt trigger
// causing us to request the parents of the genesis block (aka, an empty cid set)
break loop
}
blockSet = append(blockSet, b)
}
fmt.Println("AT CHILD: ", blks[len(blks)-1].Height())
at = blks[len(blks)-1].Parents()
}
@ -547,6 +556,7 @@ func (syncer *Syncer) syncMessagesAndCheckState(headers []*types.TipSet) error {
log.Errorf("failed to validate tipset: %s", err)
return xerrors.Errorf("message processing failed: %w", err)
}
return nil
})
}
@ -572,10 +582,6 @@ func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.Fu
windowSize := 10
for i := len(headers) - 1; i >= 0; i -= windowSize {
// temp storage so we don't persist data we dont want to
ds := dstore.NewMapDatastore()
bs := bstore.NewBlockstore(ds)
cst := hamt.CSTFromBstore(bs)
batchSize := windowSize
if i < batchSize {
@ -589,6 +595,11 @@ func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.Fu
}
for bsi := 0; bsi < len(bstips); bsi++ {
// temp storage so we don't persist data we dont want to
ds := dstore.NewMapDatastore()
bs := bstore.NewBlockstore(ds)
cst := hamt.CSTFromBstore(bs)
this := headers[i-bsi]
bstip := bstips[len(bstips)-(bsi+1)]
fts, err := zipTipSetAndMessages(cst, this, bstip.BlsMessages, bstip.SecpkMessages, bstip.BlsMsgIncludes, bstip.SecpkMsgIncludes)
@ -604,38 +615,37 @@ func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.Fu
if err := cb(fts); err != nil {
return err
}
if err := persistMessages(bs, bstip); err != nil {
return err
}
if err := copyBlockstore(bs, syncer.store.Blockstore()); err != nil {
return xerrors.Errorf("message processing failed: %w", err)
}
}
if err := persistMessages(bs, bstips); err != nil {
return err
}
if err := copyBlockstore(bs, syncer.store.Blockstore()); err != nil {
return xerrors.Errorf("message processing failed: %w", err)
}
}
return nil
}
func persistMessages(bs bstore.Blockstore, bstips []*BSTipSet) error {
for _, bst := range bstips {
for _, m := range bst.BlsMessages {
//log.Infof("putting BLS message: %s", m.Cid())
if _, err := store.PutMessage(bs, m); err != nil {
log.Error("failed to persist messages: ", err)
return xerrors.Errorf("BLS message processing failed: %w", err)
}
func persistMessages(bs bstore.Blockstore, bst *BSTipSet) error {
for _, m := range bst.BlsMessages {
//log.Infof("putting BLS message: %s", m.Cid())
if _, err := store.PutMessage(bs, m); err != nil {
log.Error("failed to persist messages: ", err)
return xerrors.Errorf("BLS message processing failed: %w", err)
}
for _, m := range bst.SecpkMessages {
if m.Signature.Type != types.KTSecp256k1 {
return xerrors.Errorf("unknown signature type on message %s: %q", m.Cid(), m.Signature.TypeCode)
}
//log.Infof("putting secp256k1 message: %s", m.Cid())
if _, err := store.PutMessage(bs, m); err != nil {
log.Error("failed to persist messages: ", err)
return xerrors.Errorf("secp256k1 message processing failed: %w", err)
}
}
for _, m := range bst.SecpkMessages {
if m.Signature.Type != types.KTSecp256k1 {
return xerrors.Errorf("unknown signature type on message %s: %q", m.Cid(), m.Signature.TypeCode)
}
//log.Infof("putting secp256k1 message: %s", m.Cid())
if _, err := store.PutMessage(bs, m); err != nil {
log.Error("failed to persist messages: ", err)
return xerrors.Errorf("secp256k1 message processing failed: %w", err)
}
}

View File

@ -33,7 +33,6 @@ func (tu *syncTestUtil) repoWithChain(t testing.TB, h int) (repo.Repo, []byte, [
ts := mts.TipSet.TipSet()
fmt.Printf("tipset at H:%d: %s\n", ts.Height(), ts.Cids())
require.Equal(t, uint64(i+1), ts.Height(), "wrong height")
}
r, err := tu.g.YieldRepo()
@ -81,7 +80,7 @@ func prepSyncTest(t testing.TB, h int) *syncTestUtil {
}
tu.addSourceNode(h)
tu.checkHeight("source", source, h)
//tu.checkHeight("source", source, h)
// separate logs
fmt.Println("\x1b[31m///////////////////////////////////////////////////\x1b[39b")
@ -181,6 +180,17 @@ func (tu *syncTestUtil) checkHeight(name string, n int, h int) {
}
func (tu *syncTestUtil) compareSourceState(with int) {
sourceHead, err := tu.nds[source].ChainHead(tu.ctx)
require.NoError(tu.t, err)
targetHead, err := tu.nds[with].ChainHead(tu.ctx)
require.NoError(tu.t, err)
if !sourceHead.Equals(targetHead) {
fmt.Println("different chains: ", sourceHead.Height(), targetHead.Height())
tu.t.Fatalf("nodes were not synced correctly: %s != %s", sourceHead.Cids(), targetHead.Cids())
}
sourceAccounts, err := tu.nds[source].WalletList(tu.ctx)
require.NoError(tu.t, err)
@ -250,13 +260,13 @@ func TestSyncSimple(t *testing.T) {
tu := prepSyncTest(t, H)
client := tu.addClientNode()
tu.checkHeight("client", client, 0)
//tu.checkHeight("client", client, 0)
require.NoError(t, tu.mn.LinkAll())
tu.connect(1, 0)
tu.waitUntilSync(0, client)
tu.checkHeight("client", client, H)
//tu.checkHeight("client", client, H)
tu.compareSourceState(client)
}
@ -266,7 +276,7 @@ func TestSyncMining(t *testing.T) {
tu := prepSyncTest(t, H)
client := tu.addClientNode()
tu.checkHeight("client", client, 0)
//tu.checkHeight("client", client, 0)
require.NoError(t, tu.mn.LinkAll())
tu.connect(client, 0)
@ -275,11 +285,12 @@ func TestSyncMining(t *testing.T) {
fmt.Println("after wait until sync")
tu.checkHeight("client", client, H)
//tu.checkHeight("client", client, H)
tu.compareSourceState(client)
for i := 0; i < 5; i++ {
fmt.Println("MINE A NEW BLOCK")
tu.mineNewBlock(0)
tu.waitUntilSync(0, client)
tu.compareSourceState(client)