2019-12-10 04:19:59 +00:00
|
|
|
package retrievaladapter
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2020-01-24 20:19:52 +00:00
|
|
|
"io"
|
2019-12-10 04:19:59 +00:00
|
|
|
|
2021-07-06 13:22:08 +00:00
|
|
|
"github.com/filecoin-project/lotus/api"
|
2021-04-05 17:56:53 +00:00
|
|
|
"github.com/filecoin-project/lotus/api/v1api"
|
2021-05-20 10:49:44 +00:00
|
|
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
|
|
|
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
2021-07-05 10:38:51 +00:00
|
|
|
"github.com/hashicorp/go-multierror"
|
2021-05-20 10:49:44 +00:00
|
|
|
"golang.org/x/xerrors"
|
2021-04-05 17:56:53 +00:00
|
|
|
|
2021-03-22 09:23:58 +00:00
|
|
|
"github.com/ipfs/go-cid"
|
|
|
|
|
2020-09-28 21:25:58 +00:00
|
|
|
"github.com/filecoin-project/lotus/chain/actors/builtin/paych"
|
2020-08-16 10:09:58 +00:00
|
|
|
"github.com/filecoin-project/lotus/chain/types"
|
2020-08-17 13:39:33 +00:00
|
|
|
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
|
2020-08-17 13:26:18 +00:00
|
|
|
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
2020-08-16 10:09:58 +00:00
|
|
|
|
2019-12-10 04:19:59 +00:00
|
|
|
"github.com/filecoin-project/go-address"
|
2020-01-10 17:13:12 +00:00
|
|
|
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
2020-03-18 17:51:25 +00:00
|
|
|
"github.com/filecoin-project/go-fil-markets/shared"
|
2020-09-07 03:49:10 +00:00
|
|
|
"github.com/filecoin-project/go-state-types/abi"
|
2020-11-05 06:44:46 +00:00
|
|
|
specstorage "github.com/filecoin-project/specs-storage/storage"
|
2021-06-08 10:25:49 +00:00
|
|
|
|
|
|
|
logging "github.com/ipfs/go-log/v2"
|
2019-12-10 04:19:59 +00:00
|
|
|
)
|
|
|
|
|
2021-03-22 09:23:58 +00:00
|
|
|
var log = logging.Logger("retrievaladapter")
|
|
|
|
|
2019-12-10 04:19:59 +00:00
|
|
|
type retrievalProviderNode struct {
|
2021-05-20 10:49:44 +00:00
|
|
|
maddr address.Address
|
|
|
|
secb sectorblocks.SectorBuilder
|
2021-06-04 13:41:38 +00:00
|
|
|
pp sectorstorage.PieceProvider
|
2021-05-20 10:49:44 +00:00
|
|
|
full v1api.FullNode
|
2019-12-10 04:19:59 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewRetrievalProviderNode returns a new node adapter for a retrieval provider that talks to the
|
|
|
|
// Lotus Node
|
2021-06-04 13:41:38 +00:00
|
|
|
func NewRetrievalProviderNode(maddr dtypes.MinerAddress, secb sectorblocks.SectorBuilder, pp sectorstorage.PieceProvider, full v1api.FullNode) retrievalmarket.RetrievalProviderNode {
|
2021-05-20 10:49:44 +00:00
|
|
|
return &retrievalProviderNode{address.Address(maddr), secb, pp, full}
|
2019-12-10 04:19:59 +00:00
|
|
|
}
|
|
|
|
|
2020-03-18 17:51:25 +00:00
|
|
|
func (rpn *retrievalProviderNode) GetMinerWorkerAddress(ctx context.Context, miner address.Address, tok shared.TipSetToken) (address.Address, error) {
|
|
|
|
tsk, err := types.TipSetKeyFromBytes(tok)
|
|
|
|
if err != nil {
|
|
|
|
return address.Undef, err
|
|
|
|
}
|
|
|
|
|
2020-04-16 17:36:36 +00:00
|
|
|
mi, err := rpn.full.StateMinerInfo(ctx, miner, tsk)
|
|
|
|
return mi.Worker, err
|
2020-02-29 03:23:55 +00:00
|
|
|
}
|
|
|
|
|
2020-07-30 12:31:31 +00:00
|
|
|
func (rpn *retrievalProviderNode) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
|
2021-06-14 09:31:55 +00:00
|
|
|
log.Debugf("get sector %d, offset %d, length %d", sectorID, offset, length)
|
2021-07-06 13:22:08 +00:00
|
|
|
si, err := rpn.sectorsStatus(ctx, sectorID, true)
|
2019-12-17 03:17:46 +00:00
|
|
|
if err != nil {
|
2020-01-24 20:19:52 +00:00
|
|
|
return nil, err
|
2019-12-17 03:17:46 +00:00
|
|
|
}
|
2020-03-17 20:19:52 +00:00
|
|
|
|
2021-05-20 10:49:44 +00:00
|
|
|
mid, err := address.IDFromAddress(rpn.maddr)
|
2020-03-17 20:19:52 +00:00
|
|
|
if err != nil {
|
2020-03-22 21:39:06 +00:00
|
|
|
return nil, err
|
2020-03-17 20:19:52 +00:00
|
|
|
}
|
|
|
|
|
2020-11-05 06:44:46 +00:00
|
|
|
ref := specstorage.SectorRef{
|
|
|
|
ID: abi.SectorID{
|
|
|
|
Miner: abi.ActorID(mid),
|
|
|
|
Number: sectorID,
|
|
|
|
},
|
2021-05-20 10:49:44 +00:00
|
|
|
ProofType: si.SealProof,
|
|
|
|
}
|
|
|
|
|
|
|
|
var commD cid.Cid
|
|
|
|
if si.CommD != nil {
|
|
|
|
commD = *si.CommD
|
2020-03-17 20:19:52 +00:00
|
|
|
}
|
2020-05-26 08:20:32 +00:00
|
|
|
|
2021-05-18 07:32:30 +00:00
|
|
|
// Get a reader for the piece, unsealing the piece if necessary
|
|
|
|
log.Debugf("read piece in sector %d, offset %d, length %d from miner %d", sectorID, offset, length, mid)
|
2021-05-20 10:49:44 +00:00
|
|
|
r, unsealed, err := rpn.pp.ReadPiece(ctx, ref, storiface.UnpaddedByteIndex(offset), length, si.Ticket.Value, commD)
|
|
|
|
if err != nil {
|
|
|
|
return nil, xerrors.Errorf("failed to unseal piece from sector %d: %w", sectorID, err)
|
|
|
|
}
|
|
|
|
_ = unsealed // todo: use
|
2020-05-26 08:20:32 +00:00
|
|
|
|
|
|
|
return r, nil
|
2019-12-17 03:17:46 +00:00
|
|
|
}
|
|
|
|
|
2020-03-18 17:51:25 +00:00
|
|
|
func (rpn *retrievalProviderNode) SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *paych.SignedVoucher, proof []byte, expectedAmount abi.TokenAmount, tok shared.TipSetToken) (abi.TokenAmount, error) {
|
|
|
|
// TODO: respect the provided TipSetToken (a serialized TipSetKey) when
|
|
|
|
// querying the chain
|
2020-02-12 22:32:26 +00:00
|
|
|
added, err := rpn.full.PaychVoucherAdd(ctx, paymentChannel, voucher, proof, expectedAmount)
|
|
|
|
return added, err
|
2019-12-10 04:19:59 +00:00
|
|
|
}
|
2020-03-18 17:51:25 +00:00
|
|
|
|
|
|
|
func (rpn *retrievalProviderNode) GetChainHead(ctx context.Context) (shared.TipSetToken, abi.ChainEpoch, error) {
|
|
|
|
head, err := rpn.full.ChainHead(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return nil, 0, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return head.Key().Bytes(), head.Height(), nil
|
|
|
|
}
|
2021-05-22 17:10:21 +00:00
|
|
|
|
|
|
|
func (rpn *retrievalProviderNode) IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) {
|
2021-07-06 15:40:03 +00:00
|
|
|
si, err := rpn.sectorsStatus(ctx, sectorID, true)
|
2021-05-22 17:10:21 +00:00
|
|
|
if err != nil {
|
2021-06-22 10:05:59 +00:00
|
|
|
return false, xerrors.Errorf("failed to get sector info: %w", err)
|
2021-05-22 17:10:21 +00:00
|
|
|
}
|
|
|
|
|
2021-06-22 10:05:59 +00:00
|
|
|
mid, err := address.IDFromAddress(rpn.maddr)
|
2021-05-22 17:10:21 +00:00
|
|
|
if err != nil {
|
|
|
|
return false, err
|
|
|
|
}
|
|
|
|
|
|
|
|
ref := specstorage.SectorRef{
|
|
|
|
ID: abi.SectorID{
|
|
|
|
Miner: abi.ActorID(mid),
|
|
|
|
Number: sectorID,
|
|
|
|
},
|
2021-07-06 15:33:47 +00:00
|
|
|
ProofType: si.SealProof,
|
2021-05-22 17:10:21 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
log.Debugf("will call IsUnsealed now sector=%+v, offset=%d, size=%d", sectorID, offset, length)
|
|
|
|
return rpn.pp.IsUnsealed(ctx, ref, storiface.UnpaddedByteIndex(offset), length)
|
|
|
|
}
|
|
|
|
|
2021-06-14 04:10:34 +00:00
|
|
|
// GetRetrievalPricingInput takes a set of candidate storage deals that can serve a retrieval request,
|
|
|
|
// and returns an minimally populated PricingInput. This PricingInput should be enhanced
|
|
|
|
// with more data, and passed to the pricing function to determine the final quoted price.
|
2021-05-22 17:10:21 +00:00
|
|
|
func (rpn *retrievalProviderNode) GetRetrievalPricingInput(ctx context.Context, pieceCID cid.Cid, storageDeals []abi.DealID) (retrievalmarket.PricingInput, error) {
|
|
|
|
resp := retrievalmarket.PricingInput{}
|
|
|
|
|
|
|
|
head, err := rpn.full.ChainHead(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return resp, xerrors.Errorf("failed to get chain head: %w", err)
|
|
|
|
}
|
|
|
|
tsk := head.Key()
|
|
|
|
|
2021-07-05 10:38:51 +00:00
|
|
|
var mErr error
|
2021-07-05 10:23:41 +00:00
|
|
|
|
2021-05-22 17:10:21 +00:00
|
|
|
for _, dealID := range storageDeals {
|
|
|
|
ds, err := rpn.full.StateMarketStorageDeal(ctx, dealID, tsk)
|
|
|
|
if err != nil {
|
2021-07-05 04:04:56 +00:00
|
|
|
log.Warnf("failed to look up deal %d on chain: err=%w", dealID, err)
|
2021-07-05 10:38:51 +00:00
|
|
|
mErr = multierror.Append(mErr, err)
|
2021-07-05 04:04:56 +00:00
|
|
|
continue
|
2021-05-22 17:10:21 +00:00
|
|
|
}
|
|
|
|
if ds.Proposal.VerifiedDeal {
|
|
|
|
resp.VerifiedDeal = true
|
|
|
|
}
|
|
|
|
|
|
|
|
if ds.Proposal.PieceCID.Equals(pieceCID) {
|
|
|
|
resp.PieceSize = ds.Proposal.PieceSize.Unpadded()
|
|
|
|
}
|
|
|
|
|
2021-06-14 04:10:34 +00:00
|
|
|
// If we've discovered a verified deal with the required PieceCID, we don't need
|
|
|
|
// to lookup more deals and we're done.
|
2021-05-22 17:10:21 +00:00
|
|
|
if resp.VerifiedDeal && resp.PieceSize != 0 {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-06-14 04:10:34 +00:00
|
|
|
// Note: The piece size can never actually be zero. We only use it to here
|
|
|
|
// to assert that we didn't find a matching piece.
|
2021-05-22 17:10:21 +00:00
|
|
|
if resp.PieceSize == 0 {
|
2021-07-05 10:38:51 +00:00
|
|
|
if mErr == nil {
|
2021-07-05 10:23:41 +00:00
|
|
|
return resp, xerrors.New("failed to find matching piece")
|
|
|
|
}
|
2021-07-05 10:38:51 +00:00
|
|
|
|
|
|
|
return resp, xerrors.Errorf("failed to fetch storage deal state: %w", mErr)
|
2021-05-22 17:10:21 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return resp, nil
|
|
|
|
}
|
2021-07-06 13:22:08 +00:00
|
|
|
|
|
|
|
func (rpn *retrievalProviderNode) sectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) {
|
|
|
|
sInfo, err := rpn.secb.SectorsStatus(ctx, sid, false)
|
|
|
|
if err != nil {
|
|
|
|
return api.SectorInfo{}, err
|
|
|
|
}
|
|
|
|
|
|
|
|
if !showOnChainInfo {
|
|
|
|
return sInfo, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
onChainInfo, err := rpn.full.StateSectorGetInfo(ctx, rpn.maddr, sid, types.EmptyTSK)
|
|
|
|
if err != nil {
|
|
|
|
return sInfo, err
|
|
|
|
}
|
|
|
|
if onChainInfo == nil {
|
|
|
|
return sInfo, nil
|
|
|
|
}
|
|
|
|
sInfo.SealProof = onChainInfo.SealProof
|
|
|
|
sInfo.Activation = onChainInfo.Activation
|
|
|
|
sInfo.Expiration = onChainInfo.Expiration
|
|
|
|
sInfo.DealWeight = onChainInfo.DealWeight
|
|
|
|
sInfo.VerifiedDealWeight = onChainInfo.VerifiedDealWeight
|
|
|
|
sInfo.InitialPledge = onChainInfo.InitialPledge
|
|
|
|
|
|
|
|
ex, err := rpn.full.StateSectorExpiration(ctx, rpn.maddr, sid, types.EmptyTSK)
|
|
|
|
if err != nil {
|
|
|
|
return sInfo, nil
|
|
|
|
}
|
|
|
|
sInfo.OnTime = ex.OnTime
|
|
|
|
sInfo.Early = ex.Early
|
|
|
|
|
|
|
|
return sInfo, nil
|
|
|
|
}
|