Merge pull request #114 from filecoin-project/fix/sync-self

fix syncing new blocks we have locally when not connected to any peers
This commit is contained in:
Whyrusleeping 2019-08-01 18:00:29 -07:00 committed by GitHub
commit db005e99e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 89 additions and 34 deletions

View File

@ -296,7 +296,7 @@ func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, coun
req := &BlockSyncRequest{
Start: h.Cids(),
RequestLength: count,
Options: BSOptMessages,
Options: BSOptMessages | BSOptBlocks,
}
var err error

View File

@ -577,3 +577,24 @@ func (cs *ChainStore) blockContainsMsg(blk *types.BlockHeader, msg cid.Cid) (*ty
func (cs *ChainStore) Blockstore() blockstore.Blockstore {
return cs.bs
}
func (cs *ChainStore) TryFillTipSet(ts *types.TipSet) (*FullTipSet, error) {
var out []*types.FullBlock
for _, b := range ts.Blocks() {
msgs, err := cs.MessagesForBlock(b)
if err != nil {
// TODO: check for 'not found' errors, and only return nil if this
// is actually a 'not found' error
return nil, nil
}
fb := &types.FullBlock{
Header: b,
Messages: msgs,
}
out = append(out, fb)
}
return NewFullTipSet(out), nil
}

View File

@ -410,64 +410,73 @@ func (syncer *Syncer) collectHeaders(from *types.TipSet, toHeight uint64) ([]*ty
}
func (syncer *Syncer) syncMessagesAndCheckState(headers []*types.TipSet) error {
// Fetch all the messages for all the blocks in this chain
cur := headers[len(headers)-1]
return syncer.iterFullTipsets(headers, func(fts *store.FullTipSet) error {
if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil {
log.Errorf("failed to validate tipset: %s", err)
return xerrors.Errorf("message processing failed: %w", err)
}
return nil
})
}
windowSize := uint64(10)
for i := uint64(0); i <= cur.Height(); i += windowSize {
// fills out each of the given tipsets with messages and calls the callback with it
func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.FullTipSet) error) error {
var beg int
// handle case where we have a prefix of these locally
for ; beg < len(headers); beg++ {
fts, err := syncer.store.TryFillTipSet(headers[beg])
if err != nil {
return err
}
if fts == nil {
break
}
if err := cb(fts); err != nil {
return err
}
}
headers = headers[beg:]
windowSize := 10
for i := 0; i < len(headers); i += windowSize {
// temp storage so we don't persist data we dont want to
ds := dstore.NewMapDatastore()
bs := bstore.NewBlockstore(ds)
cst := hamt.CSTFromBstore(bs)
nextHeight := i + windowSize - 1
if nextHeight > cur.Height() {
nextHeight = cur.Height()
batchSize := windowSize
if i+batchSize >= len(headers) {
batchSize = (len(headers) - i) - 1
}
next := headers[nextHeight]
bstips, err := syncer.Bsync.GetChainMessages(context.TODO(), next, (nextHeight+1)-i)
next := headers[i+batchSize]
bstips, err := syncer.Bsync.GetChainMessages(context.TODO(), next, uint64(batchSize+1))
if err != nil {
log.Errorf("failed to fetch messages: %s", err)
return xerrors.Errorf("message processing failed: %w", err)
}
for bsi := 0; bsi < len(bstips); bsi++ {
this := headers[i+uint64(bsi)]
this := headers[i+bsi]
bstip := bstips[len(bstips)-(bsi+1)]
fts, err := zipTipSetAndMessages(cst, this, bstip.Messages, bstip.MsgIncludes)
if err != nil {
log.Error("zipping failed: ", err, bsi, i)
log.Error("height: ", this.Height())
log.Error("bstip height: ", bstip.Blocks[0].Height)
log.Error("bstips: ", bstips)
log.Error("next height: ", nextHeight)
log.Error("next height: ", i+batchSize)
return xerrors.Errorf("message processing failed: %w", err)
}
if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil {
log.Errorf("failed to validate tipset: %s", err)
return xerrors.Errorf("message processing failed: %w", err)
if err := cb(fts); err != nil {
return err
}
}
for _, bst := range bstips {
for _, m := range bst.Messages {
switch m.Signature.Type {
case types.KTBLS:
//log.Infof("putting BLS message: %s", m.Cid())
if _, err := store.PutMessage(bs, &m.Message); err != nil {
log.Error("failed to persist messages: ", err)
return xerrors.Errorf("BLS message processing failed: %w", err)
}
case types.KTSecp256k1:
//log.Infof("putting secp256k1 message: %s", m.Cid())
if _, err := store.PutMessage(bs, m); err != nil {
log.Error("failed to persist messages: ", err)
return xerrors.Errorf("secp256k1 message processing failed: %w", err)
}
default:
return xerrors.Errorf("unknown signature type on message %s: %q", m.Cid(), m.Signature.TypeCode)
}
}
if err := persistMessages(bs, bstips); err != nil {
return err
}
if err := copyBlockstore(bs, syncer.store.Blockstore()); err != nil {
@ -478,6 +487,31 @@ func (syncer *Syncer) syncMessagesAndCheckState(headers []*types.TipSet) error {
return nil
}
func persistMessages(bs bstore.Blockstore, bstips []*BSTipSet) error {
for _, bst := range bstips {
for _, m := range bst.Messages {
switch m.Signature.Type {
case types.KTBLS:
//log.Infof("putting BLS message: %s", m.Cid())
if _, err := store.PutMessage(bs, &m.Message); err != nil {
log.Error("failed to persist messages: ", err)
return xerrors.Errorf("BLS message processing failed: %w", err)
}
case types.KTSecp256k1:
//log.Infof("putting secp256k1 message: %s", m.Cid())
if _, err := store.PutMessage(bs, m); err != nil {
log.Error("failed to persist messages: ", err)
return xerrors.Errorf("secp256k1 message processing failed: %w", err)
}
default:
return xerrors.Errorf("unknown signature type on message %s: %q", m.Cid(), m.Signature.TypeCode)
}
}
}
return nil
}
func (syncer *Syncer) collectChain(fts *store.FullTipSet) error {
curHeight := syncer.head.Height()