events: Build on chain API calls

This commit is contained in:
Łukasz Magiera 2019-09-18 13:01:52 +02:00
parent 3e45088812
commit fe7efa753d
10 changed files with 200 additions and 95 deletions

View File

@ -44,7 +44,10 @@ type FullNode interface {
Common
// chain
ChainNotify(context.Context) (<-chan *store.HeadChange, error)
// ChainNotify returns channel with chain head updates
// First message is guaranteed to be of len == 1, and type == 'current'
ChainNotify(context.Context) (<-chan []*store.HeadChange, error)
ChainHead(context.Context) (*types.TipSet, error) // TODO: check serialization
ChainSubmitBlock(ctx context.Context, blk *types.BlockMsg) error // TODO: check serialization
ChainGetRandomness(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error)

View File

@ -38,7 +38,7 @@ type FullNodeStruct struct {
CommonStruct
Internal struct {
ChainNotify func(context.Context) (<-chan *store.HeadChange, error) `perm:"read"`
ChainNotify func(context.Context) (<-chan []*store.HeadChange, error) `perm:"read"`
ChainSubmitBlock func(ctx context.Context, blk *types.BlockMsg) error `perm:"write"`
ChainHead func(context.Context) (*types.TipSet, error) `perm:"read"`
ChainGetRandomness func(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error) `perm:"read"`
@ -279,7 +279,7 @@ func (c *FullNodeStruct) ChainGetBlockReceipts(ctx context.Context, b cid.Cid) (
return c.Internal.ChainGetBlockReceipts(ctx, b)
}
func (c *FullNodeStruct) ChainNotify(ctx context.Context) (<-chan *store.HeadChange, error) {
func (c *FullNodeStruct) ChainNotify(ctx context.Context) (<-chan []*store.HeadChange, error) {
return c.Internal.ChainNotify(ctx)
}

View File

@ -1,12 +1,17 @@
package events
import (
"context"
"sync"
"time"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/build"
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types"
)
@ -23,30 +28,31 @@ type heightHandler struct {
revert RevertHandler
}
type eventChainStore interface {
SubscribeHeadChanges(f func(rev, app []*types.TipSet) error)
GetHeaviestTipSet() *types.TipSet
MessagesForBlock(b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error)
type eventApi interface {
ChainNotify(context.Context) (<-chan []*store.HeadChange, error)
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
}
type Events struct {
cs eventChainStore
api eventApi
tsc *tipSetCache
lk sync.Mutex
ready sync.WaitGroup
readyOnce sync.Once
heightEvents
calledEvents
}
func NewEvents(cs eventChainStore) *Events {
func NewEvents(api eventApi) *Events {
gcConfidence := 2 * build.ForkLengthThreshold
tsc := newTSCache(gcConfidence)
e := &Events{
cs: cs,
api: api,
tsc: tsc,
@ -60,7 +66,7 @@ func NewEvents(cs eventChainStore) *Events {
},
calledEvents: calledEvents{
cs: cs,
cs: api,
tsc: tsc,
gcConfidence: uint64(gcConfidence),
@ -72,14 +78,87 @@ func NewEvents(cs eventChainStore) *Events {
},
}
_ = e.tsc.add(cs.GetHeaviestTipSet())
cs.SubscribeHeadChanges(e.headChange)
e.ready.Add(1)
go e.listenHeadChanges(context.TODO())
e.ready.Wait()
// TODO: cleanup/gc goroutine
return e
}
func (e *Events) restartHeadChanges(ctx context.Context) {
go func() {
if ctx.Err() != nil {
log.Warnf("not restarting listenHeadChanges: context error: %s", ctx.Err())
return
}
time.Sleep(time.Second)
log.Info("restarting listenHeadChanges")
e.listenHeadChanges(ctx)
}()
}
func (e *Events) listenHeadChanges(ctx context.Context) {
notifs, err := e.api.ChainNotify(ctx)
if err != nil {
// TODO: retry
log.Errorf("listenHeadChanges ChainNotify call failed: %s", err)
e.restartHeadChanges(ctx)
return
}
cur, ok := <-notifs // TODO: timeout?
if !ok {
log.Error("notification channel closed")
e.restartHeadChanges(ctx)
return
}
if len(cur) != 1 {
log.Errorf("unexpected initial head notification length: %d", len(cur))
e.restartHeadChanges(ctx)
return
}
if cur[0].Type != store.HCCurrent {
log.Errorf("expected first head notification type to be 'current', was '%s'", cur[0].Type)
e.restartHeadChanges(ctx)
return
}
if err := e.tsc.add(cur[0].Val); err != nil {
log.Warn("tsc.add: adding current tipset failed: %s", err)
}
e.readyOnce.Do(func() {
e.ready.Done()
})
for notif := range notifs {
var rev, app []*types.TipSet
for _, notif := range notif {
switch notif.Type {
case store.HCRevert:
rev = append(rev, notif.Val)
case store.HCApply:
app = append(app, notif.Val)
default:
log.Warnf("unexpected head change notification type: '%s'", notif.Type)
}
}
if err := e.headChange(rev, app); err != nil {
log.Warnf("headChange failed: %s", err)
}
}
log.Warn("listenHeadChanges loop quit")
e.restartHeadChanges(ctx)
}
func (e *Events) headChange(rev, app []*types.TipSet) error {
if len(app) == 0 {
return xerrors.New("events.headChange expected at least one applied tipset")

View File

@ -1,6 +1,7 @@
package events
import (
"context"
"math"
"sync"
@ -53,7 +54,7 @@ type queuedEvent struct {
}
type calledEvents struct {
cs eventChainStore
cs eventApi
tsc *tipSetCache
gcConfidence uint64
@ -235,14 +236,15 @@ func (e *calledEvents) messagesForTs(ts *types.TipSet, consume func(*types.Messa
seen := map[cid.Cid]struct{}{}
for _, tsb := range ts.Blocks() {
bmsgs, smsgs, err := e.cs.MessagesForBlock(tsb)
msgs, err := e.cs.ChainGetBlockMessages(context.TODO(), tsb.Messages)
if err != nil {
log.Errorf("messagesForTs MessagesForBlock failed (ts.H=%d, Bcid:%s, B.Mcid:%s): %s", ts.Height(), tsb.Cid(), tsb.Messages, err)
// this is quite bad, but probably better than missing all the other updates
continue
}
for _, m := range bmsgs {
for _, m := range msgs.BlsMessages {
_, ok := seen[m.Cid()]
if ok {
continue
@ -252,7 +254,7 @@ func (e *calledEvents) messagesForTs(ts *types.TipSet, consume func(*types.Messa
consume(m)
}
for _, m := range smsgs {
for _, m := range msgs.SecpkMessages {
_, ok := seen[m.Message.Cid()]
if ok {
continue

View File

@ -23,7 +23,7 @@ func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
e.lk.Lock()
defer e.lk.Unlock()
// highest tipset is always the first (see cs.ReorgOps)
// highest tipset is always the first (see api.ReorgOps)
newH := app[0].Height()
for _, ts := range rev {

View File

@ -1,8 +1,12 @@
package events
import (
"context"
"fmt"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain/store"
"testing"
"time"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
@ -31,7 +35,7 @@ type fakeCS struct {
msgs map[cid.Cid]fakeMsg
sub func(rev, app []*types.TipSet) error
sub func(rev, app []*types.TipSet)
}
func makeTs(t *testing.T, h uint64, msgcid cid.Cid) *types.TipSet {
@ -50,24 +54,39 @@ func makeTs(t *testing.T, h uint64, msgcid cid.Cid) *types.TipSet {
return ts
}
func (fcs *fakeCS) SubscribeHeadChanges(f func(rev, app []*types.TipSet) error) {
if fcs.sub != nil {
fcs.t.Fatal("sub should be nil")
func (fcs *fakeCS) ChainNotify(context.Context) (<-chan []*store.HeadChange, error) {
out := make(chan []*store.HeadChange, 1)
out <- []*store.HeadChange{{Type: store.HCCurrent, Val: fcs.tsc.best()}}
fcs.sub = func(rev, app []*types.TipSet) {
notif := make([]*store.HeadChange, len(rev)+len(app))
for i, r := range rev {
notif[i] = &store.HeadChange{
Type: store.HCRevert,
Val: r,
}
}
for i, r := range app {
notif[i+len(rev)] = &store.HeadChange{
Type: store.HCApply,
Val: r,
}
}
out <- notif
}
fcs.sub = f
return out, nil
}
func (fcs *fakeCS) GetHeaviestTipSet() *types.TipSet {
return fcs.tsc.best()
}
func (fcs *fakeCS) MessagesForBlock(b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
ms, ok := fcs.msgs[b.Messages]
func (fcs *fakeCS) ChainGetBlockMessages(ctx context.Context, messages cid.Cid) (*api.BlockMessages, error) {
ms, ok := fcs.msgs[messages]
if ok {
return ms.bmsgs, ms.smsgs, nil
return &api.BlockMessages{BlsMessages: ms.bmsgs, SecpkMessages: ms.smsgs}, nil
}
return []*types.Message{}, []*types.SignedMessage{}, nil
return &api.BlockMessages{}, nil
}
func (fcs *fakeCS) fakeMsgs(m fakeMsg) cid.Cid {
@ -113,11 +132,11 @@ func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid) { // todo: allow
apps[app-i-1] = ts
}
err := fcs.sub(revs, apps)
require.NoError(fcs.t, err)
fcs.sub(revs, apps)
time.Sleep(100 * time.Millisecond) // TODO: :c
}
var _ eventChainStore = &fakeCS{}
var _ eventApi = &fakeCS{}
func TestAt(t *testing.T) {
fcs := &fakeCS{

View File

@ -54,18 +54,23 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching) *ChainStore {
hcnf := func(rev, app []*types.TipSet) error {
cs.pubLk.Lock()
defer cs.pubLk.Unlock()
for _, r := range rev {
cs.bestTips.Pub(&HeadChange{
notif := make([]*HeadChange, len(rev)+len(app))
for i, r := range rev {
notif[i] = &HeadChange{
Type: HCRevert,
Val: r,
}, "headchange")
}
}
for _, r := range app {
cs.bestTips.Pub(&HeadChange{
for i, r := range app {
notif[i+len(rev)] = &HeadChange{
Type: HCApply,
Val: r,
}, "headchange")
}
}
cs.bestTips.Pub(notif, "headchange")
return nil
}
@ -112,18 +117,6 @@ func (cs *ChainStore) writeHead(ts *types.TipSet) error {
return nil
}
func (cs *ChainStore) SubNewTips() chan *types.TipSet {
subch := cs.bestTips.Sub("best")
out := make(chan *types.TipSet)
go func() {
defer close(out)
for val := range subch {
out <- val.(*types.TipSet)
}
}()
return out
}
const (
HCRevert = "revert"
HCApply = "apply"
@ -135,17 +128,17 @@ type HeadChange struct {
Val *types.TipSet
}
func (cs *ChainStore) SubHeadChanges(ctx context.Context) chan *HeadChange {
func (cs *ChainStore) SubHeadChanges(ctx context.Context) chan []*HeadChange {
cs.pubLk.Lock()
subch := cs.bestTips.Sub("headchange")
head := cs.GetHeaviestTipSet()
cs.pubLk.Unlock()
out := make(chan *HeadChange, 16)
out <- &HeadChange{
out := make(chan []*HeadChange, 16)
out <- []*HeadChange{{
Type: HCCurrent,
Val: head,
}
}}
go func() {
defer close(out)
@ -156,8 +149,11 @@ func (cs *ChainStore) SubHeadChanges(ctx context.Context) chan *HeadChange {
log.Warn("chain head sub exit loop")
return
}
if len(out) > 0 {
log.Warnf("head change sub is slow, has %d buffered entries", len(out))
}
select {
case out <- val.(*HeadChange):
case out <- val.([]*HeadChange):
case <-ctx.Done():
}
case <-ctx.Done():
@ -610,20 +606,22 @@ func (cs *ChainStore) WaitForMessage(ctx context.Context, mcid cid.Cid) (cid.Cid
for {
select {
case val, ok := <-tsub:
case notif, ok := <-tsub:
if !ok {
return cid.Undef, nil, ctx.Err()
}
switch val.Type {
case HCRevert:
continue
case HCApply:
bc, r, err := cs.tipsetContainsMsg(val.Val, mcid)
if err != nil {
return cid.Undef, nil, err
}
if r != nil {
return bc, r, nil
for _, val := range notif {
switch val.Type {
case HCRevert:
continue
case HCApply:
bc, r, err := cs.tipsetContainsMsg(val.Val, mcid)
if err != nil {
return cid.Undef, nil, err
}
if r != nil {
return bc, r, nil
}
}
}
case <-ctx.Done():

View File

@ -223,9 +223,11 @@ func (tu *syncTestUtil) waitUntilSync(from, to int) {
}
// TODO: some sort of timeout?
for c := range hc {
if c.Val.Equals(target) {
return
for n := range hc {
for _, c := range n {
if c.Val.Equals(target) {
return
}
}
}
}

View File

@ -22,7 +22,7 @@ type ChainAPI struct {
PubSub *pubsub.PubSub
}
func (a *ChainAPI) ChainNotify(ctx context.Context) (<-chan *store.HeadChange, error) {
func (a *ChainAPI) ChainNotify(ctx context.Context) (<-chan []*store.HeadChange, error) {
return a.Chain.SubHeadChanges(ctx), nil
}

View File

@ -52,7 +52,7 @@ type storageMinerApi interface {
MpoolGetNonce(context.Context, address.Address) (uint64, error)
ChainWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error)
ChainNotify(context.Context) (<-chan *store.HeadChange, error)
ChainNotify(context.Context) (<-chan []*store.HeadChange, error)
ChainGetRandomness(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error)
ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error)
@ -184,14 +184,14 @@ func (m *Miner) runPoSt(ctx context.Context) {
}
curhead := <-notifs
if curhead.Type != store.HCCurrent {
if curhead[0].Type != store.HCCurrent {
// TODO: this is probably 'crash the node' level serious
log.Warning("expected to get current best tipset from chain notifications stream")
return
}
postCtx, cancel := context.WithCancel(ctx)
postWaitCh, onBlock, err := m.maybeDoPost(postCtx, curhead.Val)
postWaitCh, onBlock, err := m.maybeDoPost(postCtx, curhead[0].Val)
if err != nil {
log.Errorf("initial 'maybeDoPost' call failed: %s", err)
return
@ -200,29 +200,31 @@ func (m *Miner) runPoSt(ctx context.Context) {
for {
select {
case <-ctx.Done():
case ch, ok := <-notifs:
if !ok {
log.Warning("chain notifications stream terminated")
// TODO: attempt to restart it if the context isnt cancelled
return
}
switch ch.Type {
case store.HCApply:
postWaitCh, onBlock, err = m.maybeDoPost(postCtx, ch.Val)
if err != nil {
log.Errorf("maybeDoPost failed: %s", err)
case notif, ok := <-notifs:
for _, ch := range notif {
if !ok {
log.Warning("chain notifications stream terminated")
// TODO: attempt to restart it if the context isnt cancelled
return
}
case store.HCRevert:
if onBlock != nil {
if ch.Val.Contains(onBlock.Cid()) {
// Our post may now be invalid!
cancel() // probably the right thing to do?
switch ch.Type {
case store.HCApply:
postWaitCh, onBlock, err = m.maybeDoPost(postCtx, ch.Val)
if err != nil {
log.Errorf("maybeDoPost failed: %s", err)
return
}
case store.HCRevert:
if onBlock != nil {
if ch.Val.Contains(onBlock.Cid()) {
// Our post may now be invalid!
cancel() // probably the right thing to do?
}
}
case store.HCCurrent:
log.Warn("got 'current' chain notification in middle of stream")
}
case store.HCCurrent:
log.Warn("got 'current' chain notification in middle of stream")
}
case perr := <-postWaitCh:
if perr != nil {