diff --git a/chain/events/events.go b/chain/events/events.go index 1106c8096..3fdad1cb3 100644 --- a/chain/events/events.go +++ b/chain/events/events.go @@ -31,6 +31,7 @@ type heightHandler struct { type eventApi interface { ChainNotify(context.Context) (<-chan []*store.HeadChange, error) ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error) + ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error) } type Events struct { @@ -49,7 +50,7 @@ type Events struct { func NewEvents(api eventApi) *Events { gcConfidence := 2 * build.ForkLengthThreshold - tsc := newTSCache(gcConfidence) + tsc := newTSCache(gcConfidence, api.ChainGetTipSetByHeight) e := &Events{ api: api, diff --git a/chain/events/events_test.go b/chain/events/events_test.go index 4b7eabaf5..2d61edf35 100644 --- a/chain/events/events_test.go +++ b/chain/events/events_test.go @@ -39,6 +39,10 @@ type fakeCS struct { sub func(rev, app []*types.TipSet) } +func (fcs *fakeCS) ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error) { + panic("Not Implemented") +} + func makeTs(t *testing.T, h uint64, msgcid cid.Cid) *types.TipSet { ts, err := types.NewTipSet([]*types.BlockHeader{ { @@ -152,7 +156,7 @@ func TestAt(t *testing.T) { fcs := &fakeCS{ t: t, h: 1, - tsc: newTSCache(2 * build.ForkLengthThreshold), + tsc: newTSCache(2*build.ForkLengthThreshold, nil), } require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) @@ -211,7 +215,7 @@ func TestAtStart(t *testing.T) { fcs := &fakeCS{ t: t, h: 1, - tsc: newTSCache(2 * build.ForkLengthThreshold), + tsc: newTSCache(2*build.ForkLengthThreshold, nil), } require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) @@ -245,7 +249,7 @@ func TestAtStartConfidence(t *testing.T) { fcs := &fakeCS{ t: t, h: 1, - tsc: newTSCache(2 * build.ForkLengthThreshold), + tsc: newTSCache(2*build.ForkLengthThreshold, nil), } require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) @@ -278,7 +282,7 @@ func TestCalled(t *testing.T) { msgs: map[cid.Cid]fakeMsg{}, blkMsgs: map[cid.Cid]cid.Cid{}, - tsc: newTSCache(2 * build.ForkLengthThreshold), + tsc: newTSCache(2*build.ForkLengthThreshold, nil), } require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) @@ -476,7 +480,7 @@ func TestCalledTimeout(t *testing.T) { msgs: map[cid.Cid]fakeMsg{}, blkMsgs: map[cid.Cid]cid.Cid{}, - tsc: newTSCache(2 * build.ForkLengthThreshold), + tsc: newTSCache(2*build.ForkLengthThreshold, nil), } require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) @@ -516,7 +520,7 @@ func TestCalledTimeout(t *testing.T) { msgs: map[cid.Cid]fakeMsg{}, blkMsgs: map[cid.Cid]cid.Cid{}, - tsc: newTSCache(2 * build.ForkLengthThreshold), + tsc: newTSCache(2*build.ForkLengthThreshold, nil), } require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) @@ -550,7 +554,7 @@ func TestCalledOrder(t *testing.T) { msgs: map[cid.Cid]fakeMsg{}, blkMsgs: map[cid.Cid]cid.Cid{}, - tsc: newTSCache(2 * build.ForkLengthThreshold), + tsc: newTSCache(2*build.ForkLengthThreshold, nil), } require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) diff --git a/chain/events/tscache.go b/chain/events/tscache.go index 630aed0d0..33493a4b8 100644 --- a/chain/events/tscache.go +++ b/chain/events/tscache.go @@ -1,24 +1,31 @@ package events import ( + "context" "golang.org/x/xerrors" "github.com/filecoin-project/go-lotus/chain/types" ) +type tsByHFunc func(context.Context, uint64, *types.TipSet) (*types.TipSet, error) + // tipSetCache implements a simple ring-buffer cache to keep track of recent // tipsets type tipSetCache struct { cache []*types.TipSet start int len int + + storage tsByHFunc } -func newTSCache(cap int) *tipSetCache { +func newTSCache(cap int, storage tsByHFunc) *tipSetCache { return &tipSetCache{ cache: make([]*types.TipSet, cap), start: 0, len: 0, + + storage: storage, } } @@ -66,9 +73,7 @@ func (tsc *tipSetCache) get(height uint64) (*types.TipSet, error) { tailH := tsc.cache[(tsc.start-tsc.len+1)%len(tsc.cache)].Height() if height < tailH { - // TODO: we can try to walk parents, but that shouldn't happen in - // practice, so it's probably not worth implementing - return nil, xerrors.Errorf("tipSetCache.get: requested tipset not in cache (req: %d, cache tail: %d)", height, tailH) + return tsc.storage(context.TODO(), height, tsc.cache[tailH]) } return tsc.cache[int(height-tailH+1)%len(tsc.cache)], nil diff --git a/node/impl/full/chain.go b/node/impl/full/chain.go index 683ba73d7..94fb5717e 100644 --- a/node/impl/full/chain.go +++ b/node/impl/full/chain.go @@ -109,5 +109,5 @@ func (a *ChainAPI) ChainGetBlockReceipts(ctx context.Context, bcid cid.Cid) ([]* } func (a *ChainAPI) ChainGetTipSetByHeight(ctx context.Context, h uint64, ts *types.TipSet) (*types.TipSet, error) { - panic("NYI") + return a.Chain.GetTipsetByHeight(ctx, h, ts) } diff --git a/storage/miner.go b/storage/miner.go index 5ae6b3663..5cbb7015d 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -2,11 +2,10 @@ package storage import ( "context" - "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log" - host "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/host" "github.com/pkg/errors" "golang.org/x/xerrors" @@ -24,6 +23,8 @@ import ( var log = logging.Logger("storageminer") +const PoStConfidence = 0 + type Miner struct { api storageMinerApi events *events.Events @@ -53,6 +54,7 @@ type storageMinerApi interface { MpoolPush(context.Context, *types.SignedMessage) error MpoolGetNonce(context.Context, address.Address) (uint64, error) + ChainHead(context.Context) (*types.TipSet, error) ChainWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error) ChainNotify(context.Context) (<-chan []*store.HeadChange, error) ChainGetRandomness(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error) @@ -82,8 +84,13 @@ func (m *Miner) Run(ctx context.Context) error { return errors.Wrap(err, "miner preflight checks failed") } + ts, err := m.api.ChainHead(ctx) + if err != nil { + return err + } + go m.handlePostingSealedSectors(ctx) - go m.runPoSt(ctx) + go m.schedulePoSt(ctx, ts) return nil } @@ -179,70 +186,54 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal return nil } -func (m *Miner) runPoSt(ctx context.Context) { - // TODO: most of this method can probably be replaced by the events module once it works on top of the api - notifs, err := m.api.ChainNotify(ctx) +func (m *Miner) schedulePoSt(ctx context.Context, baseTs *types.TipSet) { + ppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, baseTs) if err != nil { - // TODO: this is probably 'crash the node' level serious - log.Errorf("POST ROUTINE FAILED: failed to get chain notifications stream: %s", err) + log.Errorf("failed to get proving period end for miner: %s", err) return } - curhead := <-notifs - if curhead[0].Type != store.HCCurrent { - // TODO: this is probably 'crash the node' level serious - log.Warning("expected to get current best tipset from chain notifications stream") + if ppe == 0 { + log.Errorf("Proving period end == 0") + // TODO: we probably want to call schedulePoSt after the first commitSector call return } - postCtx, cancel := context.WithCancel(ctx) - postWaitCh, onBlock, err := m.maybeDoPost(postCtx, curhead[0].Val) + log.Infof("Scheduling post at height %d", ppe) + // TODO: Should we set confidence to randomness lookback? + err = m.events.ChainAt(m.startPost, func(ts *types.TipSet) error { // Revert + // TODO: Cancel post + return nil + }, PoStConfidence, ppe) if err != nil { - log.Errorf("initial 'maybeDoPost' call failed: %s", err) + // TODO: This is BAD, figure something out + log.Errorf("scheduling PoSt failed: %s", err) return } +} - for { - select { - case <-ctx.Done(): - case notif, ok := <-notifs: - for _, ch := range notif { - if !ok { - log.Warning("chain notifications stream terminated") - // TODO: attempt to restart it if the context isnt cancelled - return - } +func (m *Miner) startPost(ts *types.TipSet, curH uint64) error { + postWaitCh, _, err := m.maybeDoPost(context.TODO(), ts) + if err != nil { + return err + } - switch ch.Type { - case store.HCApply: - postWaitCh, onBlock, err = m.maybeDoPost(postCtx, ch.Val) - if err != nil { - log.Errorf("maybeDoPost failed: %s", err) - return - } - case store.HCRevert: - if onBlock != nil { - if ch.Val.Contains(onBlock.Cid()) { - // Our post may now be invalid! - cancel() // probably the right thing to do? - } - } - case store.HCCurrent: - log.Warn("got 'current' chain notification in middle of stream") - } - } - case perr := <-postWaitCh: - if perr != nil { - log.Errorf("got error back from postWaitCh: %s", err) - // TODO: what do we even do here? - return - } - postWaitCh = nil - onBlock = nil - // yay? - log.Infof("post successfully submitted") + if postWaitCh == nil { + return errors.New("PoSt didn't start") + } + + go func() { + err := <-postWaitCh + if err != nil { + log.Errorf("got error back from postWaitCh: %s", err) + return } - } + + log.Infof("post successfully submitted") + + m.schedulePoSt(context.TODO(), ts) + }() + return nil } func (m *Miner) maybeDoPost(ctx context.Context, ts *types.TipSet) (<-chan error, *types.BlockHeader, error) { @@ -252,6 +243,7 @@ func (m *Miner) maybeDoPost(ctx context.Context, ts *types.TipSet) (<-chan error } if ppe < ts.Height() { + log.Warnf("skipping post, supplied tipset too high: ppe=%d, ts.H=%d", ppe, ts.Height()) return nil, nil, nil }