api.SyncIncomingBlocks

This commit is contained in:
Łukasz Magiera 2019-11-18 22:39:07 +01:00
parent 333847f7c9
commit d3b980ef58
7 changed files with 54 additions and 12 deletions

View File

@ -38,6 +38,7 @@ type FullNode interface {
// syncer // syncer
SyncState(context.Context) (*SyncState, error) SyncState(context.Context) (*SyncState, error)
SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) error SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) error
SyncIncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error)
// messages // messages
MpoolPending(context.Context, *types.TipSet) ([]*types.SignedMessage, error) MpoolPending(context.Context, *types.TipSet) ([]*types.SignedMessage, error)
@ -286,6 +287,6 @@ const (
) )
type MpoolUpdate struct { type MpoolUpdate struct {
Type MpoolChange Type MpoolChange
Message *types.SignedMessage Message *types.SignedMessage
} }

View File

@ -51,8 +51,9 @@ type FullNodeStruct struct {
ChainGetGenesis func(context.Context) (*types.TipSet, error) `perm:"read"` ChainGetGenesis func(context.Context) (*types.TipSet, error) `perm:"read"`
ChainTipSetWeight func(context.Context, *types.TipSet) (types.BigInt, error) `perm:"read"` ChainTipSetWeight func(context.Context, *types.TipSet) (types.BigInt, error) `perm:"read"`
SyncState func(context.Context) (*SyncState, error) `perm:"read"` SyncState func(context.Context) (*SyncState, error) `perm:"read"`
SyncSubmitBlock func(ctx context.Context, blk *types.BlockMsg) error `perm:"write"` SyncSubmitBlock func(ctx context.Context, blk *types.BlockMsg) error `perm:"write"`
SyncIncomingBlocks func(ctx context.Context) (<-chan *types.BlockHeader, error) `perm:"read"`
MpoolPending func(context.Context, *types.TipSet) ([]*types.SignedMessage, error) `perm:"read"` MpoolPending func(context.Context, *types.TipSet) ([]*types.SignedMessage, error) `perm:"read"`
MpoolPush func(context.Context, *types.SignedMessage) error `perm:"write"` MpoolPush func(context.Context, *types.SignedMessage) error `perm:"write"`
@ -351,6 +352,10 @@ func (c *FullNodeStruct) SyncSubmitBlock(ctx context.Context, blk *types.BlockMs
return c.Internal.SyncSubmitBlock(ctx, blk) return c.Internal.SyncSubmitBlock(ctx, blk)
} }
func (c *FullNodeStruct) SyncIncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) {
return c.Internal.SyncIncomingBlocks(ctx)
}
func (c *FullNodeStruct) StateMinerSectors(ctx context.Context, addr address.Address, ts *types.TipSet) ([]*ChainSectorInfo, error) { func (c *FullNodeStruct) StateMinerSectors(ctx context.Context, addr address.Address, ts *types.TipSet) ([]*ChainSectorInfo, error) {
return c.Internal.StateMinerSectors(ctx, addr, ts) return c.Internal.StateMinerSectors(ctx, addr, ts)
} }

View File

@ -101,7 +101,7 @@ func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub) *MessagePool {
minGasPrice: types.NewInt(0), minGasPrice: types.NewInt(0),
maxTxPoolSize: 100000, maxTxPoolSize: 100000,
blsSigCache: cache, blsSigCache: cache,
changes: lps.New(50), changes: lps.New(50),
} }
sm.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error { sm.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
err := mp.HeadChange(rev, app) err := mp.HeadChange(rev, app)

View File

@ -19,6 +19,7 @@ import (
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen" cbg "github.com/whyrusleeping/cbor-gen"
"github.com/whyrusleeping/pubsub"
"go.opencensus.io/trace" "go.opencensus.io/trace"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -35,6 +36,8 @@ import (
var log = logging.Logger("chain") var log = logging.Logger("chain")
var localIncoming = "incoming"
type Syncer struct { type Syncer struct {
// The heaviest known tipset in the network. // The heaviest known tipset in the network.
@ -58,6 +61,8 @@ type Syncer struct {
syncLock sync.Mutex syncLock sync.Mutex
syncmgr *SyncManager syncmgr *SyncManager
incoming *pubsub.PubSub
} }
func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID) (*Syncer, error) { func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID) (*Syncer, error) {
@ -78,6 +83,8 @@ func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID)
store: sm.ChainStore(), store: sm.ChainStore(),
sm: sm, sm: sm,
self: self, self: self,
incoming: pubsub.New(50),
} }
s.syncmgr = NewSyncManager(s.Sync) s.syncmgr = NewSyncManager(s.Sync)
@ -109,6 +116,8 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
} }
} }
syncer.incoming.Pub(fts.TipSet().Blocks(), localIncoming)
if from == syncer.self { if from == syncer.self {
// TODO: this is kindof a hack... // TODO: this is kindof a hack...
log.Debug("got block from ourselves") log.Debug("got block from ourselves")
@ -139,6 +148,31 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
syncer.syncmgr.SetPeerHead(ctx, from, fts.TipSet()) syncer.syncmgr.SetPeerHead(ctx, from, fts.TipSet())
} }
func (syncer *Syncer) IncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) {
sub := syncer.incoming.Sub(localIncoming)
out := make(chan *types.BlockHeader, 10)
go func() {
for {
select {
case r := <-sub:
hs := r.([]*types.BlockHeader)
for _, h := range hs {
select {
case out <- h:
case <-ctx.Done():
return
}
}
case <-ctx.Done():
return
}
}
}()
return out, nil
}
func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error { func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error {
var bcids, scids []cbg.CBORMarshaler var bcids, scids []cbg.CBORMarshaler
for _, m := range fblk.BlsMessages { for _, m := range fblk.BlsMessages {

View File

@ -29,12 +29,12 @@ var dotCmd = &cli.Command{
fmt.Println("digraph D {") fmt.Println("digraph D {")
for res.Next() { for res.Next() {
var block,parent,miner string var block, parent, miner string
if err := res.Scan(&block, &parent, &miner); err != nil { if err := res.Scan(&block, &parent, &miner); err != nil {
return err return err
} }
col := crc32.Checksum([]byte(miner), crc32.MakeTable(crc32.Castagnoli)) & 0x80808080 + 0x70707070 col := crc32.Checksum([]byte(miner), crc32.MakeTable(crc32.Castagnoli))&0x80808080 + 0x70707070
fmt.Printf("%s [label = \"%s\", fillcolor = \"#%06x\", style=filled]\n%s -> %s\n", block, miner, col, block, parent) fmt.Printf("%s [label = \"%s\", fillcolor = \"#%06x\", style=filled]\n%s -> %s\n", block, miner, col, block, parent)
} }
@ -46,4 +46,4 @@ var dotCmd = &cli.Command{
return nil return nil
}, },
} }

View File

@ -44,7 +44,7 @@ func newHandler(api api.FullNode, st *storage) (*handler, error) {
"strings": h.strings, "strings": h.strings,
"messages": h.messages, "messages": h.messages,
"param": func(string) string { return "" }, // replaced in request handler "param": func(string) string { return "" }, // replaced in request handler
} }
base := template.New("") base := template.New("")
@ -232,7 +232,7 @@ func (h *handler) messages(filter string, args ...interface{}) (out []types.Mess
&r.GasLimit, &r.GasLimit,
&r.Method, &r.Method,
&r.Params, &r.Params,
); err != nil { ); err != nil {
return nil, err return nil, err
} }
@ -250,6 +250,4 @@ func (h *handler) messages(filter string, args ...interface{}) (out []types.Mess
return return
} }
var _ http.Handler = &handler{} var _ http.Handler = &handler{}

View File

@ -54,7 +54,7 @@ func (a *SyncAPI) SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) erro
} }
if err := a.Syncer.ValidateMsgMeta(fb); err != nil { if err := a.Syncer.ValidateMsgMeta(fb); err != nil {
xerrors.Errorf("provided messages did not match block: %w", err) return xerrors.Errorf("provided messages did not match block: %w", err)
} }
ts, err := types.NewTipSet([]*types.BlockHeader{blk.Header}) ts, err := types.NewTipSet([]*types.BlockHeader{blk.Header})
@ -73,3 +73,7 @@ func (a *SyncAPI) SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) erro
// TODO: anything else to do here? // TODO: anything else to do here?
return a.PubSub.Publish("/fil/blocks", b) return a.PubSub.Publish("/fil/blocks", b)
} }
func (a *SyncAPI) SyncIncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) {
return a.Syncer.IncomingBlocks(ctx)
}