WIP: ensure blocks make it into repo before pushing to pubsub

This commit is contained in:
whyrusleeping 2019-10-14 23:21:37 +09:00
parent f9dde3d10a
commit 7c26e3c35f
7 changed files with 53 additions and 27 deletions

View File

@ -50,8 +50,7 @@ type FullNode interface {
// ChainNotify returns channel with chain head updates // ChainNotify returns channel with chain head updates
// First message is guaranteed to be of len == 1, and type == 'current' // First message is guaranteed to be of len == 1, and type == 'current'
ChainNotify(context.Context) (<-chan []*store.HeadChange, error) ChainNotify(context.Context) (<-chan []*store.HeadChange, error)
ChainHead(context.Context) (*types.TipSet, error) // TODO: check serialization ChainHead(context.Context) (*types.TipSet, error)
ChainSubmitBlock(ctx context.Context, blk *types.BlockMsg) error // TODO: check serialization
ChainGetRandomness(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error) ChainGetRandomness(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error)
ChainGetBlock(context.Context, cid.Cid) (*types.BlockHeader, error) ChainGetBlock(context.Context, cid.Cid) (*types.BlockHeader, error)
ChainGetTipSet(context.Context, []cid.Cid) (*types.TipSet, error) ChainGetTipSet(context.Context, []cid.Cid) (*types.TipSet, error)
@ -65,9 +64,9 @@ type FullNode interface {
// syncer // syncer
SyncState(context.Context) (*SyncState, error) SyncState(context.Context) (*SyncState, error)
SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) error
// messages // messages
MpoolPending(context.Context, *types.TipSet) ([]*types.SignedMessage, error) MpoolPending(context.Context, *types.TipSet) ([]*types.SignedMessage, error)
MpoolPush(context.Context, *types.SignedMessage) error // TODO: remove MpoolPush(context.Context, *types.SignedMessage) error // TODO: remove
MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error) // get nonce, sign, push MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error) // get nonce, sign, push

View File

@ -38,7 +38,6 @@ type FullNodeStruct struct {
Internal struct { Internal struct {
ChainNotify func(context.Context) (<-chan []*store.HeadChange, error) `perm:"read"` ChainNotify func(context.Context) (<-chan []*store.HeadChange, error) `perm:"read"`
ChainSubmitBlock func(ctx context.Context, blk *types.BlockMsg) error `perm:"write"`
ChainHead func(context.Context) (*types.TipSet, error) `perm:"read"` ChainHead func(context.Context) (*types.TipSet, error) `perm:"read"`
ChainGetRandomness func(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error) `perm:"read"` ChainGetRandomness func(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error) `perm:"read"`
ChainGetBlock func(context.Context, cid.Cid) (*types.BlockHeader, error) `perm:"read"` ChainGetBlock func(context.Context, cid.Cid) (*types.BlockHeader, error) `perm:"read"`
@ -51,7 +50,8 @@ type FullNodeStruct struct {
ChainSetHead func(context.Context, *types.TipSet) error `perm:"admin"` ChainSetHead func(context.Context, *types.TipSet) error `perm:"admin"`
ChainGetGenesis func(context.Context) (*types.TipSet, error) `perm:"read"` ChainGetGenesis func(context.Context) (*types.TipSet, error) `perm:"read"`
SyncState func(context.Context) (*SyncState, error) `perm:"read"` SyncState func(context.Context) (*SyncState, error) `perm:"read"`
SyncSubmitBlock func(ctx context.Context, blk *types.BlockMsg) error `perm:"write"`
MpoolPending func(context.Context, *types.TipSet) ([]*types.SignedMessage, error) `perm:"read"` MpoolPending func(context.Context, *types.TipSet) ([]*types.SignedMessage, error) `perm:"read"`
MpoolPush func(context.Context, *types.SignedMessage) error `perm:"write"` MpoolPush func(context.Context, *types.SignedMessage) error `perm:"write"`
@ -228,10 +228,6 @@ func (c *FullNodeStruct) MinerCreateBlock(ctx context.Context, addr address.Addr
return c.Internal.MinerCreateBlock(ctx, addr, base, tickets, eproof, msgs, ts) return c.Internal.MinerCreateBlock(ctx, addr, base, tickets, eproof, msgs, ts)
} }
func (c *FullNodeStruct) ChainSubmitBlock(ctx context.Context, blk *types.BlockMsg) error {
return c.Internal.ChainSubmitBlock(ctx, blk)
}
func (c *FullNodeStruct) ChainHead(ctx context.Context) (*types.TipSet, error) { func (c *FullNodeStruct) ChainHead(ctx context.Context) (*types.TipSet, error) {
return c.Internal.ChainHead(ctx) return c.Internal.ChainHead(ctx)
} }
@ -324,6 +320,10 @@ func (c *FullNodeStruct) SyncState(ctx context.Context) (*SyncState, error) {
return c.Internal.SyncState(ctx) return c.Internal.SyncState(ctx)
} }
func (c *FullNodeStruct) SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) error {
return c.Internal.SyncSubmitBlock(ctx, blk)
}
func (c *FullNodeStruct) StateMinerSectors(ctx context.Context, addr address.Address) ([]*SectorInfo, error) { func (c *FullNodeStruct) StateMinerSectors(ctx context.Context, addr address.Address) ([]*SectorInfo, error) {
return c.Internal.StateMinerSectors(ctx, addr) return c.Internal.StateMinerSectors(ctx, addr)
} }

View File

@ -147,6 +147,14 @@ func (syncer *Syncer) validateMsgMeta(fblk *types.FullBlock) error {
return nil return nil
} }
func (syncer *Syncer) LocalPeer() peer.ID {
return syncer.self
}
func (syncer *Syncer) ChainStore() *store.ChainStore {
return syncer.store
}
func (syncer *Syncer) InformNewBlock(from peer.ID, blk *types.FullBlock) { func (syncer *Syncer) InformNewBlock(from peer.ID, blk *types.FullBlock) {
// TODO: search for other blocks that could form a tipset with this block // TODO: search for other blocks that could form a tipset with this block
// and then send that tipset to InformNewHead // and then send that tipset to InformNewHead

View File

@ -115,8 +115,7 @@ func (tu *syncTestUtil) pushFtsAndWait(to int, fts *store.FullTipSet, wait bool)
b.BlsMessages = append(b.BlsMessages, c) b.BlsMessages = append(b.BlsMessages, c)
} }
require.NoError(tu.t, tu.nds[to].ChainSubmitBlock(tu.ctx, &b)) require.NoError(tu.t, tu.nds[to].SyncSubmitBlock(tu.ctx, &b))
} }
if wait { if wait {

View File

@ -27,6 +27,7 @@ type api struct {
fx.In fx.In
full.ChainAPI full.ChainAPI
full.SyncAPI
full.MpoolAPI full.MpoolAPI
full.WalletAPI full.WalletAPI
full.StateAPI full.StateAPI
@ -183,7 +184,7 @@ func (m *Miner) mine(ctx context.Context) {
"time", time.Now(), "duration", time.Now().Sub(btime)) "time", time.Now(), "duration", time.Now().Sub(btime))
} }
if err := m.api.ChainSubmitBlock(ctx, b); err != nil { if err := m.api.SyncSubmitBlock(ctx, b); err != nil {
log.Errorf("failed to submit newly mined block: %s", err) log.Errorf("failed to submit newly mined block: %s", err)
} }
} else { } else {

View File

@ -9,7 +9,6 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx" "go.uber.org/fx"
) )
@ -18,26 +17,13 @@ type ChainAPI struct {
WalletAPI WalletAPI
Chain *store.ChainStore Chain *store.ChainStore
PubSub *pubsub.PubSub
} }
func (a *ChainAPI) ChainNotify(ctx context.Context) (<-chan []*store.HeadChange, error) { func (a *ChainAPI) ChainNotify(ctx context.Context) (<-chan []*store.HeadChange, error) {
return a.Chain.SubHeadChanges(ctx), nil return a.Chain.SubHeadChanges(ctx), nil
} }
func (a *ChainAPI) ChainSubmitBlock(ctx context.Context, blk *types.BlockMsg) error {
// TODO: should we have some sort of fast path to adding a local block?
b, err := blk.Serialize()
if err != nil {
return xerrors.Errorf("serializing block for pubsub publishing failed: %w", err)
}
// TODO: anything else to do here?
return a.PubSub.Publish("/fil/blocks", b)
}
func (a *ChainAPI) ChainHead(context.Context) (*types.TipSet, error) { func (a *ChainAPI) ChainHead(context.Context) (*types.TipSet, error) {
return a.Chain.GetHeaviestTipSet(), nil return a.Chain.GetHeaviestTipSet(), nil
} }

View File

@ -5,13 +5,18 @@ import (
"github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain" "github.com/filecoin-project/go-lotus/chain"
"github.com/filecoin-project/go-lotus/chain/types"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx" "go.uber.org/fx"
"golang.org/x/xerrors"
) )
type SyncAPI struct { type SyncAPI struct {
fx.In fx.In
Syncer *chain.Syncer Syncer *chain.Syncer
PubSub *pubsub.PubSub
} }
func (a *SyncAPI) SyncState(ctx context.Context) (*api.SyncState, error) { func (a *SyncAPI) SyncState(ctx context.Context) (*api.SyncState, error) {
@ -23,3 +28,31 @@ func (a *SyncAPI) SyncState(ctx context.Context) (*api.SyncState, error) {
Height: ss.Height, Height: ss.Height,
}, nil }, nil
} }
func (a *SyncAPI) SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) error {
// TODO: should we have some sort of fast path to adding a local block?
bmsgs, err := a.Syncer.ChainStore().LoadMessagesFromCids(blk.BlsMessages)
if err != nil {
return xerrors.Errorf("failed to load bls messages: %w", err)
}
smsgs, err := a.Syncer.ChainStore().LoadSignedMessagesFromCids(blk.SecpkMessages)
if err != nil {
return xerrors.Errorf("failed to load secpk message: %w", err)
}
fb := &types.FullBlock{
Header: blk.Header,
BlsMessages: bmsgs,
SecpkMessages: smsgs,
}
a.Syncer.InformNewBlock(a.Syncer.LocalPeer(), fb)
b, err := blk.Serialize()
if err != nil {
return xerrors.Errorf("serializing block for pubsub publishing failed: %w", err)
}
// TODO: anything else to do here?
return a.PubSub.Publish("/fil/blocks", b)
}