From 650a31b050b83e86a0c543ba39dfad76f94afd23 Mon Sep 17 00:00:00 2001 From: laser Date: Mon, 6 Apr 2020 11:07:26 -0700 Subject: [PATCH] remove all lotus types from sealing package in preparation for extraction events adapter implement StateWaitMsg and StateComputeDataCommitment implement StateGetSectorPreCommitOnChainInfo implement ChainHead and SendMsg implement remaining methods --- cmd/lotus-storage-miner/init.go | 15 +- markets/retrievaladapter/provider.go | 6 +- node/impl/storminer.go | 18 +- node/modules/storageminer.go | 17 +- storage/adapter_events.go | 29 +++ storage/adapter_storage_miner.go | 176 ++++++++++++++++ storage/miner.go | 8 +- storage/sealing/cbor_gen.go | 294 ++++++++++++++++----------- storage/sealing/checks.go | 74 ++----- storage/sealing/constants.go | 13 ++ storage/sealing/events.go | 15 ++ storage/sealing/fsm.go | 71 +++---- storage/sealing/fsm_events.go | 17 +- storage/sealing/fsm_test.go | 33 ++- storage/sealing/garbage.go | 2 +- storage/sealing/sealing.go | 80 ++++---- storage/sealing/sector_state.go | 74 +++++++ storage/sealing/states.go | 107 ++++------ storage/sealing/states_failed.go | 31 +-- storage/sealing/types.go | 14 +- storage/sealing/types_test.go | 23 +-- storage/sealing/utils.go | 3 +- 22 files changed, 704 insertions(+), 416 deletions(-) create mode 100644 storage/adapter_events.go create mode 100644 storage/adapter_storage_miner.go create mode 100644 storage/sealing/constants.go create mode 100644 storage/sealing/events.go create mode 100644 storage/sealing/sector_state.go diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index 688af479c..bb243111d 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -12,8 +12,6 @@ import ( "path/filepath" "strconv" - "github.com/filecoin-project/lotus/node/modules" - "github.com/docker/go-units" "github.com/google/uuid" "github.com/ipfs/go-datastore" @@ -26,6 +24,9 @@ import ( "github.com/filecoin-project/go-address" cborutil "github.com/filecoin-project/go-cbor-util" paramfetch "github.com/filecoin-project/go-paramfetch" + sectorstorage "github.com/filecoin-project/sector-storage" + "github.com/filecoin-project/sector-storage/ffiwrapper" + "github.com/filecoin-project/sector-storage/stores" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin/market" @@ -40,13 +41,11 @@ import ( lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/genesis" "github.com/filecoin-project/lotus/miner" + "github.com/filecoin-project/lotus/node/modules" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage/sealing" - sectorstorage "github.com/filecoin-project/sector-storage" - "github.com/filecoin-project/sector-storage/ffiwrapper" - "github.com/filecoin-project/sector-storage/stores" ) var initCmd = &cli.Command{ @@ -299,9 +298,11 @@ func migratePreSealMeta(ctx context.Context, api lapi.FullNode, metadata string, CommD: &commD, CommR: &commR, Proof: nil, - Ticket: lapi.SealTicket{}, + TicketValue: abi.SealRandomness{}, + TicketEpoch: 0, PreCommitMessage: nil, - Seed: lapi.SealSeed{}, + SeedValue: abi.InteractiveSealRandomness{}, + SeedEpoch: 0, CommitMessage: nil, } diff --git a/markets/retrievaladapter/provider.go b/markets/retrievaladapter/provider.go index 04e569cbd..a4133a751 100644 --- a/markets/retrievaladapter/provider.go +++ b/markets/retrievaladapter/provider.go @@ -7,14 +7,14 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/shared" + sectorstorage "github.com/filecoin-project/sector-storage" + "github.com/filecoin-project/sector-storage/ffiwrapper" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin/paych" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/storage" - "github.com/filecoin-project/sector-storage" - "github.com/filecoin-project/sector-storage/ffiwrapper" ) type retrievalProviderNode struct { @@ -54,7 +54,7 @@ func (rpn *retrievalProviderNode) UnsealSector(ctx context.Context, sectorID uin Miner: abi.ActorID(mid), Number: abi.SectorNumber(sectorID), } - return rpn.sealer.ReadPieceFromSealedSector(ctx, sid, ffiwrapper.UnpaddedByteIndex(offset), abi.UnpaddedPieceSize(length), si.Ticket.Value, *si.CommD) + return rpn.sealer.ReadPieceFromSealedSector(ctx, sid, ffiwrapper.UnpaddedByteIndex(offset), abi.UnpaddedPieceSize(length), si.TicketValue, *si.CommD) } func (rpn *retrievalProviderNode) SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *paych.SignedVoucher, proof []byte, expectedAmount abi.TokenAmount, tok shared.TipSetToken) (abi.TokenAmount, error) { diff --git a/node/impl/storminer.go b/node/impl/storminer.go index fa280071e..6ed251832 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -12,6 +12,9 @@ import ( "github.com/filecoin-project/go-address" storagemarket "github.com/filecoin-project/go-fil-markets/storagemarket" + sectorstorage "github.com/filecoin-project/sector-storage" + "github.com/filecoin-project/sector-storage/ffiwrapper" + "github.com/filecoin-project/sector-storage/stores" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/lotus/api" @@ -21,9 +24,6 @@ import ( "github.com/filecoin-project/lotus/node/impl/common" "github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage/sectorblocks" - "github.com/filecoin-project/sector-storage" - "github.com/filecoin-project/sector-storage/ffiwrapper" - "github.com/filecoin-project/sector-storage/stores" ) type StorageMinerAPI struct { @@ -97,9 +97,15 @@ func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid abi.SectorNumb CommR: info.CommR, Proof: info.Proof, Deals: deals, - Ticket: info.Ticket, - Seed: info.Seed, - Retries: info.Nonce, + Ticket: api.SealTicket{ + Value: info.TicketValue, + Epoch: info.TicketEpoch, + }, + Seed: api.SealSeed{ + Value: info.SeedValue, + Epoch: info.SeedEpoch, + }, + Retries: info.Nonce, LastErr: info.LastErr, Log: log, diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 9e9d63140..1bab252f5 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -35,6 +35,9 @@ import ( "github.com/filecoin-project/go-fil-markets/storedcounter" paramfetch "github.com/filecoin-project/go-paramfetch" "github.com/filecoin-project/go-statestore" + sectorstorage "github.com/filecoin-project/sector-storage" + "github.com/filecoin-project/sector-storage/ffiwrapper" + "github.com/filecoin-project/sector-storage/stores" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/crypto" @@ -49,9 +52,6 @@ import ( "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage/sealing" - "github.com/filecoin-project/sector-storage" - "github.com/filecoin-project/sector-storage/ffiwrapper" - "github.com/filecoin-project/sector-storage/stores" ) func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) { @@ -281,21 +281,18 @@ func SetupBlockProducer(lc fx.Lifecycle, ds dtypes.MetadataDS, api lapi.FullNode } func SealTicketGen(fapi lapi.FullNode) sealing.TicketFn { - return func(ctx context.Context) (*lapi.SealTicket, error) { + return func(ctx context.Context) (abi.SealRandomness, abi.ChainEpoch, error) { ts, err := fapi.ChainHead(ctx) if err != nil { - return nil, xerrors.Errorf("getting head ts for SealTicket failed: %w", err) + return nil, 0, xerrors.Errorf("getting head ts for SealTicket failed: %w", err) } r, err := fapi.ChainGetRandomness(ctx, ts.Key(), crypto.DomainSeparationTag_SealRandomness, ts.Height()-build.SealRandomnessLookback, nil) if err != nil { - return nil, xerrors.Errorf("getting randomness for SealTicket failed: %w", err) + return nil, 0, xerrors.Errorf("getting randomness for SealTicket failed: %w", err) } - return &lapi.SealTicket{ - Epoch: ts.Height() - build.SealRandomnessLookback, - Value: abi.SealRandomness(r), - }, nil + return abi.SealRandomness(r), ts.Height() - build.SealRandomnessLookback, nil } } diff --git a/storage/adapter_events.go b/storage/adapter_events.go new file mode 100644 index 000000000..4a173d4a4 --- /dev/null +++ b/storage/adapter_events.go @@ -0,0 +1,29 @@ +package storage + +import ( + "context" + + "github.com/filecoin-project/specs-actors/actors/abi" + + "github.com/filecoin-project/lotus/chain/events" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/storage/sealing" +) + +var _ sealing.Events = new(EventsAdapter) + +type EventsAdapter struct { + delegate events.Events +} + +func NewEventsAdapter(api events.Events) EventsAdapter { + return EventsAdapter{delegate: api} +} + +func (e EventsAdapter) ChainAt(hnd sealing.HeightHandler, rev sealing.RevertHandler, confidence int, h abi.ChainEpoch) error { + return e.delegate.ChainAt(func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error { + return hnd(ctx, ts.Key().Bytes(), curH) + }, func(ctx context.Context, ts *types.TipSet) error { + return rev(ctx, ts.Key().Bytes()) + }, confidence, h) +} diff --git a/storage/adapter_storage_miner.go b/storage/adapter_storage_miner.go new file mode 100644 index 000000000..b12bbb62c --- /dev/null +++ b/storage/adapter_storage_miner.go @@ -0,0 +1,176 @@ +package storage + +import ( + "bytes" + "context" + + "github.com/ipfs/go-cid" + cbg "github.com/whyrusleeping/cbor-gen" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/abi/big" + "github.com/filecoin-project/specs-actors/actors/builtin" + "github.com/filecoin-project/specs-actors/actors/builtin/market" + "github.com/filecoin-project/specs-actors/actors/builtin/miner" + "github.com/filecoin-project/specs-actors/actors/crypto" + "github.com/filecoin-project/specs-actors/actors/util/adt" + + "github.com/filecoin-project/lotus/api/apibstore" + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/storage/sealing" +) + +var _ sealing.SealingAPI = new(SealingAPIAdapter) + +type SealingAPIAdapter struct { + delegate storageMinerApi +} + +func NewSealingAPIAdapter(api storageMinerApi) SealingAPIAdapter { + return SealingAPIAdapter{delegate: api} +} + +func (s SealingAPIAdapter) StateWaitMsg(ctx context.Context, mcid cid.Cid) (sealing.MsgLookup, error) { + wmsg, err := s.delegate.StateWaitMsg(ctx, mcid) + if err != nil { + return sealing.MsgLookup{}, err + } + + return sealing.MsgLookup{ + Receipt: sealing.MessageReceipt{ + ExitCode: wmsg.Receipt.ExitCode, + Return: wmsg.Receipt.Return, + GasUsed: wmsg.Receipt.GasUsed, + }, + TipSetTok: wmsg.TipSet.Key().Bytes(), + Height: wmsg.TipSet.Height(), + }, nil +} + +func (s SealingAPIAdapter) StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredProof, deals []abi.DealID, tok sealing.TipSetToken) (cid.Cid, error) { + tsk, err := types.TipSetKeyFromBytes(tok) + if err != nil { + return cid.Undef, xerrors.Errorf("failed to unmarshal TipSetToken to TipSetKey: %w", err) + } + + ccparams, err := actors.SerializeParams(&market.ComputeDataCommitmentParams{ + DealIDs: deals, + SectorType: sectorType, + }) + if err != nil { + return cid.Undef, xerrors.Errorf("computing params for ComputeDataCommitment: %w", err) + } + + ccmt := &types.Message{ + To: builtin.StorageMarketActorAddr, + From: maddr, + Value: types.NewInt(0), + GasPrice: types.NewInt(0), + GasLimit: 9999999999, + Method: builtin.MethodsMarket.ComputeDataCommitment, + Params: ccparams, + } + r, err := s.delegate.StateCall(ctx, ccmt, tsk) + if err != nil { + return cid.Undef, xerrors.Errorf("calling ComputeDataCommitment: %w", err) + } + if r.MsgRct.ExitCode != 0 { + return cid.Undef, xerrors.Errorf("receipt for ComputeDataCommitment had exit code %d", r.MsgRct.ExitCode) + } + + var c cbg.CborCid + if err := c.UnmarshalCBOR(bytes.NewReader(r.MsgRct.Return)); err != nil { + return cid.Undef, xerrors.Errorf("failed to unmarshal CBOR to CborCid: %w", err) + } + + return cid.Cid(c), nil +} + +func (s SealingAPIAdapter) StateGetSectorPreCommitOnChainInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok sealing.TipSetToken) (*miner.SectorPreCommitOnChainInfo, error) { + tsk, err := types.TipSetKeyFromBytes(tok) + if err != nil { + return nil, xerrors.Errorf("failed to unmarshal TipSetToken to TipSetKey: %w", err) + } + + act, err := s.delegate.StateGetActor(ctx, maddr, tsk) + if err != nil { + return nil, xerrors.Errorf("handleSealFailed(%d): temp error: %+v", sectorNumber, err) + } + + st, err := s.delegate.ChainReadObj(ctx, act.Head) + if err != nil { + return nil, xerrors.Errorf("handleSealFailed(%d): temp error: %+v", sectorNumber, err) + } + + var state miner.State + if err := state.UnmarshalCBOR(bytes.NewReader(st)); err != nil { + return nil, xerrors.Errorf("handleSealFailed(%d): temp error: unmarshaling miner state: %+v", sectorNumber, err) + } + + var pci miner.SectorPreCommitOnChainInfo + precommits := adt.AsMap(store.ActorStore(ctx, apibstore.NewAPIBlockstore(s.delegate)), state.PreCommittedSectors) + if _, err := precommits.Get(adt.UIntKey(uint64(sectorNumber)), &pci); err != nil { + return nil, err + } + + return &pci, nil +} + +func (s SealingAPIAdapter) StateMarketStorageDeal(ctx context.Context, dealID abi.DealID, tok sealing.TipSetToken) (market.DealProposal, market.DealState, error) { + tsk, err := types.TipSetKeyFromBytes(tok) + if err != nil { + return market.DealProposal{}, market.DealState{}, err + } + + deal, err := s.delegate.StateMarketStorageDeal(ctx, dealID, tsk) + if err != nil { + return market.DealProposal{}, market.DealState{}, err + } + + return deal.Proposal, deal.State, nil +} + +func (s SealingAPIAdapter) SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, gasPrice big.Int, gasLimit int64, params []byte) (cid.Cid, error) { + msg := types.Message{ + To: to, + From: from, + Value: value, + GasPrice: gasPrice, + GasLimit: gasLimit, + Method: method, + Params: params, + } + + smsg, err := s.delegate.MpoolPushMessage(ctx, &msg) + if err != nil { + return cid.Undef, err + } + + return smsg.Cid(), nil +} + +func (s SealingAPIAdapter) ChainHead(ctx context.Context) (sealing.TipSetToken, abi.ChainEpoch, error) { + head, err := s.delegate.ChainHead(ctx) + if err != nil { + return nil, 0, err + } + + return head.Key().Bytes(), head.Height(), nil +} + +func (s SealingAPIAdapter) ChainGetRandomness(ctx context.Context, tok sealing.TipSetToken, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte) (abi.Randomness, error) { + tsk, err := types.TipSetKeyFromBytes(tok) + if err != nil { + return nil, err + } + + return s.delegate.ChainGetRandomness(ctx, tsk, personalization, randEpoch, entropy) +} + +func (s SealingAPIAdapter) ChainReadObj(ctx context.Context, ocid cid.Cid) ([]byte, error) { + return s.delegate.ChainReadObj(ctx, ocid) +} diff --git a/storage/miner.go b/storage/miner.go index 97780f356..dcf0c9124 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -12,13 +12,13 @@ import ( "github.com/libp2p/go-libp2p-core/host" "golang.org/x/xerrors" - "github.com/filecoin-project/sector-storage" + "github.com/filecoin-project/go-address" + sectorstorage "github.com/filecoin-project/sector-storage" "github.com/filecoin-project/sector-storage/ffiwrapper" - "github.com/filecoin-project/specs-storage/storage" - "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/crypto" + "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" @@ -100,7 +100,7 @@ func (m *Miner) Run(ctx context.Context) error { } evts := events.NewEvents(ctx, m.api) - m.sealing = sealing.New(m.api, evts, m.maddr, m.worker, m.ds, m.sealer, m.sc, m.verif, m.tktFn) + m.sealing = sealing.New(NewSealingAPIAdapter(m.api), NewEventsAdapter(*evts), m.maddr, m.worker, m.ds, m.sealer, m.sc, m.verif, m.tktFn) go m.sealing.Run(ctx) diff --git a/storage/sealing/cbor_gen.go b/storage/sealing/cbor_gen.go index ab12aa155..d97cebd74 100644 --- a/storage/sealing/cbor_gen.go +++ b/storage/sealing/cbor_gen.go @@ -6,7 +6,6 @@ import ( "fmt" "io" - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/specs-actors/actors/abi" cbg "github.com/whyrusleeping/cbor-gen" xerrors "golang.org/x/xerrors" @@ -182,7 +181,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { return err } - // t.State (api.SectorState) (string) + // t.State (uint64) (uint64) if len("State") > cbg.MaxLength { return xerrors.Errorf("Value in field \"State\" was too long") } @@ -194,14 +193,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { return err } - if len(t.State) > cbg.MaxLength { - return xerrors.Errorf("Value in field t.State was too long") - } - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.State)))); err != nil { - return err - } - if _, err := w.Write([]byte(t.State)); err != nil { + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.State))); err != nil { return err } @@ -284,45 +276,6 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { } } - // t.Ticket (api.SealTicket) (struct) - if len("Ticket") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"Ticket\" was too long") - } - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("Ticket")))); err != nil { - return err - } - if _, err := w.Write([]byte("Ticket")); err != nil { - return err - } - - if err := t.Ticket.MarshalCBOR(w); err != nil { - return err - } - - // t.PreCommit1Out (storage.PreCommit1Out) (slice) - if len("PreCommit1Out") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"PreCommit1Out\" was too long") - } - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("PreCommit1Out")))); err != nil { - return err - } - if _, err := w.Write([]byte("PreCommit1Out")); err != nil { - return err - } - - if len(t.PreCommit1Out) > cbg.ByteArrayMaxLen { - return xerrors.Errorf("Byte array in field t.PreCommit1Out was too long") - } - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.PreCommit1Out)))); err != nil { - return err - } - if _, err := w.Write(t.PreCommit1Out); err != nil { - return err - } - // t.CommD (cid.Cid) (struct) if len("CommD") > cbg.MaxLength { return xerrors.Errorf("Value in field \"CommD\" was too long") @@ -390,6 +343,51 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { return err } + // t.TicketValue (abi.SealRandomness) (slice) + if len("TicketValue") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"TicketValue\" was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("TicketValue")))); err != nil { + return err + } + if _, err := w.Write([]byte("TicketValue")); err != nil { + return err + } + + if len(t.TicketValue) > cbg.ByteArrayMaxLen { + return xerrors.Errorf("Byte array in field t.TicketValue was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.TicketValue)))); err != nil { + return err + } + if _, err := w.Write(t.TicketValue); err != nil { + return err + } + + // t.TicketEpoch (abi.ChainEpoch) (int64) + if len("TicketEpoch") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"TicketEpoch\" was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("TicketEpoch")))); err != nil { + return err + } + if _, err := w.Write([]byte("TicketEpoch")); err != nil { + return err + } + + if t.TicketEpoch >= 0 { + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.TicketEpoch))); err != nil { + return err + } + } else { + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajNegativeInt, uint64(-t.TicketEpoch)-1)); err != nil { + return err + } + } + // t.PreCommitMessage (cid.Cid) (struct) if len("PreCommitMessage") > cbg.MaxLength { return xerrors.Errorf("Value in field \"PreCommitMessage\" was too long") @@ -412,21 +410,50 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { } } - // t.Seed (api.SealSeed) (struct) - if len("Seed") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"Seed\" was too long") + // t.SeedValue (abi.InteractiveSealRandomness) (slice) + if len("SeedValue") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"SeedValue\" was too long") } - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("Seed")))); err != nil { + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("SeedValue")))); err != nil { return err } - if _, err := w.Write([]byte("Seed")); err != nil { + if _, err := w.Write([]byte("SeedValue")); err != nil { return err } - if err := t.Seed.MarshalCBOR(w); err != nil { + if len(t.SeedValue) > cbg.ByteArrayMaxLen { + return xerrors.Errorf("Byte array in field t.SeedValue was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.SeedValue)))); err != nil { return err } + if _, err := w.Write(t.SeedValue); err != nil { + return err + } + + // t.SeedEpoch (abi.ChainEpoch) (int64) + if len("SeedEpoch") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"SeedEpoch\" was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("SeedEpoch")))); err != nil { + return err + } + if _, err := w.Write([]byte("SeedEpoch")); err != nil { + return err + } + + if t.SeedEpoch >= 0 { + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.SeedEpoch))); err != nil { + return err + } + } else { + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajNegativeInt, uint64(-t.SeedEpoch)-1)); err != nil { + return err + } + } // t.CommitMessage (cid.Cid) (struct) if len("CommitMessage") > cbg.MaxLength { @@ -450,22 +477,6 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { } } - // t.InvalidProofs (uint64) (uint64) - if len("InvalidProofs") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"InvalidProofs\" was too long") - } - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("InvalidProofs")))); err != nil { - return err - } - if _, err := w.Write([]byte("InvalidProofs")); err != nil { - return err - } - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.InvalidProofs))); err != nil { - return err - } - // t.FaultReportMsg (cid.Cid) (struct) if len("FaultReportMsg") > cbg.MaxLength { return xerrors.Errorf("Value in field \"FaultReportMsg\" was too long") @@ -568,16 +579,20 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error { } switch name { - // t.State (api.SectorState) (string) + // t.State (uint64) (uint64) case "State": { - sval, err := cbg.ReadString(br) + + maj, extra, err = cbg.CborReadHeader(br) if err != nil { return err } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.State = uint64(extra) - t.State = api.SectorState(sval) } // t.SectorID (abi.SectorNumber) (uint64) case "SectorID": @@ -663,34 +678,6 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error { t.Pieces[i] = v } - // t.Ticket (api.SealTicket) (struct) - case "Ticket": - - { - - if err := t.Ticket.UnmarshalCBOR(br); err != nil { - return xerrors.Errorf("unmarshaling t.Ticket: %w", err) - } - - } - // t.PreCommit1Out (storage.PreCommit1Out) (slice) - case "PreCommit1Out": - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - - if extra > cbg.ByteArrayMaxLen { - return fmt.Errorf("t.PreCommit1Out: byte array too large (%d)", extra) - } - if maj != cbg.MajByteString { - return fmt.Errorf("expected byte array") - } - t.PreCommit1Out = make([]byte, extra) - if _, err := io.ReadFull(br, t.PreCommit1Out); err != nil { - return err - } // t.CommD (cid.Cid) (struct) case "CommD": @@ -759,6 +746,50 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error { if _, err := io.ReadFull(br, t.Proof); err != nil { return err } + // t.TicketValue (abi.SealRandomness) (slice) + case "TicketValue": + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + + if extra > cbg.ByteArrayMaxLen { + return fmt.Errorf("t.TicketValue: byte array too large (%d)", extra) + } + if maj != cbg.MajByteString { + return fmt.Errorf("expected byte array") + } + t.TicketValue = make([]byte, extra) + if _, err := io.ReadFull(br, t.TicketValue); err != nil { + return err + } + // t.TicketEpoch (abi.ChainEpoch) (int64) + case "TicketEpoch": + { + maj, extra, err := cbg.CborReadHeader(br) + var extraI int64 + if err != nil { + return err + } + switch maj { + case cbg.MajUnsignedInt: + extraI = int64(extra) + if extraI < 0 { + return fmt.Errorf("int64 positive overflow") + } + case cbg.MajNegativeInt: + extraI = int64(extra) + if extraI < 0 { + return fmt.Errorf("int64 negative oveflow") + } + extraI = -1 - extraI + default: + return fmt.Errorf("wrong type for int64 field: %d", maj) + } + + t.TicketEpoch = abi.ChainEpoch(extraI) + } // t.PreCommitMessage (cid.Cid) (struct) case "PreCommitMessage": @@ -784,15 +815,49 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error { } } - // t.Seed (api.SealSeed) (struct) - case "Seed": + // t.SeedValue (abi.InteractiveSealRandomness) (slice) + case "SeedValue": + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + + if extra > cbg.ByteArrayMaxLen { + return fmt.Errorf("t.SeedValue: byte array too large (%d)", extra) + } + if maj != cbg.MajByteString { + return fmt.Errorf("expected byte array") + } + t.SeedValue = make([]byte, extra) + if _, err := io.ReadFull(br, t.SeedValue); err != nil { + return err + } + // t.SeedEpoch (abi.ChainEpoch) (int64) + case "SeedEpoch": { - - if err := t.Seed.UnmarshalCBOR(br); err != nil { - return xerrors.Errorf("unmarshaling t.Seed: %w", err) + maj, extra, err := cbg.CborReadHeader(br) + var extraI int64 + if err != nil { + return err + } + switch maj { + case cbg.MajUnsignedInt: + extraI = int64(extra) + if extraI < 0 { + return fmt.Errorf("int64 positive overflow") + } + case cbg.MajNegativeInt: + extraI = int64(extra) + if extraI < 0 { + return fmt.Errorf("int64 negative oveflow") + } + extraI = -1 - extraI + default: + return fmt.Errorf("wrong type for int64 field: %d", maj) } + t.SeedEpoch = abi.ChainEpoch(extraI) } // t.CommitMessage (cid.Cid) (struct) case "CommitMessage": @@ -818,21 +883,6 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error { t.CommitMessage = &c } - } - // t.InvalidProofs (uint64) (uint64) - case "InvalidProofs": - - { - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.InvalidProofs = uint64(extra) - } // t.FaultReportMsg (cid.Cid) (struct) case "FaultReportMsg": diff --git a/storage/sealing/checks.go b/storage/sealing/checks.go index cc0619e43..2721c8333 100644 --- a/storage/sealing/checks.go +++ b/storage/sealing/checks.go @@ -1,25 +1,18 @@ package sealing import ( - "bytes" "context" - "github.com/ipfs/go-cid" - cbg "github.com/whyrusleeping/cbor-gen" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/sector-storage/ffiwrapper" + "github.com/filecoin-project/sector-storage/zerocomm" "github.com/filecoin-project/specs-actors/actors/abi" - "github.com/filecoin-project/specs-actors/actors/builtin" - "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/crypto" - - "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/actors" - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/sector-storage/zerocomm" + log "github.com/mgutz/logxi/v1" ) // TODO: For now we handle this by halting state execution, when we get jsonrpc reconnecting @@ -41,8 +34,8 @@ type ErrInvalidProof struct{ error } // - Piece commitments match with on chain deals // - Piece sizes match // - Deals aren't expired -func checkPieces(ctx context.Context, si SectorInfo, api sealingApi) error { - head, err := api.ChainHead(ctx) +func checkPieces(ctx context.Context, si SectorInfo, api SealingAPI) error { + tok, height, err := api.ChainHead(ctx) if err != nil { return &ErrApi{xerrors.Errorf("getting chain head: %w", err)} } @@ -55,21 +48,21 @@ func checkPieces(ctx context.Context, si SectorInfo, api sealingApi) error { } continue } - deal, err := api.StateMarketStorageDeal(ctx, *piece.DealID, types.EmptyTSK) + proposal, _, err := api.StateMarketStorageDeal(ctx, *piece.DealID, tok) if err != nil { return &ErrApi{xerrors.Errorf("getting deal %d for piece %d: %w", piece.DealID, i, err)} } - if deal.Proposal.PieceCID != piece.CommP { - return &ErrInvalidDeals{xerrors.Errorf("piece %d (or %d) of sector %d refers deal %d with wrong CommP: %x != %x", i, len(si.Pieces), si.SectorID, piece.DealID, piece.CommP, deal.Proposal.PieceCID)} + if proposal.PieceCID != piece.CommP { + return &ErrInvalidDeals{xerrors.Errorf("piece %d (or %d) of sector %d refers deal %d with wrong CommP: %x != %x", i, len(si.Pieces), si.SectorID, piece.DealID, piece.CommP, proposal.PieceCID)} } - if piece.Size != deal.Proposal.PieceSize.Unpadded() { - return &ErrInvalidDeals{xerrors.Errorf("piece %d (or %d) of sector %d refers deal %d with different size: %d != %d", i, len(si.Pieces), si.SectorID, piece.DealID, piece.Size, deal.Proposal.PieceSize)} + if piece.Size != proposal.PieceSize.Unpadded() { + return &ErrInvalidDeals{xerrors.Errorf("piece %d (or %d) of sector %d refers deal %d with different size: %d != %d", i, len(si.Pieces), si.SectorID, piece.DealID, piece.Size, proposal.PieceSize)} } - if head.Height() >= deal.Proposal.StartEpoch { - return &ErrExpiredDeals{xerrors.Errorf("piece %d (or %d) of sector %d refers expired deal %d - should start at %d, head %d", i, len(si.Pieces), si.SectorID, piece.DealID, deal.Proposal.StartEpoch, head.Height())} + if height >= proposal.StartEpoch { + return &ErrExpiredDeals{xerrors.Errorf("piece %d (or %d) of sector %d refers expired deal %d - should start at %d, head %d", i, len(si.Pieces), si.SectorID, piece.DealID, proposal.StartEpoch, height)} } } @@ -78,55 +71,30 @@ func checkPieces(ctx context.Context, si SectorInfo, api sealingApi) error { // checkPrecommit checks that data commitment generated in the sealing process // matches pieces, and that the seal ticket isn't expired -func checkPrecommit(ctx context.Context, maddr address.Address, si SectorInfo, api sealingApi) (err error) { - head, err := api.ChainHead(ctx) +func checkPrecommit(ctx context.Context, maddr address.Address, si SectorInfo, api SealingAPI) (err error) { + tok, height, err := api.ChainHead(ctx) if err != nil { return &ErrApi{xerrors.Errorf("getting chain head: %w", err)} } - ccparams, err := actors.SerializeParams(&market.ComputeDataCommitmentParams{ - DealIDs: si.deals(), - SectorType: si.SectorType, - }) + commD, err := api.StateComputeDataCommitment(ctx, maddr, si.SectorType, si.deals(), tok) if err != nil { - return xerrors.Errorf("computing params for ComputeDataCommitment: %w", err) + return &ErrApi{xerrors.Errorf("calling StateComputeDataCommitment: %w", err)} } - ccmt := &types.Message{ - To: builtin.StorageMarketActorAddr, - From: maddr, - Value: types.NewInt(0), - GasPrice: types.NewInt(0), - GasLimit: 9999999999, - Method: builtin.MethodsMarket.ComputeDataCommitment, - Params: ccparams, - } - r, err := api.StateCall(ctx, ccmt, types.EmptyTSK) - if err != nil { - return &ErrApi{xerrors.Errorf("calling ComputeDataCommitment: %w", err)} - } - if r.MsgRct.ExitCode != 0 { - return &ErrBadCommD{xerrors.Errorf("receipt for ComputeDataCommitment had exit code %d", r.MsgRct.ExitCode)} + if !commD.Equals(*si.CommD) { + return &ErrBadCommD{xerrors.Errorf("on chain CommD differs from sector: %s != %s", commD, si.CommD)} } - var c cbg.CborCid - if err := c.UnmarshalCBOR(bytes.NewReader(r.MsgRct.Return)); err != nil { - return err - } - - if cid.Cid(c) != *si.CommD { - return &ErrBadCommD{xerrors.Errorf("on chain CommD differs from sector: %s != %s", cid.Cid(c), si.CommD)} - } - - if int64(head.Height())-int64(si.Ticket.Epoch+build.SealRandomnessLookback) > build.SealRandomnessLookbackLimit { - return &ErrExpiredTicket{xerrors.Errorf("ticket expired: seal height: %d, head: %d", si.Ticket.Epoch+build.SealRandomnessLookback, head.Height())} + if int64(height)-int64(si.TicketEpoch+SealRandomnessLookback) > SealRandomnessLookbackLimit { + return &ErrExpiredTicket{xerrors.Errorf("ticket expired: seal height: %d, head: %d", si.TicketEpoch+SealRandomnessLookback, height)} } return nil } func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte) (err error) { - head, err := m.api.ChainHead(ctx) + tok, height, err := m.api.ChainHead(ctx) if err != nil { return &ErrApi{xerrors.Errorf("getting chain head: %w", err)} } diff --git a/storage/sealing/constants.go b/storage/sealing/constants.go new file mode 100644 index 000000000..b898443e6 --- /dev/null +++ b/storage/sealing/constants.go @@ -0,0 +1,13 @@ +package sealing + +// Epochs +const Finality = 500 + +// Epochs +const SealRandomnessLookback = Finality + +// Epochs +const SealRandomnessLookbackLimit = SealRandomnessLookback + 2000 + +// Epochs +const InteractivePoRepConfidence = 6 diff --git a/storage/sealing/events.go b/storage/sealing/events.go new file mode 100644 index 000000000..ba6d2a860 --- /dev/null +++ b/storage/sealing/events.go @@ -0,0 +1,15 @@ +package sealing + +import ( + "context" + + "github.com/filecoin-project/specs-actors/actors/abi" +) + +// `curH`-`ts.Height` = `confidence` +type HeightHandler func(ctx context.Context, tok TipSetToken, curH abi.ChainEpoch) error +type RevertHandler func(ctx context.Context, tok TipSetToken) error + +type Events interface { + ChainAt(hnd HeightHandler, rev RevertHandler, confidence int, h abi.ChainEpoch) error +} diff --git a/storage/sealing/fsm.go b/storage/sealing/fsm.go index 2d1691678..88be53eff 100644 --- a/storage/sealing/fsm.go +++ b/storage/sealing/fsm.go @@ -1,6 +1,7 @@ package sealing import ( + "bytes" "context" "encoding/json" "fmt" @@ -9,10 +10,10 @@ import ( "golang.org/x/xerrors" - "github.com/filecoin-project/go-statemachine" - "github.com/filecoin-project/specs-actors/actors/abi" - + statemachine "github.com/filecoin-project/go-statemachine" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/prometheus/common/log" ) func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, uint64, error) { @@ -54,19 +55,19 @@ var fsmPlanners = map[api.SectorState]func(events []statemachine.Event, state *S on(SectorSeedReady{}, api.Committing), on(SectorChainPreCommitFailed{}, api.PreCommitFailed), ), - api.Committing: planCommitting, - api.CommitWait: planOne( - on(SectorProving{}, api.FinalizeSector), - on(SectorCommitFailed{}, api.CommitFailed), + Committing: planCommitting, + CommitWait: planOne( + on(SectorProving{}, FinalizeSector), + on(SectorCommitFailed{}, CommitFailed), ), - api.FinalizeSector: planOne( - on(SectorFinalized{}, api.Proving), + FinalizeSector: planOne( + on(SectorFinalized{}, Proving), ), - api.Proving: planOne( - on(SectorFaultReported{}, api.FaultReported), - on(SectorFaulty{}, api.Faulty), + Proving: planOne( + on(SectorFaultReported{}, FaultReported), + on(SectorFaulty{}, Faulty), ), api.SealFailed: planOne( @@ -87,10 +88,10 @@ var fsmPlanners = map[api.SectorState]func(events []statemachine.Event, state *S on(SectorRetryInvalidProof{}, api.Committing), ), - api.Faulty: planOne( - on(SectorFaultReported{}, api.FaultReported), + Faulty: planOne( + on(SectorFaultReported{}, FaultReported), ), - api.FaultedFinal: final, + FaultedFinal: final, } func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(statemachine.Context, SectorInfo) error, error) { @@ -170,7 +171,7 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta switch state.State { // Happy path - case api.Packing: + case Packing: return m.handlePacking, nil case api.PreCommit1: return m.handlePreCommit1, nil @@ -178,22 +179,22 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta return m.handlePreCommit2, nil case api.PreCommitting: return m.handlePreCommitting, nil - case api.WaitSeed: + case WaitSeed: return m.handleWaitSeed, nil - case api.Committing: + case Committing: return m.handleCommitting, nil - case api.CommitWait: + case CommitWait: return m.handleCommitWait, nil - case api.FinalizeSector: + case FinalizeSector: return m.handleFinalizeSector, nil - case api.Proving: + case Proving: // TODO: track sector health / expiration log.Infof("Proving sector %d", state.SectorID) // Handled failure modes - case api.SealFailed: + case SealFailed: return m.handleSealFailed, nil - case api.PreCommitFailed: + case PreCommitFailed: return m.handlePreCommitFailed, nil case api.ComputeProofFailed: return m.handleComputeProofFailed, nil @@ -201,15 +202,15 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta return m.handleCommitFailed, nil // Faults - case api.Faulty: + case Faulty: return m.handleFaulty, nil - case api.FaultReported: + case FaultReported: return m.handleFaultReported, nil // Fatal errors - case api.UndefinedSectorState: + case UndefinedSectorState: log.Error("sector update with undefined state!") - case api.FailedUnrecoverable: + case FailedUnrecoverable: log.Errorf("sector %d failed unrecoverably", state.SectorID) default: log.Errorf("unexpected sector update state: %d", state.State) @@ -227,22 +228,22 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) error { } case SectorCommitted: // the normal case e.apply(state) - state.State = api.CommitWait + state.State = CommitWait case SectorSeedReady: // seed changed :/ - if e.Seed.Equals(&state.Seed) { + if e.SeedEpoch == state.SeedEpoch && bytes.Equal(e.SeedValue, state.SeedValue) { log.Warnf("planCommitting: got SectorSeedReady, but the seed didn't change") continue // or it didn't! } log.Warnf("planCommitting: commit Seed changed") e.apply(state) - state.State = api.Committing + state.State = Committing return nil case SectorComputeProofFailed: state.State = api.ComputeProofFailed case SectorSealPreCommitFailed: state.State = api.CommitFailed case SectorCommitFailed: - state.State = api.CommitFailed + state.State = CommitFailed default: return xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events) } @@ -267,7 +268,7 @@ func (m *Sealing) restartSectors(ctx context.Context) error { return nil } -func (m *Sealing) ForceSectorState(ctx context.Context, id abi.SectorNumber, state api.SectorState) error { +func (m *Sealing) ForceSectorState(ctx context.Context, id abi.SectorNumber, state SectorState) error { return m.sectors.Send(id, SectorForceState{state}) } @@ -275,13 +276,13 @@ func final(events []statemachine.Event, state *SectorInfo) error { return xerrors.Errorf("didn't expect any events in state %s, got %+v", state.State, events) } -func on(mut mutator, next api.SectorState) func() (mutator, api.SectorState) { - return func() (mutator, api.SectorState) { +func on(mut mutator, next SectorState) func() (mutator, SectorState) { + return func() (mutator, SectorState) { return mut, next } } -func planOne(ts ...func() (mut mutator, next api.SectorState)) func(events []statemachine.Event, state *SectorInfo) error { +func planOne(ts ...func() (mut mutator, next SectorState)) func(events []statemachine.Event, state *SectorInfo) error { return func(events []statemachine.Event, state *SectorInfo) error { if len(events) != 1 { for _, event := range events { diff --git a/storage/sealing/fsm_events.go b/storage/sealing/fsm_events.go index 83d0a5c24..40388ad70 100644 --- a/storage/sealing/fsm_events.go +++ b/storage/sealing/fsm_events.go @@ -4,9 +4,10 @@ import ( "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-storage/storage" "github.com/ipfs/go-cid" + "github.com/prometheus/common/log" "golang.org/x/xerrors" - "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/specs-actors/actors/abi" ) type mutator interface { @@ -39,7 +40,7 @@ func (evt SectorFatalError) applyGlobal(state *SectorInfo) bool { } type SectorForceState struct { - State api.SectorState + State SectorState } func (evt SectorForceState) applyGlobal(state *SectorInfo) bool { @@ -73,12 +74,14 @@ func (evt SectorPackingFailed) apply(*SectorInfo) {} type SectorPreCommit1 struct { PreCommit1Out storage.PreCommit1Out - Ticket api.SealTicket + TicketValue abi.SealRandomness + TicketEpoch abi.ChainEpoch } func (evt SectorPreCommit1) apply(state *SectorInfo) { state.PreCommit1Out = evt.PreCommit1Out - state.Ticket = evt.Ticket + state.TicketEpoch = evt.TicketEpoch + state.TicketValue = evt.TicketValue } type SectorPreCommit2 struct { @@ -114,11 +117,13 @@ func (evt SectorPreCommitted) apply(state *SectorInfo) { } type SectorSeedReady struct { - Seed api.SealSeed + SeedValue abi.InteractiveSealRandomness + SeedEpoch abi.ChainEpoch } func (evt SectorSeedReady) apply(state *SectorInfo) { - state.Seed = evt.Seed + state.SeedEpoch = evt.SeedEpoch + state.SeedValue = evt.SeedValue } type SectorComputeProofFailed struct{ error } diff --git a/storage/sealing/fsm_test.go b/storage/sealing/fsm_test.go index 5014f46e9..69c2a99fb 100644 --- a/storage/sealing/fsm_test.go +++ b/storage/sealing/fsm_test.go @@ -7,7 +7,6 @@ import ( "github.com/stretchr/testify/require" "github.com/filecoin-project/go-statemachine" - "github.com/filecoin-project/lotus/api" ) @@ -32,7 +31,7 @@ func TestHappyPath(t *testing.T) { m := test{ s: &Sealing{}, t: t, - state: &SectorInfo{State: api.Packing}, + state: &SectorInfo{State: Packing}, } m.planSingle(SectorPacked{}) @@ -45,26 +44,26 @@ func TestHappyPath(t *testing.T) { require.Equal(m.t, m.state.State, api.PreCommitting) m.planSingle(SectorPreCommitted{}) - require.Equal(m.t, m.state.State, api.WaitSeed) + require.Equal(m.t, m.state.State, WaitSeed) m.planSingle(SectorSeedReady{}) - require.Equal(m.t, m.state.State, api.Committing) + require.Equal(m.t, m.state.State, Committing) m.planSingle(SectorCommitted{}) - require.Equal(m.t, m.state.State, api.CommitWait) + require.Equal(m.t, m.state.State, CommitWait) m.planSingle(SectorProving{}) - require.Equal(m.t, m.state.State, api.FinalizeSector) + require.Equal(m.t, m.state.State, FinalizeSector) m.planSingle(SectorFinalized{}) - require.Equal(m.t, m.state.State, api.Proving) + require.Equal(m.t, m.state.State, Proving) } func TestSeedRevert(t *testing.T) { m := test{ s: &Sealing{}, t: t, - state: &SectorInfo{State: api.Packing}, + state: &SectorInfo{State: Packing}, } m.planSingle(SectorPacked{}) @@ -77,31 +76,31 @@ func TestSeedRevert(t *testing.T) { require.Equal(m.t, m.state.State, api.PreCommitting) m.planSingle(SectorPreCommitted{}) - require.Equal(m.t, m.state.State, api.WaitSeed) + require.Equal(m.t, m.state.State, WaitSeed) m.planSingle(SectorSeedReady{}) - require.Equal(m.t, m.state.State, api.Committing) + require.Equal(m.t, m.state.State, Committing) - _, err := m.s.plan([]statemachine.Event{{SectorSeedReady{Seed: api.SealSeed{Epoch: 5}}}, {SectorCommitted{}}}, m.state) + _, err := m.s.plan([]statemachine.Event{{SectorSeedReady{Seed: SealSeed{Epoch: 5}}}, {SectorCommitted{}}}, m.state) require.NoError(t, err) - require.Equal(m.t, m.state.State, api.Committing) + require.Equal(m.t, m.state.State, Committing) // not changing the seed this time - _, err = m.s.plan([]statemachine.Event{{SectorSeedReady{Seed: api.SealSeed{Epoch: 5}}}, {SectorCommitted{}}}, m.state) - require.Equal(m.t, m.state.State, api.CommitWait) + _, err = m.s.plan([]statemachine.Event{{SectorSeedReady{Seed: SealSeed{Epoch: 5}}}, {SectorCommitted{}}}, m.state) + require.Equal(m.t, m.state.State, CommitWait) m.planSingle(SectorProving{}) - require.Equal(m.t, m.state.State, api.FinalizeSector) + require.Equal(m.t, m.state.State, FinalizeSector) m.planSingle(SectorFinalized{}) - require.Equal(m.t, m.state.State, api.Proving) + require.Equal(m.t, m.state.State, Proving) } func TestPlanCommittingHandlesSectorCommitFailed(t *testing.T) { m := test{ s: &Sealing{}, t: t, - state: &SectorInfo{State: api.Committing}, + state: &SectorInfo{State: Committing}, } events := []statemachine.Event{{SectorCommitFailed{}}} diff --git a/storage/sealing/garbage.go b/storage/sealing/garbage.go index fcc3505c7..7f367d448 100644 --- a/storage/sealing/garbage.go +++ b/storage/sealing/garbage.go @@ -2,11 +2,11 @@ package sealing import ( "context" - "github.com/filecoin-project/sector-storage/ffiwrapper" "io" "golang.org/x/xerrors" + "github.com/filecoin-project/sector-storage/ffiwrapper" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/lotus/lib/nullreader" diff --git a/storage/sealing/sealing.go b/storage/sealing/sealing.go index b6fe1edb1..a843650b4 100644 --- a/storage/sealing/sealing.go +++ b/storage/sealing/sealing.go @@ -1,72 +1,76 @@ package sealing import ( + "bytes" "context" - "github.com/filecoin-project/sector-storage/ffiwrapper" "io" - "github.com/filecoin-project/go-address" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" logging "github.com/ipfs/go-log/v2" "golang.org/x/xerrors" - "github.com/filecoin-project/go-padreader" - "github.com/filecoin-project/go-statemachine" + "github.com/filecoin-project/go-address" + padreader "github.com/filecoin-project/go-padreader" + statemachine "github.com/filecoin-project/go-statemachine" + sectorstorage "github.com/filecoin-project/sector-storage" + "github.com/filecoin-project/sector-storage/ffiwrapper" "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/abi/big" + "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/crypto" - - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/chain/events" - "github.com/filecoin-project/lotus/chain/store" - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/sector-storage" + "github.com/filecoin-project/specs-actors/actors/runtime/exitcode" ) const SectorStorePrefix = "/sectors" var log = logging.Logger("sectors") -type TicketFn func(context.Context) (*api.SealTicket, error) +type TicketFn func(context.Context) (abi.SealRandomness, abi.ChainEpoch, error) type SectorIDCounter interface { Next() (abi.SectorNumber, error) } -type sealingApi interface { // TODO: trim down - // Call a read only method on actors (no interaction with the chain required) - StateCall(context.Context, *types.Message, types.TipSetKey) (*api.InvocResult, error) - StateMinerWorker(context.Context, address.Address, types.TipSetKey) (address.Address, error) - StateMinerPostState(ctx context.Context, actor address.Address, ts types.TipSetKey) (*miner.PoStState, error) - StateMinerSectors(context.Context, address.Address, types.TipSetKey) ([]*api.ChainSectorInfo, error) - StateMinerProvingSet(context.Context, address.Address, types.TipSetKey) ([]*api.ChainSectorInfo, error) - StateMinerSectorSize(context.Context, address.Address, types.TipSetKey) (abi.SectorSize, error) - StateSectorPreCommitInfo(context.Context, address.Address, abi.SectorNumber, types.TipSetKey) (miner.SectorPreCommitOnChainInfo, error) - StateWaitMsg(context.Context, cid.Cid) (*api.MsgLookup, error) // TODO: removeme eventually - StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error) - StateGetReceipt(context.Context, cid.Cid, types.TipSetKey) (*types.MessageReceipt, error) - StateMarketStorageDeal(context.Context, abi.DealID, types.TipSetKey) (*api.MarketDeal, error) +type TipSetToken []byte - MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error) +type MsgLookup struct { + Receipt MessageReceipt + TipSetTok TipSetToken + Height abi.ChainEpoch +} - ChainHead(context.Context) (*types.TipSet, error) - ChainNotify(context.Context) (<-chan []*store.HeadChange, error) - ChainGetRandomness(ctx context.Context, tsk types.TipSetKey, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte) (abi.Randomness, error) - ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) - ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error) +type MessageReceipt struct { + ExitCode exitcode.ExitCode + Return []byte + GasUsed int64 +} + +func (mr *MessageReceipt) Equals(o *MessageReceipt) bool { + return mr.ExitCode == o.ExitCode && bytes.Equal(mr.Return, o.Return) && mr.GasUsed == o.GasUsed +} + +type MarketDeal struct { + Proposal market.DealProposal + State market.DealState +} + +type SealingAPI interface { + StateWaitMsg(context.Context, cid.Cid) (MsgLookup, error) + StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredProof, deals []abi.DealID, tok TipSetToken) (cid.Cid, error) + StateGetSectorPreCommitOnChainInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorPreCommitOnChainInfo, error) + StateMarketStorageDeal(context.Context, abi.DealID, TipSetToken) (market.DealProposal, market.DealState, error) + SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, gasPrice big.Int, gasLimit int64, params []byte) (cid.Cid, error) + ChainHead(ctx context.Context) (TipSetToken, abi.ChainEpoch, error) + ChainGetRandomness(ctx context.Context, tok TipSetToken, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte) (abi.Randomness, error) ChainReadObj(context.Context, cid.Cid) ([]byte, error) - ChainHasObj(context.Context, cid.Cid) (bool, error) - - WalletSign(context.Context, address.Address, []byte) (*crypto.Signature, error) - WalletBalance(context.Context, address.Address) (types.BigInt, error) - WalletHas(context.Context, address.Address) (bool, error) } type Sealing struct { - api sealingApi - events *events.Events + api SealingAPI + events Events maddr address.Address worker address.Address @@ -78,7 +82,7 @@ type Sealing struct { tktFn TicketFn } -func New(api sealingApi, events *events.Events, maddr address.Address, worker address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, tktFn TicketFn) *Sealing { +func New(api SealingAPI, events Events, maddr address.Address, worker address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, tktFn TicketFn) *Sealing { s := &Sealing{ api: api, events: events, diff --git a/storage/sealing/sector_state.go b/storage/sealing/sector_state.go new file mode 100644 index 000000000..5477edff8 --- /dev/null +++ b/storage/sealing/sector_state.go @@ -0,0 +1,74 @@ +package sealing + +// alias because cbor-gen doesn't like non-alias types +type SectorState = uint64 + +const ( + UndefinedSectorState SectorState = iota + + // happy path + Empty + Packing // sector not in sealStore, and not on chain + + Unsealed // sealing / queued + PreCommitting // on chain pre-commit + WaitSeed // waiting for seed + Committing + CommitWait // waiting for message to land on chain + FinalizeSector + Proving + _ // reserved + _ + _ + + // recovery handling + // Reseal + _ + _ + _ + _ + _ + _ + _ + + // error modes + FailedUnrecoverable + + SealFailed + PreCommitFailed + SealCommitFailed + CommitFailed + PackingFailed + _ + _ + _ + + Faulty // sector is corrupted or gone for some reason + FaultReported // sector has been declared as a fault on chain + FaultedFinal // fault declared on chain +) + +var SectorStates = []string{ + UndefinedSectorState: "UndefinedSectorState", + Empty: "Empty", + Packing: "Packing", + Unsealed: "Unsealed", + PreCommitting: "PreCommitting", + WaitSeed: "WaitSeed", + Committing: "Committing", + CommitWait: "CommitWait", + FinalizeSector: "FinalizeSector", + Proving: "Proving", + + SealFailed: "SealFailed", + PreCommitFailed: "PreCommitFailed", + SealCommitFailed: "SealCommitFailed", + CommitFailed: "CommitFailed", + PackingFailed: "PackingFailed", + + FailedUnrecoverable: "FailedUnrecoverable", + + Faulty: "Faulty", + FaultReported: "FaultReported", + FaultedFinal: "FaultedFinal", +} diff --git a/storage/sealing/states.go b/storage/sealing/states.go index 072fc4ed8..0bd4f767d 100644 --- a/storage/sealing/states.go +++ b/storage/sealing/states.go @@ -1,21 +1,19 @@ package sealing import ( + "bytes" "context" - "github.com/filecoin-project/specs-storage/storage" - "github.com/filecoin-project/specs-actors/actors/crypto" - - "github.com/filecoin-project/go-statemachine" - "github.com/filecoin-project/specs-actors/actors/abi" - "github.com/filecoin-project/specs-actors/actors/builtin" - "github.com/filecoin-project/specs-actors/actors/builtin/miner" "golang.org/x/xerrors" - "github.com/filecoin-project/lotus/api" + statemachine "github.com/filecoin-project/go-statemachine" "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/actors" - "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/abi/big" + "github.com/filecoin-project/specs-actors/actors/builtin" + "github.com/filecoin-project/specs-actors/actors/builtin/miner" + "github.com/filecoin-project/specs-actors/actors/crypto" + "github.com/filecoin-project/specs-storage/storage" ) func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) error { @@ -65,19 +63,20 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) } log.Infow("performing sector replication...", "sector", sector.SectorID) - ticket, err := m.tktFn(ctx.Context()) + ticketValue, ticketEpoch, err := m.tktFn(ctx.Context()) if err != nil { return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("getting ticket failed: %w", err)}) } - pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), m.minerSector(sector.SectorID), ticket.Value, sector.pieceInfos()) + pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), m.minerSector(sector.SectorID), ticketValue, sector.pieceInfos()) if err != nil { return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("seal pre commit(1) failed: %w", err)}) } return ctx.Send(SectorPreCommit1{ PreCommit1Out: pc1o, - Ticket: *ticket, + TicketValue: ticketValue, + TicketEpoch: ticketEpoch, }) } @@ -94,7 +93,7 @@ func (m *Sealing) handlePreCommit2(ctx statemachine.Context, sector SectorInfo) } func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error { - if err := checkPrecommit(ctx.Context(), m.maddr, sector, m.api); err != nil { + if err := checkPrecommit(ctx.Context(), m.Address(), sector, m.api); err != nil { switch err.(type) { case *ErrApi: log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err) @@ -114,31 +113,22 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf RegisteredProof: sector.SectorType, SealedCID: *sector.CommR, - SealRandEpoch: sector.Ticket.Epoch, + SealRandEpoch: sector.TicketEpoch, DealIDs: sector.deals(), } - enc, aerr := actors.SerializeParams(params) - if aerr != nil { - return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)}) - } - msg := &types.Message{ - To: m.maddr, - From: m.worker, - Method: builtin.MethodsMiner.PreCommitSector, - Params: enc, - Value: types.NewInt(0), // TODO: need to ensure sufficient collateral - GasLimit: 1000000, /* i dont know help */ - GasPrice: types.NewInt(1), + enc := new(bytes.Buffer) + if err := params.MarshalCBOR(enc); err != nil { + return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("could not serialize pre-commit sector parameters: %w", err)}) } log.Info("submitting precommit for sector: ", sector.SectorID) - smsg, err := m.api.MpoolPushMessage(ctx.Context(), msg) + mcid, err := m.api.SendMsg(ctx.Context(), m.worker, m.maddr, builtin.MethodsMiner.PreCommitSector, big.NewInt(0), big.NewInt(1), 1000000, enc.Bytes()) if err != nil { return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)}) } - return ctx.Send(SectorPreCommitted{Message: smsg.Cid()}) + return ctx.Send(SectorPreCommitted{Message: mcid}) } func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) error { @@ -162,10 +152,9 @@ func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) er } randHeight := pci.PreCommitEpoch + miner.PreCommitChallengeDelay - log.Infof("precommit for sector %d made it on chain, will start proof computation at height %d", sector.SectorID, randHeight) - err = m.events.ChainAt(func(ectx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error { - rand, err := m.api.ChainGetRandomness(ectx, ts.Key(), crypto.DomainSeparationTag_InteractiveSealChallengeSeed, randHeight, nil) + err = m.events.ChainAt(func(ectx context.Context, tok TipSetToken, curH abi.ChainEpoch) error { + rand, err := m.api.ChainGetRandomness(ectx, tok, crypto.DomainSeparationTag_InteractiveSealChallengeSeed, randHeight, nil) if err != nil { err = xerrors.Errorf("failed to get randomness for computing seal proof: %w", err) @@ -173,13 +162,10 @@ func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) er return err } - ctx.Send(SectorSeedReady{Seed: api.SealSeed{ - Epoch: randHeight, - Value: abi.InteractiveSealRandomness(rand), - }}) + ctx.Send(SectorSeedReady{SeedValue: abi.InteractiveSealRandomness(rand), SeedEpoch: randHeight}) return nil - }, func(ctx context.Context, ts *types.TipSet) error { + }, func(ctx context.Context, ts TipSetToken) error { log.Warn("revert in interactive commit sector step") // TODO: need to cancel running process and restart... return nil @@ -194,13 +180,13 @@ func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) er func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) error { log.Info("scheduling seal proof computation...") - log.Infof("KOMIT %d %x(%d); %x(%d); %v; r:%x; d:%x", sector.SectorID, sector.Ticket.Value, sector.Ticket.Epoch, sector.Seed.Value, sector.Seed.Epoch, sector.pieceInfos(), sector.CommR, sector.CommD) + log.Infof("KOMIT %d %x(%d); %x(%d); %v; r:%x; d:%x", sector.SectorID, sector.TicketValue, sector.TicketEpoch, sector.SeedValue, sector.SeedEpoch, sector.pieceInfos(), sector.CommR, sector.CommD) cids := storage.SectorCids{ Unsealed: *sector.CommD, Sealed: *sector.CommR, } - c2in, err := m.sealer.SealCommit1(ctx.Context(), m.minerSector(sector.SectorID), sector.Ticket.Value, sector.Seed.Value, sector.pieceInfos(), cids) + c2in, err := m.sealer.SealCommit1(ctx.Context(), m.minerSector(sector.SectorID), sector.TicketValue, sector.SeedValue, sector.pieceInfos(), cids) if err != nil { return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed: %w", err)}) } @@ -221,31 +207,20 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) Proof: proof, } - enc, aerr := actors.SerializeParams(params) - if aerr != nil { - return ctx.Send(SectorCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)}) - } - - msg := &types.Message{ - To: m.maddr, - From: m.worker, - Method: builtin.MethodsMiner.ProveCommitSector, - Params: enc, - Value: types.NewInt(0), // TODO: need to ensure sufficient collateral - GasLimit: 1000000, /* i dont know help */ - GasPrice: types.NewInt(1), + enc := new(bytes.Buffer) + if err := params.MarshalCBOR(enc); err != nil { + return ctx.Send(SectorCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", err)}) } // TODO: check seed / ticket are up to date - - smsg, err := m.api.MpoolPushMessage(ctx.Context(), msg) + mcid, err := m.api.SendMsg(ctx.Context(), m.worker, m.maddr, builtin.MethodsMiner.ProveCommitSector, big.NewInt(0), big.NewInt(1), 1000000, enc.Bytes()) if err != nil { return ctx.Send(SectorCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)}) } return ctx.Send(SectorCommitted{ Proof: proof, - Message: smsg.Cid(), + Message: mcid, }) } @@ -261,7 +236,7 @@ func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) } if mw.Receipt.ExitCode != 0 { - return ctx.Send(SectorCommitFailed{xerrors.Errorf("submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, sector.CommitMessage, sector.Ticket.Value, sector.Seed.Value, sector.Seed.Epoch, sector.Proof)}) + return ctx.Send(SectorCommitFailed{xerrors.Errorf("submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, sector.CommitMessage, sector.TicketValue, sector.SeedValue, sector.SeedEpoch, sector.Proof)}) } return ctx.Send(SectorProving{}) @@ -284,30 +259,22 @@ func (m *Sealing) handleFaulty(ctx statemachine.Context, sector SectorInfo) erro bf := abi.NewBitField() bf.Set(uint64(sector.SectorID)) - enc, aerr := actors.SerializeParams(&miner.DeclareTemporaryFaultsParams{ + params := &miner.DeclareTemporaryFaultsParams{ SectorNumbers: bf, Duration: 99999999, // TODO: This is very unlikely to be the correct number - }) - if aerr != nil { - return xerrors.Errorf("failed to serialize declare fault params: %w", aerr) } - msg := &types.Message{ - To: m.maddr, - From: m.worker, - Method: builtin.MethodsMiner.DeclareTemporaryFaults, - Params: enc, - Value: types.NewInt(0), // TODO: need to ensure sufficient collateral - GasLimit: 1000000, /* i dont know help */ - GasPrice: types.NewInt(1), + enc := new(bytes.Buffer) + if err := params.MarshalCBOR(enc); err != nil { + return ctx.Send(SectorCommitFailed{xerrors.Errorf("failed to serialize declare fault params: %w", err)}) } - smsg, err := m.api.MpoolPushMessage(ctx.Context(), msg) + mcid, err := m.api.SendMsg(ctx.Context(), m.worker, m.maddr, builtin.MethodsMiner.DeclareTemporaryFaults, big.NewInt(0), big.NewInt(1), 1000000, enc.Bytes()) if err != nil { return xerrors.Errorf("failed to push declare faults message to network: %w", err) } - return ctx.Send(SectorFaultReported{reportMsg: smsg.Cid()}) + return ctx.Send(SectorFaultReported{reportMsg: mcid}) } func (m *Sealing) handleFaultReported(ctx statemachine.Context, sector SectorInfo) error { diff --git a/storage/sealing/states_failed.go b/storage/sealing/states_failed.go index c7b563923..4879b52fd 100644 --- a/storage/sealing/states_failed.go +++ b/storage/sealing/states_failed.go @@ -1,17 +1,12 @@ package sealing import ( - "bytes" "time" + "golang.org/x/xerrors" + "github.com/filecoin-project/go-statemachine" "github.com/filecoin-project/specs-actors/actors/builtin/miner" - "github.com/filecoin-project/specs-actors/actors/util/adt" - "golang.org/x/xerrors" - - "github.com/filecoin-project/lotus/api/apibstore" - "github.com/filecoin-project/lotus/chain/store" - "github.com/filecoin-project/lotus/chain/types" ) const minRetryTime = 1 * time.Minute @@ -33,36 +28,22 @@ func failedCooldown(ctx statemachine.Context, sector SectorInfo) error { } func (m *Sealing) checkPreCommitted(ctx statemachine.Context, sector SectorInfo) (*miner.SectorPreCommitOnChainInfo, bool) { - act, err := m.api.StateGetActor(ctx.Context(), m.maddr, types.EmptyTSK) + tok, _, err := m.api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handleSealFailed(%d): temp error: %+v", sector.SectorID, err) return nil, true } - st, err := m.api.ChainReadObj(ctx.Context(), act.Head) + info, err := m.api.StateGetSectorPreCommitOnChainInfo(ctx.Context(), m.maddr, sector.SectorID, tok) if err != nil { log.Errorf("handleSealFailed(%d): temp error: %+v", sector.SectorID, err) return nil, true } - var state miner.State - if err := state.UnmarshalCBOR(bytes.NewReader(st)); err != nil { - log.Errorf("handleSealFailed(%d): temp error: unmarshaling miner state: %+v", sector.SectorID, err) - return nil, true - } - - var pci miner.SectorPreCommitOnChainInfo - precommits := adt.AsMap(store.ActorStore(ctx.Context(), apibstore.NewAPIBlockstore(m.api)), state.PreCommittedSectors) - if _, err := precommits.Get(adt.UIntKey(uint64(sector.SectorID)), &pci); err != nil { - log.Error(err) - return nil, true - } - - return &pci, false + return info, false } func (m *Sealing) handleSealFailed(ctx statemachine.Context, sector SectorInfo) error { - if _, is := m.checkPreCommitted(ctx, sector); is { // TODO: Remove this after we can re-precommit return nil // noop, for now @@ -76,7 +57,7 @@ func (m *Sealing) handleSealFailed(ctx statemachine.Context, sector SectorInfo) } func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorInfo) error { - if err := checkPrecommit(ctx.Context(), m.maddr, sector, m.api); err != nil { + if err := checkPrecommit(ctx.Context(), m.Address(), sector, m.api); err != nil { switch err.(type) { case *ErrApi: log.Errorf("handlePreCommitFailed: api error, not proceeding: %+v", err) diff --git a/storage/sealing/types.go b/storage/sealing/types.go index 294802ef0..4b2a6396a 100644 --- a/storage/sealing/types.go +++ b/storage/sealing/types.go @@ -5,7 +5,7 @@ import ( "github.com/filecoin-project/specs-storage/storage" "github.com/ipfs/go-cid" - "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/specs-actors/actors/abi" ) type Piece struct { @@ -26,9 +26,9 @@ type Log struct { } type SectorInfo struct { - State api.SectorState - SectorID abi.SectorNumber - Nonce uint64 // TODO: remove + State SectorState + SectorID abi.SectorNumber // TODO: this field's name should be changed to SectorNumber + Nonce uint64 // TODO: remove SectorType abi.RegisteredProof @@ -37,7 +37,8 @@ type SectorInfo struct { Pieces []Piece // PreCommit1 - Ticket api.SealTicket + TicketValue abi.SealRandomness + TicketEpoch abi.ChainEpoch PreCommit1Out storage.PreCommit1Out // PreCommit2 @@ -48,7 +49,8 @@ type SectorInfo struct { PreCommitMessage *cid.Cid // WaitSeed - Seed api.SealSeed + SeedValue abi.InteractiveSealRandomness + SeedEpoch abi.ChainEpoch // Committing CommitMessage *cid.Cid diff --git a/storage/sealing/types_test.go b/storage/sealing/types_test.go index 93bffc20c..7270c49a7 100644 --- a/storage/sealing/types_test.go +++ b/storage/sealing/types_test.go @@ -4,12 +4,11 @@ import ( "bytes" "testing" - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/specs-actors/actors/abi" - "github.com/filecoin-project/specs-actors/actors/builtin" "gotest.tools/assert" cborutil "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/builtin" ) func TestSectorInfoSelialization(t *testing.T) { @@ -26,15 +25,14 @@ func TestSectorInfoSelialization(t *testing.T) { Size: 5, CommP: dummyCid, }}, - CommD: &dummyCid, - CommR: nil, - Proof: nil, - Ticket: api.SealTicket{ - Epoch: 345, - Value: []byte{87, 78, 7, 87}, - }, + CommD: &dummyCid, + CommR: nil, + Proof: nil, + TicketValue: []byte{87, 78, 7, 87}, + TicketEpoch: 345, PreCommitMessage: nil, - Seed: api.SealSeed{}, + SeedValue: []byte{}, + SeedEpoch: 0, CommitMessage: nil, FaultReportMsg: nil, LastErr: "hi", @@ -56,7 +54,8 @@ func TestSectorInfoSelialization(t *testing.T) { assert.Equal(t, si.Pieces, si2.Pieces) assert.Equal(t, si.CommD, si2.CommD) - assert.Equal(t, si.Ticket, si2.Ticket) + assert.Equal(t, si.TicketValue, si2.TicketValue) + assert.Equal(t, si.TicketEpoch, si2.TicketEpoch) assert.Equal(t, si, si2) diff --git a/storage/sealing/utils.go b/storage/sealing/utils.go index cfc17734c..b507907fb 100644 --- a/storage/sealing/utils.go +++ b/storage/sealing/utils.go @@ -1,8 +1,9 @@ package sealing import ( - "github.com/filecoin-project/specs-actors/actors/abi" "math/bits" + + "github.com/filecoin-project/specs-actors/actors/abi" ) func fillersFromRem(in abi.UnpaddedPieceSize) ([]abi.UnpaddedPieceSize, error) {