Merge pull request #370 from filecoin-project/fix/chain-sync-test-fail
Fix chain sync test flakiness
This commit is contained in:
commit
9a238b7d01
@ -50,8 +50,7 @@ type FullNode interface {
|
|||||||
// ChainNotify returns channel with chain head updates
|
// ChainNotify returns channel with chain head updates
|
||||||
// First message is guaranteed to be of len == 1, and type == 'current'
|
// First message is guaranteed to be of len == 1, and type == 'current'
|
||||||
ChainNotify(context.Context) (<-chan []*store.HeadChange, error)
|
ChainNotify(context.Context) (<-chan []*store.HeadChange, error)
|
||||||
ChainHead(context.Context) (*types.TipSet, error) // TODO: check serialization
|
ChainHead(context.Context) (*types.TipSet, error)
|
||||||
ChainSubmitBlock(ctx context.Context, blk *types.BlockMsg) error // TODO: check serialization
|
|
||||||
ChainGetRandomness(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error)
|
ChainGetRandomness(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error)
|
||||||
ChainGetBlock(context.Context, cid.Cid) (*types.BlockHeader, error)
|
ChainGetBlock(context.Context, cid.Cid) (*types.BlockHeader, error)
|
||||||
ChainGetTipSet(context.Context, []cid.Cid) (*types.TipSet, error)
|
ChainGetTipSet(context.Context, []cid.Cid) (*types.TipSet, error)
|
||||||
@ -65,9 +64,9 @@ type FullNode interface {
|
|||||||
|
|
||||||
// syncer
|
// syncer
|
||||||
SyncState(context.Context) (*SyncState, error)
|
SyncState(context.Context) (*SyncState, error)
|
||||||
|
SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) error
|
||||||
|
|
||||||
// messages
|
// messages
|
||||||
|
|
||||||
MpoolPending(context.Context, *types.TipSet) ([]*types.SignedMessage, error)
|
MpoolPending(context.Context, *types.TipSet) ([]*types.SignedMessage, error)
|
||||||
MpoolPush(context.Context, *types.SignedMessage) error // TODO: remove
|
MpoolPush(context.Context, *types.SignedMessage) error // TODO: remove
|
||||||
MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error) // get nonce, sign, push
|
MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error) // get nonce, sign, push
|
||||||
|
@ -38,7 +38,6 @@ type FullNodeStruct struct {
|
|||||||
|
|
||||||
Internal struct {
|
Internal struct {
|
||||||
ChainNotify func(context.Context) (<-chan []*store.HeadChange, error) `perm:"read"`
|
ChainNotify func(context.Context) (<-chan []*store.HeadChange, error) `perm:"read"`
|
||||||
ChainSubmitBlock func(ctx context.Context, blk *types.BlockMsg) error `perm:"write"`
|
|
||||||
ChainHead func(context.Context) (*types.TipSet, error) `perm:"read"`
|
ChainHead func(context.Context) (*types.TipSet, error) `perm:"read"`
|
||||||
ChainGetRandomness func(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error) `perm:"read"`
|
ChainGetRandomness func(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error) `perm:"read"`
|
||||||
ChainGetBlock func(context.Context, cid.Cid) (*types.BlockHeader, error) `perm:"read"`
|
ChainGetBlock func(context.Context, cid.Cid) (*types.BlockHeader, error) `perm:"read"`
|
||||||
@ -51,7 +50,8 @@ type FullNodeStruct struct {
|
|||||||
ChainSetHead func(context.Context, *types.TipSet) error `perm:"admin"`
|
ChainSetHead func(context.Context, *types.TipSet) error `perm:"admin"`
|
||||||
ChainGetGenesis func(context.Context) (*types.TipSet, error) `perm:"read"`
|
ChainGetGenesis func(context.Context) (*types.TipSet, error) `perm:"read"`
|
||||||
|
|
||||||
SyncState func(context.Context) (*SyncState, error) `perm:"read"`
|
SyncState func(context.Context) (*SyncState, error) `perm:"read"`
|
||||||
|
SyncSubmitBlock func(ctx context.Context, blk *types.BlockMsg) error `perm:"write"`
|
||||||
|
|
||||||
MpoolPending func(context.Context, *types.TipSet) ([]*types.SignedMessage, error) `perm:"read"`
|
MpoolPending func(context.Context, *types.TipSet) ([]*types.SignedMessage, error) `perm:"read"`
|
||||||
MpoolPush func(context.Context, *types.SignedMessage) error `perm:"write"`
|
MpoolPush func(context.Context, *types.SignedMessage) error `perm:"write"`
|
||||||
@ -228,10 +228,6 @@ func (c *FullNodeStruct) MinerCreateBlock(ctx context.Context, addr address.Addr
|
|||||||
return c.Internal.MinerCreateBlock(ctx, addr, base, tickets, eproof, msgs, ts)
|
return c.Internal.MinerCreateBlock(ctx, addr, base, tickets, eproof, msgs, ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *FullNodeStruct) ChainSubmitBlock(ctx context.Context, blk *types.BlockMsg) error {
|
|
||||||
return c.Internal.ChainSubmitBlock(ctx, blk)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *FullNodeStruct) ChainHead(ctx context.Context) (*types.TipSet, error) {
|
func (c *FullNodeStruct) ChainHead(ctx context.Context) (*types.TipSet, error) {
|
||||||
return c.Internal.ChainHead(ctx)
|
return c.Internal.ChainHead(ctx)
|
||||||
}
|
}
|
||||||
@ -324,6 +320,10 @@ func (c *FullNodeStruct) SyncState(ctx context.Context) (*SyncState, error) {
|
|||||||
return c.Internal.SyncState(ctx)
|
return c.Internal.SyncState(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *FullNodeStruct) SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) error {
|
||||||
|
return c.Internal.SyncSubmitBlock(ctx, blk)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *FullNodeStruct) StateMinerSectors(ctx context.Context, addr address.Address) ([]*SectorInfo, error) {
|
func (c *FullNodeStruct) StateMinerSectors(ctx context.Context, addr address.Address) ([]*SectorInfo, error) {
|
||||||
return c.Internal.StateMinerSectors(ctx, addr)
|
return c.Internal.StateMinerSectors(ctx, addr)
|
||||||
}
|
}
|
||||||
|
@ -94,7 +94,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, b := range fts.Blocks {
|
for _, b := range fts.Blocks {
|
||||||
if err := syncer.validateMsgMeta(b); err != nil {
|
if err := syncer.ValidateMsgMeta(b); err != nil {
|
||||||
log.Warnf("invalid block received: %s", err)
|
log.Warnf("invalid block received: %s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -122,7 +122,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (syncer *Syncer) validateMsgMeta(fblk *types.FullBlock) error {
|
func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error {
|
||||||
var bcids, scids []cbg.CBORMarshaler
|
var bcids, scids []cbg.CBORMarshaler
|
||||||
for _, m := range fblk.BlsMessages {
|
for _, m := range fblk.BlsMessages {
|
||||||
c := cbg.CborCid(m.Cid())
|
c := cbg.CborCid(m.Cid())
|
||||||
@ -147,6 +147,14 @@ func (syncer *Syncer) validateMsgMeta(fblk *types.FullBlock) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (syncer *Syncer) LocalPeer() peer.ID {
|
||||||
|
return syncer.self
|
||||||
|
}
|
||||||
|
|
||||||
|
func (syncer *Syncer) ChainStore() *store.ChainStore {
|
||||||
|
return syncer.store
|
||||||
|
}
|
||||||
|
|
||||||
func (syncer *Syncer) InformNewBlock(from peer.ID, blk *types.FullBlock) {
|
func (syncer *Syncer) InformNewBlock(from peer.ID, blk *types.FullBlock) {
|
||||||
// TODO: search for other blocks that could form a tipset with this block
|
// TODO: search for other blocks that could form a tipset with this block
|
||||||
// and then send that tipset to InformNewHead
|
// and then send that tipset to InformNewHead
|
||||||
|
@ -32,9 +32,6 @@ func (tu *syncTestUtil) repoWithChain(t testing.TB, h int) (repo.Repo, []byte, [
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
blks[i] = mts.TipSet
|
blks[i] = mts.TipSet
|
||||||
|
|
||||||
ts := mts.TipSet.TipSet()
|
|
||||||
fmt.Printf("tipset at H:%d: %s\n", ts.Height(), ts.Cids())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
r, err := tu.g.YieldRepo()
|
r, err := tu.g.YieldRepo()
|
||||||
@ -94,8 +91,38 @@ func (tu *syncTestUtil) Shutdown() {
|
|||||||
tu.cancel()
|
tu.cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (tu *syncTestUtil) printHeads() {
|
||||||
|
for i, n := range tu.nds {
|
||||||
|
head, err := n.ChainHead(tu.ctx)
|
||||||
|
if err != nil {
|
||||||
|
tu.t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("Node %d: %s\n", i, head.Cids())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (tu *syncTestUtil) pushFtsAndWait(to int, fts *store.FullTipSet, wait bool) {
|
func (tu *syncTestUtil) pushFtsAndWait(to int, fts *store.FullTipSet, wait bool) {
|
||||||
// TODO: would be great if we could pass a whole tipset here...
|
// TODO: would be great if we could pass a whole tipset here...
|
||||||
|
tu.pushTsExpectErr(to, fts, false)
|
||||||
|
|
||||||
|
if wait {
|
||||||
|
start := time.Now()
|
||||||
|
h, err := tu.nds[to].ChainHead(tu.ctx)
|
||||||
|
require.NoError(tu.t, err)
|
||||||
|
for !h.Equals(fts.TipSet()) {
|
||||||
|
time.Sleep(time.Millisecond * 50)
|
||||||
|
h, err = tu.nds[to].ChainHead(tu.ctx)
|
||||||
|
require.NoError(tu.t, err)
|
||||||
|
|
||||||
|
if time.Since(start) > time.Second*10 {
|
||||||
|
tu.t.Fatal("took too long waiting for block to be accepted")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tu *syncTestUtil) pushTsExpectErr(to int, fts *store.FullTipSet, experr bool) {
|
||||||
for _, fb := range fts.Blocks {
|
for _, fb := range fts.Blocks {
|
||||||
var b types.BlockMsg
|
var b types.BlockMsg
|
||||||
|
|
||||||
@ -115,27 +142,17 @@ func (tu *syncTestUtil) pushFtsAndWait(to int, fts *store.FullTipSet, wait bool)
|
|||||||
b.BlsMessages = append(b.BlsMessages, c)
|
b.BlsMessages = append(b.BlsMessages, c)
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(tu.t, tu.nds[to].ChainSubmitBlock(tu.ctx, &b))
|
err := tu.nds[to].SyncSubmitBlock(tu.ctx, &b)
|
||||||
|
if experr {
|
||||||
}
|
require.Error(tu.t, err, "expected submit block to fail")
|
||||||
|
} else {
|
||||||
if wait {
|
|
||||||
start := time.Now()
|
|
||||||
h, err := tu.nds[to].ChainHead(tu.ctx)
|
|
||||||
require.NoError(tu.t, err)
|
|
||||||
for !h.Equals(fts.TipSet()) {
|
|
||||||
time.Sleep(time.Millisecond * 50)
|
|
||||||
h, err = tu.nds[to].ChainHead(tu.ctx)
|
|
||||||
require.NoError(tu.t, err)
|
require.NoError(tu.t, err)
|
||||||
|
|
||||||
if time.Since(start) > time.Second*10 {
|
|
||||||
tu.t.Fatal("took too long waiting for block to be accepted")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tu *syncTestUtil) mineOnBlock(blk *store.FullTipSet, src int, miners []int, wait bool) *store.FullTipSet {
|
func (tu *syncTestUtil) mineOnBlock(blk *store.FullTipSet, src int, miners []int, wait, fail bool) *store.FullTipSet {
|
||||||
if miners == nil {
|
if miners == nil {
|
||||||
for i := range tu.g.Miners {
|
for i := range tu.g.Miners {
|
||||||
miners = append(miners, i)
|
miners = append(miners, i)
|
||||||
@ -152,13 +169,17 @@ func (tu *syncTestUtil) mineOnBlock(blk *store.FullTipSet, src int, miners []int
|
|||||||
mts, err := tu.g.NextTipSetFromMiners(blk.TipSet(), maddrs)
|
mts, err := tu.g.NextTipSetFromMiners(blk.TipSet(), maddrs)
|
||||||
require.NoError(tu.t, err)
|
require.NoError(tu.t, err)
|
||||||
|
|
||||||
tu.pushFtsAndWait(src, mts.TipSet, wait)
|
if fail {
|
||||||
|
tu.pushTsExpectErr(src, mts.TipSet, true)
|
||||||
|
} else {
|
||||||
|
tu.pushFtsAndWait(src, mts.TipSet, wait)
|
||||||
|
}
|
||||||
|
|
||||||
return mts.TipSet
|
return mts.TipSet
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tu *syncTestUtil) mineNewBlock(src int, miners []int) {
|
func (tu *syncTestUtil) mineNewBlock(src int, miners []int) {
|
||||||
mts := tu.mineOnBlock(tu.g.CurTipset, src, miners, true)
|
mts := tu.mineOnBlock(tu.g.CurTipset, src, miners, true, false)
|
||||||
tu.g.CurTipset = mts
|
tu.g.CurTipset = mts
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -197,7 +218,9 @@ func (tu *syncTestUtil) addSourceNode(gen int) {
|
|||||||
|
|
||||||
lastTs := blocks[len(blocks)-1].Blocks
|
lastTs := blocks[len(blocks)-1].Blocks
|
||||||
for _, lastB := range lastTs {
|
for _, lastB := range lastTs {
|
||||||
err = out.(*impl.FullNodeAPI).ChainAPI.Chain.AddBlock(lastB.Header)
|
cs := out.(*impl.FullNodeAPI).ChainAPI.Chain
|
||||||
|
require.NoError(tu.t, cs.AddToTipSetTracker(lastB.Header))
|
||||||
|
err = cs.AddBlock(lastB.Header)
|
||||||
require.NoError(tu.t, err)
|
require.NoError(tu.t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -368,12 +391,17 @@ func TestSyncBadTimestamp(t *testing.T) {
|
|||||||
return pts.MinTimestamp() + 2
|
return pts.MinTimestamp() + 2
|
||||||
}
|
}
|
||||||
|
|
||||||
a1 := tu.mineOnBlock(base, 0, nil, false)
|
fmt.Println("BASE: ", base.Cids())
|
||||||
|
tu.printHeads()
|
||||||
|
|
||||||
|
a1 := tu.mineOnBlock(base, 0, nil, false, true)
|
||||||
|
|
||||||
tu.g.Timestamper = nil
|
tu.g.Timestamper = nil
|
||||||
tu.g.ResyncBankerNonce(a1.TipSet())
|
tu.g.ResyncBankerNonce(a1.TipSet())
|
||||||
|
|
||||||
a2 := tu.mineOnBlock(base, 0, nil, true)
|
fmt.Println("After mine bad block!")
|
||||||
|
tu.printHeads()
|
||||||
|
a2 := tu.mineOnBlock(base, 0, nil, true, false)
|
||||||
|
|
||||||
tu.waitUntilSync(0, client)
|
tu.waitUntilSync(0, client)
|
||||||
|
|
||||||
@ -427,16 +455,16 @@ func TestSyncFork(t *testing.T) {
|
|||||||
fmt.Println("Mining base: ", base.TipSet().Cids(), base.TipSet().Height())
|
fmt.Println("Mining base: ", base.TipSet().Cids(), base.TipSet().Height())
|
||||||
|
|
||||||
// The two nodes fork at this point into 'a' and 'b'
|
// The two nodes fork at this point into 'a' and 'b'
|
||||||
a1 := tu.mineOnBlock(base, p1, []int{0}, true)
|
a1 := tu.mineOnBlock(base, p1, []int{0}, true, false)
|
||||||
a := tu.mineOnBlock(a1, p1, []int{0}, true)
|
a := tu.mineOnBlock(a1, p1, []int{0}, true, false)
|
||||||
a = tu.mineOnBlock(a, p1, []int{0}, true)
|
a = tu.mineOnBlock(a, p1, []int{0}, true, false)
|
||||||
|
|
||||||
tu.g.ResyncBankerNonce(a1.TipSet())
|
tu.g.ResyncBankerNonce(a1.TipSet())
|
||||||
// chain B will now be heaviest
|
// chain B will now be heaviest
|
||||||
b := tu.mineOnBlock(base, p2, []int{1}, true)
|
b := tu.mineOnBlock(base, p2, []int{1}, true, false)
|
||||||
b = tu.mineOnBlock(b, p2, []int{1}, true)
|
b = tu.mineOnBlock(b, p2, []int{1}, true, false)
|
||||||
b = tu.mineOnBlock(b, p2, []int{1}, true)
|
b = tu.mineOnBlock(b, p2, []int{1}, true, false)
|
||||||
b = tu.mineOnBlock(b, p2, []int{1}, true)
|
b = tu.mineOnBlock(b, p2, []int{1}, true, false)
|
||||||
|
|
||||||
fmt.Println("A: ", a.Cids(), a.TipSet().Height())
|
fmt.Println("A: ", a.Cids(), a.TipSet().Height())
|
||||||
fmt.Println("B: ", b.Cids(), b.TipSet().Height())
|
fmt.Println("B: ", b.Cids(), b.TipSet().Height())
|
||||||
|
@ -27,6 +27,7 @@ type api struct {
|
|||||||
fx.In
|
fx.In
|
||||||
|
|
||||||
full.ChainAPI
|
full.ChainAPI
|
||||||
|
full.SyncAPI
|
||||||
full.MpoolAPI
|
full.MpoolAPI
|
||||||
full.WalletAPI
|
full.WalletAPI
|
||||||
full.StateAPI
|
full.StateAPI
|
||||||
@ -183,7 +184,7 @@ func (m *Miner) mine(ctx context.Context) {
|
|||||||
"time", time.Now(), "duration", time.Now().Sub(btime))
|
"time", time.Now(), "duration", time.Now().Sub(btime))
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := m.api.ChainSubmitBlock(ctx, b); err != nil {
|
if err := m.api.SyncSubmitBlock(ctx, b); err != nil {
|
||||||
log.Errorf("failed to submit newly mined block: %s", err)
|
log.Errorf("failed to submit newly mined block: %s", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -9,7 +9,6 @@ import (
|
|||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -18,26 +17,13 @@ type ChainAPI struct {
|
|||||||
|
|
||||||
WalletAPI
|
WalletAPI
|
||||||
|
|
||||||
Chain *store.ChainStore
|
Chain *store.ChainStore
|
||||||
PubSub *pubsub.PubSub
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *ChainAPI) ChainNotify(ctx context.Context) (<-chan []*store.HeadChange, error) {
|
func (a *ChainAPI) ChainNotify(ctx context.Context) (<-chan []*store.HeadChange, error) {
|
||||||
return a.Chain.SubHeadChanges(ctx), nil
|
return a.Chain.SubHeadChanges(ctx), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *ChainAPI) ChainSubmitBlock(ctx context.Context, blk *types.BlockMsg) error {
|
|
||||||
// TODO: should we have some sort of fast path to adding a local block?
|
|
||||||
|
|
||||||
b, err := blk.Serialize()
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("serializing block for pubsub publishing failed: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: anything else to do here?
|
|
||||||
return a.PubSub.Publish("/fil/blocks", b)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *ChainAPI) ChainHead(context.Context) (*types.TipSet, error) {
|
func (a *ChainAPI) ChainHead(context.Context) (*types.TipSet, error) {
|
||||||
return a.Chain.GetHeaviestTipSet(), nil
|
return a.Chain.GetHeaviestTipSet(), nil
|
||||||
}
|
}
|
||||||
|
@ -5,13 +5,18 @@ import (
|
|||||||
|
|
||||||
"github.com/filecoin-project/go-lotus/api"
|
"github.com/filecoin-project/go-lotus/api"
|
||||||
"github.com/filecoin-project/go-lotus/chain"
|
"github.com/filecoin-project/go-lotus/chain"
|
||||||
|
"github.com/filecoin-project/go-lotus/chain/types"
|
||||||
|
|
||||||
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
)
|
)
|
||||||
|
|
||||||
type SyncAPI struct {
|
type SyncAPI struct {
|
||||||
fx.In
|
fx.In
|
||||||
|
|
||||||
Syncer *chain.Syncer
|
Syncer *chain.Syncer
|
||||||
|
PubSub *pubsub.PubSub
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *SyncAPI) SyncState(ctx context.Context) (*api.SyncState, error) {
|
func (a *SyncAPI) SyncState(ctx context.Context) (*api.SyncState, error) {
|
||||||
@ -23,3 +28,42 @@ func (a *SyncAPI) SyncState(ctx context.Context) (*api.SyncState, error) {
|
|||||||
Height: ss.Height,
|
Height: ss.Height,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *SyncAPI) SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) error {
|
||||||
|
// TODO: should we have some sort of fast path to adding a local block?
|
||||||
|
bmsgs, err := a.Syncer.ChainStore().LoadMessagesFromCids(blk.BlsMessages)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("failed to load bls messages: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
smsgs, err := a.Syncer.ChainStore().LoadSignedMessagesFromCids(blk.SecpkMessages)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("failed to load secpk message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fb := &types.FullBlock{
|
||||||
|
Header: blk.Header,
|
||||||
|
BlsMessages: bmsgs,
|
||||||
|
SecpkMessages: smsgs,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := a.Syncer.ValidateMsgMeta(fb); err != nil {
|
||||||
|
xerrors.Errorf("provided messages did not match block: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ts, err := types.NewTipSet([]*types.BlockHeader{blk.Header})
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("somehow failed to make a tipset out of a single block: %w", err)
|
||||||
|
}
|
||||||
|
if err := a.Syncer.Sync(ctx, ts); err != nil {
|
||||||
|
return xerrors.Errorf("sync to submitted block failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err := blk.Serialize()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("serializing block for pubsub publishing failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: anything else to do here?
|
||||||
|
return a.PubSub.Publish("/fil/blocks", b)
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user