From 33dfcf2663ef1b09b2bcde83ef9acf7746db1682 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 20 Jan 2022 14:59:57 +0100 Subject: [PATCH] stores: Fix single post proof reading with updated sectors --- extern/sector-storage/faults.go | 8 +++++++- extern/sector-storage/stores/http_handler.go | 9 ++++----- extern/sector-storage/stores/local.go | 4 ++-- extern/sector-storage/stores/remote.go | 7 ++++++- itests/kit/blockminer.go | 21 +++++++++++++++----- storage/wdpost_run.go | 3 ++- 6 files changed, 37 insertions(+), 15 deletions(-) diff --git a/extern/sector-storage/faults.go b/extern/sector-storage/faults.go index 5d6dd26f5..ced09636b 100644 --- a/extern/sector-storage/faults.go +++ b/extern/sector-storage/faults.go @@ -4,6 +4,7 @@ import ( "context" "crypto/rand" "fmt" + "time" "golang.org/x/xerrors" @@ -14,6 +15,8 @@ import ( "github.com/filecoin-project/lotus/extern/sector-storage/storiface" ) +var PostCheckTimeout = 160 * time.Second + // FaultTracker TODO: Track things more actively type FaultTracker interface { CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storage.SectorRef, rg storiface.RGetter) (map[abi.SectorID]string, error) @@ -68,7 +71,10 @@ func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, return nil } - _, err = m.storage.GenerateSingleVanillaProof(ctx, sector.ID.Miner, storiface.PostSectorChallenge{ + vctx, cancel2 := context.WithTimeout(ctx, PostCheckTimeout) + defer cancel2() + + _, err = m.storage.GenerateSingleVanillaProof(vctx, sector.ID.Miner, storiface.PostSectorChallenge{ SealProof: sector.ProofType, SectorNumber: sector.ID.Number, SealedCID: commr, diff --git a/extern/sector-storage/stores/http_handler.go b/extern/sector-storage/stores/http_handler.go index 126888295..e203dccc0 100644 --- a/extern/sector-storage/stores/http_handler.go +++ b/extern/sector-storage/stores/http_handler.go @@ -1,10 +1,12 @@ package stores import ( + "bytes" "encoding/json" "net/http" "os" "strconv" + "time" "github.com/gorilla/mux" logging "github.com/ipfs/go-log/v2" @@ -306,11 +308,8 @@ func (handler *FetchHandler) generateSingleVanillaProof(w http.ResponseWriter, r return } - w.WriteHeader(200) - _, err = w.Write(vanilla) - if err != nil { - log.Error("response writer: ", err) - } + w.Header().Set("Content-Type", "application/octet-stream") + http.ServeContent(w, r, "", time.Time{}, bytes.NewReader(vanilla)) } func ftFromString(t string) (storiface.SectorFileType, error) { diff --git a/extern/sector-storage/stores/local.go b/extern/sector-storage/stores/local.go index 5bac8ca50..4efddca38 100644 --- a/extern/sector-storage/stores/local.go +++ b/extern/sector-storage/stores/local.go @@ -722,11 +722,11 @@ func (st *Local) GenerateSingleVanillaProof(ctx context.Context, minerID abi.Act var cache string var sealed string if si.Update { - src, _, err := st.AcquireSector(ctx, sr, storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove) + src, _, err := st.AcquireSector(ctx, sr, storiface.FTUpdate|storiface.FTUpdateCache, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove) if err != nil { return nil, xerrors.Errorf("acquire sector: %w", err) } - cache, sealed = src.Update, src.UpdateCache + cache, sealed = src.UpdateCache, src.Update } else { src, _, err := st.AcquireSector(ctx, sr, storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove) if err != nil { diff --git a/extern/sector-storage/stores/remote.go b/extern/sector-storage/stores/remote.go index 6dad1d5c1..91bf24659 100644 --- a/extern/sector-storage/stores/remote.go +++ b/extern/sector-storage/stores/remote.go @@ -750,7 +750,12 @@ func (r *Remote) GenerateSingleVanillaProof(ctx context.Context, minerID abi.Act Number: sinfo.SectorNumber, } - si, err := r.index.StorageFindSector(ctx, sid, storiface.FTSealed|storiface.FTCache, 0, false) + ft := storiface.FTSealed | storiface.FTCache + if sinfo.Update { + ft = storiface.FTUpdate | storiface.FTUpdateCache + } + + si, err := r.index.StorageFindSector(ctx, sid, ft, 0, false) if err != nil { return nil, xerrors.Errorf("finding sector %d failed: %w", sid, err) } diff --git a/itests/kit/blockminer.go b/itests/kit/blockminer.go index 91ddc2e26..2dff62714 100644 --- a/itests/kit/blockminer.go +++ b/itests/kit/blockminer.go @@ -63,11 +63,10 @@ func (p *partitionTracker) done(t *testing.T) bool { return uint64(len(p.partitions)) == p.count(t) } -func (p *partitionTracker) recordIfPost(t *testing.T, bm *BlockMiner, smsg *types.SignedMessage) (ret bool) { +func (p *partitionTracker) recordIfPost(t *testing.T, bm *BlockMiner, msg *types.Message) (ret bool) { defer func() { ret = p.done(t) }() - msg := smsg.Message if !(msg.To == bm.miner.ActorAddr) { return } @@ -124,7 +123,7 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur tracker := newPartitionTracker(ctx, dlinfo.Index, bm) if !tracker.done(bm.t) { // need to wait for post - bm.t.Logf("expect %d partitions proved but only see %d", len(tracker.partitions), tracker.count(bm.t)) + bm.t.Logf("expect %d partitions proved but only see %d (dl:%d, epoch:%d)", len(tracker.partitions), tracker.count(bm.t), dlinfo.Index, ts.Height()) poolEvts, err := bm.miner.FullNode.MpoolSub(ctx) require.NoError(bm.t, err) @@ -132,7 +131,19 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur msgs, err := bm.miner.FullNode.MpoolPending(ctx, types.EmptyTSK) require.NoError(bm.t, err) for _, msg := range msgs { - tracker.recordIfPost(bm.t, bm, msg) + tracker.recordIfPost(bm.t, bm, &msg.Message) + } + + // Account for included but not yet executed messages + for _, bc := range ts.Cids() { + msgs, err := bm.miner.FullNode.ChainGetBlockMessages(ctx, bc) + require.NoError(bm.t, err) + for _, msg := range msgs.BlsMessages { + tracker.recordIfPost(bm.t, bm, msg) + } + for _, msg := range msgs.SecpkMessages { + tracker.recordIfPost(bm.t, bm, &msg.Message) + } } // post not yet in mpool, wait for it @@ -148,7 +159,7 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur bm.t.Logf("pool event: %d", evt.Type) if evt.Type == api.MpoolAdd { bm.t.Logf("incoming message %v", evt.Message) - if tracker.recordIfPost(bm.t, bm, evt.Message) { + if tracker.recordIfPost(bm.t, bm, &evt.Message.Message) { break POOL } } diff --git a/storage/wdpost_run.go b/storage/wdpost_run.go index b7901d0db..a83945f17 100644 --- a/storage/wdpost_run.go +++ b/storage/wdpost_run.go @@ -868,7 +868,7 @@ func (s *WindowPoStScheduler) submitPoStMessage(ctx context.Context, proof *mine return nil, xerrors.Errorf("pushing message to mpool: %w", err) } - log.Infof("Submitted window post: %s", sm.Cid()) + log.Infof("Submitted window post: %s (deadline %d)", sm.Cid(), proof.Deadline) go func() { rec, err := s.api.StateWaitMsg(context.TODO(), sm.Cid(), build.MessageConfidence, api.LookbackNoLimit, true) @@ -878,6 +878,7 @@ func (s *WindowPoStScheduler) submitPoStMessage(ctx context.Context, proof *mine } if rec.Receipt.ExitCode == 0 { + log.Infow("Window post submission successful", "cid", sm.Cid(), "deadline", proof.Deadline, "epoch", rec.Height, "ts", rec.TipSet.Cids()) return }