expose head change notifications through api

This commit is contained in:
whyrusleeping 2019-07-28 12:19:33 -07:00
parent f712f56c6d
commit 03f653b88e
5 changed files with 24 additions and 5 deletions

View File

@ -8,6 +8,7 @@ import (
"github.com/filecoin-project/go-lotus/chain"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/ipfs/go-cid"
@ -64,6 +65,7 @@ type FullNode interface {
Common
// chain
ChainNotify(context.Context) (<-chan *store.HeadChange, error)
ChainHead(context.Context) (*types.TipSet, error) // TODO: check serialization
ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error // TODO: check serialization
ChainGetRandomness(context.Context, *types.TipSet) ([]byte, error)

View File

@ -7,6 +7,7 @@ import (
"github.com/filecoin-project/go-lotus/chain"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/ipfs/go-cid"
@ -37,6 +38,7 @@ type FullNodeStruct struct {
CommonStruct
Internal struct {
ChainNotify func(context.Context) (<-chan *store.HeadChange, error) `perm:"read"`
ChainSubmitBlock func(ctx context.Context, blk *chain.BlockMsg) error `perm:"write"`
ChainHead func(context.Context) (*types.TipSet, error) `perm:"read"`
ChainGetRandomness func(context.Context, *types.TipSet) ([]byte, error) `perm:"read"`
@ -179,6 +181,10 @@ func (c *FullNodeStruct) ChainGetBlockMessages(ctx context.Context, b cid.Cid) (
return c.Internal.ChainGetBlockMessages(ctx, b)
}
func (c *FullNodeStruct) ChainNotify(ctx context.Context) (<-chan *store.HeadChange, error) {
return c.Internal.ChainNotify(ctx)
}
var _ Common = &CommonStruct{}
var _ FullNode = &FullNodeStruct{}
var _ StorageMiner = &StorageMinerStruct{}

View File

@ -106,13 +106,21 @@ type HeadChange struct {
Val *types.TipSet
}
func (cs *ChainStore) SubHeadChanges() chan *HeadChange {
func (cs *ChainStore) SubHeadChanges(ctx context.Context) chan *HeadChange {
subch := cs.bestTips.Sub("headchange")
out := make(chan *HeadChange, 16)
go func() {
defer close(out)
for val := range subch {
out <- val.(*HeadChange)
for {
select {
case val, ok := <-subch:
if !ok {
return
}
out <- val.(*HeadChange)
case <-ctx.Done():
cs.bestTips.Unsub(subch)
}
}
}()
return out
@ -465,7 +473,7 @@ func (cs *ChainStore) GetActor(addr address.Address) (*types.Actor, error) {
}
func (cs *ChainStore) WaitForMessage(ctx context.Context, mcid cid.Cid) (cid.Cid, *types.MessageReceipt, error) {
tsub := cs.SubHeadChanges()
tsub := cs.SubHeadChanges(ctx)
head := cs.GetHeaviestTipSet()

1
go.mod
View File

@ -7,7 +7,6 @@ require (
github.com/BurntSushi/toml v0.3.1
github.com/filecoin-project/go-bls-sigs v0.0.0-20190718224239-4bc4b8a7bbf8
github.com/filecoin-project/go-leb128 v0.0.0-20190212224330-8d79a5489543
github.com/filecoin-project/go-sectorbuilder v0.0.0-20190725115349-9b090e700325
github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1
github.com/gorilla/websocket v1.4.0
github.com/ipfs/go-bitswap v0.1.5

View File

@ -31,6 +31,10 @@ type FullNodeAPI struct {
Wallet *chain.Wallet
}
func (a *FullNodeAPI) ChainNotify(ctx context.Context) (<-chan *store.HeadChange, error) {
return a.Chain.SubHeadChanges(ctx), nil
}
func (a *FullNodeAPI) ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error {
if err := a.Chain.AddBlock(blk.Header); err != nil {
return err