diff --git a/api/api.go b/api/api.go index a924379b2..950dc5fa2 100644 --- a/api/api.go +++ b/api/api.go @@ -44,7 +44,10 @@ type FullNode interface { Common // chain - ChainNotify(context.Context) (<-chan *store.HeadChange, error) + + // ChainNotify returns channel with chain head updates + // First message is guaranteed to be of len == 1, and type == 'current' + ChainNotify(context.Context) (<-chan []*store.HeadChange, error) ChainHead(context.Context) (*types.TipSet, error) // TODO: check serialization ChainSubmitBlock(ctx context.Context, blk *types.BlockMsg) error // TODO: check serialization ChainGetRandomness(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error) diff --git a/api/struct.go b/api/struct.go index 0a1c6f4d5..9c73f08da 100644 --- a/api/struct.go +++ b/api/struct.go @@ -38,7 +38,7 @@ type FullNodeStruct struct { CommonStruct Internal struct { - ChainNotify func(context.Context) (<-chan *store.HeadChange, error) `perm:"read"` + ChainNotify func(context.Context) (<-chan []*store.HeadChange, error) `perm:"read"` ChainSubmitBlock func(ctx context.Context, blk *types.BlockMsg) error `perm:"write"` ChainHead func(context.Context) (*types.TipSet, error) `perm:"read"` ChainGetRandomness func(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error) `perm:"read"` @@ -279,7 +279,7 @@ func (c *FullNodeStruct) ChainGetBlockReceipts(ctx context.Context, b cid.Cid) ( return c.Internal.ChainGetBlockReceipts(ctx, b) } -func (c *FullNodeStruct) ChainNotify(ctx context.Context) (<-chan *store.HeadChange, error) { +func (c *FullNodeStruct) ChainNotify(ctx context.Context) (<-chan []*store.HeadChange, error) { return c.Internal.ChainNotify(ctx) } diff --git a/chain/events/events.go b/chain/events/events.go index f333de9d1..c58ab4e81 100644 --- a/chain/events/events.go +++ b/chain/events/events.go @@ -1,12 +1,17 @@ package events import ( + "context" "sync" + "time" + "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" "golang.org/x/xerrors" + "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/build" + "github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/types" ) @@ -23,30 +28,32 @@ type heightHandler struct { revert RevertHandler } -type eventChainStore interface { - SubscribeHeadChanges(f func(rev, app []*types.TipSet) error) - - GetHeaviestTipSet() *types.TipSet - MessagesForBlock(b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) +type eventApi interface { + ChainNotify(context.Context) (<-chan []*store.HeadChange, error) + ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error) + ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error) } type Events struct { - cs eventChainStore + api eventApi tsc *tipSetCache lk sync.Mutex + ready sync.WaitGroup + readyOnce sync.Once + heightEvents calledEvents } -func NewEvents(cs eventChainStore) *Events { +func NewEvents(ctx context.Context, api eventApi) *Events { gcConfidence := 2 * build.ForkLengthThreshold - tsc := newTSCache(gcConfidence) + tsc := newTSCache(gcConfidence, api.ChainGetTipSetByHeight) e := &Events{ - cs: cs, + api: api, tsc: tsc, @@ -60,7 +67,7 @@ func NewEvents(cs eventChainStore) *Events { }, calledEvents: calledEvents{ - cs: cs, + cs: api, tsc: tsc, gcConfidence: uint64(gcConfidence), @@ -72,14 +79,82 @@ func NewEvents(cs eventChainStore) *Events { }, } - _ = e.tsc.add(cs.GetHeaviestTipSet()) - cs.SubscribeHeadChanges(e.headChange) + e.ready.Add(1) + + go e.listenHeadChanges(ctx) + + e.ready.Wait() // TODO: cleanup/gc goroutine return e } +func (e *Events) listenHeadChanges(ctx context.Context) { + for { + if err := e.listenHeadChangesOnce(ctx); err != nil { + log.Errorf("listen head changes errored: %s", err) + } else { + log.Warn("listenHeadChanges quit") + } + if ctx.Err() != nil { + log.Warnf("not restarting listenHeadChanges: context error: %s", ctx.Err()) + return + } + time.Sleep(time.Second) + log.Info("restarting listenHeadChanges") + } +} + +func (e *Events) listenHeadChangesOnce(ctx context.Context) error { + notifs, err := e.api.ChainNotify(ctx) + if err != nil { + // TODO: retry + return xerrors.Errorf("listenHeadChanges ChainNotify call failed: %w", err) + } + + cur, ok := <-notifs // TODO: timeout? + if !ok { + return xerrors.Errorf("notification channel closed") + } + + if len(cur) != 1 { + return xerrors.Errorf("unexpected initial head notification length: %d", len(cur)) + } + + if cur[0].Type != store.HCCurrent { + return xerrors.Errorf("expected first head notification type to be 'current', was '%s'", cur[0].Type) + } + + if err := e.tsc.add(cur[0].Val); err != nil { + log.Warn("tsc.add: adding current tipset failed: %w", err) + } + + e.readyOnce.Do(func() { + e.ready.Done() + }) + + for notif := range notifs { + var rev, app []*types.TipSet + for _, notif := range notif { + switch notif.Type { + case store.HCRevert: + rev = append(rev, notif.Val) + case store.HCApply: + app = append(app, notif.Val) + default: + log.Warnf("unexpected head change notification type: '%s'", notif.Type) + } + } + + if err := e.headChange(rev, app); err != nil { + log.Warnf("headChange failed: %s", err) + } + } + + return nil +} + func (e *Events) headChange(rev, app []*types.TipSet) error { if len(app) == 0 { return xerrors.New("events.headChange expected at least one applied tipset") diff --git a/chain/events/events_called.go b/chain/events/events_called.go index f4789c6e1..3caab9f17 100644 --- a/chain/events/events_called.go +++ b/chain/events/events_called.go @@ -1,6 +1,7 @@ package events import ( + "context" "math" "sync" @@ -53,7 +54,7 @@ type queuedEvent struct { } type calledEvents struct { - cs eventChainStore + cs eventApi tsc *tipSetCache gcConfidence uint64 @@ -235,14 +236,15 @@ func (e *calledEvents) messagesForTs(ts *types.TipSet, consume func(*types.Messa seen := map[cid.Cid]struct{}{} for _, tsb := range ts.Blocks() { - bmsgs, smsgs, err := e.cs.MessagesForBlock(tsb) + + msgs, err := e.cs.ChainGetBlockMessages(context.TODO(), tsb.Cid()) if err != nil { log.Errorf("messagesForTs MessagesForBlock failed (ts.H=%d, Bcid:%s, B.Mcid:%s): %s", ts.Height(), tsb.Cid(), tsb.Messages, err) // this is quite bad, but probably better than missing all the other updates continue } - for _, m := range bmsgs { + for _, m := range msgs.BlsMessages { _, ok := seen[m.Cid()] if ok { continue @@ -252,7 +254,7 @@ func (e *calledEvents) messagesForTs(ts *types.TipSet, consume func(*types.Messa consume(m) } - for _, m := range smsgs { + for _, m := range msgs.SecpkMessages { _, ok := seen[m.Message.Cid()] if ok { continue diff --git a/chain/events/events_height.go b/chain/events/events_height.go index 8f12f626d..0826e6fc2 100644 --- a/chain/events/events_height.go +++ b/chain/events/events_height.go @@ -23,7 +23,7 @@ func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error { e.lk.Lock() defer e.lk.Unlock() - // highest tipset is always the first (see cs.ReorgOps) + // highest tipset is always the first (see api.ReorgOps) newH := app[0].Height() for _, ts := range rev { diff --git a/chain/events/events_test.go b/chain/events/events_test.go index 37a1d653a..4628b5fae 100644 --- a/chain/events/events_test.go +++ b/chain/events/events_test.go @@ -1,8 +1,12 @@ package events import ( + "context" "fmt" + "github.com/filecoin-project/go-lotus/api" + "github.com/filecoin-project/go-lotus/chain/store" "testing" + "time" "github.com/ipfs/go-cid" "github.com/multiformats/go-multihash" @@ -29,9 +33,14 @@ type fakeCS struct { h uint64 tsc *tipSetCache - msgs map[cid.Cid]fakeMsg + msgs map[cid.Cid]fakeMsg + blkMsgs map[cid.Cid]cid.Cid - sub func(rev, app []*types.TipSet) error + sub func(rev, app []*types.TipSet) +} + +func (fcs *fakeCS) ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error) { + panic("Not Implemented") } func makeTs(t *testing.T, h uint64, msgcid cid.Cid) *types.TipSet { @@ -50,24 +59,44 @@ func makeTs(t *testing.T, h uint64, msgcid cid.Cid) *types.TipSet { return ts } -func (fcs *fakeCS) SubscribeHeadChanges(f func(rev, app []*types.TipSet) error) { - if fcs.sub != nil { - fcs.t.Fatal("sub should be nil") - } - fcs.sub = f -} +func (fcs *fakeCS) ChainNotify(context.Context) (<-chan []*store.HeadChange, error) { + out := make(chan []*store.HeadChange, 1) + out <- []*store.HeadChange{{Type: store.HCCurrent, Val: fcs.tsc.best()}} -func (fcs *fakeCS) GetHeaviestTipSet() *types.TipSet { - return fcs.tsc.best() -} + fcs.sub = func(rev, app []*types.TipSet) { + notif := make([]*store.HeadChange, len(rev)+len(app)) -func (fcs *fakeCS) MessagesForBlock(b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) { - ms, ok := fcs.msgs[b.Messages] - if ok { - return ms.bmsgs, ms.smsgs, nil + for i, r := range rev { + notif[i] = &store.HeadChange{ + Type: store.HCRevert, + Val: r, + } + } + for i, r := range app { + notif[i+len(rev)] = &store.HeadChange{ + Type: store.HCApply, + Val: r, + } + } + + out <- notif } - return []*types.Message{}, []*types.SignedMessage{}, nil + return out, nil +} + +func (fcs *fakeCS) ChainGetBlockMessages(ctx context.Context, blk cid.Cid) (*api.BlockMessages, error) { + messages, ok := fcs.blkMsgs[blk] + if !ok { + return &api.BlockMessages{}, nil + } + + ms, ok := fcs.msgs[messages] + if !ok { + return &api.BlockMessages{}, nil + } + return &api.BlockMessages{BlsMessages: ms.bmsgs, SecpkMessages: ms.smsgs}, nil + } func (fcs *fakeCS) fakeMsgs(m fakeMsg) cid.Cid { @@ -102,32 +131,36 @@ func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid) { // todo: allow for i := 0; i < app; i++ { fcs.h++ - mc, _ := msgs[i] - if mc == cid.Undef { + mc, hasMsgs := msgs[i] + if !hasMsgs { mc = dummyCid } ts := makeTs(fcs.t, fcs.h, mc) require.NoError(fcs.t, fcs.tsc.add(ts)) + if hasMsgs { + fcs.blkMsgs[ts.Blocks()[0].Cid()] = mc + } + apps[app-i-1] = ts } - err := fcs.sub(revs, apps) - require.NoError(fcs.t, err) + fcs.sub(revs, apps) + time.Sleep(100 * time.Millisecond) // TODO: :c } -var _ eventChainStore = &fakeCS{} +var _ eventApi = &fakeCS{} func TestAt(t *testing.T) { fcs := &fakeCS{ t: t, h: 1, - tsc: newTSCache(2 * build.ForkLengthThreshold), + tsc: newTSCache(2*build.ForkLengthThreshold, nil), } require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) - events := NewEvents(fcs) + events := NewEvents(context.Background(), fcs) var applied bool var reverted bool @@ -178,17 +211,82 @@ func TestAt(t *testing.T) { require.Equal(t, false, reverted) } +func TestAtStart(t *testing.T) { + fcs := &fakeCS{ + t: t, + h: 1, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), + } + require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) + + events := NewEvents(context.Background(), fcs) + + fcs.advance(0, 5, nil) // 6 + + var applied bool + var reverted bool + + err := events.ChainAt(func(ts *types.TipSet, curH uint64) error { + require.Equal(t, 5, int(ts.Height())) + require.Equal(t, 8, int(curH)) + applied = true + return nil + }, func(ts *types.TipSet) error { + reverted = true + return nil + }, 3, 5) + require.NoError(t, err) + + require.Equal(t, false, applied) + require.Equal(t, false, reverted) + + fcs.advance(0, 5, nil) // 11 + require.Equal(t, true, applied) + require.Equal(t, false, reverted) +} + +func TestAtStartConfidence(t *testing.T) { + fcs := &fakeCS{ + t: t, + h: 1, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), + } + require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) + + events := NewEvents(context.Background(), fcs) + + fcs.advance(0, 10, nil) // 11 + + var applied bool + var reverted bool + + err := events.ChainAt(func(ts *types.TipSet, curH uint64) error { + require.Equal(t, 5, int(ts.Height())) + require.Equal(t, 11, int(curH)) + applied = true + return nil + }, func(ts *types.TipSet) error { + reverted = true + return nil + }, 3, 5) + require.NoError(t, err) + + require.Equal(t, true, applied) + require.Equal(t, false, reverted) +} + func TestCalled(t *testing.T) { fcs := &fakeCS{ t: t, h: 1, - msgs: map[cid.Cid]fakeMsg{}, - tsc: newTSCache(2 * build.ForkLengthThreshold), + msgs: map[cid.Cid]fakeMsg{}, + blkMsgs: map[cid.Cid]cid.Cid{}, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), } require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) - events := NewEvents(fcs) + events := NewEvents(context.Background(), fcs) t0123, err := address.NewFromString("t0123") require.NoError(t, err) @@ -380,12 +478,13 @@ func TestCalledTimeout(t *testing.T) { t: t, h: 1, - msgs: map[cid.Cid]fakeMsg{}, - tsc: newTSCache(2 * build.ForkLengthThreshold), + msgs: map[cid.Cid]fakeMsg{}, + blkMsgs: map[cid.Cid]cid.Cid{}, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), } require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) - events := NewEvents(fcs) + events := NewEvents(context.Background(), fcs) t0123, err := address.NewFromString("t0123") require.NoError(t, err) @@ -419,12 +518,13 @@ func TestCalledTimeout(t *testing.T) { t: t, h: 1, - msgs: map[cid.Cid]fakeMsg{}, - tsc: newTSCache(2 * build.ForkLengthThreshold), + msgs: map[cid.Cid]fakeMsg{}, + blkMsgs: map[cid.Cid]cid.Cid{}, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), } require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) - events = NewEvents(fcs) + events = NewEvents(context.Background(), fcs) err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) { return true, true, nil @@ -452,12 +552,13 @@ func TestCalledOrder(t *testing.T) { t: t, h: 1, - msgs: map[cid.Cid]fakeMsg{}, - tsc: newTSCache(2 * build.ForkLengthThreshold), + msgs: map[cid.Cid]fakeMsg{}, + blkMsgs: map[cid.Cid]cid.Cid{}, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), } require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) - events := NewEvents(fcs) + events := NewEvents(context.Background(), fcs) t0123, err := address.NewFromString("t0123") require.NoError(t, err) diff --git a/chain/events/tscache.go b/chain/events/tscache.go index 630aed0d0..33493a4b8 100644 --- a/chain/events/tscache.go +++ b/chain/events/tscache.go @@ -1,24 +1,31 @@ package events import ( + "context" "golang.org/x/xerrors" "github.com/filecoin-project/go-lotus/chain/types" ) +type tsByHFunc func(context.Context, uint64, *types.TipSet) (*types.TipSet, error) + // tipSetCache implements a simple ring-buffer cache to keep track of recent // tipsets type tipSetCache struct { cache []*types.TipSet start int len int + + storage tsByHFunc } -func newTSCache(cap int) *tipSetCache { +func newTSCache(cap int, storage tsByHFunc) *tipSetCache { return &tipSetCache{ cache: make([]*types.TipSet, cap), start: 0, len: 0, + + storage: storage, } } @@ -66,9 +73,7 @@ func (tsc *tipSetCache) get(height uint64) (*types.TipSet, error) { tailH := tsc.cache[(tsc.start-tsc.len+1)%len(tsc.cache)].Height() if height < tailH { - // TODO: we can try to walk parents, but that shouldn't happen in - // practice, so it's probably not worth implementing - return nil, xerrors.Errorf("tipSetCache.get: requested tipset not in cache (req: %d, cache tail: %d)", height, tailH) + return tsc.storage(context.TODO(), height, tsc.cache[tailH]) } return tsc.cache[int(height-tailH+1)%len(tsc.cache)], nil diff --git a/chain/store/store.go b/chain/store/store.go index 43c39991d..85b846521 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -54,18 +54,23 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching) *ChainStore { hcnf := func(rev, app []*types.TipSet) error { cs.pubLk.Lock() defer cs.pubLk.Unlock() - for _, r := range rev { - cs.bestTips.Pub(&HeadChange{ + + notif := make([]*HeadChange, len(rev)+len(app)) + + for i, r := range rev { + notif[i] = &HeadChange{ Type: HCRevert, Val: r, - }, "headchange") + } } - for _, r := range app { - cs.bestTips.Pub(&HeadChange{ + for i, r := range app { + notif[i+len(rev)] = &HeadChange{ Type: HCApply, Val: r, - }, "headchange") + } } + + cs.bestTips.Pub(notif, "headchange") return nil } @@ -112,18 +117,6 @@ func (cs *ChainStore) writeHead(ts *types.TipSet) error { return nil } -func (cs *ChainStore) SubNewTips() chan *types.TipSet { - subch := cs.bestTips.Sub("best") - out := make(chan *types.TipSet) - go func() { - defer close(out) - for val := range subch { - out <- val.(*types.TipSet) - } - }() - return out -} - const ( HCRevert = "revert" HCApply = "apply" @@ -135,17 +128,17 @@ type HeadChange struct { Val *types.TipSet } -func (cs *ChainStore) SubHeadChanges(ctx context.Context) chan *HeadChange { +func (cs *ChainStore) SubHeadChanges(ctx context.Context) chan []*HeadChange { cs.pubLk.Lock() subch := cs.bestTips.Sub("headchange") head := cs.GetHeaviestTipSet() cs.pubLk.Unlock() - out := make(chan *HeadChange, 16) - out <- &HeadChange{ + out := make(chan []*HeadChange, 16) + out <- []*HeadChange{{ Type: HCCurrent, Val: head, - } + }} go func() { defer close(out) @@ -156,8 +149,11 @@ func (cs *ChainStore) SubHeadChanges(ctx context.Context) chan *HeadChange { log.Warn("chain head sub exit loop") return } + if len(out) > 0 { + log.Warnf("head change sub is slow, has %d buffered entries", len(out)) + } select { - case out <- val.(*HeadChange): + case out <- val.([]*HeadChange): case <-ctx.Done(): } case <-ctx.Done(): @@ -610,20 +606,22 @@ func (cs *ChainStore) WaitForMessage(ctx context.Context, mcid cid.Cid) (cid.Cid for { select { - case val, ok := <-tsub: + case notif, ok := <-tsub: if !ok { return cid.Undef, nil, ctx.Err() } - switch val.Type { - case HCRevert: - continue - case HCApply: - bc, r, err := cs.tipsetContainsMsg(val.Val, mcid) - if err != nil { - return cid.Undef, nil, err - } - if r != nil { - return bc, r, nil + for _, val := range notif { + switch val.Type { + case HCRevert: + continue + case HCApply: + bc, r, err := cs.tipsetContainsMsg(val.Val, mcid) + if err != nil { + return cid.Undef, nil, err + } + if r != nil { + return bc, r, nil + } } } case <-ctx.Done(): diff --git a/chain/sync_test.go b/chain/sync_test.go index 56f98cc7d..47b8f4ec3 100644 --- a/chain/sync_test.go +++ b/chain/sync_test.go @@ -223,9 +223,11 @@ func (tu *syncTestUtil) waitUntilSync(from, to int) { } // TODO: some sort of timeout? - for c := range hc { - if c.Val.Equals(target) { - return + for n := range hc { + for _, c := range n { + if c.Val.Equals(target) { + return + } } } } diff --git a/node/impl/full/chain.go b/node/impl/full/chain.go index 2e429dde0..94fb5717e 100644 --- a/node/impl/full/chain.go +++ b/node/impl/full/chain.go @@ -22,7 +22,7 @@ type ChainAPI struct { PubSub *pubsub.PubSub } -func (a *ChainAPI) ChainNotify(ctx context.Context) (<-chan *store.HeadChange, error) { +func (a *ChainAPI) ChainNotify(ctx context.Context) (<-chan []*store.HeadChange, error) { return a.Chain.SubHeadChanges(ctx), nil } @@ -109,5 +109,5 @@ func (a *ChainAPI) ChainGetBlockReceipts(ctx context.Context, bcid cid.Cid) ([]* } func (a *ChainAPI) ChainGetTipSetByHeight(ctx context.Context, h uint64, ts *types.TipSet) (*types.TipSet, error) { - panic("NYI") + return a.Chain.GetTipsetByHeight(ctx, h, ts) } diff --git a/storage/miner.go b/storage/miner.go index 2d3278c9f..2ffaa0c8b 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -2,11 +2,10 @@ package storage import ( "context" - "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log" - host "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/host" "github.com/pkg/errors" "golang.org/x/xerrors" @@ -14,6 +13,7 @@ import ( "github.com/filecoin-project/go-lotus/build" "github.com/filecoin-project/go-lotus/chain/actors" "github.com/filecoin-project/go-lotus/chain/address" + "github.com/filecoin-project/go-lotus/chain/events" "github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/lib/sectorbuilder" @@ -23,8 +23,11 @@ import ( var log = logging.Logger("storageminer") +const PoStConfidence = 0 + type Miner struct { - api storageMinerApi + api storageMinerApi + events *events.Events secst *sector.Store commt *commitment.Tracker @@ -51,10 +54,12 @@ type storageMinerApi interface { MpoolPush(context.Context, *types.SignedMessage) error MpoolGetNonce(context.Context, address.Address) (uint64, error) + ChainHead(context.Context) (*types.TipSet, error) ChainWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error) - ChainNotify(context.Context) (<-chan *store.HeadChange, error) + ChainNotify(context.Context) (<-chan []*store.HeadChange, error) ChainGetRandomness(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error) ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error) + ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error) WalletBalance(context.Context, address.Address) (types.BigInt, error) WalletSign(context.Context, address.Address, []byte) (*types.Signature, error) @@ -63,7 +68,8 @@ type storageMinerApi interface { func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, secst *sector.Store, commt *commitment.Tracker) (*Miner, error) { return &Miner{ - api: api, + api: api, + maddr: addr, h: h, ds: ds, @@ -77,8 +83,15 @@ func (m *Miner) Run(ctx context.Context) error { return errors.Wrap(err, "miner preflight checks failed") } + m.events = events.NewEvents(ctx, m.api) + + ts, err := m.api.ChainHead(ctx) + if err != nil { + return err + } + go m.handlePostingSealedSectors(ctx) - go m.runPoSt(ctx) + go m.schedulePoSt(ctx, ts) return nil } @@ -174,68 +187,54 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal return nil } -func (m *Miner) runPoSt(ctx context.Context) { - // TODO: most of this method can probably be replaced by the events module once it works on top of the api - notifs, err := m.api.ChainNotify(ctx) +func (m *Miner) schedulePoSt(ctx context.Context, baseTs *types.TipSet) { + ppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, baseTs) if err != nil { - // TODO: this is probably 'crash the node' level serious - log.Errorf("POST ROUTINE FAILED: failed to get chain notifications stream: %s", err) + log.Errorf("failed to get proving period end for miner: %s", err) return } - curhead := <-notifs - if curhead.Type != store.HCCurrent { - // TODO: this is probably 'crash the node' level serious - log.Warning("expected to get current best tipset from chain notifications stream") + if ppe == 0 { + log.Errorf("Proving period end == 0") + // TODO: we probably want to call schedulePoSt after the first commitSector call return } - postCtx, cancel := context.WithCancel(ctx) - postWaitCh, onBlock, err := m.maybeDoPost(postCtx, curhead.Val) + log.Infof("Scheduling post at height %d", ppe) + // TODO: Should we set confidence to randomness lookback? + err = m.events.ChainAt(m.startPost, func(ts *types.TipSet) error { // Revert + // TODO: Cancel post + return nil + }, PoStConfidence, ppe) if err != nil { - log.Errorf("initial 'maybeDoPost' call failed: %s", err) + // TODO: This is BAD, figure something out + log.Errorf("scheduling PoSt failed: %s", err) return } +} - for { - select { - case <-ctx.Done(): - case ch, ok := <-notifs: - if !ok { - log.Warning("chain notifications stream terminated") - // TODO: attempt to restart it if the context isnt cancelled - return - } +func (m *Miner) startPost(ts *types.TipSet, curH uint64) error { + postWaitCh, _, err := m.maybeDoPost(context.TODO(), ts) + if err != nil { + return err + } - switch ch.Type { - case store.HCApply: - postWaitCh, onBlock, err = m.maybeDoPost(postCtx, ch.Val) - if err != nil { - log.Errorf("maybeDoPost failed: %s", err) - return - } - case store.HCRevert: - if onBlock != nil { - if ch.Val.Contains(onBlock.Cid()) { - // Our post may now be invalid! - cancel() // probably the right thing to do? - } - } - case store.HCCurrent: - log.Warn("got 'current' chain notification in middle of stream") - } - case perr := <-postWaitCh: - if perr != nil { - log.Errorf("got error back from postWaitCh: %s", err) - // TODO: what do we even do here? - return - } - postWaitCh = nil - onBlock = nil - // yay? - log.Infof("post successfully submitted") + if postWaitCh == nil { + return errors.New("PoSt didn't start") + } + + go func() { + err := <-postWaitCh + if err != nil { + log.Errorf("got error back from postWaitCh: %s", err) + return } - } + + log.Infof("post successfully submitted") + + m.schedulePoSt(context.TODO(), ts) + }() + return nil } func (m *Miner) maybeDoPost(ctx context.Context, ts *types.TipSet) (<-chan error, *types.BlockHeader, error) { @@ -245,6 +244,7 @@ func (m *Miner) maybeDoPost(ctx context.Context, ts *types.TipSet) (<-chan error } if ppe < ts.Height() { + log.Warnf("skipping post, supplied tipset too high: ppe=%d, ts.H=%d", ppe, ts.Height()) return nil, nil, nil }