lotus/storage/miner.go

298 lines
7.9 KiB
Go
Raw Normal View History

2019-07-29 18:57:23 +00:00
package storage
import (
"context"
2019-09-26 23:07:40 +00:00
"sync"
2019-07-29 18:57:23 +00:00
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host"
2019-07-29 18:57:23 +00:00
"github.com/pkg/errors"
"golang.org/x/xerrors"
2019-07-29 18:57:23 +00:00
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
"github.com/filecoin-project/lotus/storage/sector"
2019-07-29 18:57:23 +00:00
)
var log = logging.Logger("storageminer")
const PoStConfidence = 3
2019-07-29 18:57:23 +00:00
type Miner struct {
api storageMinerApi
events *events.Events
2019-07-29 18:57:23 +00:00
2019-08-14 20:27:10 +00:00
secst *sector.Store
2019-07-29 18:57:23 +00:00
maddr address.Address
worker address.Address
h host.Host
ds datastore.Batching
2019-09-19 16:17:49 +00:00
schedLk sync.Mutex
2019-09-26 20:57:20 +00:00
schedPost uint64
2019-07-29 18:57:23 +00:00
}
type storageMinerApi interface {
// I think I want this... but this is tricky
//ReadState(ctx context.Context, addr address.Address) (????, error)
// Call a read only method on actors (no interaction with the chain required)
StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error)
StateMinerWorker(context.Context, address.Address, *types.TipSet) (address.Address, error)
StateMinerProvingPeriodEnd(context.Context, address.Address, *types.TipSet) (uint64, error)
StateMinerSectors(context.Context, address.Address, *types.TipSet) ([]*api.SectorInfo, error)
StateMinerProvingSet(context.Context, address.Address, *types.TipSet) ([]*api.SectorInfo, error)
StateMinerSectorSize(context.Context, address.Address, *types.TipSet) (uint64, error)
StateWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error)
2019-07-29 18:57:23 +00:00
2019-09-19 16:17:49 +00:00
MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error)
2019-07-29 18:57:23 +00:00
ChainHead(context.Context) (*types.TipSet, error)
2019-09-18 11:01:52 +00:00
ChainNotify(context.Context) (<-chan []*store.HeadChange, error)
ChainGetRandomness(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error)
ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error)
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
2019-08-08 04:24:49 +00:00
WalletBalance(context.Context, address.Address) (types.BigInt, error)
2019-08-08 17:29:23 +00:00
WalletHas(context.Context, address.Address) (bool, error)
2019-07-29 18:57:23 +00:00
}
2019-10-30 17:37:38 +00:00
func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, secst *sector.Store) (*Miner, error) {
2019-07-29 18:57:23 +00:00
return &Miner{
api: api,
2019-07-29 18:57:23 +00:00
maddr: addr,
h: h,
ds: ds,
2019-08-14 20:27:10 +00:00
secst: secst,
2019-07-29 18:57:23 +00:00
}, nil
}
func (m *Miner) Run(ctx context.Context) error {
if err := m.runPreflightChecks(ctx); err != nil {
return errors.Wrap(err, "miner preflight checks failed")
}
m.events = events.NewEvents(ctx, m.api)
2019-07-29 18:57:23 +00:00
go m.handlePostingSealedSectors(ctx)
2019-09-26 20:57:20 +00:00
go m.beginPosting(ctx)
2019-07-29 18:57:23 +00:00
return nil
}
func (m *Miner) commitUntrackedSectors(ctx context.Context) error {
2019-10-30 18:10:29 +00:00
sealed, err := m.secst.Commited()
if err != nil {
return err
}
chainSectors, err := m.api.StateMinerSectors(ctx, m.maddr, nil)
if err != nil {
return err
}
onchain := map[uint64]struct{}{}
for _, chainSector := range chainSectors {
onchain[chainSector.SectorID] = struct{}{}
}
for _, s := range sealed {
if _, ok := onchain[s.SectorID]; ok {
continue
}
log.Warnf("Missing commitment for sector %d, committing sector", s.SectorID)
if err := m.commitSector(ctx, s); err != nil {
log.Error("Committing uncommitted sector failed: ", err)
}
}
return nil
}
2019-07-29 18:57:23 +00:00
func (m *Miner) handlePostingSealedSectors(ctx context.Context) {
2019-08-14 20:27:10 +00:00
incoming := m.secst.Incoming()
defer m.secst.CloseIncoming(incoming)
if err := m.commitUntrackedSectors(ctx); err != nil {
log.Error(err)
}
2019-07-29 18:57:23 +00:00
for {
select {
2019-08-14 20:27:10 +00:00
case sinfo, ok := <-incoming:
2019-07-29 18:57:23 +00:00
if !ok {
// TODO: set some state variable so that this state can be
// visible via some status command
log.Warn("sealed sector channel closed, aborting process")
2019-07-29 18:57:23 +00:00
return
}
if err := m.commitSector(ctx, sinfo); err != nil {
log.Errorf("failed to commit sector: %s", err)
continue
}
2019-07-29 18:57:23 +00:00
case <-ctx.Done():
log.Warn("exiting seal posting routine")
2019-07-29 18:57:23 +00:00
return
}
}
}
2019-07-29 18:57:23 +00:00
func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSealingStatus) error {
2019-08-12 21:48:18 +00:00
log.Info("committing sector")
ssize, err := m.SectorSize(ctx)
if err != nil {
return xerrors.Errorf("failed to check out own sector size: %w", err)
}
_ = ssize
// TODO: 2 stage commit
/*deals, err := m.secst.DealsForCommit(sinfo.SectorID)
if err != nil {
return xerrors.Errorf("getting sector deals failed: %w", err)
}
*/
params := &actors.SectorPreCommitInfo{
2019-10-21 11:58:41 +00:00
CommD: sinfo.CommD[:],
CommR: sinfo.CommR[:],
2019-10-27 08:56:53 +00:00
Epoch: sinfo.Ticket.BlockHeight,
//DealIDs: deals,
SectorNumber: sinfo.SectorID,
2019-07-29 18:57:23 +00:00
}
enc, aerr := actors.SerializeParams(params)
if aerr != nil {
return errors.Wrap(aerr, "could not serialize commit sector parameters")
2019-07-29 18:57:23 +00:00
}
2019-09-19 16:17:49 +00:00
msg := &types.Message{
2019-07-29 18:57:23 +00:00
To: m.maddr,
From: m.worker,
Method: actors.MAMethods.PreCommitSector,
Params: enc,
2019-07-29 18:57:23 +00:00
Value: types.NewInt(0), // TODO: need to ensure sufficient collateral
GasLimit: types.NewInt(1000000 /* i dont know help */),
2019-07-29 18:57:23 +00:00
GasPrice: types.NewInt(1),
}
2019-09-19 16:17:49 +00:00
smsg, err := m.api.MpoolPushMessage(ctx, msg)
2019-07-29 18:57:23 +00:00
if err != nil {
2019-09-19 16:17:49 +00:00
return errors.Wrap(err, "pushing message to mpool")
2019-07-29 18:57:23 +00:00
}
2019-09-19 16:17:49 +00:00
go func() {
// TODO: maybe just mark this down in the datastore and handle it differently? This feels complicated to restart
mw, err := m.api.StateWaitMsg(ctx, smsg.Cid())
2019-09-19 16:17:49 +00:00
if err != nil {
return
}
randHeight := mw.TipSet.Height() + build.InteractivePoRepDelay
err = m.events.ChainAt(func(ts *types.TipSet, curH uint64) error {
go func() {
rand, err := m.api.ChainGetRandomness(ctx, ts, nil, ts.Height()-randHeight)
if err != nil {
log.Error(errors.Errorf("failed to get randomness for computing seal proof: %w", err))
return
}
// TODO: should this get scheduled to preserve proper resource consumption?
proof, err := m.secst.SealComputeProof(ctx, sinfo.SectorID, rand)
if err != nil {
log.Error(errors.Errorf("computing seal proof failed: %w", err))
return
}
params := &actors.SectorProveCommitInfo{
Proof: proof,
SectorID: sinfo.SectorID,
//DealIDs: deals,
}
enc, aerr := actors.SerializeParams(params)
if aerr != nil {
log.Errorf(errors.Wrap(aerr, "could not serialize commit sector parameters"))
return
}
msg := &types.Message{
To: m.maddr,
From: m.worker,
Method: actors.MAMethods.ProveCommitSector,
Params: enc,
Value: types.NewInt(0), // TODO: need to ensure sufficient collateral
GasLimit: types.NewInt(1000000 /* i dont know help */),
GasPrice: types.NewInt(1),
}
smsg, err := m.api.MpoolPushMessage(ctx, msg)
if err != nil {
return errors.Wrap(err, "pushing message to mpool")
}
// TODO: now wait for this to get included and handle errors?
_, err := m.api.StateWaitMsg(ctx, smsg.Cid())
if err != nil {
log.Errorf("failed to wait for porep inclusion: %s", err)
return
}
m.beginPosting(ctx)
}()
return nil
}, func(ts *types.TipSet) error {
log.Warn("revert in interactive commit sector step")
return nil
}, 3, mw.TipSet.Height()+build.InteractivePoRepDelay)
if err != nil {
return
}
2019-09-19 16:17:49 +00:00
}()
2019-09-16 16:40:26 +00:00
return nil
2019-07-29 18:57:23 +00:00
}
func (m *Miner) runPreflightChecks(ctx context.Context) error {
worker, err := m.api.StateMinerWorker(ctx, m.maddr, nil)
2019-07-29 18:57:23 +00:00
if err != nil {
return err
}
m.worker = worker
2019-08-08 17:29:23 +00:00
has, err := m.api.WalletHas(ctx, worker)
if err != nil {
return errors.Wrap(err, "failed to check wallet for worker key")
}
if !has {
return errors.New("key for worker not found in local wallet")
2019-07-29 18:57:23 +00:00
}
log.Infof("starting up miner %s, worker addr %s", m.maddr, m.worker)
2019-07-29 18:57:23 +00:00
return nil
}
func (m *Miner) SectorSize(ctx context.Context) (uint64, error) {
// TODO: cache this
return m.api.StateMinerSectorSize(ctx, m.maddr, nil)
}