import sector/deal into state machines on init

This commit is contained in:
Łukasz Magiera 2019-11-29 20:11:01 +01:00
parent 0ca92bdca0
commit ad978949db
9 changed files with 186 additions and 26 deletions

View File

@ -5,7 +5,7 @@ package build
import "os"
// Seconds
const BlockDelay = 2
const BlockDelay = 4
// Blocks
const ProvingPeriodDuration uint64 = 40

View File

@ -3,16 +3,17 @@ package actors
import (
"bytes"
"context"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-amt-ipld"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-hamt-ipld"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/aerrors"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/cborutil"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
)
@ -110,6 +111,15 @@ func (sdp *StorageDealProposal) Sign(ctx context.Context, sign SignFunc) error {
return nil
}
func (sdp *StorageDealProposal) Cid() (cid.Cid, error) {
nd, err := cborutil.AsIpld(sdp)
if err != nil {
return cid.Undef, err
}
return nd.Cid(), nil
}
func (sdp *StorageDealProposal) Verify() error {
unsigned := *sdp
unsigned.ProposerSignature = nil

View File

@ -24,6 +24,8 @@ import (
"github.com/filecoin-project/lotus/storage/sectorblocks"
)
var ProviderDsPrefix = "/deals/provider"
type MinerDeal struct {
Client peer.ID
Proposal actors.StorageDealProposal
@ -110,7 +112,7 @@ func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, secb *sectorblocks
actor: minerAddress,
deals: statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client"))),
deals: statestore.New(namespace.Wrap(ds, datastore.NewKey(ProviderDsPrefix))),
ds: ds,
}

View File

@ -3,8 +3,12 @@ package main
import (
"context"
"crypto/rand"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"github.com/ipfs/go-datastore"
badger "github.com/ipfs/go-ds-badger"
@ -14,12 +18,15 @@ import (
"golang.org/x/xerrors"
"gopkg.in/urfave/cli.v2"
"github.com/filecoin-project/lotus/api"
lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/deals"
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/genesis"
"github.com/filecoin-project/lotus/lib/cborutil"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
"github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node/modules"
@ -199,7 +206,116 @@ func migratePreSealedSectors(presealsb string, repoPath string, mds dtypes.Metad
return nil
}
func storageMinerInit(ctx context.Context, cctx *cli.Context, api api.FullNode, r repo.Repo) error {
func migratePreSealMeta(ctx context.Context, api lapi.FullNode, presealDir string, maddr address.Address, mds dtypes.MetadataDS) error {
b, err := ioutil.ReadFile(filepath.Join(presealDir, "pre-seal-"+maddr.String()+".json"))
if err != nil {
return xerrors.Errorf("reading preseal metadata: %w", err)
}
preseals := map[string]genesis.GenesisMiner{}
if err := json.Unmarshal(b, &preseals); err != nil {
return xerrors.Errorf("unmarshaling preseal metadata: %w", err)
}
meta, ok := preseals[maddr.String()]
if !ok {
return xerrors.New("got wrong preseal info")
}
for _, sector := range meta.Sectors {
sectorKey := datastore.NewKey(storage.SectorStorePrefix).ChildString(fmt.Sprint(sector.SectorID))
dealID, err := findMarketDealID(ctx, api, sector.Deal)
if err != nil {
return xerrors.Errorf("finding storage deal for pre-sealed sector %d: %w", sector.SectorID, err)
}
info := &storage.SectorInfo{
State: lapi.Proving,
SectorID: sector.SectorID,
Pieces: []storage.Piece{
{
DealID: dealID,
Ref: fmt.Sprintf("preseal-%d", sector.SectorID),
Size: meta.SectorSize,
CommP: sector.CommD[:],
},
},
CommC: nil,
CommD: sector.CommD[:],
CommR: sector.CommR[:],
CommRLast: nil,
Proof: nil,
Ticket: storage.SealTicket{},
PreCommitMessage: nil,
Seed: storage.SealSeed{},
CommitMessage: nil,
}
b, err := cborutil.Dump(info)
if err != nil {
return err
}
if err := mds.Put(sectorKey, b); err != nil {
return err
}
proposalCid, err := sector.Deal.Proposal.Cid()
if err != nil {
return err
}
dealKey := datastore.NewKey(deals.ProviderDsPrefix).ChildString(proposalCid.String())
deal := &deals.MinerDeal{
Proposal: sector.Deal.Proposal,
ProposalCid: proposalCid,
State: lapi.DealComplete,
Ref: proposalCid, // TODO: This is super wrong, but there
// are no params for CommP CIDs, we can't recover unixfs cid easily,
// and this isn't even used after the deal enters Complete state
DealID: dealID,
SectorID: sector.SectorID,
}
b, err = cborutil.Dump(deal)
if err != nil {
return err
}
if err := mds.Put(dealKey, b); err != nil {
return err
}
}
return nil
}
func findMarketDealID(ctx context.Context, api lapi.FullNode, deal actors.StorageDeal) (uint64, error) {
// TODO: find a better way
// (this is only used by genesis miners)
deals, err := api.StateMarketDeals(ctx, nil)
if err != nil {
return 0, xerrors.Errorf("getting market deals: %w", err)
}
for k, v := range deals {
eq, err := cborutil.Equals(&v.Deal, &deal)
if err != nil {
return 0, err
}
if eq {
return strconv.ParseUint(k, 10, 64)
}
}
return 0, xerrors.New("deal not found")
}
func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode, r repo.Repo) error {
lr, err := r.Lock(repo.StorageMiner)
if err != nil {
return err
@ -218,6 +334,11 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api api.FullNode,
return xerrors.Errorf("peer ID from private key: %w", err)
}
mds, err := lr.Datastore("/metadata")
if err != nil {
return err
}
var addr address.Address
if act := cctx.String("actor"); act != "" {
a, err := address.NewFromString(act)
@ -226,10 +347,6 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api api.FullNode,
}
if cctx.Bool("genesis-miner") {
mds, err := lr.Datastore("/metadata")
if err != nil {
return err
}
if err := mds.Put(datastore.NewKey("miner-address"), a.Bytes()); err != nil {
return err
}
@ -261,6 +378,14 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api api.FullNode,
}
}
if pssb := cctx.String("pre-sealed-sectors"); pssb != "" {
log.Infof("Importing pre-sealed sector metadata for %s", a)
if err := migratePreSealMeta(ctx, api, cctx.String("pre-sealed-sectors"), a, mds); err != nil {
return xerrors.Errorf("migrating presealed sector metadata: %w", err)
}
}
return nil
} else {
if err := configureStorageMiner(ctx, api, a, peerid); err != nil {
@ -279,10 +404,6 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api api.FullNode,
}
log.Infof("Created new storage miner: %s", addr)
mds, err := lr.Datastore("/metadata")
if err != nil {
return err
}
if err := mds.Put(datastore.NewKey("miner-address"), addr.Bytes()); err != nil {
return err
}
@ -316,7 +437,7 @@ func makeHostKey(lr repo.LockedRepo) (crypto.PrivKey, error) {
return pk, nil
}
func configureStorageMiner(ctx context.Context, api api.FullNode, addr address.Address, peerid peer.ID) error {
func configureStorageMiner(ctx context.Context, api lapi.FullNode, addr address.Address, peerid peer.ID) error {
// This really just needs to be an api call at this point...
recp, err := api.StateCall(ctx, &types.Message{
To: addr,
@ -369,7 +490,7 @@ func configureStorageMiner(ctx context.Context, api api.FullNode, addr address.A
return nil
}
func createStorageMiner(ctx context.Context, api api.FullNode, peerid peer.ID, cctx *cli.Context) (addr address.Address, err error) {
func createStorageMiner(ctx context.Context, api lapi.FullNode, peerid peer.ID, cctx *cli.Context) (addr address.Address, err error) {
log.Info("Creating StorageMarket.CreateStorageMiner message")
var owner address.Address

View File

@ -33,6 +33,7 @@ var runCmd = &cli.Command{
&cli.BoolFlag{
Name: "enable-gpu-proving",
Usage: "Enable use of GPU for mining operations",
Value: true,
},
},
Action: func(cctx *cli.Context) error {

View File

@ -40,7 +40,7 @@ var DaemonCmd = &cli.Command{
Hidden: true,
},
&cli.StringFlag{
Name: "genesis-presealed-sectors",
Name: preSealedSectorsFlag,
Hidden: true,
},
&cli.StringFlag{

View File

@ -24,7 +24,7 @@ import (
var log = logging.Logger("storageminer")
const PoStConfidence = 3
const SectorStorePrefix = "/sectors"
type Miner struct {
api storageMinerApi
@ -83,7 +83,7 @@ func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datasto
sb: sb,
tktFn: tktFn,
sectors: statestore.New(namespace.Wrap(ds, datastore.NewKey("/sectors"))),
sectors: statestore.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix))),
sectorIncoming: make(chan *SectorInfo),
sectorUpdated: make(chan sectorUpdate),

View File

@ -125,13 +125,13 @@ func (t *SectorInfo) rspco() sectorbuilder.RawSealPreCommitOutput {
}
func (m *Miner) sectorStateLoop(ctx context.Context) error {
toRestart, err := m.ListSectors()
trackedSectors, err := m.ListSectors()
if err != nil {
return err
}
go func() {
for _, si := range toRestart {
for _, si := range trackedSectors {
select {
case m.sectorUpdated <- sectorUpdate{
newState: si.State,
@ -146,6 +146,32 @@ func (m *Miner) sectorStateLoop(ctx context.Context) error {
}
}()
{
// verify on-chain state
trackedByID := map[uint64]*SectorInfo{}
for _, si := range trackedSectors {
trackedByID[si.SectorID] = &si
}
curTs, err := m.api.ChainHead(ctx)
if err != nil {
return xerrors.Errorf("getting chain head: %w", err)
}
ps, err := m.api.StateMinerProvingSet(ctx, m.maddr, curTs)
if err != nil {
return err
}
for _, ocs := range ps {
if _, ok := trackedByID[ocs.SectorID]; ok {
continue // TODO: check state
}
// TODO: attempt recovery
log.Warnf("untracked sector %d found on chain", ocs.SectorID)
}
}
go func() {
defer log.Warn("quitting deal provider loop")
defer close(m.stopped)
@ -217,15 +243,15 @@ func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) {
switch update.newState {
case api.Packing:
m.handle(ctx, sector, m.finishPacking, api.Unsealed)
m.handleSectorUpdate(ctx, sector, m.finishPacking, api.Unsealed)
case api.Unsealed:
m.handle(ctx, sector, m.sealPreCommit, api.PreCommitting)
m.handleSectorUpdate(ctx, sector, m.sealPreCommit, api.PreCommitting)
case api.PreCommitting:
m.handle(ctx, sector, m.preCommit, api.PreCommitted)
m.handleSectorUpdate(ctx, sector, m.preCommit, api.PreCommitted)
case api.PreCommitted:
m.handle(ctx, sector, m.preCommitted, api.SectorNoUpdate)
m.handleSectorUpdate(ctx, sector, m.preCommitted, api.SectorNoUpdate)
case api.Committing:
m.handle(ctx, sector, m.committing, api.Proving)
m.handleSectorUpdate(ctx, sector, m.committing, api.Proving)
case api.Proving:
// TODO: track sector health / expiration
log.Infof("Proving sector %d", update.id)

View File

@ -14,7 +14,7 @@ import (
type providerHandlerFunc func(ctx context.Context, deal SectorInfo) (func(*SectorInfo), error)
func (m *Miner) handle(ctx context.Context, sector SectorInfo, cb providerHandlerFunc, next api.SectorState) {
func (m *Miner) handleSectorUpdate(ctx context.Context, sector SectorInfo, cb providerHandlerFunc, next api.SectorState) {
go func() {
mut, err := cb(ctx, sector)