Merge pull request #240 from filecoin-project/fix/retry-post

Post fixes and cleanup
This commit is contained in:
Łukasz Magiera 2019-09-27 23:14:12 +02:00 committed by GitHub
commit 47de2ffc74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 252 additions and 194 deletions

View File

@ -295,6 +295,14 @@ type SubmitPoStParams struct {
// TODO: once the spec changes finish, we have more work to do here... // TODO: once the spec changes finish, we have more work to do here...
} }
func ProvingPeriodEnd(setPeriodEnd, height uint64) (uint64, uint64) {
offset := setPeriodEnd % build.ProvingPeriodDuration
period := ((height - offset - 1) / build.ProvingPeriodDuration) + 1
end := (period * build.ProvingPeriodDuration) + offset
return end, period
}
// TODO: this is a dummy method that allows us to plumb in other parts of the // TODO: this is a dummy method that allows us to plumb in other parts of the
// system for now. // system for now.
func (sma StorageMinerActor) SubmitPoSt(act *types.Actor, vmctx types.VMContext, params *SubmitPoStParams) ([]byte, ActorError) { func (sma StorageMinerActor) SubmitPoSt(act *types.Actor, vmctx types.VMContext, params *SubmitPoStParams) ([]byte, ActorError) {
@ -312,16 +320,12 @@ func (sma StorageMinerActor) SubmitPoSt(act *types.Actor, vmctx types.VMContext,
return nil, aerrors.New(1, "not authorized to submit post for miner") return nil, aerrors.New(1, "not authorized to submit post for miner")
} }
feesRequired := types.NewInt(0) currentProvingPeriodEnd, _ := ProvingPeriodEnd(self.ProvingPeriodEnd, vmctx.BlockHeight())
nextProvingPeriodEnd := self.ProvingPeriodEnd + build.ProvingPeriodDuration
if vmctx.BlockHeight() > nextProvingPeriodEnd {
return nil, aerrors.New(1, "PoSt submited too late")
}
var lateSubmission bool feesRequired := types.NewInt(0)
if vmctx.BlockHeight() > self.ProvingPeriodEnd {
if currentProvingPeriodEnd > self.ProvingPeriodEnd {
//TODO late fee calc //TODO late fee calc
lateSubmission = true
feesRequired = types.BigAdd(feesRequired, types.NewInt(1000)) feesRequired = types.BigAdd(feesRequired, types.NewInt(1000))
} }
@ -342,13 +346,14 @@ func (sma StorageMinerActor) SubmitPoSt(act *types.Actor, vmctx types.VMContext,
var seed [sectorbuilder.CommLen]byte var seed [sectorbuilder.CommLen]byte
{ {
var rand []byte randHeight := currentProvingPeriodEnd - build.PoSTChallangeTime
var err ActorError if vmctx.BlockHeight() <= randHeight {
if !lateSubmission { // TODO: spec, retcode
rand, err = vmctx.GetRandomness(self.ProvingPeriodEnd - build.PoSTChallangeTime) return nil, aerrors.New(1, "submit PoSt called outside submission window")
} else {
rand, err = vmctx.GetRandomness(nextProvingPeriodEnd - build.PoSTChallangeTime)
} }
rand, err := vmctx.GetRandomness(randHeight)
if err != nil { if err != nil {
return nil, aerrors.Wrap(err, "could not get randomness for PoST") return nil, aerrors.Wrap(err, "could not get randomness for PoST")
} }
@ -434,7 +439,7 @@ func (sma StorageMinerActor) SubmitPoSt(act *types.Actor, vmctx types.VMContext,
} }
self.ProvingSet = self.Sectors self.ProvingSet = self.Sectors
self.ProvingPeriodEnd = nextProvingPeriodEnd self.ProvingPeriodEnd = currentProvingPeriodEnd + build.ProvingPeriodDuration
self.NextDoneSet = params.DoneSet self.NextDoneSet = params.DoneSet
c, err := vmctx.Storage().Put(self) c, err := vmctx.Storage().Put(self)

View File

@ -107,6 +107,9 @@ func (e *Events) listenHeadChanges(ctx context.Context) {
} }
func (e *Events) listenHeadChangesOnce(ctx context.Context) error { func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
notifs, err := e.api.ChainNotify(ctx) notifs, err := e.api.ChainNotify(ctx)
if err != nil { if err != nil {
// TODO: retry // TODO: retry

View File

@ -64,8 +64,7 @@ func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
} }
if err := hnd.handle(incTs, ts.Height()); err != nil { if err := hnd.handle(incTs, ts.Height()); err != nil {
msgInfo := "" log.Errorf("chain trigger (@H %d, called @ %d) failed: %s", triggerH, ts.Height(), err)
log.Errorf("chain trigger (%s@H %d, called @ %d) failed: %s", msgInfo, triggerH, ts.Height(), err)
} }
} }
} }

View File

@ -36,7 +36,7 @@ func (tsc *tipSetCache) add(ts *types.TipSet) error {
} }
} }
tsc.start = (tsc.start + 1) % len(tsc.cache) tsc.start = normalModulo(tsc.start+1, len(tsc.cache))
tsc.cache[tsc.start] = ts tsc.cache[tsc.start] = ts
if tsc.len < len(tsc.cache) { if tsc.len < len(tsc.cache) {
tsc.len++ tsc.len++
@ -54,7 +54,7 @@ func (tsc *tipSetCache) revert(ts *types.TipSet) error {
} }
tsc.cache[tsc.start] = nil tsc.cache[tsc.start] = nil
tsc.start = (tsc.start - 1) % len(tsc.cache) tsc.start = normalModulo(tsc.start-1, len(tsc.cache))
tsc.len-- tsc.len--
return nil return nil
} }
@ -71,15 +71,20 @@ func (tsc *tipSetCache) get(height uint64) (*types.TipSet, error) {
} }
clen := len(tsc.cache) clen := len(tsc.cache)
tailH := tsc.cache[((tsc.start-tsc.len+1)%clen+clen)%clen].Height() tailH := tsc.cache[normalModulo(tsc.start-tsc.len+1, clen)].Height()
if height < tailH { if height < tailH {
log.Warnf("tipSetCache.get: requested tipset not in cache, requesting from storage (h=%d; tail=%d)", height, tailH)
return tsc.storage(context.TODO(), height, tsc.cache[tailH]) return tsc.storage(context.TODO(), height, tsc.cache[tailH])
} }
return tsc.cache[(int(height-tailH+1)%clen+clen)%clen], nil return tsc.cache[normalModulo(tsc.start-int(headH-height), clen)], nil
} }
func (tsc *tipSetCache) best() *types.TipSet { func (tsc *tipSetCache) best() *types.TipSet {
return tsc.cache[tsc.start] return tsc.cache[tsc.start]
} }
func normalModulo(n, m int) int {
return (n%m + m) % m
}

View File

@ -1,12 +1,14 @@
package jsonrpc package jsonrpc
import ( import (
"container/list"
"context" "context"
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http" "net/http"
"reflect" "reflect"
"sync"
"sync/atomic" "sync/atomic"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
@ -132,6 +134,9 @@ func (c *client) makeOutChan(ctx context.Context, ftyp reflect.Type, valOut int)
ch := reflect.MakeChan(ctyp, 0) // todo: buffer? ch := reflect.MakeChan(ctyp, 0) // todo: buffer?
retVal = ch.Convert(ftyp.Out(valOut)) retVal = ch.Convert(ftyp.Out(valOut))
buf := (&list.List{}).Init()
var bufLk sync.Mutex
return ctx, func(result []byte, ok bool) { return ctx, func(result []byte, ok bool) {
if !ok { if !ok {
// remote channel closed, close ours too // remote channel closed, close ours too
@ -145,7 +150,36 @@ func (c *client) makeOutChan(ctx context.Context, ftyp reflect.Type, valOut int)
return return
} }
ch.Send(val.Elem()) // todo: select on ctx is probably a good idea bufLk.Lock()
if ctx.Err() != nil {
log.Errorf("got rpc message with cancelled context: %s", ctx.Err())
bufLk.Unlock()
return
}
buf.PushBack(val)
if buf.Len() > 1 {
log.Warnf("rpc output channel has %d buffered messages", buf.Len())
bufLk.Unlock()
return
}
go func() {
for buf.Len() > 0 {
front := buf.Front()
bufLk.Unlock()
ch.Send(front.Value.(reflect.Value).Elem()) // todo: select on ctx is probably a good idea
bufLk.Lock()
buf.Remove(front)
}
bufLk.Unlock()
}()
} }
} }
@ -177,6 +211,7 @@ loop:
ctxDone = nil ctxDone = nil
c.requests <- clientRequest{ c.requests <- clientRequest{
req: request{ req: request{
Jsonrpc: "2.0", Jsonrpc: "2.0",
Method: wsCancel, Method: wsCancel,

View File

@ -420,7 +420,7 @@ func (c *wsConn) handleWsConn(ctx context.Context) {
case r, ok := <-c.incoming: case r, ok := <-c.incoming:
if !ok { if !ok {
if c.incomingErr != nil { if c.incomingErr != nil {
log.Debugf("websocket error", "error", c.incomingErr) log.Warnw("websocket error", "error", c.incomingErr)
} }
return // remote closed return // remote closed
} }
@ -443,7 +443,7 @@ func (c *wsConn) handleWsConn(ctx context.Context) {
c.sendRequest(req.req) c.sendRequest(req.req)
case <-c.stop: case <-c.stop:
if err := c.conn.Close(); err != nil { if err := c.conn.Close(); err != nil {
log.Debugf("websocket close error", "error", err) log.Warnw("websocket close error", "error", err)
} }
return return
} }

View File

@ -76,7 +76,7 @@ func MakeGenesis(outFile string) func(bs dtypes.ChainBlockstore, w *wallet.Walle
} }
addrs := map[address.Address]types.BigInt{ addrs := map[address.Address]types.BigInt{
minerAddr: types.NewInt(5000000000000000000), minerAddr: types.FromFil(100000),
} }
b, err := gen.MakeGenesisBlock(bs, addrs, gmc, 100000) b, err := gen.MakeGenesisBlock(bs, addrs, gmc, 100000)

View File

@ -2,14 +2,13 @@ package storage
import ( import (
"context" "context"
"encoding/base64" "sync"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/pkg/errors" "github.com/pkg/errors"
"golang.org/x/xerrors"
"sync"
"github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/build" "github.com/filecoin-project/go-lotus/build"
@ -43,7 +42,7 @@ type Miner struct {
ds datastore.Batching ds datastore.Batching
schedLk sync.Mutex schedLk sync.Mutex
postSched uint64 schedPost uint64
} }
type storageMinerApi interface { type storageMinerApi interface {
@ -88,13 +87,8 @@ func (m *Miner) Run(ctx context.Context) error {
m.events = events.NewEvents(ctx, m.api) m.events = events.NewEvents(ctx, m.api)
ts, err := m.api.ChainHead(ctx)
if err != nil {
return err
}
go m.handlePostingSealedSectors(ctx) go m.handlePostingSealedSectors(ctx)
go m.schedulePoSt(ctx, ts) go m.beginPosting(ctx)
return nil return nil
} }
@ -172,164 +166,12 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal
return return
} }
m.schedulePoSt(ctx, nil) m.beginPosting(ctx)
}() }()
return nil return nil
} }
func (m *Miner) schedulePoSt(ctx context.Context, baseTs *types.TipSet) {
ppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, baseTs)
if err != nil {
log.Errorf("failed to get proving period end for miner: %s", err)
return
}
if ppe == 0 {
log.Errorf("Proving period end == 0")
// TODO: we probably want to call schedulePoSt after the first commitSector call
return
}
m.schedLk.Lock()
if m.postSched >= ppe {
log.Warnf("schedulePoSt already called for proving period >= %d", m.postSched)
m.schedLk.Unlock()
return
}
m.postSched = ppe
m.schedLk.Unlock()
log.Infof("Scheduling post at height %d", ppe-build.PoSTChallangeTime)
err = m.events.ChainAt(m.startPost, func(ts *types.TipSet) error { // Revert
// TODO: Cancel post
return nil
}, PoStConfidence, ppe-build.PoSTChallangeTime)
if err != nil {
// TODO: This is BAD, figure something out
log.Errorf("scheduling PoSt failed: %s", err)
return
}
}
func (m *Miner) startPost(ts *types.TipSet, curH uint64) error {
log.Info("starting PoSt computation")
head, err := m.api.ChainHead(context.TODO())
if err != nil {
return err
}
postWaitCh, _, err := m.maybeDoPost(context.TODO(), head)
if err != nil {
return err
}
if postWaitCh == nil {
return errors.New("PoSt didn't start")
}
go func() {
err := <-postWaitCh
if err != nil {
log.Errorf("got error back from postWaitCh: %s", err)
return
}
log.Infof("post successfully submitted")
m.schedulePoSt(context.TODO(), ts)
}()
return nil
}
func (m *Miner) maybeDoPost(ctx context.Context, ts *types.TipSet) (<-chan error, *types.BlockHeader, error) {
ppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, ts)
if err != nil {
return nil, nil, xerrors.Errorf("failed to get proving period end for miner: %w", err)
}
if ts.Height() > ppe {
log.Warnf("skipping post, supplied tipset too high: ppe=%d, ts.H=%d", ppe, ts.Height())
return nil, nil, nil
}
sset, err := m.api.StateMinerProvingSet(ctx, m.maddr, ts)
if err != nil {
return nil, nil, xerrors.Errorf("failed to get proving set for miner: %w", err)
}
r, err := m.api.ChainGetRandomness(ctx, ts, nil, int(int64(ts.Height())-int64(ppe)+int64(build.PoSTChallangeTime))) // TODO: review: check math
if err != nil {
return nil, nil, xerrors.Errorf("failed to get chain randomness for post: %w", err)
}
sourceTs, err := m.api.ChainGetTipSetByHeight(ctx, ppe-build.PoSTChallangeTime, ts)
if err != nil {
return nil, nil, xerrors.Errorf("failed to get post start tipset: %w", err)
}
ret := make(chan error, 1)
go func() {
log.Infof("running PoSt computation, rh=%d r=%s, ppe=%d, h=%d", ts.Height()-(ts.Height()-ppe+build.PoSTChallangeTime), base64.StdEncoding.EncodeToString(r), ppe, ts.Height())
var faults []uint64
proof, err := m.secst.RunPoSt(ctx, sset, r, faults)
if err != nil {
ret <- xerrors.Errorf("running post failed: %w", err)
return
}
log.Infof("submitting PoSt pLen=%d", len(proof))
params := &actors.SubmitPoStParams{
Proof: proof,
DoneSet: types.BitFieldFromSet(sectorIdList(sset)),
}
enc, aerr := actors.SerializeParams(params)
if aerr != nil {
ret <- xerrors.Errorf("could not serialize submit post parameters: %w", err)
return
}
msg := &types.Message{
To: m.maddr,
From: m.worker,
Method: actors.MAMethods.SubmitPoSt,
Params: enc,
Value: types.NewInt(0),
GasLimit: types.NewInt(100000 /* i dont know help */),
GasPrice: types.NewInt(1),
}
smsg, err := m.api.MpoolPushMessage(ctx, msg)
if err != nil {
ret <- xerrors.Errorf("pushing message to mpool: %w", err)
return
}
// make sure it succeeds...
_, err = m.api.ChainWaitMsg(ctx, smsg.Cid())
if err != nil {
return
}
// TODO: check receipt
m.schedulePoSt(ctx, nil)
}()
return ret, sourceTs.MinTicketBlock(), nil
}
func sectorIdList(si []*api.SectorInfo) []uint64 {
out := make([]uint64, len(si))
for i, s := range si {
out[i] = s.SectorID
}
return out
}
func (m *Miner) runPreflightChecks(ctx context.Context) error { func (m *Miner) runPreflightChecks(ctx context.Context) error {
worker, err := m.api.StateMinerWorker(ctx, m.maddr, nil) worker, err := m.api.StateMinerWorker(ctx, m.maddr, nil)
if err != nil { if err != nil {

169
storage/post.go Normal file
View File

@ -0,0 +1,169 @@
package storage
import (
"context"
"encoding/base64"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/build"
"github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/types"
)
func (m *Miner) beginPosting(ctx context.Context) {
ts, err := m.api.ChainHead(context.TODO())
if err != nil {
log.Error(err)
return
}
ppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, ts)
if err != nil {
log.Errorf("failed to get proving period end for miner: %s", err)
return
}
if ppe == 0 {
log.Errorf("Proving period end == 0")
return
}
m.schedLk.Lock()
if m.schedPost > 0 {
log.Warnf("PoSts already running %d", m.schedPost)
m.schedLk.Unlock()
return
}
m.schedPost, _ = actors.ProvingPeriodEnd(ppe, ts.Height())
m.schedLk.Unlock()
log.Infof("Scheduling post at height %d", ppe-build.PoSTChallangeTime)
err = m.events.ChainAt(m.computePost(m.schedPost), func(ts *types.TipSet) error { // Revert
// TODO: Cancel post
log.Errorf("TODO: Cancel PoSt, re-run")
return nil
}, PoStConfidence, ppe-build.PoSTChallangeTime)
if err != nil {
// TODO: This is BAD, figure something out
log.Errorf("scheduling PoSt failed: %s", err)
return
}
}
func (m *Miner) scheduleNextPost(ppe uint64) {
ts, err := m.api.ChainHead(context.TODO())
if err != nil {
log.Error(err)
// TODO: retry
return
}
headPPE, provingPeriod := actors.ProvingPeriodEnd(ppe, ts.Height())
if headPPE > ppe {
log.Warn("PoSt computation running behind chain")
ppe = headPPE
}
m.schedLk.Lock()
if m.schedPost >= ppe {
// this probably can't happen
log.Error("PoSt already scheduled: %d >= %d", m.schedPost, ppe)
m.schedLk.Unlock()
return
}
m.schedPost = ppe
m.schedLk.Unlock()
log.Infof("Scheduling post at height %d (head=%d; ppe=%d, period=%d)", ppe-build.PoSTChallangeTime, ts.Height(), ppe, provingPeriod)
err = m.events.ChainAt(m.computePost(ppe), func(ts *types.TipSet) error { // Revert
// TODO: Cancel post
log.Errorf("TODO: Cancel PoSt, re-run")
return nil
}, PoStConfidence, ppe-build.PoSTChallangeTime)
if err != nil {
// TODO: This is BAD, figure something out
log.Errorf("scheduling PoSt failed: %s", err)
return
}
}
func (m *Miner) computePost(ppe uint64) func(ts *types.TipSet, curH uint64) error {
return func(ts *types.TipSet, curH uint64) error {
ctx := context.TODO()
sset, err := m.api.StateMinerProvingSet(ctx, m.maddr, ts)
if err != nil {
return xerrors.Errorf("failed to get proving set for miner: %w", err)
}
r, err := m.api.ChainGetRandomness(ctx, ts, nil, int(int64(ts.Height())-int64(ppe)+int64(build.PoSTChallangeTime))) // TODO: review: check math
if err != nil {
return xerrors.Errorf("failed to get chain randomness for post (ts=%d; ppe=%d): %w", ts.Height(), ppe, err)
}
log.Infof("running PoSt computation, rh=%d r=%s, ppe=%d, h=%d", int64(ts.Height())-int64(ppe)+int64(build.PoSTChallangeTime), base64.StdEncoding.EncodeToString(r), ppe, ts.Height())
var faults []uint64
proof, err := m.secst.RunPoSt(ctx, sset, r, faults)
if err != nil {
return xerrors.Errorf("running post failed: %w", err)
}
log.Infof("submitting PoSt pLen=%d", len(proof))
params := &actors.SubmitPoStParams{
Proof: proof,
DoneSet: types.BitFieldFromSet(sectorIdList(sset)),
}
enc, aerr := actors.SerializeParams(params)
if aerr != nil {
return xerrors.Errorf("could not serialize submit post parameters: %w", err)
}
msg := &types.Message{
To: m.maddr,
From: m.worker,
Method: actors.MAMethods.SubmitPoSt,
Params: enc,
Value: types.NewInt(1000), // currently hard-coded late fee in actor, returned if not late
GasLimit: types.NewInt(100000 /* i dont know help */),
GasPrice: types.NewInt(1),
}
log.Info("mpush")
smsg, err := m.api.MpoolPushMessage(ctx, msg)
if err != nil {
return xerrors.Errorf("pushing message to mpool: %w", err)
}
log.Infof("Waiting for post %s to appear on chain", smsg.Cid())
// make sure it succeeds...
rec, err := m.api.ChainWaitMsg(ctx, smsg.Cid())
if err != nil {
return err
}
if rec.Receipt.ExitCode != 0 {
log.Warnf("SubmitPoSt EXIT: %d", rec.Receipt.ExitCode)
// TODO: Do something
}
m.scheduleNextPost(ppe + build.ProvingPeriodDuration)
return nil
}
}
func sectorIdList(si []*api.SectorInfo) []uint64 {
out := make([]uint64, len(si))
for i, s := range si {
out[i] = s.SectorID
}
return out
}