Almost working PoSt submission

This commit is contained in:
Łukasz Magiera 2019-09-19 18:17:49 +02:00
parent d55e872135
commit c40f89f2a9
8 changed files with 123 additions and 48 deletions

View File

@ -1,5 +1,7 @@
package build package build
// Seconds
const BlockDelay = 5 const BlockDelay = 5
// Seconds
const AllowableClockDrift = BlockDelay * 2 const AllowableClockDrift = BlockDelay * 2

View File

@ -7,14 +7,20 @@ const UnixfsLinksPerLevel = 1024
const SectorSize = 1024 const SectorSize = 1024
// Blocks
const PaymentChannelClosingDelay = 6 * 60 * 2 // six hours const PaymentChannelClosingDelay = 6 * 60 * 2 // six hours
// Blocks
const DealVoucherSkewLimit = 10 const DealVoucherSkewLimit = 10
// Blocks
const ForkLengthThreshold = 20 const ForkLengthThreshold = 20
// Blocks
const RandomnessLookback = 20 const RandomnessLookback = 20
const ProvingPeriodDuration = 2 * 60 // an hour, for now // Blocks
const PoSTChallangeTime = 1 * 60 const ProvingPeriodDuration = 20
const PoSTChallangeTime = 10
// TODO: Move other important consts here // TODO: Move other important consts here

View File

@ -82,9 +82,6 @@ type callTuple struct {
} }
func (e *calledEvents) headChangeCalled(rev, app []*types.TipSet) error { func (e *calledEvents) headChangeCalled(rev, app []*types.TipSet) error {
e.lk.Lock()
defer e.lk.Unlock()
for _, ts := range rev { for _, ts := range rev {
e.handleReverts(ts) e.handleReverts(ts)
} }

View File

@ -20,9 +20,6 @@ type heightEvents struct {
} }
func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error { func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
e.lk.Lock()
defer e.lk.Unlock()
// highest tipset is always the first (see api.ReorgOps) // highest tipset is always the first (see api.ReorgOps)
newH := app[0].Height() newH := app[0].Height()

View File

@ -17,6 +17,14 @@ func NewBitField() BitField {
return BitField{bits: make(map[uint64]struct{})} return BitField{bits: make(map[uint64]struct{})}
} }
func BitFieldFromSet(setBits []uint64) BitField {
res := BitField{bits: make(map[uint64]struct{})}
for _, b := range setBits {
res.bits[b] = struct{}{}
}
return res
}
// Set ...s bit in the BitField // Set ...s bit in the BitField
func (bf BitField) Set(bit uint64) { func (bf BitField) Set(bit uint64) {
bf.bits[bit] = struct{}{} bf.bits[bit] = struct{}{}

View File

@ -59,7 +59,6 @@ class StorageNode extends React.Component {
// this.props.onConnect(client, id) // TODO: dedupe connecting part // this.props.onConnect(client, id) // TODO: dedupe connecting part
this.loadInfo()
let updates = setInterval(this.loadInfo, 1050) let updates = setInterval(this.loadInfo, 1050)
client.on('close', () => clearInterval(updates)) client.on('close', () => clearInterval(updates))
}) })
@ -72,7 +71,10 @@ class StorageNode extends React.Component {
const peers = await this.state.client.call("Filecoin.NetPeers", []) const peers = await this.state.client.call("Filecoin.NetPeers", [])
const [actor] = await this.state.client.call("Filecoin.ActorAddresses", []) const [actor] = await this.state.client.call("Filecoin.ActorAddresses", [])
this.setState({version: version, peers: peers.length, actor: actor}) const stActor = await this.props.fullConn.call('Filecoin.StateGetActor', [actor, null])
const actorState = await this.props.fullConn.call('Filecoin.StateReadState', [stActor, null])
this.setState({version: version, peers: peers.length, actor: actor, actorState: actorState})
await this.stagedList() await this.stagedList()
} }
@ -109,6 +111,7 @@ class StorageNode extends React.Component {
</div> </div>
<div> <div>
<Address client={this.props.fullConn} addr={this.state.actor} mountWindow={this.props.mountWindow}/> <Address client={this.props.fullConn} addr={this.state.actor} mountWindow={this.props.mountWindow}/>
<span>&nbsp;<abbr title="Proving period end">PPE:</abbr> <b>{this.state.actorState.State.ProvingPeriodEnd}</b></span>
</div> </div>
<div>{this.state.statusCounts.map((c, i) => <span key={i}>{sealCodes[i]}: {c} | </span>)}</div> <div>{this.state.statusCounts.map((c, i) => <span key={i}>{sealCodes[i]}: {c} | </span>)}</div>
<div> <div>

View File

@ -8,6 +8,7 @@ import (
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/pkg/errors" "github.com/pkg/errors"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"sync"
"github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/build" "github.com/filecoin-project/go-lotus/build"
@ -23,7 +24,7 @@ import (
var log = logging.Logger("storageminer") var log = logging.Logger("storageminer")
const PoStConfidence = 0 const PoStConfidence = 1
type Miner struct { type Miner struct {
api storageMinerApi api storageMinerApi
@ -39,6 +40,9 @@ type Miner struct {
h host.Host h host.Host
ds datastore.Batching ds datastore.Batching
schedLk sync.Mutex
postSched uint64
} }
type storageMinerApi interface { type storageMinerApi interface {
@ -51,8 +55,7 @@ type storageMinerApi interface {
StateMinerProvingPeriodEnd(context.Context, address.Address, *types.TipSet) (uint64, error) StateMinerProvingPeriodEnd(context.Context, address.Address, *types.TipSet) (uint64, error)
StateMinerProvingSet(context.Context, address.Address, *types.TipSet) ([]*api.SectorInfo, error) StateMinerProvingSet(context.Context, address.Address, *types.TipSet) ([]*api.SectorInfo, error)
MpoolPush(context.Context, *types.SignedMessage) error MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error)
MpoolGetNonce(context.Context, address.Address) (uint64, error)
ChainHead(context.Context) (*types.TipSet, error) ChainHead(context.Context) (*types.TipSet, error)
ChainWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error) ChainWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error)
@ -62,7 +65,6 @@ type storageMinerApi interface {
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error) ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
WalletBalance(context.Context, address.Address) (types.BigInt, error) WalletBalance(context.Context, address.Address) (types.BigInt, error)
WalletSign(context.Context, address.Address, []byte) (*types.Signature, error)
WalletHas(context.Context, address.Address) (bool, error) WalletHas(context.Context, address.Address) (bool, error)
} }
@ -144,7 +146,7 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal
return errors.Wrap(aerr, "could not serialize commit sector parameters") return errors.Wrap(aerr, "could not serialize commit sector parameters")
} }
msg := types.Message{ msg := &types.Message{
To: m.maddr, To: m.maddr,
From: m.worker, From: m.worker,
Method: actors.MAMethods.CommitSector, Method: actors.MAMethods.CommitSector,
@ -154,36 +156,24 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal
GasPrice: types.NewInt(1), GasPrice: types.NewInt(1),
} }
nonce, err := m.api.MpoolGetNonce(ctx, m.worker) smsg, err := m.api.MpoolPushMessage(ctx, msg)
if err != nil { if err != nil {
return errors.Wrap(err, "failed to get nonce") return errors.Wrap(err, "pushing message to mpool")
}
msg.Nonce = nonce
data, err := msg.Serialize()
if err != nil {
return errors.Wrap(err, "serializing commit sector message")
}
sig, err := m.api.WalletSign(ctx, m.worker, data)
if err != nil {
return errors.Wrap(err, "signing commit sector message")
}
smsg := &types.SignedMessage{
Message: msg,
Signature: *sig,
}
if err := m.api.MpoolPush(ctx, smsg); err != nil {
return errors.Wrap(err, "pushing commit sector message to mpool")
} }
if err := m.commt.TrackCommitSectorMsg(m.maddr, sinfo.SectorID, smsg.Cid()); err != nil { if err := m.commt.TrackCommitSectorMsg(m.maddr, sinfo.SectorID, smsg.Cid()); err != nil {
return errors.Wrap(err, "tracking sector commitment") return errors.Wrap(err, "tracking sector commitment")
} }
go func() {
_, err := m.api.ChainWaitMsg(ctx, smsg.Cid())
if err != nil {
return
}
m.schedulePoSt(ctx, nil)
}()
return nil return nil
} }
@ -200,12 +190,21 @@ func (m *Miner) schedulePoSt(ctx context.Context, baseTs *types.TipSet) {
return return
} }
log.Infof("Scheduling post at height %d", ppe) m.schedLk.Lock()
// TODO: Should we set confidence to randomness lookback?
if m.postSched >= ppe {
log.Warn("schedulePoSt already called for proving period >= %d", m.postSched)
m.schedLk.Unlock()
return
}
m.postSched = ppe
m.schedLk.Unlock()
log.Infof("Scheduling post at height %d", ppe-build.PoSTChallangeTime)
err = m.events.ChainAt(m.startPost, func(ts *types.TipSet) error { // Revert err = m.events.ChainAt(m.startPost, func(ts *types.TipSet) error { // Revert
// TODO: Cancel post // TODO: Cancel post
return nil return nil
}, PoStConfidence, ppe) }, PoStConfidence, ppe-build.PoSTChallangeTime)
if err != nil { if err != nil {
// TODO: This is BAD, figure something out // TODO: This is BAD, figure something out
log.Errorf("scheduling PoSt failed: %s", err) log.Errorf("scheduling PoSt failed: %s", err)
@ -214,7 +213,14 @@ func (m *Miner) schedulePoSt(ctx context.Context, baseTs *types.TipSet) {
} }
func (m *Miner) startPost(ts *types.TipSet, curH uint64) error { func (m *Miner) startPost(ts *types.TipSet, curH uint64) error {
postWaitCh, _, err := m.maybeDoPost(context.TODO(), ts) log.Info("starting PoSt computation")
head, err := m.api.ChainHead(context.TODO())
if err != nil {
return err
}
postWaitCh, _, err := m.maybeDoPost(context.TODO(), head)
if err != nil { if err != nil {
return err return err
} }
@ -253,18 +259,19 @@ func (m *Miner) maybeDoPost(ctx context.Context, ts *types.TipSet) (<-chan error
return nil, nil, xerrors.Errorf("failed to get proving set for miner: %w", err) return nil, nil, xerrors.Errorf("failed to get proving set for miner: %w", err)
} }
r, err := m.api.ChainGetRandomness(ctx, ts, nil, int(ts.Height()-ppe)) r, err := m.api.ChainGetRandomness(ctx, ts, nil, int(ts.Height()-ppe+build.ProvingPeriodDuration)) // TODO: review: check math
if err != nil { if err != nil {
return nil, nil, xerrors.Errorf("failed to get chain randomness for post: %w", err) return nil, nil, xerrors.Errorf("failed to get chain randomness for post: %w", err)
} }
sourceTs, err := m.api.ChainGetTipSetByHeight(ctx, ppe, ts) sourceTs, err := m.api.ChainGetTipSetByHeight(ctx, ppe-build.ProvingPeriodDuration, ts)
if err != nil { if err != nil {
return nil, nil, xerrors.Errorf("failed to get post start tipset: %w", err) return nil, nil, xerrors.Errorf("failed to get post start tipset: %w", err)
} }
ret := make(chan error, 1) ret := make(chan error, 1)
go func() { go func() {
log.Info("running PoSt computation")
var faults []uint64 var faults []uint64
proof, err := m.secst.RunPoSt(ctx, sset, r, faults) proof, err := m.secst.RunPoSt(ctx, sset, r, faults)
if err != nil { if err != nil {
@ -272,8 +279,34 @@ func (m *Miner) maybeDoPost(ctx context.Context, ts *types.TipSet) (<-chan error
return return
} }
// TODO: submit post... log.Info("submitting PoSt")
_ = proof
params := &actors.SubmitPoStParams{
Proof: proof,
DoneSet: types.BitFieldFromSet(sectorIdList(sset)),
}
enc, aerr := actors.SerializeParams(params)
if aerr != nil {
ret <- xerrors.Errorf("could not serialize submit post parameters: %w", err)
return
}
msg := &types.Message{
To: m.maddr,
From: m.worker,
Method: actors.MAMethods.SubmitPoSt,
Params: enc,
Value: types.NewInt(0),
GasLimit: types.NewInt(100000 /* i dont know help */),
GasPrice: types.NewInt(1),
}
_, err = m.api.MpoolPushMessage(ctx, msg)
if err != nil {
ret <- xerrors.Errorf("pushing message to mpool: %w", err)
return
}
// make sure it succeeds... // make sure it succeeds...
// m.api.ChainWaitMsg() // m.api.ChainWaitMsg()
@ -283,6 +316,14 @@ func (m *Miner) maybeDoPost(ctx context.Context, ts *types.TipSet) (<-chan error
return ret, sourceTs.MinTicketBlock(), nil return ret, sourceTs.MinTicketBlock(), nil
} }
func sectorIdList(si []*api.SectorInfo) []uint64 {
out := make([]uint64, len(si))
for i, s := range si {
out[i] = s.SectorID
}
return out
}
func (m *Miner) runPreflightChecks(ctx context.Context) error { func (m *Miner) runPreflightChecks(ctx context.Context) error {
worker, err := m.api.StateMinerWorker(ctx, m.maddr, nil) worker, err := m.api.StateMinerWorker(ctx, m.maddr, nil)
if err != nil { if err != nil {

View File

@ -2,6 +2,7 @@ package sector
import ( import (
"context" "context"
"golang.org/x/xerrors"
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
@ -41,7 +42,7 @@ func (s *Store) Service() {
} }
func (s *Store) poll() { func (s *Store) poll() {
log.Info("polling for sealed sectors...") log.Debug("polling for sealed sectors...")
// get a list of sectors to poll // get a list of sectors to poll
s.lk.Lock() s.lk.Lock()
@ -164,7 +165,27 @@ func (s *Store) WaitSeal(ctx context.Context, sector uint64) (sectorbuilder.Sect
} }
func (s *Store) RunPoSt(ctx context.Context, sectors []*api.SectorInfo, r []byte, faults []uint64) ([]byte, error) { func (s *Store) RunPoSt(ctx context.Context, sectors []*api.SectorInfo, r []byte, faults []uint64) ([]byte, error) {
panic("TODO") sbsi := make([]sectorbuilder.SectorInfo, len(sectors))
for k, sector := range sectors {
var commR [sectorbuilder.CommLen]byte
if copy(commR[:], sector.CommR) != sectorbuilder.CommLen {
return nil, xerrors.Errorf("commR too short, %d bytes", len(sector.CommR))
}
sbsi[k] = sectorbuilder.SectorInfo{
SectorID: sector.SectorID,
CommR: commR,
}
}
ssi := sectorbuilder.NewSortedSectorInfo(sbsi)
var seed [sectorbuilder.CommLen]byte
if copy(seed[:], r) != sectorbuilder.CommLen {
return nil, xerrors.Errorf("random seed too short, %d bytes", len(r))
}
return s.sb.GeneratePoSt(ssi, seed, faults)
} }
func (s *Store) Stop() { func (s *Store) Stop() {