Merge pull request #494 from filecoin-project/feat/handle-uncommited
storageminer: Handle uncommited sectors on start
This commit is contained in:
commit
413314b44b
10
api/api.go
10
api/api.go
@ -168,6 +168,8 @@ type StorageMiner interface {
|
||||
SectorsList(context.Context) ([]uint64, error)
|
||||
|
||||
SectorsRefs(context.Context) (map[string][]SealedRef, error)
|
||||
|
||||
CommitmentsList(context.Context) ([]SectorCommitment, error)
|
||||
}
|
||||
|
||||
// Version provides various build-time information
|
||||
@ -330,6 +332,14 @@ type SyncState struct {
|
||||
Height uint64
|
||||
}
|
||||
|
||||
type SectorCommitment struct {
|
||||
SectorID uint64
|
||||
Miner address.Address
|
||||
|
||||
CommitMsg cid.Cid
|
||||
DealIDs []uint64
|
||||
}
|
||||
|
||||
type SyncStateStage int
|
||||
|
||||
const (
|
||||
|
@ -133,6 +133,8 @@ type StorageMinerStruct struct {
|
||||
SectorsList func(context.Context) ([]uint64, error) `perm:"read"`
|
||||
|
||||
SectorsRefs func(context.Context) (map[string][]SealedRef, error) `perm:"read"`
|
||||
|
||||
CommitmentsList func(context.Context) ([]SectorCommitment, error) `perm:"read"`
|
||||
}
|
||||
}
|
||||
|
||||
@ -479,6 +481,10 @@ func (c *StorageMinerStruct) SectorsRefs(ctx context.Context) (map[string][]Seal
|
||||
return c.Internal.SectorsRefs(ctx)
|
||||
}
|
||||
|
||||
func (c *StorageMinerStruct) CommitmentsList(ctx context.Context) ([]SectorCommitment, error) {
|
||||
return c.Internal.CommitmentsList(ctx)
|
||||
}
|
||||
|
||||
var _ Common = &CommonStruct{}
|
||||
var _ FullNode = &FullNodeStruct{}
|
||||
var _ StorageMiner = &StorageMinerStruct{}
|
||||
|
41
cmd/lotus-storage-miner/commitments.go
Normal file
41
cmd/lotus-storage-miner/commitments.go
Normal file
@ -0,0 +1,41 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
lcli "github.com/filecoin-project/lotus/cli"
|
||||
|
||||
"gopkg.in/urfave/cli.v2"
|
||||
)
|
||||
|
||||
var commitmentsCmd = &cli.Command{
|
||||
Name: "commitments",
|
||||
Usage: "interact with commitment tracker",
|
||||
Subcommands: []*cli.Command{
|
||||
commitmentsListCmd,
|
||||
},
|
||||
}
|
||||
|
||||
var commitmentsListCmd = &cli.Command{
|
||||
Name: "list",
|
||||
Usage: "List tracked sector commitments",
|
||||
Action: func(cctx *cli.Context) error {
|
||||
api, closer, err := lcli.GetStorageMinerAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closer()
|
||||
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
|
||||
comms, err := api.CommitmentsList(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, comm := range comms {
|
||||
fmt.Printf("%s:%d msg:%s, deals: %v\n", comm.Miner, comm.SectorID, comm.CommitMsg, comm.DealIDs)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
@ -26,6 +26,7 @@ func main() {
|
||||
infoCmd,
|
||||
storeGarbageCmd,
|
||||
sectorsCmd,
|
||||
commitmentsCmd,
|
||||
}
|
||||
jaeger := tracing.SetupJaegerTracing("lotus")
|
||||
defer func() {
|
||||
|
@ -88,6 +88,10 @@ func (sb *SectorBuilder) SealSector(sectorID uint64, ticket SealTicket) (SealedS
|
||||
return sectorbuilder.SealSector(sb.handle, sectorID, ticket)
|
||||
}
|
||||
|
||||
func (sb *SectorBuilder) ResumeSealSector(sectorID uint64) (SealedSectorMetadata, error) {
|
||||
return sectorbuilder.ResumeSealSector(sb.handle, sectorID)
|
||||
}
|
||||
|
||||
func (sb *SectorBuilder) SealStatus(sector uint64) (SectorSealingStatus, error) {
|
||||
return sectorbuilder.GetSectorSealingStatusByID(sb.handle, sector)
|
||||
}
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/address"
|
||||
"github.com/filecoin-project/lotus/lib/sectorbuilder"
|
||||
"github.com/filecoin-project/lotus/storage"
|
||||
"github.com/filecoin-project/lotus/storage/commitment"
|
||||
"github.com/filecoin-project/lotus/storage/sector"
|
||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||
|
||||
@ -23,6 +24,7 @@ type StorageMinerAPI struct {
|
||||
SectorBuilder *sectorbuilder.SectorBuilder
|
||||
Sectors *sector.Store
|
||||
SectorBlocks *sectorblocks.SectorBlocks
|
||||
CommitmentTracker *commitment.Tracker
|
||||
|
||||
Miner *storage.Miner
|
||||
}
|
||||
@ -81,4 +83,8 @@ func (sm *StorageMinerAPI) SectorsRefs(context.Context) (map[string][]api.Sealed
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) CommitmentsList(ctx context.Context) ([]api.SectorCommitment, error) {
|
||||
return sm.CommitmentTracker.List()
|
||||
}
|
||||
|
||||
var _ api.StorageMiner = &StorageMinerAPI{}
|
||||
|
@ -3,6 +3,8 @@ package commitment
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
@ -12,8 +14,10 @@ import (
|
||||
logging "github.com/ipfs/go-log"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/address"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
dsq "github.com/ipfs/go-datastore/query"
|
||||
)
|
||||
|
||||
var log = logging.Logger("commitment")
|
||||
@ -56,6 +60,21 @@ func (ct *Tracker) TrackCommitSectorMsg(miner address.Address, sectorId uint64,
|
||||
|
||||
tracking, err := ct.commitments.Get(key)
|
||||
switch err {
|
||||
case nil:
|
||||
var comm commitment
|
||||
if err := cbor.DecodeInto(tracking, &comm); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !comm.Msg.Equals(commitMsg) {
|
||||
log.Errorf("commitment tracking for miner %s, sector %d: already tracking %s, got another commitment message: %s", miner, sectorId, comm.Msg, commitMsg)
|
||||
}
|
||||
|
||||
log.Warnf("commitment.TrackCommitSectorMsg called more than once for miner %s, sector %d, message %s", miner, sectorId, commitMsg)
|
||||
|
||||
// we still want to store it
|
||||
fallthrough // TODO: ideally we'd keep around both (even though we'll
|
||||
// usually only need the new one)
|
||||
case datastore.ErrNotFound:
|
||||
comm := &commitment{Msg: commitMsg}
|
||||
commB, err := cbor.DumpObject(comm)
|
||||
@ -73,18 +92,6 @@ func (ct *Tracker) TrackCommitSectorMsg(miner address.Address, sectorId uint64,
|
||||
delete(ct.waits, key)
|
||||
}
|
||||
return nil
|
||||
case nil:
|
||||
var comm commitment
|
||||
if err := cbor.DecodeInto(tracking, &comm); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !comm.Msg.Equals(commitMsg) {
|
||||
return xerrors.Errorf("commitment tracking for miner %s, sector %d: already tracking %s, got another commitment message: %s", miner, sectorId, comm.Msg, commitMsg)
|
||||
}
|
||||
|
||||
log.Warnf("commitment.TrackCommitSectorMsg called more than once for miner %s, sector %d, message %s", miner, sectorId, commitMsg)
|
||||
return nil
|
||||
default:
|
||||
return err
|
||||
}
|
||||
@ -136,3 +143,56 @@ func (ct *Tracker) WaitCommit(ctx context.Context, miner address.Address, sector
|
||||
return cid.Undef, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (ct *Tracker) List() ([]api.SectorCommitment, error) {
|
||||
out := make([]api.SectorCommitment, 0)
|
||||
|
||||
ct.lk.Lock()
|
||||
defer ct.lk.Unlock()
|
||||
|
||||
res, err := ct.commitments.Query(dsq.Query{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer res.Close()
|
||||
|
||||
for {
|
||||
res, ok := res.NextSync()
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
|
||||
if res.Error != nil {
|
||||
return nil, xerrors.Errorf("iterating commitments: %w", err)
|
||||
}
|
||||
|
||||
parts := strings.Split(res.Key, "/")
|
||||
if len(parts) != 4 {
|
||||
return nil, xerrors.Errorf("expected commitment key to be 4 parts, Key %s", res.Key)
|
||||
}
|
||||
|
||||
miner, err := address.NewFromString(parts[2])
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("parsing miner address: %w", err)
|
||||
}
|
||||
|
||||
sectorID, err := strconv.ParseInt(parts[3], 10, 64)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("parsing sector id: %w", err)
|
||||
}
|
||||
|
||||
var comm commitment
|
||||
if err := cbor.DecodeInto(res.Value, &comm); err != nil {
|
||||
return nil, xerrors.Errorf("decoding commitment %s (`% X`): %w", res.Key, res.Value, err)
|
||||
}
|
||||
|
||||
out = append(out, api.SectorCommitment{
|
||||
SectorID: uint64(sectorID),
|
||||
Miner: miner,
|
||||
CommitMsg: comm.Msg,
|
||||
DealIDs: comm.DealIDs,
|
||||
})
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
@ -53,6 +53,7 @@ type storageMinerApi interface {
|
||||
StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error)
|
||||
StateMinerWorker(context.Context, address.Address, *types.TipSet) (address.Address, error)
|
||||
StateMinerProvingPeriodEnd(context.Context, address.Address, *types.TipSet) (uint64, error)
|
||||
StateMinerSectors(context.Context, address.Address, *types.TipSet) ([]*api.SectorInfo, error)
|
||||
StateMinerProvingSet(context.Context, address.Address, *types.TipSet) ([]*api.SectorInfo, error)
|
||||
StateMinerSectorSize(context.Context, address.Address, *types.TipSet) (uint64, error)
|
||||
StateWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error)
|
||||
@ -93,10 +94,44 @@ func (m *Miner) Run(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Miner) commitUntrackedSectors(ctx context.Context) error {
|
||||
sealed, err := m.secst.Sealed()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
chainSectors, err := m.api.StateMinerSectors(ctx, m.maddr, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
onchain := map[uint64]struct{}{}
|
||||
for _, chainSector := range chainSectors {
|
||||
onchain[chainSector.SectorID] = struct{}{}
|
||||
}
|
||||
|
||||
for _, s := range sealed {
|
||||
if _, ok := onchain[s.SectorID]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
log.Warnf("Missing commitment for sector %d, committing sector", s.SectorID)
|
||||
|
||||
if err := m.commitSector(ctx, s); err != nil {
|
||||
log.Error("Committing uncommitted sector failed: ", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Miner) handlePostingSealedSectors(ctx context.Context) {
|
||||
incoming := m.secst.Incoming()
|
||||
defer m.secst.CloseIncoming(incoming)
|
||||
|
||||
if err := m.commitUntrackedSectors(ctx); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case sinfo, ok := <-incoming:
|
||||
@ -169,10 +204,6 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal
|
||||
return errors.Wrap(err, "pushing message to mpool")
|
||||
}
|
||||
|
||||
if err := m.commt.TrackCommitSectorMsg(m.maddr, sinfo.SectorID, smsg.Cid()); err != nil {
|
||||
return errors.Wrap(err, "tracking sector commitment")
|
||||
}
|
||||
|
||||
go func() {
|
||||
_, err := m.api.StateWaitMsg(ctx, smsg.Cid())
|
||||
if err != nil {
|
||||
@ -182,6 +213,10 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal
|
||||
m.beginPosting(ctx)
|
||||
}()
|
||||
|
||||
if err := m.commt.TrackCommitSectorMsg(m.maddr, sinfo.SectorID, smsg.Cid()); err != nil {
|
||||
return xerrors.Errorf("tracking sector commitment: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -107,9 +107,39 @@ func (s *Store) poll() {
|
||||
s.waitingLk.Unlock()
|
||||
}
|
||||
|
||||
func (s *Store) restartSealing() {
|
||||
sectors, err := s.sb.GetAllStagedSectors()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, sid := range sectors {
|
||||
status, err := s.sb.SealStatus(sid)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if status.State != sealing_state.Paused {
|
||||
continue
|
||||
}
|
||||
|
||||
log.Infof("Sector %d is in paused state, resuming sealing", sid)
|
||||
go func() {
|
||||
// TODO: when we refactor wait-for-seal below, care about this output too
|
||||
// (see SealSector below)
|
||||
_, err := s.sb.ResumeSealSector(sid)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Store) service() {
|
||||
poll := time.Tick(5 * time.Second)
|
||||
|
||||
s.restartSealing()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-poll:
|
||||
@ -263,6 +293,28 @@ func (s *Store) WaitSeal(ctx context.Context, sector uint64) (sectorbuilder.Sect
|
||||
return s.sb.SealStatus(sector)
|
||||
}
|
||||
|
||||
func (s *Store) Sealed() ([]sectorbuilder.SectorSealingStatus, error) {
|
||||
l, err := s.sb.GetAllStagedSectors()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
out := make([]sectorbuilder.SectorSealingStatus, 0)
|
||||
for _, sid := range l {
|
||||
status, err := s.sb.SealStatus(sid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if status.State != sealing_state.Sealed {
|
||||
continue
|
||||
}
|
||||
out = append(out, status)
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (s *Store) RunPoSt(ctx context.Context, sectors []*api.SectorInfo, r []byte, faults []uint64) ([]byte, error) {
|
||||
sbsi := make([]sectorbuilder.SectorInfo, len(sectors))
|
||||
for k, sector := range sectors {
|
||||
|
Loading…
Reference in New Issue
Block a user