Merge pull request #206 from filecoin-project/feat/api-events

Make events subsystem work on top of API
This commit is contained in:
Łukasz Magiera 2019-09-18 20:43:48 +02:00 committed by GitHub
commit 4376b25b00
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 339 additions and 153 deletions

View File

@ -44,7 +44,10 @@ type FullNode interface {
Common
// chain
ChainNotify(context.Context) (<-chan *store.HeadChange, error)
// ChainNotify returns channel with chain head updates
// First message is guaranteed to be of len == 1, and type == 'current'
ChainNotify(context.Context) (<-chan []*store.HeadChange, error)
ChainHead(context.Context) (*types.TipSet, error) // TODO: check serialization
ChainSubmitBlock(ctx context.Context, blk *types.BlockMsg) error // TODO: check serialization
ChainGetRandomness(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error)

View File

@ -38,7 +38,7 @@ type FullNodeStruct struct {
CommonStruct
Internal struct {
ChainNotify func(context.Context) (<-chan *store.HeadChange, error) `perm:"read"`
ChainNotify func(context.Context) (<-chan []*store.HeadChange, error) `perm:"read"`
ChainSubmitBlock func(ctx context.Context, blk *types.BlockMsg) error `perm:"write"`
ChainHead func(context.Context) (*types.TipSet, error) `perm:"read"`
ChainGetRandomness func(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error) `perm:"read"`
@ -279,7 +279,7 @@ func (c *FullNodeStruct) ChainGetBlockReceipts(ctx context.Context, b cid.Cid) (
return c.Internal.ChainGetBlockReceipts(ctx, b)
}
func (c *FullNodeStruct) ChainNotify(ctx context.Context) (<-chan *store.HeadChange, error) {
func (c *FullNodeStruct) ChainNotify(ctx context.Context) (<-chan []*store.HeadChange, error) {
return c.Internal.ChainNotify(ctx)
}

View File

@ -1,12 +1,17 @@
package events
import (
"context"
"sync"
"time"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/build"
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types"
)
@ -23,30 +28,32 @@ type heightHandler struct {
revert RevertHandler
}
type eventChainStore interface {
SubscribeHeadChanges(f func(rev, app []*types.TipSet) error)
GetHeaviestTipSet() *types.TipSet
MessagesForBlock(b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error)
type eventApi interface {
ChainNotify(context.Context) (<-chan []*store.HeadChange, error)
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error)
}
type Events struct {
cs eventChainStore
api eventApi
tsc *tipSetCache
lk sync.Mutex
ready sync.WaitGroup
readyOnce sync.Once
heightEvents
calledEvents
}
func NewEvents(cs eventChainStore) *Events {
func NewEvents(ctx context.Context, api eventApi) *Events {
gcConfidence := 2 * build.ForkLengthThreshold
tsc := newTSCache(gcConfidence)
tsc := newTSCache(gcConfidence, api.ChainGetTipSetByHeight)
e := &Events{
cs: cs,
api: api,
tsc: tsc,
@ -60,7 +67,7 @@ func NewEvents(cs eventChainStore) *Events {
},
calledEvents: calledEvents{
cs: cs,
cs: api,
tsc: tsc,
gcConfidence: uint64(gcConfidence),
@ -72,14 +79,82 @@ func NewEvents(cs eventChainStore) *Events {
},
}
_ = e.tsc.add(cs.GetHeaviestTipSet())
cs.SubscribeHeadChanges(e.headChange)
e.ready.Add(1)
go e.listenHeadChanges(ctx)
e.ready.Wait()
// TODO: cleanup/gc goroutine
return e
}
func (e *Events) listenHeadChanges(ctx context.Context) {
for {
if err := e.listenHeadChangesOnce(ctx); err != nil {
log.Errorf("listen head changes errored: %s", err)
} else {
log.Warn("listenHeadChanges quit")
}
if ctx.Err() != nil {
log.Warnf("not restarting listenHeadChanges: context error: %s", ctx.Err())
return
}
time.Sleep(time.Second)
log.Info("restarting listenHeadChanges")
}
}
func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
notifs, err := e.api.ChainNotify(ctx)
if err != nil {
// TODO: retry
return xerrors.Errorf("listenHeadChanges ChainNotify call failed: %w", err)
}
cur, ok := <-notifs // TODO: timeout?
if !ok {
return xerrors.Errorf("notification channel closed")
}
if len(cur) != 1 {
return xerrors.Errorf("unexpected initial head notification length: %d", len(cur))
}
if cur[0].Type != store.HCCurrent {
return xerrors.Errorf("expected first head notification type to be 'current', was '%s'", cur[0].Type)
}
if err := e.tsc.add(cur[0].Val); err != nil {
log.Warn("tsc.add: adding current tipset failed: %w", err)
}
e.readyOnce.Do(func() {
e.ready.Done()
})
for notif := range notifs {
var rev, app []*types.TipSet
for _, notif := range notif {
switch notif.Type {
case store.HCRevert:
rev = append(rev, notif.Val)
case store.HCApply:
app = append(app, notif.Val)
default:
log.Warnf("unexpected head change notification type: '%s'", notif.Type)
}
}
if err := e.headChange(rev, app); err != nil {
log.Warnf("headChange failed: %s", err)
}
}
return nil
}
func (e *Events) headChange(rev, app []*types.TipSet) error {
if len(app) == 0 {
return xerrors.New("events.headChange expected at least one applied tipset")

View File

@ -1,6 +1,7 @@
package events
import (
"context"
"math"
"sync"
@ -53,7 +54,7 @@ type queuedEvent struct {
}
type calledEvents struct {
cs eventChainStore
cs eventApi
tsc *tipSetCache
gcConfidence uint64
@ -235,14 +236,15 @@ func (e *calledEvents) messagesForTs(ts *types.TipSet, consume func(*types.Messa
seen := map[cid.Cid]struct{}{}
for _, tsb := range ts.Blocks() {
bmsgs, smsgs, err := e.cs.MessagesForBlock(tsb)
msgs, err := e.cs.ChainGetBlockMessages(context.TODO(), tsb.Cid())
if err != nil {
log.Errorf("messagesForTs MessagesForBlock failed (ts.H=%d, Bcid:%s, B.Mcid:%s): %s", ts.Height(), tsb.Cid(), tsb.Messages, err)
// this is quite bad, but probably better than missing all the other updates
continue
}
for _, m := range bmsgs {
for _, m := range msgs.BlsMessages {
_, ok := seen[m.Cid()]
if ok {
continue
@ -252,7 +254,7 @@ func (e *calledEvents) messagesForTs(ts *types.TipSet, consume func(*types.Messa
consume(m)
}
for _, m := range smsgs {
for _, m := range msgs.SecpkMessages {
_, ok := seen[m.Message.Cid()]
if ok {
continue

View File

@ -23,7 +23,7 @@ func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
e.lk.Lock()
defer e.lk.Unlock()
// highest tipset is always the first (see cs.ReorgOps)
// highest tipset is always the first (see api.ReorgOps)
newH := app[0].Height()
for _, ts := range rev {

View File

@ -1,8 +1,12 @@
package events
import (
"context"
"fmt"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain/store"
"testing"
"time"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
@ -29,9 +33,14 @@ type fakeCS struct {
h uint64
tsc *tipSetCache
msgs map[cid.Cid]fakeMsg
msgs map[cid.Cid]fakeMsg
blkMsgs map[cid.Cid]cid.Cid
sub func(rev, app []*types.TipSet) error
sub func(rev, app []*types.TipSet)
}
func (fcs *fakeCS) ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error) {
panic("Not Implemented")
}
func makeTs(t *testing.T, h uint64, msgcid cid.Cid) *types.TipSet {
@ -50,24 +59,44 @@ func makeTs(t *testing.T, h uint64, msgcid cid.Cid) *types.TipSet {
return ts
}
func (fcs *fakeCS) SubscribeHeadChanges(f func(rev, app []*types.TipSet) error) {
if fcs.sub != nil {
fcs.t.Fatal("sub should be nil")
}
fcs.sub = f
}
func (fcs *fakeCS) ChainNotify(context.Context) (<-chan []*store.HeadChange, error) {
out := make(chan []*store.HeadChange, 1)
out <- []*store.HeadChange{{Type: store.HCCurrent, Val: fcs.tsc.best()}}
func (fcs *fakeCS) GetHeaviestTipSet() *types.TipSet {
return fcs.tsc.best()
}
fcs.sub = func(rev, app []*types.TipSet) {
notif := make([]*store.HeadChange, len(rev)+len(app))
func (fcs *fakeCS) MessagesForBlock(b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
ms, ok := fcs.msgs[b.Messages]
if ok {
return ms.bmsgs, ms.smsgs, nil
for i, r := range rev {
notif[i] = &store.HeadChange{
Type: store.HCRevert,
Val: r,
}
}
for i, r := range app {
notif[i+len(rev)] = &store.HeadChange{
Type: store.HCApply,
Val: r,
}
}
out <- notif
}
return []*types.Message{}, []*types.SignedMessage{}, nil
return out, nil
}
func (fcs *fakeCS) ChainGetBlockMessages(ctx context.Context, blk cid.Cid) (*api.BlockMessages, error) {
messages, ok := fcs.blkMsgs[blk]
if !ok {
return &api.BlockMessages{}, nil
}
ms, ok := fcs.msgs[messages]
if !ok {
return &api.BlockMessages{}, nil
}
return &api.BlockMessages{BlsMessages: ms.bmsgs, SecpkMessages: ms.smsgs}, nil
}
func (fcs *fakeCS) fakeMsgs(m fakeMsg) cid.Cid {
@ -102,32 +131,36 @@ func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid) { // todo: allow
for i := 0; i < app; i++ {
fcs.h++
mc, _ := msgs[i]
if mc == cid.Undef {
mc, hasMsgs := msgs[i]
if !hasMsgs {
mc = dummyCid
}
ts := makeTs(fcs.t, fcs.h, mc)
require.NoError(fcs.t, fcs.tsc.add(ts))
if hasMsgs {
fcs.blkMsgs[ts.Blocks()[0].Cid()] = mc
}
apps[app-i-1] = ts
}
err := fcs.sub(revs, apps)
require.NoError(fcs.t, err)
fcs.sub(revs, apps)
time.Sleep(100 * time.Millisecond) // TODO: :c
}
var _ eventChainStore = &fakeCS{}
var _ eventApi = &fakeCS{}
func TestAt(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2 * build.ForkLengthThreshold),
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
events := NewEvents(fcs)
events := NewEvents(context.Background(), fcs)
var applied bool
var reverted bool
@ -178,17 +211,82 @@ func TestAt(t *testing.T) {
require.Equal(t, false, reverted)
}
func TestAtStart(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
fcs.advance(0, 5, nil) // 6
var applied bool
var reverted bool
err := events.ChainAt(func(ts *types.TipSet, curH uint64) error {
require.Equal(t, 5, int(ts.Height()))
require.Equal(t, 8, int(curH))
applied = true
return nil
}, func(ts *types.TipSet) error {
reverted = true
return nil
}, 3, 5)
require.NoError(t, err)
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
fcs.advance(0, 5, nil) // 11
require.Equal(t, true, applied)
require.Equal(t, false, reverted)
}
func TestAtStartConfidence(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
fcs.advance(0, 10, nil) // 11
var applied bool
var reverted bool
err := events.ChainAt(func(ts *types.TipSet, curH uint64) error {
require.Equal(t, 5, int(ts.Height()))
require.Equal(t, 11, int(curH))
applied = true
return nil
}, func(ts *types.TipSet) error {
reverted = true
return nil
}, 3, 5)
require.NoError(t, err)
require.Equal(t, true, applied)
require.Equal(t, false, reverted)
}
func TestCalled(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
tsc: newTSCache(2 * build.ForkLengthThreshold),
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
events := NewEvents(fcs)
events := NewEvents(context.Background(), fcs)
t0123, err := address.NewFromString("t0123")
require.NoError(t, err)
@ -380,12 +478,13 @@ func TestCalledTimeout(t *testing.T) {
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
tsc: newTSCache(2 * build.ForkLengthThreshold),
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
events := NewEvents(fcs)
events := NewEvents(context.Background(), fcs)
t0123, err := address.NewFromString("t0123")
require.NoError(t, err)
@ -419,12 +518,13 @@ func TestCalledTimeout(t *testing.T) {
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
tsc: newTSCache(2 * build.ForkLengthThreshold),
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
events = NewEvents(fcs)
events = NewEvents(context.Background(), fcs)
err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
return true, true, nil
@ -452,12 +552,13 @@ func TestCalledOrder(t *testing.T) {
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
tsc: newTSCache(2 * build.ForkLengthThreshold),
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
events := NewEvents(fcs)
events := NewEvents(context.Background(), fcs)
t0123, err := address.NewFromString("t0123")
require.NoError(t, err)

View File

@ -1,24 +1,31 @@
package events
import (
"context"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/chain/types"
)
type tsByHFunc func(context.Context, uint64, *types.TipSet) (*types.TipSet, error)
// tipSetCache implements a simple ring-buffer cache to keep track of recent
// tipsets
type tipSetCache struct {
cache []*types.TipSet
start int
len int
storage tsByHFunc
}
func newTSCache(cap int) *tipSetCache {
func newTSCache(cap int, storage tsByHFunc) *tipSetCache {
return &tipSetCache{
cache: make([]*types.TipSet, cap),
start: 0,
len: 0,
storage: storage,
}
}
@ -66,9 +73,7 @@ func (tsc *tipSetCache) get(height uint64) (*types.TipSet, error) {
tailH := tsc.cache[(tsc.start-tsc.len+1)%len(tsc.cache)].Height()
if height < tailH {
// TODO: we can try to walk parents, but that shouldn't happen in
// practice, so it's probably not worth implementing
return nil, xerrors.Errorf("tipSetCache.get: requested tipset not in cache (req: %d, cache tail: %d)", height, tailH)
return tsc.storage(context.TODO(), height, tsc.cache[tailH])
}
return tsc.cache[int(height-tailH+1)%len(tsc.cache)], nil

View File

@ -54,18 +54,23 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching) *ChainStore {
hcnf := func(rev, app []*types.TipSet) error {
cs.pubLk.Lock()
defer cs.pubLk.Unlock()
for _, r := range rev {
cs.bestTips.Pub(&HeadChange{
notif := make([]*HeadChange, len(rev)+len(app))
for i, r := range rev {
notif[i] = &HeadChange{
Type: HCRevert,
Val: r,
}, "headchange")
}
}
for _, r := range app {
cs.bestTips.Pub(&HeadChange{
for i, r := range app {
notif[i+len(rev)] = &HeadChange{
Type: HCApply,
Val: r,
}, "headchange")
}
}
cs.bestTips.Pub(notif, "headchange")
return nil
}
@ -112,18 +117,6 @@ func (cs *ChainStore) writeHead(ts *types.TipSet) error {
return nil
}
func (cs *ChainStore) SubNewTips() chan *types.TipSet {
subch := cs.bestTips.Sub("best")
out := make(chan *types.TipSet)
go func() {
defer close(out)
for val := range subch {
out <- val.(*types.TipSet)
}
}()
return out
}
const (
HCRevert = "revert"
HCApply = "apply"
@ -135,17 +128,17 @@ type HeadChange struct {
Val *types.TipSet
}
func (cs *ChainStore) SubHeadChanges(ctx context.Context) chan *HeadChange {
func (cs *ChainStore) SubHeadChanges(ctx context.Context) chan []*HeadChange {
cs.pubLk.Lock()
subch := cs.bestTips.Sub("headchange")
head := cs.GetHeaviestTipSet()
cs.pubLk.Unlock()
out := make(chan *HeadChange, 16)
out <- &HeadChange{
out := make(chan []*HeadChange, 16)
out <- []*HeadChange{{
Type: HCCurrent,
Val: head,
}
}}
go func() {
defer close(out)
@ -156,8 +149,11 @@ func (cs *ChainStore) SubHeadChanges(ctx context.Context) chan *HeadChange {
log.Warn("chain head sub exit loop")
return
}
if len(out) > 0 {
log.Warnf("head change sub is slow, has %d buffered entries", len(out))
}
select {
case out <- val.(*HeadChange):
case out <- val.([]*HeadChange):
case <-ctx.Done():
}
case <-ctx.Done():
@ -610,20 +606,22 @@ func (cs *ChainStore) WaitForMessage(ctx context.Context, mcid cid.Cid) (cid.Cid
for {
select {
case val, ok := <-tsub:
case notif, ok := <-tsub:
if !ok {
return cid.Undef, nil, ctx.Err()
}
switch val.Type {
case HCRevert:
continue
case HCApply:
bc, r, err := cs.tipsetContainsMsg(val.Val, mcid)
if err != nil {
return cid.Undef, nil, err
}
if r != nil {
return bc, r, nil
for _, val := range notif {
switch val.Type {
case HCRevert:
continue
case HCApply:
bc, r, err := cs.tipsetContainsMsg(val.Val, mcid)
if err != nil {
return cid.Undef, nil, err
}
if r != nil {
return bc, r, nil
}
}
}
case <-ctx.Done():

View File

@ -223,9 +223,11 @@ func (tu *syncTestUtil) waitUntilSync(from, to int) {
}
// TODO: some sort of timeout?
for c := range hc {
if c.Val.Equals(target) {
return
for n := range hc {
for _, c := range n {
if c.Val.Equals(target) {
return
}
}
}
}

View File

@ -22,7 +22,7 @@ type ChainAPI struct {
PubSub *pubsub.PubSub
}
func (a *ChainAPI) ChainNotify(ctx context.Context) (<-chan *store.HeadChange, error) {
func (a *ChainAPI) ChainNotify(ctx context.Context) (<-chan []*store.HeadChange, error) {
return a.Chain.SubHeadChanges(ctx), nil
}
@ -109,5 +109,5 @@ func (a *ChainAPI) ChainGetBlockReceipts(ctx context.Context, bcid cid.Cid) ([]*
}
func (a *ChainAPI) ChainGetTipSetByHeight(ctx context.Context, h uint64, ts *types.TipSet) (*types.TipSet, error) {
panic("NYI")
return a.Chain.GetTipsetByHeight(ctx, h, ts)
}

View File

@ -2,11 +2,10 @@ package storage
import (
"context"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log"
host "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/host"
"github.com/pkg/errors"
"golang.org/x/xerrors"
@ -14,6 +13,7 @@ import (
"github.com/filecoin-project/go-lotus/build"
"github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/events"
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/lib/sectorbuilder"
@ -23,8 +23,11 @@ import (
var log = logging.Logger("storageminer")
const PoStConfidence = 0
type Miner struct {
api storageMinerApi
api storageMinerApi
events *events.Events
secst *sector.Store
commt *commitment.Tracker
@ -51,10 +54,12 @@ type storageMinerApi interface {
MpoolPush(context.Context, *types.SignedMessage) error
MpoolGetNonce(context.Context, address.Address) (uint64, error)
ChainHead(context.Context) (*types.TipSet, error)
ChainWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error)
ChainNotify(context.Context) (<-chan *store.HeadChange, error)
ChainNotify(context.Context) (<-chan []*store.HeadChange, error)
ChainGetRandomness(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error)
ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error)
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
WalletBalance(context.Context, address.Address) (types.BigInt, error)
WalletSign(context.Context, address.Address, []byte) (*types.Signature, error)
@ -63,7 +68,8 @@ type storageMinerApi interface {
func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, secst *sector.Store, commt *commitment.Tracker) (*Miner, error) {
return &Miner{
api: api,
api: api,
maddr: addr,
h: h,
ds: ds,
@ -77,8 +83,15 @@ func (m *Miner) Run(ctx context.Context) error {
return errors.Wrap(err, "miner preflight checks failed")
}
m.events = events.NewEvents(ctx, m.api)
ts, err := m.api.ChainHead(ctx)
if err != nil {
return err
}
go m.handlePostingSealedSectors(ctx)
go m.runPoSt(ctx)
go m.schedulePoSt(ctx, ts)
return nil
}
@ -174,68 +187,54 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal
return nil
}
func (m *Miner) runPoSt(ctx context.Context) {
// TODO: most of this method can probably be replaced by the events module once it works on top of the api
notifs, err := m.api.ChainNotify(ctx)
func (m *Miner) schedulePoSt(ctx context.Context, baseTs *types.TipSet) {
ppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, baseTs)
if err != nil {
// TODO: this is probably 'crash the node' level serious
log.Errorf("POST ROUTINE FAILED: failed to get chain notifications stream: %s", err)
log.Errorf("failed to get proving period end for miner: %s", err)
return
}
curhead := <-notifs
if curhead.Type != store.HCCurrent {
// TODO: this is probably 'crash the node' level serious
log.Warning("expected to get current best tipset from chain notifications stream")
if ppe == 0 {
log.Errorf("Proving period end == 0")
// TODO: we probably want to call schedulePoSt after the first commitSector call
return
}
postCtx, cancel := context.WithCancel(ctx)
postWaitCh, onBlock, err := m.maybeDoPost(postCtx, curhead.Val)
log.Infof("Scheduling post at height %d", ppe)
// TODO: Should we set confidence to randomness lookback?
err = m.events.ChainAt(m.startPost, func(ts *types.TipSet) error { // Revert
// TODO: Cancel post
return nil
}, PoStConfidence, ppe)
if err != nil {
log.Errorf("initial 'maybeDoPost' call failed: %s", err)
// TODO: This is BAD, figure something out
log.Errorf("scheduling PoSt failed: %s", err)
return
}
}
for {
select {
case <-ctx.Done():
case ch, ok := <-notifs:
if !ok {
log.Warning("chain notifications stream terminated")
// TODO: attempt to restart it if the context isnt cancelled
return
}
func (m *Miner) startPost(ts *types.TipSet, curH uint64) error {
postWaitCh, _, err := m.maybeDoPost(context.TODO(), ts)
if err != nil {
return err
}
switch ch.Type {
case store.HCApply:
postWaitCh, onBlock, err = m.maybeDoPost(postCtx, ch.Val)
if err != nil {
log.Errorf("maybeDoPost failed: %s", err)
return
}
case store.HCRevert:
if onBlock != nil {
if ch.Val.Contains(onBlock.Cid()) {
// Our post may now be invalid!
cancel() // probably the right thing to do?
}
}
case store.HCCurrent:
log.Warn("got 'current' chain notification in middle of stream")
}
case perr := <-postWaitCh:
if perr != nil {
log.Errorf("got error back from postWaitCh: %s", err)
// TODO: what do we even do here?
return
}
postWaitCh = nil
onBlock = nil
// yay?
log.Infof("post successfully submitted")
if postWaitCh == nil {
return errors.New("PoSt didn't start")
}
go func() {
err := <-postWaitCh
if err != nil {
log.Errorf("got error back from postWaitCh: %s", err)
return
}
}
log.Infof("post successfully submitted")
m.schedulePoSt(context.TODO(), ts)
}()
return nil
}
func (m *Miner) maybeDoPost(ctx context.Context, ts *types.TipSet) (<-chan error, *types.BlockHeader, error) {
@ -245,6 +244,7 @@ func (m *Miner) maybeDoPost(ctx context.Context, ts *types.TipSet) (<-chan error
}
if ppe < ts.Height() {
log.Warnf("skipping post, supplied tipset too high: ppe=%d, ts.H=%d", ppe, ts.Height())
return nil, nil, nil
}