more refactoring for interactive porep scheduling

This commit is contained in:
whyrusleeping 2019-10-31 09:55:35 -07:00
parent 759094198c
commit 4e478330a4
10 changed files with 191 additions and 300 deletions

View File

@ -313,8 +313,6 @@ func (sma StorageMinerActor) ProveCommitSector(act *types.Actor, vmctx types.VMC
// TODO: ensure normalization to ID address
maddr := vmctx.Message().To
var pieces []sectorbuilder.PublicPieceInfo // TODO: GET ME FROM DEALS IN STORAGEMARKET
ticket, err := vmctx.GetRandomness(us.TicketEpoch)
if err != nil {
return nil, aerrors.Wrap(err, "failed to get ticket randomness")
@ -325,7 +323,20 @@ func (sma StorageMinerActor) ProveCommitSector(act *types.Actor, vmctx types.VMC
return nil, aerrors.Wrap(err, "failed to get randomness for prove sector commitment")
}
if ok, err := ValidatePoRep(maddr, mi.SectorSize, us.CommD, us.CommR, ticket, params.Proof, seed, params.SectorID, pieces); err != nil {
enc, err := SerializeParams(&ComputeDataCommitmentParams{
DealIDs: params.DealIDs,
SectorSize: mi.SectorSize,
})
if err != nil {
return nil, aerrors.Wrap(err, "failed to serialize ComputeDataCommitmentParams")
}
commD, err := vmctx.Send(StorageMarketAddress, SMAMethods.ComputeDataCommitment, types.NewInt(0), enc)
if err != nil {
return nil, aerrors.Wrap(err, "failed to compute data commitment")
}
if ok, err := ValidatePoRep(maddr, mi.SectorSize, commD, us.CommR, ticket, params.Proof, seed, params.SectorID); err != nil {
return nil, err
} else if !ok {
return nil, aerrors.New(2, "bad proof!")
@ -599,8 +610,8 @@ func GetFromSectorSet(ctx context.Context, s types.Storage, ss cid.Cid, sectorID
return true, comms[0], comms[1], nil
}
func ValidatePoRep(maddr address.Address, ssize uint64, commD, commR, ticket, proof, seed []byte, sectorID uint64, pieces []sectorbuilder.PublicPieceInfo) (bool, ActorError) {
ok, err := sectorbuilder.VerifySeal(ssize, commR, commD, maddr, ticket, seed, sectorID, proof, pieces)
func ValidatePoRep(maddr address.Address, ssize uint64, commD, commR, ticket, proof, seed []byte, sectorID uint64) (bool, ActorError) {
ok, err := sectorbuilder.VerifySeal(ssize, commR, commD, maddr, ticket, seed, sectorID, proof)
if err != nil {
return false, aerrors.Absorb(err, 25, "verify seal failed")
}

View File

@ -587,7 +587,7 @@ func transferFunds(from, to, amt types.BigInt) (types.BigInt, types.BigInt) {
return types.BigSub(from, amt), types.BigAdd(to, amt)
}
var ComputeDataCommitmentParams struct {
type ComputeDataCommitmentParams struct {
DealIDs []uint64
SectorSize uint64
}

View File

@ -542,7 +542,7 @@ func (t *SectorPreCommitInfo) MarshalCBOR(w io.Writer) error {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{133}); err != nil {
if _, err := w.Write([]byte{132}); err != nil {
return err
}
@ -567,14 +567,6 @@ func (t *SectorPreCommitInfo) MarshalCBOR(w io.Writer) error {
return err
}
// t.t.Proof ([]uint8)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.Proof)))); err != nil {
return err
}
if _, err := w.Write(t.Proof); err != nil {
return err
}
// t.t.SectorNumber (uint64)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.SectorNumber)); err != nil {
return err
@ -593,7 +585,7 @@ func (t *SectorPreCommitInfo) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("cbor input should be of type array")
}
if extra != 5 {
if extra != 4 {
return fmt.Errorf("cbor input had wrong number of fields")
}
@ -641,23 +633,6 @@ func (t *SectorPreCommitInfo) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("wrong type for uint64 field")
}
t.Epoch = extra
// t.t.Proof ([]uint8)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > 8192 {
return fmt.Errorf("t.Proof: array too large (%d)", extra)
}
if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array")
}
t.Proof = make([]byte, extra)
if _, err := io.ReadFull(br, t.Proof); err != nil {
return err
}
// t.t.SectorNumber (uint64)
maj, extra, err = cbg.CborReadHeader(br)
@ -676,7 +651,7 @@ func (t *UnprovenSector) MarshalCBOR(w io.Writer) error {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{131}); err != nil {
if _, err := w.Write([]byte{132}); err != nil {
return err
}
@ -700,6 +675,11 @@ func (t *UnprovenSector) MarshalCBOR(w io.Writer) error {
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.SubmitHeight)); err != nil {
return err
}
// t.t.TicketEpoch (uint64)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.TicketEpoch)); err != nil {
return err
}
return nil
}
@ -714,7 +694,7 @@ func (t *UnprovenSector) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("cbor input should be of type array")
}
if extra != 3 {
if extra != 4 {
return fmt.Errorf("cbor input had wrong number of fields")
}
@ -762,6 +742,16 @@ func (t *UnprovenSector) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("wrong type for uint64 field")
}
t.SubmitHeight = extra
// t.t.TicketEpoch (uint64)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.TicketEpoch = extra
return nil
}
@ -3697,3 +3687,87 @@ func (t *OnChainDeal) UnmarshalCBOR(r io.Reader) error {
t.ActivationEpoch = extra
return nil
}
func (t *ComputeDataCommitmentParams) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{130}); err != nil {
return err
}
// t.t.DealIDs ([]uint64)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.DealIDs)))); err != nil {
return err
}
for _, v := range t.DealIDs {
if err := cbg.CborWriteHeader(w, cbg.MajUnsignedInt, v); err != nil {
return err
}
}
// t.t.SectorSize (uint64)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.SectorSize)); err != nil {
return err
}
return nil
}
func (t *ComputeDataCommitmentParams) UnmarshalCBOR(r io.Reader) error {
br := cbg.GetPeeker(r)
maj, extra, err := cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajArray {
return fmt.Errorf("cbor input should be of type array")
}
if extra != 2 {
return fmt.Errorf("cbor input had wrong number of fields")
}
// t.t.DealIDs ([]uint64)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > 8192 {
return fmt.Errorf("t.DealIDs: array too large (%d)", extra)
}
if maj != cbg.MajArray {
return fmt.Errorf("expected cbor array")
}
if extra > 0 {
t.DealIDs = make([]uint64, extra)
}
for i := 0; i < int(extra); i++ {
maj, val, err := cbg.CborReadHeader(br)
if err != nil {
return xerrors.Errorf("failed to read uint64 for t.DealIDs slice: %w", err)
}
if maj != cbg.MajUnsignedInt {
return xerrors.Errorf("value read for array t.DealIDs was not a uint, instead got %d", maj)
}
t.DealIDs[i] = val
}
// t.t.SectorSize (uint64)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.SectorSize = extra
return nil
}

View File

@ -17,7 +17,7 @@ import (
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/cborrpc"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/storage/sectorblocks"
"github.com/filecoin-project/lotus/storage"
)
type MinerDeal struct {
@ -41,7 +41,7 @@ type Provider struct {
ask *types.SignedStorageAsk
askLk sync.Mutex
secst *sectorblocks.SectorBlocks
sminer *storage.Miner
full api.FullNode
// TODO: Use a custom protocol or graphsync in the future
@ -68,7 +68,7 @@ type minerDealUpdate struct {
mut func(*MinerDeal)
}
func NewProvider(ds dtypes.MetadataDS, secst *sectorblocks.SectorBlocks, dag dtypes.StagingDAG, fullNode api.FullNode) (*Provider, error) {
func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, dag dtypes.StagingDAG, fullNode api.FullNode) (*Provider, error) {
addr, err := ds.Get(datastore.NewKey("miner-address"))
if err != nil {
return nil, err
@ -79,7 +79,7 @@ func NewProvider(ds dtypes.MetadataDS, secst *sectorblocks.SectorBlocks, dag dty
}
h := &Provider{
secst: secst,
sminer: sminer,
dag: dag,
full: fullNode,

View File

@ -4,7 +4,6 @@ import (
"bytes"
"context"
"github.com/filecoin-project/go-sectorbuilder/sealing_state"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-merkledag"
unixfile "github.com/ipfs/go-unixfs/file"
@ -229,39 +228,48 @@ func (p *Provider) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
if err != nil {
return nil, err
}
_ = pcid
sectorID, err := p.secst.AddUnixfsPiece(pcid, uf, deal.DealID)
if err != nil {
return nil, xerrors.Errorf("AddPiece failed: %s", err)
}
/*
sectorID, err := p.sminer.AddUnixfsPiece(pcid, uf, deal.DealID)
if err != nil {
return nil, xerrors.Errorf("AddPiece failed: %s", err)
}
log.Warnf("New Sector: %d", sectorID)
log.Warnf("New Sector: %d", sectorID)
return func(deal *MinerDeal) {
deal.SectorID = sectorID
}, nil
return func(deal *MinerDeal) {
deal.SectorID = sectorID
}, nil
*/
panic("fixme")
}
// SEALING
func (p *Provider) waitSealed(ctx context.Context, deal MinerDeal) (sectorbuilder.SectorSealingStatus, error) {
status, err := p.secst.WaitSeal(ctx, deal.SectorID)
if err != nil {
return sectorbuilder.SectorSealingStatus{}, err
}
panic("fixme")
switch status.State {
case sealing_state.Sealed:
case sealing_state.Failed:
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sealing sector %d for deal %s (ref=%s) failed: %s", deal.SectorID, deal.ProposalCid, deal.Ref, status.SealErrorMsg)
case sealing_state.Pending:
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sector status was 'pending' after call to WaitSeal (for sector %d)", deal.SectorID)
case sealing_state.Sealing:
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sector status was 'wait' after call to WaitSeal (for sector %d)", deal.SectorID)
default:
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("unknown SealStatusCode: %d", status.SectorID)
}
/*
status, err := p.sminer.WaitSeal(ctx, deal.SectorID)
if err != nil {
return sectorbuilder.SectorSealingStatus{}, err
}
switch status.State {
case sealing_state.Sealed:
case sealing_state.Failed:
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sealing sector %d for deal %s (ref=%s) failed: %s", deal.SectorID, deal.ProposalCid, deal.Ref, status.SealErrorMsg)
case sealing_state.Pending:
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sector status was 'pending' after call to WaitSeal (for sector %d)", deal.SectorID)
case sealing_state.Sealing:
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sector status was 'wait' after call to WaitSeal (for sector %d)", deal.SectorID)
default:
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("unknown SealStatusCode: %d", status.SectorID)
}
return status, nil
*/
return status, nil
}
func (p *Provider) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
@ -273,7 +281,7 @@ func (p *Provider) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal
log.Warnf("Sending deal response failed: %s", err)
}
if err := p.secst.SealSector(ctx, deal.SectorID); err != nil {
if err := p.sminer.SealSector(ctx, deal.SectorID); err != nil {
return nil, xerrors.Errorf("sealing sector failed: %w", err)
}

View File

@ -5,7 +5,7 @@ import (
"io"
"math"
"github.com/ipfs/go-cid"
cid "github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
xerrors "golang.org/x/xerrors"
)

View File

@ -134,7 +134,7 @@ func (sb *SectorBuilder) GeneratePoSt(sectorInfo SortedSectorInfo, challengeSeed
var UserBytesForSectorSize = sectorbuilder.GetMaxUserBytesPerStagedSector
func VerifySeal(sectorSize uint64, commR, commD []byte, proverID address.Address, ticket []byte, seed []byte, sectorID uint64, proof []byte, pieces []PublicPieceInfo) (bool, error) {
func VerifySeal(sectorSize uint64, commR, commD []byte, proverID address.Address, ticket []byte, seed []byte, sectorID uint64, proof []byte) (bool, error) {
var commRa, commDa, ticketa, seeda [32]byte
copy(commRa[:], commR)
copy(commDa[:], commD)
@ -142,7 +142,7 @@ func VerifySeal(sectorSize uint64, commR, commD []byte, proverID address.Address
copy(seeda[:], seed)
proverIDa := addressToProverID(proverID)
return sectorbuilder.VerifySeal(sectorSize, commRa, commDa, proverIDa, ticketa, seeda, sectorID, proof, pieces)
return sectorbuilder.VerifySeal(sectorSize, commRa, commDa, proverIDa, ticketa, seeda, sectorID, proof, nil)
}
func NewSortedSectorInfo(sectors []SectorInfo) SortedSectorInfo {

View File

@ -1,19 +1,15 @@
package sectorbuilder_test
import (
"context"
"io"
"io/ioutil"
"math/rand"
"path/filepath"
"testing"
"github.com/ipfs/go-datastore"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
"github.com/filecoin-project/lotus/storage/sector"
)
const sectorSize = 1024
@ -43,7 +39,7 @@ func TestSealAndVerify(t *testing.T) {
sb, err := sectorbuilder.New(&sectorbuilder.SectorBuilderConfig{
SectorSize: sectorSize,
CacheDir:cache,
CacheDir: cache,
SealedDir: sealed,
StagedDir: staging,
MetadataDir: metadata,
@ -53,31 +49,35 @@ func TestSealAndVerify(t *testing.T) {
t.Fatal(err)
}
// TODO: Consider fixing
store := sector.NewStore(sb, datastore.NewMapDatastore(), func(ctx context.Context) (*sectorbuilder.SealTicket, error) {
return &sectorbuilder.SealTicket{
BlockHeight: 5,
TicketBytes: [32]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2},
}, nil
})
store.Service()
dlen := sectorbuilder.UserBytesForSectorSize(sectorSize)
r := io.LimitReader(rand.New(rand.NewSource(42)), int64(dlen))
sid, err := store.AddPiece("foo", dlen, r)
sid, err := sb.AddPiece("foo", dlen, r)
if err != nil {
t.Fatal(err)
}
if err := store.SealSector(context.TODO(), sid); err != nil {
ticket := sectorbuilder.SealTicket{
BlockHeight: 5,
TicketBytes: [32]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2},
}
pco, err := sb.SealPreCommit(sid, ticket)
if err != nil {
t.Fatal(err)
}
ssinfo := <-store.Incoming()
seed := sectorbuilder.SealSeed{
BlockHeight: 15,
TicketBytes: [32]byte{0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9, 8, 7, 6, 45, 3, 2, 1, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9, 8},
}
ok, err := sectorbuilder.VerifySeal(sectorSize, ssinfo.CommR[:], ssinfo.CommD[:], addr, ssinfo.Ticket.TicketBytes[:], ssinfo.SectorID, ssinfo.Proof)
sco, err := sb.SealCommit(sid, seed)
if err != nil {
t.Fatal(err)
}
ok, err := sectorbuilder.VerifySeal(sectorSize, pco.CommR[:], pco.CommD[:], addr, ticket.TicketBytes[:], seed.TicketBytes[:], sid, sco.Proof)
if err != nil {
t.Fatal(err)
}

View File

@ -9,11 +9,8 @@ import (
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host"
"github.com/pkg/errors"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/store"
@ -87,187 +84,12 @@ func (m *Miner) Run(ctx context.Context) error {
m.events = events.NewEvents(ctx, m.api)
go m.handlePostingSealedSectors(ctx)
go m.beginPosting(ctx)
return nil
}
func (m *Miner) commitUntrackedSectors(ctx context.Context) error {
sealed, err := m.secst.Commited()
if err != nil {
return err
}
chainSectors, err := m.api.StateMinerSectors(ctx, m.maddr, nil)
if err != nil {
return err
}
onchain := map[uint64]struct{}{}
for _, chainSector := range chainSectors {
onchain[chainSector.SectorID] = struct{}{}
}
for _, s := range sealed {
if _, ok := onchain[s.SectorID]; ok {
continue
}
log.Warnf("Missing commitment for sector %d, committing sector", s.SectorID)
if err := m.commitSector(ctx, s); err != nil {
log.Error("Committing uncommitted sector failed: ", err)
}
}
return nil
}
func (m *Miner) handlePostingSealedSectors(ctx context.Context) {
incoming := m.secst.Incoming()
defer m.secst.CloseIncoming(incoming)
if err := m.commitUntrackedSectors(ctx); err != nil {
log.Error(err)
}
for {
select {
case sinfo, ok := <-incoming:
if !ok {
// TODO: set some state variable so that this state can be
// visible via some status command
log.Warn("sealed sector channel closed, aborting process")
return
}
if err := m.commitSector(ctx, sinfo); err != nil {
log.Errorf("failed to commit sector: %s", err)
continue
}
case <-ctx.Done():
log.Warn("exiting seal posting routine")
return
}
}
}
func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSealingStatus) error {
log.Info("committing sector")
ssize, err := m.SectorSize(ctx)
if err != nil {
return xerrors.Errorf("failed to check out own sector size: %w", err)
}
_ = ssize
// TODO: 2 stage commit
/*deals, err := m.secst.DealsForCommit(sinfo.SectorID)
if err != nil {
return xerrors.Errorf("getting sector deals failed: %w", err)
}
*/
params := &actors.SectorPreCommitInfo{
CommD: sinfo.CommD[:],
CommR: sinfo.CommR[:],
Epoch: sinfo.Ticket.BlockHeight,
//DealIDs: deals,
SectorNumber: sinfo.SectorID,
}
enc, aerr := actors.SerializeParams(params)
if aerr != nil {
return errors.Wrap(aerr, "could not serialize commit sector parameters")
}
msg := &types.Message{
To: m.maddr,
From: m.worker,
Method: actors.MAMethods.PreCommitSector,
Params: enc,
Value: types.NewInt(0), // TODO: need to ensure sufficient collateral
GasLimit: types.NewInt(1000000 /* i dont know help */),
GasPrice: types.NewInt(1),
}
smsg, err := m.api.MpoolPushMessage(ctx, msg)
if err != nil {
return errors.Wrap(err, "pushing message to mpool")
}
go func() {
// TODO: maybe just mark this down in the datastore and handle it differently? This feels complicated to restart
mw, err := m.api.StateWaitMsg(ctx, smsg.Cid())
if err != nil {
return
}
randHeight := mw.TipSet.Height() + build.InteractivePoRepDelay
err = m.events.ChainAt(func(ts *types.TipSet, curH uint64) error {
go func() {
rand, err := m.api.ChainGetRandomness(ctx, ts, nil, ts.Height()-randHeight)
if err != nil {
log.Error(errors.Errorf("failed to get randomness for computing seal proof: %w", err))
return
}
// TODO: should this get scheduled to preserve proper resource consumption?
proof, err := m.secst.SealComputeProof(ctx, sinfo.SectorID, rand)
if err != nil {
log.Error(errors.Errorf("computing seal proof failed: %w", err))
return
}
params := &actors.SectorProveCommitInfo{
Proof: proof,
SectorID: sinfo.SectorID,
//DealIDs: deals,
}
enc, aerr := actors.SerializeParams(params)
if aerr != nil {
log.Errorf(errors.Wrap(aerr, "could not serialize commit sector parameters"))
return
}
msg := &types.Message{
To: m.maddr,
From: m.worker,
Method: actors.MAMethods.ProveCommitSector,
Params: enc,
Value: types.NewInt(0), // TODO: need to ensure sufficient collateral
GasLimit: types.NewInt(1000000 /* i dont know help */),
GasPrice: types.NewInt(1),
}
smsg, err := m.api.MpoolPushMessage(ctx, msg)
if err != nil {
return errors.Wrap(err, "pushing message to mpool")
}
// TODO: now wait for this to get included and handle errors?
_, err := m.api.StateWaitMsg(ctx, smsg.Cid())
if err != nil {
log.Errorf("failed to wait for porep inclusion: %s", err)
return
}
m.beginPosting(ctx)
}()
return nil
}, func(ts *types.TipSet) error {
log.Warn("revert in interactive commit sector step")
return nil
}, 3, mw.TipSet.Height()+build.InteractivePoRepDelay)
if err != nil {
return
}
}()
return nil
return m.SealSector(ctx, sinfo.SectorID)
}
func (m *Miner) runPreflightChecks(ctx context.Context) error {

View File

@ -31,7 +31,7 @@ type dealMapping struct {
Committed bool
}
type TicketFn func(context.Context) (*sectorbuilder.SealSeed, error)
type TicketFn func(context.Context) (*sectorbuilder.SealTicket, error)
// TODO: eventually handle sector storage here instead of in rust-sectorbuilder
type Store struct {
@ -60,10 +60,6 @@ func NewStore(sb *sectorbuilder.SectorBuilder, ds dtypes.MetadataDS, tktFn Ticke
}
}
func (s *Store) Service() {
go s.service()
}
func (s *Store) restartSealing() {
sectors, err := s.sb.GetAllStagedSectors()
if err != nil {
@ -92,6 +88,15 @@ func (s *Store) restartSealing() {
}
}
func (s *Store) SectorStatus(sid uint64) (*sectorbuilder.SectorSealingStatus, error) {
status, err := s.sb.SealStatus(sid)
if err != nil {
return nil, err
}
return &status, nil
}
func (s *Store) AddPiece(ref string, size uint64, r io.Reader, dealIDs ...uint64) (sectorID uint64, err error) {
sectorID, err = s.sb.AddPiece(ref, size, r)
if err != nil {
@ -190,35 +195,6 @@ func (s *Store) SealComputeProof(ctx context.Context, sectorID uint64, rand []by
panic("TODO")
}
func (s *Store) CloseIncoming(c <-chan sectorbuilder.SectorSealingStatus) {
s.waitingLk.Lock()
var at = -1
for i, ch := range s.incoming {
if ch == c {
at = i
}
}
if at == -1 {
s.waitingLk.Unlock()
return
}
if len(s.incoming) > 1 {
last := len(s.incoming) - 1
s.incoming[at] = s.incoming[last]
s.incoming[last] = nil
}
s.incoming = s.incoming[:len(s.incoming)-1]
s.waitingLk.Unlock()
}
func (s *Store) Incoming() <-chan sectorbuilder.SectorSealingStatus {
ch := make(chan sectorbuilder.SectorSealingStatus, 8)
s.waitingLk.Lock()
s.incoming = append(s.incoming, ch)
s.waitingLk.Unlock()
return ch
}
func (s *Store) WaitSeal(ctx context.Context, sector uint64) (sectorbuilder.SectorSealingStatus, error) {
s.waitingLk.Lock()
watch, ok := s.waiting[sector]