diff --git a/api/api_full.go b/api/api_full.go index 290b87e93..cf67d8442 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -38,6 +38,7 @@ type FullNode interface { // syncer SyncState(context.Context) (*SyncState, error) SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) error + SyncIncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) // messages MpoolPending(context.Context, *types.TipSet) ([]*types.SignedMessage, error) @@ -286,6 +287,6 @@ const ( ) type MpoolUpdate struct { - Type MpoolChange + Type MpoolChange Message *types.SignedMessage } diff --git a/api/struct.go b/api/struct.go index 5a093eb00..7ea1b5835 100644 --- a/api/struct.go +++ b/api/struct.go @@ -51,8 +51,9 @@ type FullNodeStruct struct { ChainGetGenesis func(context.Context) (*types.TipSet, error) `perm:"read"` ChainTipSetWeight func(context.Context, *types.TipSet) (types.BigInt, error) `perm:"read"` - SyncState func(context.Context) (*SyncState, error) `perm:"read"` - SyncSubmitBlock func(ctx context.Context, blk *types.BlockMsg) error `perm:"write"` + SyncState func(context.Context) (*SyncState, error) `perm:"read"` + SyncSubmitBlock func(ctx context.Context, blk *types.BlockMsg) error `perm:"write"` + SyncIncomingBlocks func(ctx context.Context) (<-chan *types.BlockHeader, error) `perm:"read"` MpoolPending func(context.Context, *types.TipSet) ([]*types.SignedMessage, error) `perm:"read"` MpoolPush func(context.Context, *types.SignedMessage) error `perm:"write"` @@ -351,6 +352,10 @@ func (c *FullNodeStruct) SyncSubmitBlock(ctx context.Context, blk *types.BlockMs return c.Internal.SyncSubmitBlock(ctx, blk) } +func (c *FullNodeStruct) SyncIncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) { + return c.Internal.SyncIncomingBlocks(ctx) +} + func (c *FullNodeStruct) StateMinerSectors(ctx context.Context, addr address.Address, ts *types.TipSet) ([]*ChainSectorInfo, error) { return c.Internal.StateMinerSectors(ctx, addr, ts) } diff --git a/chain/messagepool.go b/chain/messagepool.go index e10e35556..56de6e607 100644 --- a/chain/messagepool.go +++ b/chain/messagepool.go @@ -101,7 +101,7 @@ func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub) *MessagePool { minGasPrice: types.NewInt(0), maxTxPoolSize: 100000, blsSigCache: cache, - changes: lps.New(50), + changes: lps.New(50), } sm.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error { err := mp.HeadChange(rev, app) diff --git a/chain/sync.go b/chain/sync.go index aff1e6cee..6c3697eb6 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -19,6 +19,7 @@ import ( logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/peer" cbg "github.com/whyrusleeping/cbor-gen" + "github.com/whyrusleeping/pubsub" "go.opencensus.io/trace" "golang.org/x/xerrors" @@ -35,6 +36,8 @@ import ( var log = logging.Logger("chain") +var localIncoming = "incoming" + type Syncer struct { // The heaviest known tipset in the network. @@ -58,6 +61,8 @@ type Syncer struct { syncLock sync.Mutex syncmgr *SyncManager + + incoming *pubsub.PubSub } func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID) (*Syncer, error) { @@ -78,6 +83,8 @@ func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID) store: sm.ChainStore(), sm: sm, self: self, + + incoming: pubsub.New(50), } s.syncmgr = NewSyncManager(s.Sync) @@ -109,6 +116,8 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) { } } + syncer.incoming.Pub(fts.TipSet().Blocks(), localIncoming) + if from == syncer.self { // TODO: this is kindof a hack... log.Debug("got block from ourselves") @@ -139,6 +148,31 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) { syncer.syncmgr.SetPeerHead(ctx, from, fts.TipSet()) } +func (syncer *Syncer) IncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) { + sub := syncer.incoming.Sub(localIncoming) + out := make(chan *types.BlockHeader, 10) + + go func() { + for { + select { + case r := <-sub: + hs := r.([]*types.BlockHeader) + for _, h := range hs { + select { + case out <- h: + case <-ctx.Done(): + return + } + } + case <-ctx.Done(): + return + } + } + }() + + return out, nil +} + func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error { var bcids, scids []cbg.CBORMarshaler for _, m := range fblk.BlsMessages { diff --git a/cmd/lotus-chainwatch/dot.go b/cmd/lotus-chainwatch/dot.go index 354e4bc67..e8e60b0af 100644 --- a/cmd/lotus-chainwatch/dot.go +++ b/cmd/lotus-chainwatch/dot.go @@ -29,12 +29,12 @@ var dotCmd = &cli.Command{ fmt.Println("digraph D {") for res.Next() { - var block,parent,miner string + var block, parent, miner string if err := res.Scan(&block, &parent, &miner); err != nil { return err } - col := crc32.Checksum([]byte(miner), crc32.MakeTable(crc32.Castagnoli)) & 0x80808080 + 0x70707070 + col := crc32.Checksum([]byte(miner), crc32.MakeTable(crc32.Castagnoli))&0x80808080 + 0x70707070 fmt.Printf("%s [label = \"%s\", fillcolor = \"#%06x\", style=filled]\n%s -> %s\n", block, miner, col, block, parent) } @@ -46,4 +46,4 @@ var dotCmd = &cli.Command{ return nil }, -} \ No newline at end of file +} diff --git a/cmd/lotus-chainwatch/templates.go b/cmd/lotus-chainwatch/templates.go index 322498192..02e85d6b0 100644 --- a/cmd/lotus-chainwatch/templates.go +++ b/cmd/lotus-chainwatch/templates.go @@ -44,7 +44,7 @@ func newHandler(api api.FullNode, st *storage) (*handler, error) { "strings": h.strings, "messages": h.messages, - "param": func(string) string { return "" }, // replaced in request handler + "param": func(string) string { return "" }, // replaced in request handler } base := template.New("") @@ -232,7 +232,7 @@ func (h *handler) messages(filter string, args ...interface{}) (out []types.Mess &r.GasLimit, &r.Method, &r.Params, - ); err != nil { + ); err != nil { return nil, err } @@ -250,6 +250,4 @@ func (h *handler) messages(filter string, args ...interface{}) (out []types.Mess return } - - var _ http.Handler = &handler{} diff --git a/node/impl/full/sync.go b/node/impl/full/sync.go index b06d51da4..c84957ad7 100644 --- a/node/impl/full/sync.go +++ b/node/impl/full/sync.go @@ -54,7 +54,7 @@ func (a *SyncAPI) SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) erro } if err := a.Syncer.ValidateMsgMeta(fb); err != nil { - xerrors.Errorf("provided messages did not match block: %w", err) + return xerrors.Errorf("provided messages did not match block: %w", err) } ts, err := types.NewTipSet([]*types.BlockHeader{blk.Header}) @@ -73,3 +73,7 @@ func (a *SyncAPI) SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) erro // TODO: anything else to do here? return a.PubSub.Publish("/fil/blocks", b) } + +func (a *SyncAPI) SyncIncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) { + return a.Syncer.IncomingBlocks(ctx) +}