Merge pull request #104 from filecoin-project/feat/chain-notifs

expose head change notifications through api
This commit is contained in:
Whyrusleeping 2019-07-29 11:36:04 -07:00 committed by GitHub
commit 54c78877bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 24 additions and 5 deletions

View File

@ -8,6 +8,7 @@ import (
"github.com/filecoin-project/go-lotus/chain" "github.com/filecoin-project/go-lotus/chain"
"github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/chain/types"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
@ -64,6 +65,7 @@ type FullNode interface {
Common Common
// chain // chain
ChainNotify(context.Context) (<-chan *store.HeadChange, error)
ChainHead(context.Context) (*types.TipSet, error) // TODO: check serialization ChainHead(context.Context) (*types.TipSet, error) // TODO: check serialization
ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error // TODO: check serialization ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error // TODO: check serialization
ChainGetRandomness(context.Context, *types.TipSet) ([]byte, error) ChainGetRandomness(context.Context, *types.TipSet) ([]byte, error)

View File

@ -7,6 +7,7 @@ import (
"github.com/filecoin-project/go-lotus/chain" "github.com/filecoin-project/go-lotus/chain"
"github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/chain/types"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
@ -37,6 +38,7 @@ type FullNodeStruct struct {
CommonStruct CommonStruct
Internal struct { Internal struct {
ChainNotify func(context.Context) (<-chan *store.HeadChange, error) `perm:"read"`
ChainSubmitBlock func(ctx context.Context, blk *chain.BlockMsg) error `perm:"write"` ChainSubmitBlock func(ctx context.Context, blk *chain.BlockMsg) error `perm:"write"`
ChainHead func(context.Context) (*types.TipSet, error) `perm:"read"` ChainHead func(context.Context) (*types.TipSet, error) `perm:"read"`
ChainGetRandomness func(context.Context, *types.TipSet) ([]byte, error) `perm:"read"` ChainGetRandomness func(context.Context, *types.TipSet) ([]byte, error) `perm:"read"`
@ -179,6 +181,10 @@ func (c *FullNodeStruct) ChainGetBlockMessages(ctx context.Context, b cid.Cid) (
return c.Internal.ChainGetBlockMessages(ctx, b) return c.Internal.ChainGetBlockMessages(ctx, b)
} }
func (c *FullNodeStruct) ChainNotify(ctx context.Context) (<-chan *store.HeadChange, error) {
return c.Internal.ChainNotify(ctx)
}
var _ Common = &CommonStruct{} var _ Common = &CommonStruct{}
var _ FullNode = &FullNodeStruct{} var _ FullNode = &FullNodeStruct{}
var _ StorageMiner = &StorageMinerStruct{} var _ StorageMiner = &StorageMinerStruct{}

View File

@ -106,13 +106,21 @@ type HeadChange struct {
Val *types.TipSet Val *types.TipSet
} }
func (cs *ChainStore) SubHeadChanges() chan *HeadChange { func (cs *ChainStore) SubHeadChanges(ctx context.Context) chan *HeadChange {
subch := cs.bestTips.Sub("headchange") subch := cs.bestTips.Sub("headchange")
out := make(chan *HeadChange, 16) out := make(chan *HeadChange, 16)
go func() { go func() {
defer close(out) defer close(out)
for val := range subch { for {
select {
case val, ok := <-subch:
if !ok {
return
}
out <- val.(*HeadChange) out <- val.(*HeadChange)
case <-ctx.Done():
cs.bestTips.Unsub(subch)
}
} }
}() }()
return out return out
@ -465,7 +473,7 @@ func (cs *ChainStore) GetActor(addr address.Address) (*types.Actor, error) {
} }
func (cs *ChainStore) WaitForMessage(ctx context.Context, mcid cid.Cid) (cid.Cid, *types.MessageReceipt, error) { func (cs *ChainStore) WaitForMessage(ctx context.Context, mcid cid.Cid) (cid.Cid, *types.MessageReceipt, error) {
tsub := cs.SubHeadChanges() tsub := cs.SubHeadChanges(ctx)
head := cs.GetHeaviestTipSet() head := cs.GetHeaviestTipSet()

1
go.mod
View File

@ -7,7 +7,6 @@ require (
github.com/BurntSushi/toml v0.3.1 github.com/BurntSushi/toml v0.3.1
github.com/filecoin-project/go-bls-sigs v0.0.0-20190718224239-4bc4b8a7bbf8 github.com/filecoin-project/go-bls-sigs v0.0.0-20190718224239-4bc4b8a7bbf8
github.com/filecoin-project/go-leb128 v0.0.0-20190212224330-8d79a5489543 github.com/filecoin-project/go-leb128 v0.0.0-20190212224330-8d79a5489543
github.com/filecoin-project/go-sectorbuilder v0.0.0-20190725115349-9b090e700325
github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1 github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1
github.com/gorilla/websocket v1.4.0 github.com/gorilla/websocket v1.4.0
github.com/ipfs/go-bitswap v0.1.5 github.com/ipfs/go-bitswap v0.1.5

View File

@ -33,6 +33,10 @@ type FullNodeAPI struct {
Wallet *wallet.Wallet Wallet *wallet.Wallet
} }
func (a *FullNodeAPI) ChainNotify(ctx context.Context) (<-chan *store.HeadChange, error) {
return a.Chain.SubHeadChanges(ctx), nil
}
func (a *FullNodeAPI) ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error { func (a *FullNodeAPI) ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error {
if err := a.Chain.AddBlock(blk.Header); err != nil { if err := a.Chain.AddBlock(blk.Header); err != nil {
return err return err