From 169c285fb3335ed1c66575bff4987bf75031e87e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 29 Oct 2019 20:46:32 +0100 Subject: [PATCH 1/7] storageminer: Handle uncommited sectors on start --- lib/sectorbuilder/sectorbuilder.go | 4 ++++ storage/commitment/tracker.go | 9 +++++++ storage/miner.go | 38 ++++++++++++++++++++++++++++++ storage/sector/store.go | 4 ++++ 4 files changed, 55 insertions(+) diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index c9cad6ae5..26123e757 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -110,6 +110,10 @@ func (sb *SectorBuilder) GetAllStagedSectors() ([]uint64, error) { return out, nil } +func (sb *SectorBuilder) GetAllSealedSectors() ([]SealedSectorMetadata, error) { + return sectorbuilder.GetAllSealedSectors(sb.handle) +} + func (sb *SectorBuilder) GeneratePoSt(sectorInfo SortedSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]byte, error) { // Wait, this is a blocking method with no way of interrupting it? // does it checkpoint itself? diff --git a/storage/commitment/tracker.go b/storage/commitment/tracker.go index aa50c705e..10510fba0 100644 --- a/storage/commitment/tracker.go +++ b/storage/commitment/tracker.go @@ -136,3 +136,12 @@ func (ct *Tracker) WaitCommit(ctx context.Context, miner address.Address, sector return cid.Undef, ctx.Err() } } + +func (ct *Tracker) CheckCommitment(miner address.Address, sectorId uint64) (bool, error) { + key := commitmentKey(miner, sectorId) + + ct.lk.Lock() + defer ct.lk.Unlock() + + return ct.commitments.Has(key) +} diff --git a/storage/miner.go b/storage/miner.go index 0ab1ce009..81880f86b 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -2,6 +2,7 @@ package storage import ( "context" + "github.com/filecoin-project/go-sectorbuilder/sealing_state" "sync" "github.com/ipfs/go-cid" @@ -93,10 +94,47 @@ func (m *Miner) Run(ctx context.Context) error { return nil } +func (m *Miner) commitUntrackedSectors(ctx context.Context) error { + sealed, err := m.secst.Sealed() + if err != nil { + return err + } + + for _, s := range sealed { + has, err := m.commt.CheckCommitment(m.maddr, s.SectorID) + if err != nil { + log.Error("checking commitment: ", err) + } + + if has { + continue + } + + log.Warnf("Missing commitment for sector %d, committing sector", s.SectorID) + + if err := m.commitSector(ctx, sectorbuilder.SectorSealingStatus{ + SectorID: s.SectorID, + State: sealing_state.Sealed, + CommD: s.CommD, + CommR: s.CommR, + Proof: s.Proof, + Pieces: s.Pieces, + Ticket: s.Ticket, + }); err != nil { + log.Error("Committing uncommitted sector failed: ", err) + } + } + return nil +} + func (m *Miner) handlePostingSealedSectors(ctx context.Context) { incoming := m.secst.Incoming() defer m.secst.CloseIncoming(incoming) + if err := m.commitUntrackedSectors(ctx); err != nil { + log.Error(err) + } + for { select { case sinfo, ok := <-incoming: diff --git a/storage/sector/store.go b/storage/sector/store.go index 5a4b6a9a3..f82505837 100644 --- a/storage/sector/store.go +++ b/storage/sector/store.go @@ -263,6 +263,10 @@ func (s *Store) WaitSeal(ctx context.Context, sector uint64) (sectorbuilder.Sect return s.sb.SealStatus(sector) } +func (s *Store) Sealed() ([]sectorbuilder.SealedSectorMetadata, error) { + return s.sb.GetAllSealedSectors() +} + func (s *Store) RunPoSt(ctx context.Context, sectors []*api.SectorInfo, r []byte, faults []uint64) ([]byte, error) { sbsi := make([]sectorbuilder.SectorInfo, len(sectors)) for k, sector := range sectors { From 44f4ee0de1c628c330abc74a8328dedc9b1b7a41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 29 Oct 2019 21:41:30 +0100 Subject: [PATCH 2/7] storageminer: cmd to list sector commitments --- api/api.go | 10 +++++ api/struct.go | 6 +++ cmd/lotus-storage-miner/commitments.go | 41 ++++++++++++++++++ cmd/lotus-storage-miner/main.go | 1 + node/impl/storminer.go | 6 +++ storage/commitment/tracker.go | 57 ++++++++++++++++++++++++++ storage/miner.go | 2 +- 7 files changed, 122 insertions(+), 1 deletion(-) create mode 100644 cmd/lotus-storage-miner/commitments.go diff --git a/api/api.go b/api/api.go index 635901ecb..463e5e9cb 100644 --- a/api/api.go +++ b/api/api.go @@ -168,6 +168,8 @@ type StorageMiner interface { SectorsList(context.Context) ([]uint64, error) SectorsRefs(context.Context) (map[string][]SealedRef, error) + + CommitmentsList(context.Context) ([]SectorCommitment, error) } // Version provides various build-time information @@ -330,6 +332,14 @@ type SyncState struct { Height uint64 } +type SectorCommitment struct { + SectorID uint64 + Miner address.Address + + CommitMsg cid.Cid + DealIDs []uint64 +} + type SyncStateStage int const ( diff --git a/api/struct.go b/api/struct.go index 6eefe41dc..47d88aa69 100644 --- a/api/struct.go +++ b/api/struct.go @@ -133,6 +133,8 @@ type StorageMinerStruct struct { SectorsList func(context.Context) ([]uint64, error) `perm:"read"` SectorsRefs func(context.Context) (map[string][]SealedRef, error) `perm:"read"` + + CommitmentsList func(context.Context) ([]SectorCommitment, error) `perm:"read"` } } @@ -479,6 +481,10 @@ func (c *StorageMinerStruct) SectorsRefs(ctx context.Context) (map[string][]Seal return c.Internal.SectorsRefs(ctx) } +func (c *StorageMinerStruct) CommitmentsList(ctx context.Context) ([]SectorCommitment, error) { + return c.Internal.CommitmentsList(ctx) +} + var _ Common = &CommonStruct{} var _ FullNode = &FullNodeStruct{} var _ StorageMiner = &StorageMinerStruct{} diff --git a/cmd/lotus-storage-miner/commitments.go b/cmd/lotus-storage-miner/commitments.go new file mode 100644 index 000000000..a7825deb1 --- /dev/null +++ b/cmd/lotus-storage-miner/commitments.go @@ -0,0 +1,41 @@ +package main + +import ( + "fmt" + lcli "github.com/filecoin-project/lotus/cli" + + "gopkg.in/urfave/cli.v2" +) + +var commitmentsCmd = &cli.Command{ + Name: "commitments", + Usage: "interact with commitment tracker", + Subcommands: []*cli.Command{ + commitmentsListCmd, + }, +} + +var commitmentsListCmd = &cli.Command{ + Name: "list", + Usage: "List tracked sector commitments", + Action: func(cctx *cli.Context) error { + api, closer, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return err + } + defer closer() + + ctx := lcli.ReqContext(cctx) + + comms, err := api.CommitmentsList(ctx) + if err != nil { + return err + } + + for _, comm := range comms { + fmt.Printf("%s:%d msg:%s, deals: %v\n", comm.Miner, comm.SectorID, comm.CommitMsg, comm.DealIDs) + } + + return nil + }, +} diff --git a/cmd/lotus-storage-miner/main.go b/cmd/lotus-storage-miner/main.go index c9a904695..c08cd21e6 100644 --- a/cmd/lotus-storage-miner/main.go +++ b/cmd/lotus-storage-miner/main.go @@ -24,6 +24,7 @@ func main() { infoCmd, storeGarbageCmd, sectorsCmd, + commitmentsCmd, } jaeger := tracing.SetupJaegerTracing("lotus") defer func() { diff --git a/node/impl/storminer.go b/node/impl/storminer.go index c763dab5e..8eb363fe2 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -10,6 +10,7 @@ import ( "github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/lib/sectorbuilder" "github.com/filecoin-project/lotus/storage" + "github.com/filecoin-project/lotus/storage/commitment" "github.com/filecoin-project/lotus/storage/sector" "github.com/filecoin-project/lotus/storage/sectorblocks" @@ -23,6 +24,7 @@ type StorageMinerAPI struct { SectorBuilder *sectorbuilder.SectorBuilder Sectors *sector.Store SectorBlocks *sectorblocks.SectorBlocks + CommitmentTracker *commitment.Tracker Miner *storage.Miner } @@ -81,4 +83,8 @@ func (sm *StorageMinerAPI) SectorsRefs(context.Context) (map[string][]api.Sealed return out, nil } +func (sm *StorageMinerAPI) CommitmentsList(ctx context.Context) ([]api.SectorCommitment, error) { + return sm.CommitmentTracker.List() +} + var _ api.StorageMiner = &StorageMinerAPI{} diff --git a/storage/commitment/tracker.go b/storage/commitment/tracker.go index 10510fba0..a0fa57e80 100644 --- a/storage/commitment/tracker.go +++ b/storage/commitment/tracker.go @@ -3,6 +3,8 @@ package commitment import ( "context" "fmt" + "strconv" + "strings" "sync" "github.com/ipfs/go-cid" @@ -12,8 +14,10 @@ import ( logging "github.com/ipfs/go-log" "golang.org/x/xerrors" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/node/modules/dtypes" + dsq "github.com/ipfs/go-datastore/query" ) var log = logging.Logger("commitment") @@ -145,3 +149,56 @@ func (ct *Tracker) CheckCommitment(miner address.Address, sectorId uint64) (bool return ct.commitments.Has(key) } + +func (ct *Tracker) List() ([]api.SectorCommitment, error) { + out := make([]api.SectorCommitment, 0) + + ct.lk.Lock() + defer ct.lk.Unlock() + + res, err := ct.commitments.Query(dsq.Query{}) + if err != nil { + return nil, err + } + defer res.Close() + + for { + res, ok := res.NextSync() + if !ok { + break + } + + if res.Error != nil { + return nil, xerrors.Errorf("iterating commitments: %w", err) + } + + parts := strings.Split(res.Key, "/") + if len(parts) != 4 { + return nil, xerrors.Errorf("expected commitment key to be 4 parts, Key %s", res.Key) + } + + miner, err := address.NewFromString(parts[2]) + if err != nil { + return nil, xerrors.Errorf("parsing miner address: %w", err) + } + + sectorID, err := strconv.ParseInt(parts[3], 10, 64) + if err != nil { + return nil, xerrors.Errorf("parsing sector id: %w", err) + } + + var comm commitment + if err := cbor.DecodeInto(res.Value, &comm); err != nil { + return nil, xerrors.Errorf("decoding commitment %s (`% X`): %w", res.Key, res.Value, err) + } + + out = append(out, api.SectorCommitment{ + SectorID: uint64(sectorID), + Miner: miner, + CommitMsg: comm.Msg, + DealIDs: comm.DealIDs, + }) + } + + return out, nil +} diff --git a/storage/miner.go b/storage/miner.go index 81880f86b..7e1757e31 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -2,9 +2,9 @@ package storage import ( "context" - "github.com/filecoin-project/go-sectorbuilder/sealing_state" "sync" + "github.com/filecoin-project/go-sectorbuilder/sealing_state" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log" From 3047c9d4c2367f830b213611f7d359071376b1b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 29 Oct 2019 22:41:28 +0100 Subject: [PATCH 3/7] storageminer: Check on-chain commitments --- storage/commitment/tracker.go | 9 --------- storage/miner.go | 18 ++++++++++++------ 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/storage/commitment/tracker.go b/storage/commitment/tracker.go index a0fa57e80..4e439c57d 100644 --- a/storage/commitment/tracker.go +++ b/storage/commitment/tracker.go @@ -141,15 +141,6 @@ func (ct *Tracker) WaitCommit(ctx context.Context, miner address.Address, sector } } -func (ct *Tracker) CheckCommitment(miner address.Address, sectorId uint64) (bool, error) { - key := commitmentKey(miner, sectorId) - - ct.lk.Lock() - defer ct.lk.Unlock() - - return ct.commitments.Has(key) -} - func (ct *Tracker) List() ([]api.SectorCommitment, error) { out := make([]api.SectorCommitment, 0) diff --git a/storage/miner.go b/storage/miner.go index 7e1757e31..ef045eb93 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -54,6 +54,7 @@ type storageMinerApi interface { StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error) StateMinerWorker(context.Context, address.Address, *types.TipSet) (address.Address, error) StateMinerProvingPeriodEnd(context.Context, address.Address, *types.TipSet) (uint64, error) + StateMinerSectors(context.Context, address.Address, *types.TipSet) ([]*api.SectorInfo, error) StateMinerProvingSet(context.Context, address.Address, *types.TipSet) ([]*api.SectorInfo, error) StateMinerSectorSize(context.Context, address.Address, *types.TipSet) (uint64, error) StateWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error) @@ -100,13 +101,18 @@ func (m *Miner) commitUntrackedSectors(ctx context.Context) error { return err } - for _, s := range sealed { - has, err := m.commt.CheckCommitment(m.maddr, s.SectorID) - if err != nil { - log.Error("checking commitment: ", err) - } + chainSectors, err := m.api.StateMinerSectors(ctx, m.maddr, nil) + if err != nil { + return err + } - if has { + onchain := map[uint64]struct{}{} + for _, chainSector := range chainSectors { + onchain[chainSector.SectorID] = struct{}{} + } + + for _, s := range sealed { + if _, ok := onchain[s.SectorID]; ok { continue } From b7a78670abb7feb52efd151b82a04d8964f43e8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 29 Oct 2019 22:58:41 +0100 Subject: [PATCH 4/7] storageminer: Call beginPosting before trying to commit a sector --- storage/miner.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/storage/miner.go b/storage/miner.go index ef045eb93..4d16315f4 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -213,10 +213,6 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal return errors.Wrap(err, "pushing message to mpool") } - if err := m.commt.TrackCommitSectorMsg(m.maddr, sinfo.SectorID, smsg.Cid()); err != nil { - return errors.Wrap(err, "tracking sector commitment") - } - go func() { _, err := m.api.StateWaitMsg(ctx, smsg.Cid()) if err != nil { @@ -226,6 +222,10 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal m.beginPosting(ctx) }() + if err := m.commt.TrackCommitSectorMsg(m.maddr, sinfo.SectorID, smsg.Cid()); err != nil { + return xerrors.Errorf("tracking sector commitment: %w", err) + } + return nil } From 6c25c5384dc16ae55a083105d1050283cb076087 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 29 Oct 2019 23:04:08 +0100 Subject: [PATCH 5/7] commitment tracker: prefer new commitments --- storage/commitment/tracker.go | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/storage/commitment/tracker.go b/storage/commitment/tracker.go index 4e439c57d..90e2459bf 100644 --- a/storage/commitment/tracker.go +++ b/storage/commitment/tracker.go @@ -60,6 +60,21 @@ func (ct *Tracker) TrackCommitSectorMsg(miner address.Address, sectorId uint64, tracking, err := ct.commitments.Get(key) switch err { + case nil: + var comm commitment + if err := cbor.DecodeInto(tracking, &comm); err != nil { + return err + } + + if !comm.Msg.Equals(commitMsg) { + log.Errorf("commitment tracking for miner %s, sector %d: already tracking %s, got another commitment message: %s", miner, sectorId, comm.Msg, commitMsg) + } + + log.Warnf("commitment.TrackCommitSectorMsg called more than once for miner %s, sector %d, message %s", miner, sectorId, commitMsg) + + // we still want to store it + fallthrough // TODO: ideally we'd keep around both (even though we'll + // usually only need the new one) case datastore.ErrNotFound: comm := &commitment{Msg: commitMsg} commB, err := cbor.DumpObject(comm) @@ -77,18 +92,6 @@ func (ct *Tracker) TrackCommitSectorMsg(miner address.Address, sectorId uint64, delete(ct.waits, key) } return nil - case nil: - var comm commitment - if err := cbor.DecodeInto(tracking, &comm); err != nil { - return err - } - - if !comm.Msg.Equals(commitMsg) { - return xerrors.Errorf("commitment tracking for miner %s, sector %d: already tracking %s, got another commitment message: %s", miner, sectorId, comm.Msg, commitMsg) - } - - log.Warnf("commitment.TrackCommitSectorMsg called more than once for miner %s, sector %d, message %s", miner, sectorId, commitMsg) - return nil default: return err } From 922d8a90a5f5633a95f6419a386e025c1ffb083d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 29 Oct 2019 23:19:58 +0100 Subject: [PATCH 6/7] storageminer: Restart sealing on restart --- lib/sectorbuilder/sectorbuilder.go | 4 ++++ storage/sector/store.go | 30 ++++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index 26123e757..0f7e2ea26 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -88,6 +88,10 @@ func (sb *SectorBuilder) SealSector(sectorID uint64, ticket SealTicket) (SealedS return sectorbuilder.SealSector(sb.handle, sectorID, ticket) } +func (sb *SectorBuilder) ResumeSealSector(sectorID uint64) (SealedSectorMetadata, error) { + return sectorbuilder.ResumeSealSector(sb.handle, sectorID) +} + func (sb *SectorBuilder) SealStatus(sector uint64) (SectorSealingStatus, error) { return sectorbuilder.GetSectorSealingStatusByID(sb.handle, sector) } diff --git a/storage/sector/store.go b/storage/sector/store.go index f82505837..5692c8649 100644 --- a/storage/sector/store.go +++ b/storage/sector/store.go @@ -107,9 +107,39 @@ func (s *Store) poll() { s.waitingLk.Unlock() } +func (s *Store) restartSealing() { + sectors, err := s.sb.GetAllStagedSectors() + if err != nil { + return + } + + for _, sid := range sectors { + status, err := s.sb.SealStatus(sid) + if err != nil { + return + } + + if status.State != sealing_state.Paused { + continue + } + + log.Infof("Sector %d is in paused state, resuming sealing", sid) + go func() { + // TODO: when we refactor wait-for-seal below, care about this output too + // (see SealSector below) + _, err := s.sb.ResumeSealSector(sid) + if err != nil { + return + } + }() + } +} + func (s *Store) service() { poll := time.Tick(5 * time.Second) + s.restartSealing() + for { select { case <-poll: From 5a5c66600fff68060b1af3b413df8a0ae2fec611 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 30 Oct 2019 10:55:49 +0100 Subject: [PATCH 7/7] storageminer: More correct listing of sealed sectors --- lib/sectorbuilder/sectorbuilder.go | 4 ---- storage/miner.go | 11 +---------- storage/sector/store.go | 22 ++++++++++++++++++++-- 3 files changed, 21 insertions(+), 16 deletions(-) diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index 0f7e2ea26..a2e4499b7 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -114,10 +114,6 @@ func (sb *SectorBuilder) GetAllStagedSectors() ([]uint64, error) { return out, nil } -func (sb *SectorBuilder) GetAllSealedSectors() ([]SealedSectorMetadata, error) { - return sectorbuilder.GetAllSealedSectors(sb.handle) -} - func (sb *SectorBuilder) GeneratePoSt(sectorInfo SortedSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]byte, error) { // Wait, this is a blocking method with no way of interrupting it? // does it checkpoint itself? diff --git a/storage/miner.go b/storage/miner.go index 4d16315f4..e563edaa4 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -4,7 +4,6 @@ import ( "context" "sync" - "github.com/filecoin-project/go-sectorbuilder/sealing_state" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log" @@ -118,15 +117,7 @@ func (m *Miner) commitUntrackedSectors(ctx context.Context) error { log.Warnf("Missing commitment for sector %d, committing sector", s.SectorID) - if err := m.commitSector(ctx, sectorbuilder.SectorSealingStatus{ - SectorID: s.SectorID, - State: sealing_state.Sealed, - CommD: s.CommD, - CommR: s.CommR, - Proof: s.Proof, - Pieces: s.Pieces, - Ticket: s.Ticket, - }); err != nil { + if err := m.commitSector(ctx, s); err != nil { log.Error("Committing uncommitted sector failed: ", err) } } diff --git a/storage/sector/store.go b/storage/sector/store.go index 5692c8649..30363ee67 100644 --- a/storage/sector/store.go +++ b/storage/sector/store.go @@ -293,8 +293,26 @@ func (s *Store) WaitSeal(ctx context.Context, sector uint64) (sectorbuilder.Sect return s.sb.SealStatus(sector) } -func (s *Store) Sealed() ([]sectorbuilder.SealedSectorMetadata, error) { - return s.sb.GetAllSealedSectors() +func (s *Store) Sealed() ([]sectorbuilder.SectorSealingStatus, error) { + l, err := s.sb.GetAllStagedSectors() + if err != nil { + return nil, err + } + + out := make([]sectorbuilder.SectorSealingStatus, 0) + for _, sid := range l { + status, err := s.sb.SealStatus(sid) + if err != nil { + return nil, err + } + + if status.State != sealing_state.Sealed { + continue + } + out = append(out, status) + } + + return out, nil } func (s *Store) RunPoSt(ctx context.Context, sectors []*api.SectorInfo, r []byte, faults []uint64) ([]byte, error) {