From ebb744f42f39951ceb1f24a98ce02979d26db601 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 26 Sep 2019 17:35:32 +0200 Subject: [PATCH 1/9] storageminer: retry PoSt --- storage/miner.go | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/storage/miner.go b/storage/miner.go index 00c69ea87..3a8102563 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -10,6 +10,7 @@ import ( "github.com/pkg/errors" "golang.org/x/xerrors" "sync" + "time" "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/build" @@ -94,7 +95,7 @@ func (m *Miner) Run(ctx context.Context) error { } go m.handlePostingSealedSectors(ctx) - go m.schedulePoSt(ctx, ts) + go m.schedulePoSt(ctx, ts, false) return nil } @@ -172,13 +173,13 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal return } - m.schedulePoSt(ctx, nil) + m.schedulePoSt(ctx, nil, false) }() return nil } -func (m *Miner) schedulePoSt(ctx context.Context, baseTs *types.TipSet) { +func (m *Miner) schedulePoSt(ctx context.Context, baseTs *types.TipSet, force bool) { ppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, baseTs) if err != nil { log.Errorf("failed to get proving period end for miner: %s", err) @@ -194,9 +195,11 @@ func (m *Miner) schedulePoSt(ctx context.Context, baseTs *types.TipSet) { m.schedLk.Lock() if m.postSched >= ppe { - log.Warnf("schedulePoSt already called for proving period >= %d", m.postSched) - m.schedLk.Unlock() - return + if !force || m.postSched > ppe { + log.Warnf("schedulePoSt already called for proving period >= %d", m.postSched) + m.schedLk.Unlock() + return + } } m.postSched = ppe m.schedLk.Unlock() @@ -213,33 +216,44 @@ func (m *Miner) schedulePoSt(ctx context.Context, baseTs *types.TipSet) { } } +func (m *Miner) restartPost(ts *types.TipSet) { + time.Sleep(400 * time.Millisecond) + log.Warn("Restarting PoSt after failure") + m.schedulePoSt(context.TODO(), ts, true) +} + func (m *Miner) startPost(ts *types.TipSet, curH uint64) error { log.Info("starting PoSt computation") head, err := m.api.ChainHead(context.TODO()) if err != nil { + m.restartPost(ts) return err } postWaitCh, _, err := m.maybeDoPost(context.TODO(), head) if err != nil { + m.restartPost(ts) return err } if postWaitCh == nil { - return errors.New("PoSt didn't start") + log.Errorf("PoSt didn't start") + m.restartPost(ts) + return nil } go func() { err := <-postWaitCh if err != nil { log.Errorf("got error back from postWaitCh: %s", err) + m.restartPost(ts) return } log.Infof("post successfully submitted") - m.schedulePoSt(context.TODO(), ts) + m.schedulePoSt(context.TODO(), ts, false) }() return nil } @@ -316,7 +330,7 @@ func (m *Miner) maybeDoPost(ctx context.Context, ts *types.TipSet) (<-chan error } // TODO: check receipt - m.schedulePoSt(ctx, nil) + m.schedulePoSt(ctx, nil, true) }() return ret, sourceTs.MinTicketBlock(), nil From a167b9f5db7962fe52cc637ea6fb44b0d3bc10b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 26 Sep 2019 20:54:31 +0200 Subject: [PATCH 2/9] storageminer: Set Value when posting post --- storage/miner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/miner.go b/storage/miner.go index 3a8102563..411baae7b 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -312,7 +312,7 @@ func (m *Miner) maybeDoPost(ctx context.Context, ts *types.TipSet) (<-chan error From: m.worker, Method: actors.MAMethods.SubmitPoSt, Params: enc, - Value: types.NewInt(0), + Value: types.NewInt(1000), // currently hard-coded late fee in actor, returned if not late GasLimit: types.NewInt(100000 /* i dont know help */), GasPrice: types.NewInt(1), } From 4305824be77af07708e6a049565934893dee9891 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 26 Sep 2019 21:23:11 +0200 Subject: [PATCH 3/9] miner actor: Change PPE logic in submitPost --- chain/actors/actor_miner.go | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/chain/actors/actor_miner.go b/chain/actors/actor_miner.go index 9e1f02619..25ad88ee1 100644 --- a/chain/actors/actor_miner.go +++ b/chain/actors/actor_miner.go @@ -312,16 +312,14 @@ func (sma StorageMinerActor) SubmitPoSt(act *types.Actor, vmctx types.VMContext, return nil, aerrors.New(1, "not authorized to submit post for miner") } - feesRequired := types.NewInt(0) - nextProvingPeriodEnd := self.ProvingPeriodEnd + build.ProvingPeriodDuration - if vmctx.BlockHeight() > nextProvingPeriodEnd { - return nil, aerrors.New(1, "PoSt submited too late") - } + provingPeriodOffset := self.ProvingPeriodEnd % build.ProvingPeriodDuration + provingPeriod := (vmctx.BlockHeight() - provingPeriodOffset - 1) / build.ProvingPeriodDuration + 1 + currentProvingPeriodEnd := provingPeriod * build.ProvingPeriodDuration + provingPeriodOffset - var lateSubmission bool - if vmctx.BlockHeight() > self.ProvingPeriodEnd { + feesRequired := types.NewInt(0) + + if currentProvingPeriodEnd > self.ProvingPeriodEnd { //TODO late fee calc - lateSubmission = true feesRequired = types.BigAdd(feesRequired, types.NewInt(1000)) } @@ -342,13 +340,14 @@ func (sma StorageMinerActor) SubmitPoSt(act *types.Actor, vmctx types.VMContext, var seed [sectorbuilder.CommLen]byte { - var rand []byte - var err ActorError - if !lateSubmission { - rand, err = vmctx.GetRandomness(self.ProvingPeriodEnd - build.PoSTChallangeTime) - } else { - rand, err = vmctx.GetRandomness(nextProvingPeriodEnd - build.PoSTChallangeTime) + randHeight := currentProvingPeriodEnd - build.PoSTChallangeTime + if vmctx.BlockHeight() <= randHeight { + // TODO: spec, retcode + return nil, aerrors.New(1, "submit PoSt called outside submission window") } + + rand, err := vmctx.GetRandomness(randHeight) + if err != nil { return nil, aerrors.Wrap(err, "could not get randomness for PoST") } @@ -434,7 +433,7 @@ func (sma StorageMinerActor) SubmitPoSt(act *types.Actor, vmctx types.VMContext, } self.ProvingSet = self.Sectors - self.ProvingPeriodEnd = nextProvingPeriodEnd + self.ProvingPeriodEnd = currentProvingPeriodEnd + build.ProvingPeriodDuration self.NextDoneSet = params.DoneSet c, err := vmctx.Storage().Put(self) From 10248125594fd50a7727be2849a2b522ea1f2599 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 26 Sep 2019 22:57:20 +0200 Subject: [PATCH 4/9] storageminer: Update post scheduling --- chain/actors/actor_miner.go | 4 +- cmd/lotus-storage-miner/init.go | 12 +-- storage/miner.go | 177 +++++++++++++++----------------- 3 files changed, 89 insertions(+), 104 deletions(-) diff --git a/chain/actors/actor_miner.go b/chain/actors/actor_miner.go index 25ad88ee1..d406815ea 100644 --- a/chain/actors/actor_miner.go +++ b/chain/actors/actor_miner.go @@ -313,8 +313,8 @@ func (sma StorageMinerActor) SubmitPoSt(act *types.Actor, vmctx types.VMContext, } provingPeriodOffset := self.ProvingPeriodEnd % build.ProvingPeriodDuration - provingPeriod := (vmctx.BlockHeight() - provingPeriodOffset - 1) / build.ProvingPeriodDuration + 1 - currentProvingPeriodEnd := provingPeriod * build.ProvingPeriodDuration + provingPeriodOffset + provingPeriod := (vmctx.BlockHeight()-provingPeriodOffset-1)/build.ProvingPeriodDuration + 1 + currentProvingPeriodEnd := provingPeriod*build.ProvingPeriodDuration + provingPeriodOffset feesRequired := types.NewInt(0) diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index 3d2e113dd..81ca41825 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -26,8 +26,8 @@ var initCmd = &cli.Command{ Usage: "specify the address of an already created miner actor", }, &cli.BoolFlag{ - Name: "genesis-miner", - Usage: "enable genesis mining (DON'T USE ON BOOTSTRAPPED NETWORK)", + Name: "genesis-miner", + Usage: "enable genesis mining (DON'T USE ON BOOTSTRAPPED NETWORK)", Hidden: true, }, &cli.BoolFlag{ @@ -35,14 +35,14 @@ var initCmd = &cli.Command{ Usage: "create separate worker key", }, &cli.StringFlag{ - Name: "worker", + Name: "worker", Aliases: []string{"w"}, - Usage: "worker key to use (overrides --create-worker-key)", + Usage: "worker key to use (overrides --create-worker-key)", }, &cli.StringFlag{ - Name: "owner", + Name: "owner", Aliases: []string{"o"}, - Usage: "owner key to use", + Usage: "owner key to use", }, }, Action: func(cctx *cli.Context) error { diff --git a/storage/miner.go b/storage/miner.go index 411baae7b..0f49d217c 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -10,7 +10,6 @@ import ( "github.com/pkg/errors" "golang.org/x/xerrors" "sync" - "time" "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/build" @@ -44,7 +43,7 @@ type Miner struct { ds datastore.Batching schedLk sync.Mutex - postSched uint64 + schedPost uint64 } type storageMinerApi interface { @@ -89,13 +88,8 @@ func (m *Miner) Run(ctx context.Context) error { m.events = events.NewEvents(ctx, m.api) - ts, err := m.api.ChainHead(ctx) - if err != nil { - return err - } - go m.handlePostingSealedSectors(ctx) - go m.schedulePoSt(ctx, ts, false) + go m.beginPosting(ctx) return nil } @@ -173,14 +167,20 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal return } - m.schedulePoSt(ctx, nil, false) + m.beginPosting(ctx) }() return nil } -func (m *Miner) schedulePoSt(ctx context.Context, baseTs *types.TipSet, force bool) { - ppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, baseTs) +func (m *Miner) beginPosting(ctx context.Context) { + ts, err := m.api.ChainHead(context.TODO()) + if err != nil { + log.Error(err) + return + } + + ppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, ts) if err != nil { log.Errorf("failed to get proving period end for miner: %s", err) return @@ -188,25 +188,26 @@ func (m *Miner) schedulePoSt(ctx context.Context, baseTs *types.TipSet, force bo if ppe == 0 { log.Errorf("Proving period end == 0") - // TODO: we probably want to call schedulePoSt after the first commitSector call return } m.schedLk.Lock() - - if m.postSched >= ppe { - if !force || m.postSched > ppe { - log.Warnf("schedulePoSt already called for proving period >= %d", m.postSched) - m.schedLk.Unlock() - return - } + if m.schedPost >= 0 { + log.Warnf("PoSts already running %d", m.schedPost) + m.schedLk.Unlock() + return } - m.postSched = ppe + + provingPeriodOffset := ppe % build.ProvingPeriodDuration + provingPeriod := (ts.Height()-provingPeriodOffset-1)/build.ProvingPeriodDuration + 1 + m.schedPost = provingPeriod*build.ProvingPeriodDuration + provingPeriodOffset + m.schedLk.Unlock() log.Infof("Scheduling post at height %d", ppe-build.PoSTChallangeTime) - err = m.events.ChainAt(m.startPost, func(ts *types.TipSet) error { // Revert + err = m.events.ChainAt(m.computePost(m.schedPost), func(ts *types.TipSet) error { // Revert // TODO: Cancel post + log.Errorf("TODO: Cancel PoSt, re-run") return nil }, PoStConfidence, ppe-build.PoSTChallangeTime) if err != nil { @@ -216,82 +217,66 @@ func (m *Miner) schedulePoSt(ctx context.Context, baseTs *types.TipSet, force bo } } -func (m *Miner) restartPost(ts *types.TipSet) { - time.Sleep(400 * time.Millisecond) - log.Warn("Restarting PoSt after failure") - m.schedulePoSt(context.TODO(), ts, true) +func (m *Miner) scheduleNextPost(ppe uint64) { + ts, err := m.api.ChainHead(context.TODO()) + if err != nil { + log.Error(err) + // TODO: retry + return + } + + provingPeriodOffset := ppe % build.ProvingPeriodDuration + provingPeriod := (ts.Height()-provingPeriodOffset-1)/build.ProvingPeriodDuration + 1 + headPPE := provingPeriod*build.ProvingPeriodDuration + provingPeriodOffset + if headPPE > ppe { + log.Warn("PoSt computation running behind chain") + ppe = headPPE + } + + m.schedLk.Lock() + if m.schedPost >= ppe { + // this probably can't happen + log.Error("PoSt already scheduled: %d >= %d", m.schedPost, ppe) + m.schedLk.Unlock() + return + } + + m.schedPost = ppe + m.schedLk.Unlock() + + log.Infof("Scheduling post at height %d", ppe-build.PoSTChallangeTime) + err = m.events.ChainAt(m.computePost(ppe), func(ts *types.TipSet) error { // Revert + // TODO: Cancel post + log.Errorf("TODO: Cancel PoSt, re-run") + return nil + }, PoStConfidence, ppe-build.PoSTChallangeTime) + if err != nil { + // TODO: This is BAD, figure something out + log.Errorf("scheduling PoSt failed: %s", err) + return + } } -func (m *Miner) startPost(ts *types.TipSet, curH uint64) error { - log.Info("starting PoSt computation") +func (m *Miner) computePost(ppe uint64) func(ts *types.TipSet, curH uint64) error { + return func(ts *types.TipSet, curH uint64) error { - head, err := m.api.ChainHead(context.TODO()) - if err != nil { - m.restartPost(ts) - return err - } + ctx := context.TODO() - postWaitCh, _, err := m.maybeDoPost(context.TODO(), head) - if err != nil { - m.restartPost(ts) - return err - } - - if postWaitCh == nil { - log.Errorf("PoSt didn't start") - m.restartPost(ts) - return nil - } - - go func() { - err := <-postWaitCh + sset, err := m.api.StateMinerProvingSet(ctx, m.maddr, ts) if err != nil { - log.Errorf("got error back from postWaitCh: %s", err) - m.restartPost(ts) - return + return xerrors.Errorf("failed to get proving set for miner: %w", err) } - log.Infof("post successfully submitted") + r, err := m.api.ChainGetRandomness(ctx, ts, nil, int(int64(ts.Height())-int64(ppe)+int64(build.PoSTChallangeTime))) // TODO: review: check math + if err != nil { + return xerrors.Errorf("failed to get chain randomness for post: %w", err) + } - m.schedulePoSt(context.TODO(), ts, false) - }() - return nil -} - -func (m *Miner) maybeDoPost(ctx context.Context, ts *types.TipSet) (<-chan error, *types.BlockHeader, error) { - ppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, ts) - if err != nil { - return nil, nil, xerrors.Errorf("failed to get proving period end for miner: %w", err) - } - - if ts.Height() > ppe { - log.Warnf("skipping post, supplied tipset too high: ppe=%d, ts.H=%d", ppe, ts.Height()) - return nil, nil, nil - } - - sset, err := m.api.StateMinerProvingSet(ctx, m.maddr, ts) - if err != nil { - return nil, nil, xerrors.Errorf("failed to get proving set for miner: %w", err) - } - - r, err := m.api.ChainGetRandomness(ctx, ts, nil, int(int64(ts.Height())-int64(ppe)+int64(build.PoSTChallangeTime))) // TODO: review: check math - if err != nil { - return nil, nil, xerrors.Errorf("failed to get chain randomness for post: %w", err) - } - - sourceTs, err := m.api.ChainGetTipSetByHeight(ctx, ppe-build.PoSTChallangeTime, ts) - if err != nil { - return nil, nil, xerrors.Errorf("failed to get post start tipset: %w", err) - } - - ret := make(chan error, 1) - go func() { log.Infof("running PoSt computation, rh=%d r=%s, ppe=%d, h=%d", ts.Height()-(ts.Height()-ppe+build.PoSTChallangeTime), base64.StdEncoding.EncodeToString(r), ppe, ts.Height()) var faults []uint64 proof, err := m.secst.RunPoSt(ctx, sset, r, faults) if err != nil { - ret <- xerrors.Errorf("running post failed: %w", err) - return + return xerrors.Errorf("running post failed: %w", err) } log.Infof("submitting PoSt pLen=%d", len(proof)) @@ -303,8 +288,7 @@ func (m *Miner) maybeDoPost(ctx context.Context, ts *types.TipSet) (<-chan error enc, aerr := actors.SerializeParams(params) if aerr != nil { - ret <- xerrors.Errorf("could not serialize submit post parameters: %w", err) - return + return xerrors.Errorf("could not serialize submit post parameters: %w", err) } msg := &types.Message{ @@ -319,21 +303,22 @@ func (m *Miner) maybeDoPost(ctx context.Context, ts *types.TipSet) (<-chan error smsg, err := m.api.MpoolPushMessage(ctx, msg) if err != nil { - ret <- xerrors.Errorf("pushing message to mpool: %w", err) - return + return xerrors.Errorf("pushing message to mpool: %w", err) } // make sure it succeeds... - _, err = m.api.ChainWaitMsg(ctx, smsg.Cid()) + rec, err := m.api.ChainWaitMsg(ctx, smsg.Cid()) if err != nil { - return + return err + } + if rec.Receipt.ExitCode != 0 { + log.Warnf("SubmitPoSt EXIT: %d", rec.Receipt.ExitCode) + // TODO: Do something } - // TODO: check receipt - m.schedulePoSt(ctx, nil, true) - }() - - return ret, sourceTs.MinTicketBlock(), nil + m.scheduleNextPost(ppe + build.ProvingPeriodDuration) + return nil + } } func sectorIdList(si []*api.SectorInfo) []uint64 { From fe8e1fe1e4a10c219a6b81f3f866a03a186908b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 27 Sep 2019 01:07:40 +0200 Subject: [PATCH 5/9] Almost working new post code --- build/params.go | 6 +- lib/jsonrpc/websocket.go | 4 +- node/modules/testing/genesis.go | 2 +- storage/miner.go | 161 +---------------------------- storage/post.go | 173 ++++++++++++++++++++++++++++++++ 5 files changed, 181 insertions(+), 165 deletions(-) create mode 100644 storage/post.go diff --git a/build/params.go b/build/params.go index 7c00f1920..26410fbbb 100644 --- a/build/params.go +++ b/build/params.go @@ -28,7 +28,7 @@ const MaxVouchersPerDeal = 768 // roughly one voucher per 10h over a year // Consensus / Network // Seconds -const BlockDelay = 6 +const BlockDelay = 4 // Seconds const AllowableClockDrift = BlockDelay * 2 @@ -43,10 +43,10 @@ const ForkLengthThreshold = 20 const RandomnessLookback = 20 // Blocks -const ProvingPeriodDuration = 40 +const ProvingPeriodDuration = 10 // Blocks -const PoSTChallangeTime = 20 +const PoSTChallangeTime = 5 const PowerCollateralProportion = 20 const PerCapitaCollateralProportion = 5 diff --git a/lib/jsonrpc/websocket.go b/lib/jsonrpc/websocket.go index c707d4a54..79674cc9d 100644 --- a/lib/jsonrpc/websocket.go +++ b/lib/jsonrpc/websocket.go @@ -420,7 +420,7 @@ func (c *wsConn) handleWsConn(ctx context.Context) { case r, ok := <-c.incoming: if !ok { if c.incomingErr != nil { - log.Debugf("websocket error", "error", c.incomingErr) + log.Warnw("websocket error", "error", c.incomingErr) } return // remote closed } @@ -443,7 +443,7 @@ func (c *wsConn) handleWsConn(ctx context.Context) { c.sendRequest(req.req) case <-c.stop: if err := c.conn.Close(); err != nil { - log.Debugf("websocket close error", "error", err) + log.Warnw("websocket close error", "error", err) } return } diff --git a/node/modules/testing/genesis.go b/node/modules/testing/genesis.go index c6c75d9e7..f4a35fede 100644 --- a/node/modules/testing/genesis.go +++ b/node/modules/testing/genesis.go @@ -76,7 +76,7 @@ func MakeGenesis(outFile string) func(bs dtypes.ChainBlockstore, w *wallet.Walle } addrs := map[address.Address]types.BigInt{ - minerAddr: types.NewInt(5000000000000000000), + minerAddr: types.FromFil(100000), } b, err := gen.MakeGenesisBlock(bs, addrs, gmc, 100000) diff --git a/storage/miner.go b/storage/miner.go index 0f49d217c..1e9a92928 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -2,14 +2,13 @@ package storage import ( "context" - "encoding/base64" + "sync" + "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/host" "github.com/pkg/errors" - "golang.org/x/xerrors" - "sync" "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/build" @@ -173,162 +172,6 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal return nil } -func (m *Miner) beginPosting(ctx context.Context) { - ts, err := m.api.ChainHead(context.TODO()) - if err != nil { - log.Error(err) - return - } - - ppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, ts) - if err != nil { - log.Errorf("failed to get proving period end for miner: %s", err) - return - } - - if ppe == 0 { - log.Errorf("Proving period end == 0") - return - } - - m.schedLk.Lock() - if m.schedPost >= 0 { - log.Warnf("PoSts already running %d", m.schedPost) - m.schedLk.Unlock() - return - } - - provingPeriodOffset := ppe % build.ProvingPeriodDuration - provingPeriod := (ts.Height()-provingPeriodOffset-1)/build.ProvingPeriodDuration + 1 - m.schedPost = provingPeriod*build.ProvingPeriodDuration + provingPeriodOffset - - m.schedLk.Unlock() - - log.Infof("Scheduling post at height %d", ppe-build.PoSTChallangeTime) - err = m.events.ChainAt(m.computePost(m.schedPost), func(ts *types.TipSet) error { // Revert - // TODO: Cancel post - log.Errorf("TODO: Cancel PoSt, re-run") - return nil - }, PoStConfidence, ppe-build.PoSTChallangeTime) - if err != nil { - // TODO: This is BAD, figure something out - log.Errorf("scheduling PoSt failed: %s", err) - return - } -} - -func (m *Miner) scheduleNextPost(ppe uint64) { - ts, err := m.api.ChainHead(context.TODO()) - if err != nil { - log.Error(err) - // TODO: retry - return - } - - provingPeriodOffset := ppe % build.ProvingPeriodDuration - provingPeriod := (ts.Height()-provingPeriodOffset-1)/build.ProvingPeriodDuration + 1 - headPPE := provingPeriod*build.ProvingPeriodDuration + provingPeriodOffset - if headPPE > ppe { - log.Warn("PoSt computation running behind chain") - ppe = headPPE - } - - m.schedLk.Lock() - if m.schedPost >= ppe { - // this probably can't happen - log.Error("PoSt already scheduled: %d >= %d", m.schedPost, ppe) - m.schedLk.Unlock() - return - } - - m.schedPost = ppe - m.schedLk.Unlock() - - log.Infof("Scheduling post at height %d", ppe-build.PoSTChallangeTime) - err = m.events.ChainAt(m.computePost(ppe), func(ts *types.TipSet) error { // Revert - // TODO: Cancel post - log.Errorf("TODO: Cancel PoSt, re-run") - return nil - }, PoStConfidence, ppe-build.PoSTChallangeTime) - if err != nil { - // TODO: This is BAD, figure something out - log.Errorf("scheduling PoSt failed: %s", err) - return - } -} - -func (m *Miner) computePost(ppe uint64) func(ts *types.TipSet, curH uint64) error { - return func(ts *types.TipSet, curH uint64) error { - - ctx := context.TODO() - - sset, err := m.api.StateMinerProvingSet(ctx, m.maddr, ts) - if err != nil { - return xerrors.Errorf("failed to get proving set for miner: %w", err) - } - - r, err := m.api.ChainGetRandomness(ctx, ts, nil, int(int64(ts.Height())-int64(ppe)+int64(build.PoSTChallangeTime))) // TODO: review: check math - if err != nil { - return xerrors.Errorf("failed to get chain randomness for post: %w", err) - } - - log.Infof("running PoSt computation, rh=%d r=%s, ppe=%d, h=%d", ts.Height()-(ts.Height()-ppe+build.PoSTChallangeTime), base64.StdEncoding.EncodeToString(r), ppe, ts.Height()) - var faults []uint64 - proof, err := m.secst.RunPoSt(ctx, sset, r, faults) - if err != nil { - return xerrors.Errorf("running post failed: %w", err) - } - - log.Infof("submitting PoSt pLen=%d", len(proof)) - - params := &actors.SubmitPoStParams{ - Proof: proof, - DoneSet: types.BitFieldFromSet(sectorIdList(sset)), - } - - enc, aerr := actors.SerializeParams(params) - if aerr != nil { - return xerrors.Errorf("could not serialize submit post parameters: %w", err) - } - - msg := &types.Message{ - To: m.maddr, - From: m.worker, - Method: actors.MAMethods.SubmitPoSt, - Params: enc, - Value: types.NewInt(1000), // currently hard-coded late fee in actor, returned if not late - GasLimit: types.NewInt(100000 /* i dont know help */), - GasPrice: types.NewInt(1), - } - - smsg, err := m.api.MpoolPushMessage(ctx, msg) - if err != nil { - return xerrors.Errorf("pushing message to mpool: %w", err) - } - - // make sure it succeeds... - rec, err := m.api.ChainWaitMsg(ctx, smsg.Cid()) - if err != nil { - return err - } - if rec.Receipt.ExitCode != 0 { - log.Warnf("SubmitPoSt EXIT: %d", rec.Receipt.ExitCode) - // TODO: Do something - } - - m.scheduleNextPost(ppe + build.ProvingPeriodDuration) - return nil - } -} - -func sectorIdList(si []*api.SectorInfo) []uint64 { - out := make([]uint64, len(si)) - for i, s := range si { - out[i] = s.SectorID - } - return out -} - func (m *Miner) runPreflightChecks(ctx context.Context) error { worker, err := m.api.StateMinerWorker(ctx, m.maddr, nil) if err != nil { diff --git a/storage/post.go b/storage/post.go new file mode 100644 index 000000000..480c05ba0 --- /dev/null +++ b/storage/post.go @@ -0,0 +1,173 @@ +package storage + +import ( + "context" + "encoding/base64" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-lotus/api" + "github.com/filecoin-project/go-lotus/build" + "github.com/filecoin-project/go-lotus/chain/actors" + "github.com/filecoin-project/go-lotus/chain/types" +) + +func (m *Miner) beginPosting(ctx context.Context) { + ts, err := m.api.ChainHead(context.TODO()) + if err != nil { + log.Error(err) + return + } + + ppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, ts) + if err != nil { + log.Errorf("failed to get proving period end for miner: %s", err) + return + } + + if ppe == 0 { + log.Errorf("Proving period end == 0") + return + } + + m.schedLk.Lock() + if m.schedPost > 0 { + log.Warnf("PoSts already running %d", m.schedPost) + m.schedLk.Unlock() + return + } + + provingPeriodOffset := ppe % build.ProvingPeriodDuration + provingPeriod := (ts.Height()-provingPeriodOffset)/build.ProvingPeriodDuration + 1 + m.schedPost = provingPeriod*build.ProvingPeriodDuration + provingPeriodOffset + + m.schedLk.Unlock() + + log.Infof("Scheduling post at height %d", ppe-build.PoSTChallangeTime) + err = m.events.ChainAt(m.computePost(m.schedPost), func(ts *types.TipSet) error { // Revert + // TODO: Cancel post + log.Errorf("TODO: Cancel PoSt, re-run") + return nil + }, PoStConfidence, ppe-build.PoSTChallangeTime) + if err != nil { + // TODO: This is BAD, figure something out + log.Errorf("scheduling PoSt failed: %s", err) + return + } +} + +func (m *Miner) scheduleNextPost(ppe uint64) { + ts, err := m.api.ChainHead(context.TODO()) + if err != nil { + log.Error(err) + // TODO: retry + return + } + + provingPeriodOffset := ppe % build.ProvingPeriodDuration + provingPeriod := (ts.Height()-provingPeriodOffset-1)/build.ProvingPeriodDuration + 1 + headPPE := provingPeriod*build.ProvingPeriodDuration + provingPeriodOffset + if headPPE > ppe { + log.Warn("PoSt computation running behind chain") + ppe = headPPE + } + + m.schedLk.Lock() + if m.schedPost >= ppe { + // this probably can't happen + log.Error("PoSt already scheduled: %d >= %d", m.schedPost, ppe) + m.schedLk.Unlock() + return + } + + m.schedPost = ppe + m.schedLk.Unlock() + + log.Infof("Scheduling post at height %d", ppe-build.PoSTChallangeTime) + err = m.events.ChainAt(m.computePost(ppe), func(ts *types.TipSet) error { // Revert + // TODO: Cancel post + log.Errorf("TODO: Cancel PoSt, re-run") + return nil + }, PoStConfidence, ppe-build.PoSTChallangeTime) + if err != nil { + // TODO: This is BAD, figure something out + log.Errorf("scheduling PoSt failed: %s", err) + return + } +} + +func (m *Miner) computePost(ppe uint64) func(ts *types.TipSet, curH uint64) error { + return func(ts *types.TipSet, curH uint64) error { + + ctx := context.TODO() + + sset, err := m.api.StateMinerProvingSet(ctx, m.maddr, ts) + if err != nil { + return xerrors.Errorf("failed to get proving set for miner: %w", err) + } + + r, err := m.api.ChainGetRandomness(ctx, ts, nil, int(int64(ts.Height())-int64(ppe)+int64(build.PoSTChallangeTime))) // TODO: review: check math + if err != nil { + return xerrors.Errorf("failed to get chain randomness for post: %w", err) + } + + log.Infof("running PoSt computation, rh=%d r=%s, ppe=%d, h=%d", int64(ts.Height())-int64(ppe)+int64(build.PoSTChallangeTime), base64.StdEncoding.EncodeToString(r), ppe, ts.Height()) + var faults []uint64 + proof, err := m.secst.RunPoSt(ctx, sset, r, faults) + if err != nil { + return xerrors.Errorf("running post failed: %w", err) + } + + log.Infof("submitting PoSt pLen=%d", len(proof)) + + params := &actors.SubmitPoStParams{ + Proof: proof, + DoneSet: types.BitFieldFromSet(sectorIdList(sset)), + } + + enc, aerr := actors.SerializeParams(params) + if aerr != nil { + return xerrors.Errorf("could not serialize submit post parameters: %w", err) + } + + msg := &types.Message{ + To: m.maddr, + From: m.worker, + Method: actors.MAMethods.SubmitPoSt, + Params: enc, + Value: types.NewInt(1000), // currently hard-coded late fee in actor, returned if not late + GasLimit: types.NewInt(100000 /* i dont know help */), + GasPrice: types.NewInt(1), + } + + log.Info("mpush") + + smsg, err := m.api.MpoolPushMessage(ctx, msg) + if err != nil { + return xerrors.Errorf("pushing message to mpool: %w", err) + } + + log.Info("Waiting for post %s to appear on chain", smsg.Cid()) + + // make sure it succeeds... + rec, err := m.api.ChainWaitMsg(ctx, smsg.Cid()) + if err != nil { + return err + } + if rec.Receipt.ExitCode != 0 { + log.Warnf("SubmitPoSt EXIT: %d", rec.Receipt.ExitCode) + // TODO: Do something + } + + m.scheduleNextPost(ppe + build.ProvingPeriodDuration) + return nil + } +} + +func sectorIdList(si []*api.SectorInfo) []uint64 { + out := make([]uint64, len(si)) + for i, s := range si { + out[i] = s.SectorID + } + return out +} From 2874022251cf0a0444906c85d1b10663c29efb7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 27 Sep 2019 13:37:44 +0200 Subject: [PATCH 6/9] jsonrpc: Channel buffeering --- chain/events/events.go | 3 +++ lib/jsonrpc/client.go | 37 ++++++++++++++++++++++++++++++++++++- 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/chain/events/events.go b/chain/events/events.go index c58ab4e81..b7366774e 100644 --- a/chain/events/events.go +++ b/chain/events/events.go @@ -107,6 +107,9 @@ func (e *Events) listenHeadChanges(ctx context.Context) { } func (e *Events) listenHeadChangesOnce(ctx context.Context) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + notifs, err := e.api.ChainNotify(ctx) if err != nil { // TODO: retry diff --git a/lib/jsonrpc/client.go b/lib/jsonrpc/client.go index 628cbc8ad..5e88d843d 100644 --- a/lib/jsonrpc/client.go +++ b/lib/jsonrpc/client.go @@ -1,12 +1,14 @@ package jsonrpc import ( + "container/list" "context" "encoding/base64" "encoding/json" "fmt" "net/http" "reflect" + "sync" "sync/atomic" "github.com/gorilla/websocket" @@ -132,6 +134,9 @@ func (c *client) makeOutChan(ctx context.Context, ftyp reflect.Type, valOut int) ch := reflect.MakeChan(ctyp, 0) // todo: buffer? retVal = ch.Convert(ftyp.Out(valOut)) + buf := (&list.List{}).Init() + var bufLk sync.Mutex + return ctx, func(result []byte, ok bool) { if !ok { // remote channel closed, close ours too @@ -145,7 +150,36 @@ func (c *client) makeOutChan(ctx context.Context, ftyp reflect.Type, valOut int) return } - ch.Send(val.Elem()) // todo: select on ctx is probably a good idea + bufLk.Lock() + if ctx.Err() != nil { + log.Errorf("got rpc message with cancelled context: %s", ctx.Err()) + bufLk.Unlock() + return + } + + buf.PushBack(val) + + if buf.Len() > 1 { + log.Warnf("rpc output channel has %d buffered messages", buf.Len()) + bufLk.Unlock() + return + } + + go func() { + for buf.Len() > 0 { + front := buf.Front() + + bufLk.Unlock() + + ch.Send(front.Value.(reflect.Value).Elem()) // todo: select on ctx is probably a good idea + + bufLk.Lock() + buf.Remove(front) + } + + bufLk.Unlock() + }() + } } @@ -177,6 +211,7 @@ loop: ctxDone = nil c.requests <- clientRequest{ + req: request{ Jsonrpc: "2.0", Method: wsCancel, From ba0559ae580f570f568c43dd6b025b0f75077020 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 27 Sep 2019 14:35:09 +0200 Subject: [PATCH 7/9] events: Fix tipSetCache at higher heights --- chain/events/events_height.go | 3 +-- chain/events/tscache.go | 13 +++++++++---- storage/post.go | 6 +++--- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/chain/events/events_height.go b/chain/events/events_height.go index 2fb3a4319..6ac06de6b 100644 --- a/chain/events/events_height.go +++ b/chain/events/events_height.go @@ -64,8 +64,7 @@ func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error { } if err := hnd.handle(incTs, ts.Height()); err != nil { - msgInfo := "" - log.Errorf("chain trigger (%s@H %d, called @ %d) failed: %s", msgInfo, triggerH, ts.Height(), err) + log.Errorf("chain trigger (@H %d, called @ %d) failed: %s", triggerH, ts.Height(), err) } } } diff --git a/chain/events/tscache.go b/chain/events/tscache.go index e6d492bfa..37487c180 100644 --- a/chain/events/tscache.go +++ b/chain/events/tscache.go @@ -36,7 +36,7 @@ func (tsc *tipSetCache) add(ts *types.TipSet) error { } } - tsc.start = (tsc.start + 1) % len(tsc.cache) + tsc.start = normalModulo(tsc.start+1, len(tsc.cache)) tsc.cache[tsc.start] = ts if tsc.len < len(tsc.cache) { tsc.len++ @@ -54,7 +54,7 @@ func (tsc *tipSetCache) revert(ts *types.TipSet) error { } tsc.cache[tsc.start] = nil - tsc.start = (tsc.start - 1) % len(tsc.cache) + tsc.start = normalModulo(tsc.start-1, len(tsc.cache)) tsc.len-- return nil } @@ -71,15 +71,20 @@ func (tsc *tipSetCache) get(height uint64) (*types.TipSet, error) { } clen := len(tsc.cache) - tailH := tsc.cache[((tsc.start-tsc.len+1)%clen+clen)%clen].Height() + tailH := tsc.cache[normalModulo(tsc.start-tsc.len+1, clen)].Height() if height < tailH { + log.Warnf("tipSetCache.get: requested tipset not in cache, requesting from storage (h=%d; tail=%d)", height, tailH) return tsc.storage(context.TODO(), height, tsc.cache[tailH]) } - return tsc.cache[(int(height-tailH+1)%clen+clen)%clen], nil + return tsc.cache[normalModulo(tsc.start-int(headH-height), clen)], nil } func (tsc *tipSetCache) best() *types.TipSet { return tsc.cache[tsc.start] } + +func normalModulo(n, m int) int { + return (n%m + m) % m +} diff --git a/storage/post.go b/storage/post.go index 480c05ba0..1d07065fc 100644 --- a/storage/post.go +++ b/storage/post.go @@ -83,7 +83,7 @@ func (m *Miner) scheduleNextPost(ppe uint64) { m.schedPost = ppe m.schedLk.Unlock() - log.Infof("Scheduling post at height %d", ppe-build.PoSTChallangeTime) + log.Infof("Scheduling post at height %d (head=%d; ppe=%d, period=%d)", ppe-build.PoSTChallangeTime, ts.Height(), ppe, provingPeriod) err = m.events.ChainAt(m.computePost(ppe), func(ts *types.TipSet) error { // Revert // TODO: Cancel post log.Errorf("TODO: Cancel PoSt, re-run") @@ -108,7 +108,7 @@ func (m *Miner) computePost(ppe uint64) func(ts *types.TipSet, curH uint64) erro r, err := m.api.ChainGetRandomness(ctx, ts, nil, int(int64(ts.Height())-int64(ppe)+int64(build.PoSTChallangeTime))) // TODO: review: check math if err != nil { - return xerrors.Errorf("failed to get chain randomness for post: %w", err) + return xerrors.Errorf("failed to get chain randomness for post (ts=%d; ppe=%d): %w", ts.Height(), ppe, err) } log.Infof("running PoSt computation, rh=%d r=%s, ppe=%d, h=%d", int64(ts.Height())-int64(ppe)+int64(build.PoSTChallangeTime), base64.StdEncoding.EncodeToString(r), ppe, ts.Height()) @@ -147,7 +147,7 @@ func (m *Miner) computePost(ppe uint64) func(ts *types.TipSet, curH uint64) erro return xerrors.Errorf("pushing message to mpool: %w", err) } - log.Info("Waiting for post %s to appear on chain", smsg.Cid()) + log.Infof("Waiting for post %s to appear on chain", smsg.Cid()) // make sure it succeeds... rec, err := m.api.ChainWaitMsg(ctx, smsg.Cid()) From 636c97805412c912fa82e56ebec6257b02745454 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 27 Sep 2019 14:37:25 +0200 Subject: [PATCH 8/9] Revert build params to devnet --- build/params.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/build/params.go b/build/params.go index 26410fbbb..7c00f1920 100644 --- a/build/params.go +++ b/build/params.go @@ -28,7 +28,7 @@ const MaxVouchersPerDeal = 768 // roughly one voucher per 10h over a year // Consensus / Network // Seconds -const BlockDelay = 4 +const BlockDelay = 6 // Seconds const AllowableClockDrift = BlockDelay * 2 @@ -43,10 +43,10 @@ const ForkLengthThreshold = 20 const RandomnessLookback = 20 // Blocks -const ProvingPeriodDuration = 10 +const ProvingPeriodDuration = 40 // Blocks -const PoSTChallangeTime = 5 +const PoSTChallangeTime = 20 const PowerCollateralProportion = 20 const PerCapitaCollateralProportion = 5 From e0632a93ab7d3c0998a5ad1673939e4f6c21c340 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 27 Sep 2019 21:47:47 +0200 Subject: [PATCH 9/9] Share ProvingPeriodEnd math --- chain/actors/actor_miner.go | 12 +++++++++--- storage/post.go | 8 ++------ 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/chain/actors/actor_miner.go b/chain/actors/actor_miner.go index d406815ea..88f72d227 100644 --- a/chain/actors/actor_miner.go +++ b/chain/actors/actor_miner.go @@ -295,6 +295,14 @@ type SubmitPoStParams struct { // TODO: once the spec changes finish, we have more work to do here... } +func ProvingPeriodEnd(setPeriodEnd, height uint64) (uint64, uint64) { + offset := setPeriodEnd % build.ProvingPeriodDuration + period := ((height - offset - 1) / build.ProvingPeriodDuration) + 1 + end := (period * build.ProvingPeriodDuration) + offset + + return end, period +} + // TODO: this is a dummy method that allows us to plumb in other parts of the // system for now. func (sma StorageMinerActor) SubmitPoSt(act *types.Actor, vmctx types.VMContext, params *SubmitPoStParams) ([]byte, ActorError) { @@ -312,9 +320,7 @@ func (sma StorageMinerActor) SubmitPoSt(act *types.Actor, vmctx types.VMContext, return nil, aerrors.New(1, "not authorized to submit post for miner") } - provingPeriodOffset := self.ProvingPeriodEnd % build.ProvingPeriodDuration - provingPeriod := (vmctx.BlockHeight()-provingPeriodOffset-1)/build.ProvingPeriodDuration + 1 - currentProvingPeriodEnd := provingPeriod*build.ProvingPeriodDuration + provingPeriodOffset + currentProvingPeriodEnd, _ := ProvingPeriodEnd(self.ProvingPeriodEnd, vmctx.BlockHeight()) feesRequired := types.NewInt(0) diff --git a/storage/post.go b/storage/post.go index 1d07065fc..8f6748ba3 100644 --- a/storage/post.go +++ b/storage/post.go @@ -37,9 +37,7 @@ func (m *Miner) beginPosting(ctx context.Context) { return } - provingPeriodOffset := ppe % build.ProvingPeriodDuration - provingPeriod := (ts.Height()-provingPeriodOffset)/build.ProvingPeriodDuration + 1 - m.schedPost = provingPeriod*build.ProvingPeriodDuration + provingPeriodOffset + m.schedPost, _ = actors.ProvingPeriodEnd(ppe, ts.Height()) m.schedLk.Unlock() @@ -64,9 +62,7 @@ func (m *Miner) scheduleNextPost(ppe uint64) { return } - provingPeriodOffset := ppe % build.ProvingPeriodDuration - provingPeriod := (ts.Height()-provingPeriodOffset-1)/build.ProvingPeriodDuration + 1 - headPPE := provingPeriod*build.ProvingPeriodDuration + provingPeriodOffset + headPPE, provingPeriod := actors.ProvingPeriodEnd(ppe, ts.Height()) if headPPE > ppe { log.Warn("PoSt computation running behind chain") ppe = headPPE