Sector Commitment tracker
This commit is contained in:
		
							parent
							
								
									df20543b78
								
							
						
					
					
						commit
						eda72468ce
					
				| @ -17,6 +17,7 @@ import ( | |||||||
| 	"github.com/filecoin-project/go-lotus/chain/address" | 	"github.com/filecoin-project/go-lotus/chain/address" | ||||||
| 	"github.com/filecoin-project/go-lotus/chain/types" | 	"github.com/filecoin-project/go-lotus/chain/types" | ||||||
| 	"github.com/filecoin-project/go-lotus/node/modules/dtypes" | 	"github.com/filecoin-project/go-lotus/node/modules/dtypes" | ||||||
|  | 	"github.com/filecoin-project/go-lotus/storage/commitment" | ||||||
| 	"github.com/filecoin-project/go-lotus/storage/sectorblocks" | 	"github.com/filecoin-project/go-lotus/storage/sectorblocks" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| @ -45,6 +46,7 @@ type Handler struct { | |||||||
| 	askLk sync.Mutex | 	askLk sync.Mutex | ||||||
| 
 | 
 | ||||||
| 	secst *sectorblocks.SectorBlocks | 	secst *sectorblocks.SectorBlocks | ||||||
|  | 	commt *commitment.Tracker | ||||||
| 	full  api.FullNode | 	full  api.FullNode | ||||||
| 
 | 
 | ||||||
| 	// TODO: Use a custom protocol or graphsync in the future
 | 	// TODO: Use a custom protocol or graphsync in the future
 | ||||||
| @ -71,7 +73,7 @@ type minerDealUpdate struct { | |||||||
| 	mut      func(*MinerDeal) | 	mut      func(*MinerDeal) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func NewHandler(ds dtypes.MetadataDS, secst *sectorblocks.SectorBlocks, dag dtypes.StagingDAG, fullNode api.FullNode) (*Handler, error) { | func NewHandler(ds dtypes.MetadataDS, secst *sectorblocks.SectorBlocks, commt *commitment.Tracker, dag dtypes.StagingDAG, fullNode api.FullNode) (*Handler, error) { | ||||||
| 	addr, err := ds.Get(datastore.NewKey("miner-address")) | 	addr, err := ds.Get(datastore.NewKey("miner-address")) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| @ -83,6 +85,7 @@ func NewHandler(ds dtypes.MetadataDS, secst *sectorblocks.SectorBlocks, dag dtyp | |||||||
| 
 | 
 | ||||||
| 	h := &Handler{ | 	h := &Handler{ | ||||||
| 		secst: secst, | 		secst: secst, | ||||||
|  | 		commt: commt, | ||||||
| 		dag:   dag, | 		dag:   dag, | ||||||
| 		full:  fullNode, | 		full:  fullNode, | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -190,8 +190,8 @@ func getInclusionProof(ref string, status sectorbuilder.SectorSealingStatus) (Pi | |||||||
| 	return PieceInclusionProof{}, xerrors.Errorf("pieceInclusionProof for %s in sector %d not found", ref, status.SectorID) | 	return PieceInclusionProof{}, xerrors.Errorf("pieceInclusionProof for %s in sector %d not found", ref, status.SectorID) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (h *Handler) waitSealed(deal MinerDeal) (sectorbuilder.SectorSealingStatus, error) { | func (h *Handler) waitSealed(ctx context.Context, deal MinerDeal) (sectorbuilder.SectorSealingStatus, error) { | ||||||
| 	status, err := h.secst.WaitSeal(context.TODO(), deal.SectorID) | 	status, err := h.secst.WaitSeal(ctx, deal.SectorID) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return sectorbuilder.SectorSealingStatus{}, err | 		return sectorbuilder.SectorSealingStatus{}, err | ||||||
| 	} | 	} | ||||||
| @ -212,7 +212,7 @@ func (h *Handler) waitSealed(deal MinerDeal) (sectorbuilder.SectorSealingStatus, | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (h *Handler) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) { | func (h *Handler) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) { | ||||||
| 	status, err := h.waitSealed(deal) | 	status, err := h.waitSealed(ctx, deal) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| @ -253,3 +253,22 @@ func (h *Handler) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal) | |||||||
| 
 | 
 | ||||||
| 	return nil, nil | 	return nil, nil | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func (h *Handler) complete(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) { | ||||||
|  | 	mcid, err := h.commt.WaitCommit(ctx, deal.Proposal.MinerAddress, deal.SectorID) | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.Warnf("Waiting for sector commitment message: %s", err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	err = h.sendSignedResponse(StorageDealResponse{ | ||||||
|  | 		State:               api.DealComplete, | ||||||
|  | 		Proposal:            deal.ProposalCid, | ||||||
|  | 
 | ||||||
|  | 		SectorCommitMessage: &mcid, | ||||||
|  | 	}) | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.Warnf("Sending deal response failed: %s", err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return nil, nil | ||||||
|  | } | ||||||
|  | |||||||
| @ -115,10 +115,10 @@ func (h handlers) handleReader(ctx context.Context, r io.Reader, w io.Writer, rp | |||||||
| 	h.handle(ctx, req, wf, rpcError, func(bool) {}, nil) | 	h.handle(ctx, req, wf, rpcError, func(bool) {}, nil) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func doCall(f reflect.Value, params []reflect.Value) (out []reflect.Value, err error) { | func doCall(methodName string, f reflect.Value, params []reflect.Value) (out []reflect.Value, err error) { | ||||||
| 	defer func() { | 	defer func() { | ||||||
| 		if i := recover(); i != nil { | 		if i := recover(); i != nil { | ||||||
| 			err = xerrors.Errorf("panic in rpc method: %s", i) | 			err = xerrors.Errorf("panic in rpc method '%s': %s", methodName, i) | ||||||
| 			log.Error(err) | 			log.Error(err) | ||||||
| 		} | 		} | ||||||
| 	}() | 	}() | ||||||
| @ -193,7 +193,7 @@ func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer | |||||||
| 
 | 
 | ||||||
| 	///////////////////
 | 	///////////////////
 | ||||||
| 
 | 
 | ||||||
| 	callResult, err := doCall(handler.handlerFunc, callParams) | 	callResult, err := doCall(req.Method, handler.handlerFunc, callParams) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		rpcError(w, &req, 0, xerrors.Errorf("fatal error calling '%s': %w", req.Method, err)) | 		rpcError(w, &req, 0, xerrors.Errorf("fatal error calling '%s': %w", req.Method, err)) | ||||||
| 		return | 		return | ||||||
|  | |||||||
| @ -39,6 +39,7 @@ import ( | |||||||
| 	"github.com/filecoin-project/go-lotus/retrieval" | 	"github.com/filecoin-project/go-lotus/retrieval" | ||||||
| 	"github.com/filecoin-project/go-lotus/retrieval/discovery" | 	"github.com/filecoin-project/go-lotus/retrieval/discovery" | ||||||
| 	"github.com/filecoin-project/go-lotus/storage" | 	"github.com/filecoin-project/go-lotus/storage" | ||||||
|  | 	"github.com/filecoin-project/go-lotus/storage/commitment" | ||||||
| 	"github.com/filecoin-project/go-lotus/storage/sector" | 	"github.com/filecoin-project/go-lotus/storage/sector" | ||||||
| 	"github.com/filecoin-project/go-lotus/storage/sectorblocks" | 	"github.com/filecoin-project/go-lotus/storage/sectorblocks" | ||||||
| ) | ) | ||||||
| @ -232,8 +233,8 @@ func Online() Option { | |||||||
| 			Override(new(*deals.Client), deals.NewClient), | 			Override(new(*deals.Client), deals.NewClient), | ||||||
| 			Override(RunDealClientKey, modules.RunDealClient), | 			Override(RunDealClientKey, modules.RunDealClient), | ||||||
| 
 | 
 | ||||||
| 			Override(new(*paych.Store), modules.PaychStore), | 			Override(new(*paych.Store), paych.NewStore), | ||||||
| 			Override(new(*paych.Manager), modules.PaymentChannelManager), | 			Override(new(*paych.Manager), paych.NewManager), | ||||||
| 
 | 
 | ||||||
| 			Override(new(*miner.Miner), miner.NewMiner), | 			Override(new(*miner.Miner), miner.NewMiner), | ||||||
| 		), | 		), | ||||||
| @ -243,6 +244,7 @@ func Online() Option { | |||||||
| 			Override(new(*sectorbuilder.SectorBuilder), sectorbuilder.New), | 			Override(new(*sectorbuilder.SectorBuilder), sectorbuilder.New), | ||||||
| 			Override(new(*sector.Store), sector.NewStore), | 			Override(new(*sector.Store), sector.NewStore), | ||||||
| 			Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks), | 			Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks), | ||||||
|  | 			Override(new(*commitment.Tracker), commitment.NewTracker), | ||||||
| 			Override(new(*storage.Miner), modules.StorageMiner), | 			Override(new(*storage.Miner), modules.StorageMiner), | ||||||
| 
 | 
 | ||||||
| 			Override(new(dtypes.StagingDAG), modules.StagingDAG), | 			Override(new(dtypes.StagingDAG), modules.StagingDAG), | ||||||
|  | |||||||
| @ -1,15 +0,0 @@ | |||||||
| package modules |  | ||||||
| 
 |  | ||||||
| import ( |  | ||||||
| 	"github.com/filecoin-project/go-lotus/chain/stmgr" |  | ||||||
| 	"github.com/filecoin-project/go-lotus/node/modules/dtypes" |  | ||||||
| 	"github.com/filecoin-project/go-lotus/paych" |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| func PaychStore(ds dtypes.MetadataDS) *paych.Store { |  | ||||||
| 	return paych.NewStore(ds) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func PaymentChannelManager(sm *stmgr.StateManager, store *paych.Store) (*paych.Manager, error) { |  | ||||||
| 	return paych.NewManager(sm, store), nil |  | ||||||
| } |  | ||||||
| @ -25,6 +25,7 @@ import ( | |||||||
| 	"github.com/filecoin-project/go-lotus/node/repo" | 	"github.com/filecoin-project/go-lotus/node/repo" | ||||||
| 	"github.com/filecoin-project/go-lotus/retrieval" | 	"github.com/filecoin-project/go-lotus/retrieval" | ||||||
| 	"github.com/filecoin-project/go-lotus/storage" | 	"github.com/filecoin-project/go-lotus/storage" | ||||||
|  | 	"github.com/filecoin-project/go-lotus/storage/commitment" | ||||||
| 	"github.com/filecoin-project/go-lotus/storage/sector" | 	"github.com/filecoin-project/go-lotus/storage/sector" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| @ -65,13 +66,13 @@ func SectorBuilderConfig(storagePath string) func(dtypes.MetadataDS) (*sectorbui | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, secst *sector.Store) (*storage.Miner, error) { | func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, secst *sector.Store, commt *commitment.Tracker) (*storage.Miner, error) { | ||||||
| 	maddr, err := minerAddrFromDS(ds) | 	maddr, err := minerAddrFromDS(ds) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	sm, err := storage.NewMiner(api, maddr, h, ds, secst) | 	sm, err := storage.NewMiner(api, maddr, h, ds, secst, commt) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  | |||||||
| @ -3,33 +3,46 @@ package paych | |||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"github.com/filecoin-project/go-lotus/node/impl/full" |  | ||||||
| 	"math" | 	"math" | ||||||
| 	"strconv" | 	"strconv" | ||||||
| 
 | 
 | ||||||
| 	logging "github.com/ipfs/go-log" | 	logging "github.com/ipfs/go-log" | ||||||
|  | 	"go.uber.org/fx" | ||||||
| 
 | 
 | ||||||
| 	"github.com/filecoin-project/go-lotus/chain/actors" | 	"github.com/filecoin-project/go-lotus/chain/actors" | ||||||
| 	"github.com/filecoin-project/go-lotus/chain/address" | 	"github.com/filecoin-project/go-lotus/chain/address" | ||||||
| 	"github.com/filecoin-project/go-lotus/chain/stmgr" | 	"github.com/filecoin-project/go-lotus/chain/stmgr" | ||||||
| 	"github.com/filecoin-project/go-lotus/chain/types" | 	"github.com/filecoin-project/go-lotus/chain/types" | ||||||
|  | 	"github.com/filecoin-project/go-lotus/node/impl/full" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| var log = logging.Logger("paych") | var log = logging.Logger("paych") | ||||||
| 
 | 
 | ||||||
|  | type ManagerApi struct { | ||||||
|  | 	fx.In | ||||||
|  | 
 | ||||||
|  | 	full.MpoolAPI | ||||||
|  | 	full.WalletAPI | ||||||
|  | 	full.ChainAPI | ||||||
|  | } | ||||||
|  | 
 | ||||||
| type Manager struct { | type Manager struct { | ||||||
| 	store *Store | 	store *Store | ||||||
| 	sm    *stmgr.StateManager | 	sm    *stmgr.StateManager | ||||||
| 
 | 
 | ||||||
| 	mpool  *full.MpoolAPI | 	mpool  full.MpoolAPI | ||||||
| 	wallet *full.WalletAPI | 	wallet full.WalletAPI | ||||||
| 	chain  *full.ChainAPI | 	chain  full.ChainAPI | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func NewManager(sm *stmgr.StateManager, pchstore *Store) *Manager { | func NewManager(sm *stmgr.StateManager, pchstore *Store, api ManagerApi) *Manager { | ||||||
| 	return &Manager{ | 	return &Manager{ | ||||||
| 		store: pchstore, | 		store: pchstore, | ||||||
| 		sm:    sm, | 		sm:    sm, | ||||||
|  | 
 | ||||||
|  | 		mpool: api.MpoolAPI, | ||||||
|  | 		wallet: api.WalletAPI, | ||||||
|  | 		chain: api.ChainAPI, | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -7,14 +7,15 @@ import ( | |||||||
| 	"strings" | 	"strings" | ||||||
| 	"sync" | 	"sync" | ||||||
| 
 | 
 | ||||||
| 	"github.com/filecoin-project/go-lotus/chain/address" |  | ||||||
| 	"github.com/filecoin-project/go-lotus/chain/types" |  | ||||||
| 	"github.com/ipfs/go-datastore" | 	"github.com/ipfs/go-datastore" | ||||||
| 	"github.com/ipfs/go-datastore/namespace" | 	"github.com/ipfs/go-datastore/namespace" | ||||||
| 	dsq "github.com/ipfs/go-datastore/query" | 	dsq "github.com/ipfs/go-datastore/query" | ||||||
| 
 |  | ||||||
| 	cbor "github.com/ipfs/go-ipld-cbor" | 	cbor "github.com/ipfs/go-ipld-cbor" | ||||||
| 	"golang.org/x/xerrors" | 	"golang.org/x/xerrors" | ||||||
|  | 
 | ||||||
|  | 	"github.com/filecoin-project/go-lotus/chain/address" | ||||||
|  | 	"github.com/filecoin-project/go-lotus/chain/types" | ||||||
|  | 	"github.com/filecoin-project/go-lotus/node/modules/dtypes" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| var ErrChannelNotTracked = errors.New("channel not tracked") | var ErrChannelNotTracked = errors.New("channel not tracked") | ||||||
| @ -30,7 +31,7 @@ type Store struct { | |||||||
| 	ds datastore.Batching | 	ds datastore.Batching | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func NewStore(ds datastore.Batching) *Store { | func NewStore(ds dtypes.MetadataDS) *Store { | ||||||
| 	ds = namespace.Wrap(ds, datastore.NewKey("/paych/")) | 	ds = namespace.Wrap(ds, datastore.NewKey("/paych/")) | ||||||
| 	return &Store{ | 	return &Store{ | ||||||
| 		ds: ds, | 		ds: ds, | ||||||
|  | |||||||
							
								
								
									
										138
									
								
								storage/commitment/tracker.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										138
									
								
								storage/commitment/tracker.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,138 @@ | |||||||
|  | package commitment | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"fmt" | ||||||
|  | 	"sync" | ||||||
|  | 
 | ||||||
|  | 	"github.com/ipfs/go-cid" | ||||||
|  | 	"github.com/ipfs/go-datastore" | ||||||
|  | 	"github.com/ipfs/go-datastore/namespace" | ||||||
|  | 	cbor "github.com/ipfs/go-ipld-cbor" | ||||||
|  | 	logging "github.com/ipfs/go-log" | ||||||
|  | 	"golang.org/x/xerrors" | ||||||
|  | 
 | ||||||
|  | 	"github.com/filecoin-project/go-lotus/chain/address" | ||||||
|  | 	"github.com/filecoin-project/go-lotus/chain/types" | ||||||
|  | 	"github.com/filecoin-project/go-lotus/node/modules/dtypes" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | var log = logging.Logger("commitment") | ||||||
|  | 
 | ||||||
|  | func init() { | ||||||
|  | 	cbor.RegisterCborType(commitment{}) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | var commitmentDsPrefix = datastore.NewKey("/commitments") | ||||||
|  | 
 | ||||||
|  | type Tracker struct { | ||||||
|  | 	commitDs datastore.Datastore | ||||||
|  | 
 | ||||||
|  | 	lk sync.Mutex | ||||||
|  | 
 | ||||||
|  | 	waits map[datastore.Key]chan struct{} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func NewTracker(ds dtypes.MetadataDS) *Tracker { | ||||||
|  | 	return &Tracker{ | ||||||
|  | 		commitDs: namespace.Wrap(ds, commitmentDsPrefix), | ||||||
|  | 		waits: map[datastore.Key]chan struct{}{}, | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type commitment struct { | ||||||
|  | 	Msg cid.Cid | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func commitmentKey(miner address.Address, sectorId uint64) datastore.Key { | ||||||
|  | 	return commitmentDsPrefix.ChildString(miner.String()).ChildString(fmt.Sprintf("%d", sectorId)) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (ct *Tracker) TrackCommitSectorMsg(sectorId uint64, smsg *types.SignedMessage) error { | ||||||
|  | 	mcid := smsg.Cid() | ||||||
|  | 	key := commitmentKey(smsg.Message.From, sectorId) | ||||||
|  | 
 | ||||||
|  | 	ct.lk.Lock() | ||||||
|  | 	defer ct.lk.Unlock() | ||||||
|  | 
 | ||||||
|  | 	tracking, err := ct.commitDs.Get(key) | ||||||
|  | 	if err != datastore.ErrNotFound { | ||||||
|  | 		if err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		var comm commitment | ||||||
|  | 		if err := cbor.DecodeInto(tracking, &comm); err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		if !comm.Msg.Equals(mcid) { | ||||||
|  | 			return xerrors.Errorf("commitment tracking for miner %s, sector %d: already tracking %s, got another commitment message: %s", smsg.Message.From, sectorId, comm.Msg, mcid) | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		log.Warnf("commitment.TrackCommitSectorMsg called more than once for miner %s, sector %d, message %s", smsg.Message.From, sectorId, mcid) | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	comm := &commitment{Msg:mcid} | ||||||
|  | 	commB, err := cbor.DumpObject(comm) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if err := ct.commitDs.Put(key, commB); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	waits, ok := ct.waits[key] | ||||||
|  | 	if ok { | ||||||
|  | 		close(waits) | ||||||
|  | 		delete(ct.waits, key) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (ct *Tracker) WaitCommit(ctx context.Context, miner address.Address, sectorId uint64) (cid.Cid, error) { | ||||||
|  | 	key := commitmentKey(miner, sectorId) | ||||||
|  | 
 | ||||||
|  | 	ct.lk.Lock() | ||||||
|  | 
 | ||||||
|  | 	tracking, err := ct.commitDs.Get(key) | ||||||
|  | 	if err != datastore.ErrNotFound { | ||||||
|  | 		ct.lk.Unlock() | ||||||
|  | 
 | ||||||
|  | 		if err != nil { | ||||||
|  | 			return cid.Undef, err | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		var comm commitment | ||||||
|  | 		if err := cbor.DecodeInto(tracking, &comm); err != nil { | ||||||
|  | 			return cid.Undef, err | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		return comm.Msg, nil | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	wait, ok := ct.waits[key] | ||||||
|  | 	if !ok { | ||||||
|  | 		wait = make(chan struct{}) | ||||||
|  | 		ct.waits[key] = wait | ||||||
|  | 	} | ||||||
|  | 	select { | ||||||
|  | 	case <-wait: | ||||||
|  | 		tracking, err := ct.commitDs.Get(key) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return cid.Undef, xerrors.Errorf("failed to get commitment after waiting: %w", err) | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		var comm commitment | ||||||
|  | 		if err := cbor.DecodeInto(tracking, &comm); err != nil { | ||||||
|  | 			return cid.Undef, err | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		return comm.Msg, nil | ||||||
|  | 	case <-ctx.Done(): | ||||||
|  | 		return cid.Undef, ctx.Err() | ||||||
|  | 	} | ||||||
|  | } | ||||||
| @ -17,6 +17,7 @@ import ( | |||||||
| 	"github.com/filecoin-project/go-lotus/chain/store" | 	"github.com/filecoin-project/go-lotus/chain/store" | ||||||
| 	"github.com/filecoin-project/go-lotus/chain/types" | 	"github.com/filecoin-project/go-lotus/chain/types" | ||||||
| 	"github.com/filecoin-project/go-lotus/lib/sectorbuilder" | 	"github.com/filecoin-project/go-lotus/lib/sectorbuilder" | ||||||
|  | 	"github.com/filecoin-project/go-lotus/storage/commitment" | ||||||
| 	"github.com/filecoin-project/go-lotus/storage/sector" | 	"github.com/filecoin-project/go-lotus/storage/sector" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| @ -26,6 +27,7 @@ type Miner struct { | |||||||
| 	api storageMinerApi | 	api storageMinerApi | ||||||
| 
 | 
 | ||||||
| 	secst *sector.Store | 	secst *sector.Store | ||||||
|  | 	commt *commitment.Tracker | ||||||
| 
 | 
 | ||||||
| 	maddr address.Address | 	maddr address.Address | ||||||
| 
 | 
 | ||||||
| @ -54,13 +56,14 @@ type storageMinerApi interface { | |||||||
| 	WalletHas(context.Context, address.Address) (bool, error) | 	WalletHas(context.Context, address.Address) (bool, error) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, secst *sector.Store) (*Miner, error) { | func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, secst *sector.Store, commt *commitment.Tracker) (*Miner, error) { | ||||||
| 	return &Miner{ | 	return &Miner{ | ||||||
| 		api:   api, | 		api:   api, | ||||||
| 		maddr: addr, | 		maddr: addr, | ||||||
| 		h:     h, | 		h:     h, | ||||||
| 		ds:    ds, | 		ds:    ds, | ||||||
| 		secst: secst, | 		secst: secst, | ||||||
|  | 		commt: commt, | ||||||
| 	}, nil | 	}, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @ -159,13 +162,11 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal | |||||||
| 		return errors.Wrap(err, "pushing commit sector message to mpool") | 		return errors.Wrap(err, "pushing commit sector message to mpool") | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	m.trackCommitSectorMessage(smsg) | 	if err := m.commt.TrackCommitSectorMsg(sinfo.SectorID, smsg); err != nil { | ||||||
| 	return nil | 		return errors.Wrap(err, "tracking sector commitment") | ||||||
| } | 	} | ||||||
| 
 | 
 | ||||||
| // make sure the message gets included in the chain successfully
 | 	return nil | ||||||
| func (m *Miner) trackCommitSectorMessage(smsg *types.SignedMessage) { |  | ||||||
| 	log.Warning("not currently tracking commit sector messages") |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (m *Miner) runPoSt(ctx context.Context) { | func (m *Miner) runPoSt(ctx context.Context) { | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user