lotus/markets/retrievaladapter/provider.go
2021-05-22 22:40:21 +05:30

168 lines
5.6 KiB
Go

package retrievaladapter
import (
"context"
"io"
"github.com/filecoin-project/lotus/api/v1api"
"golang.org/x/xerrors"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/filecoin-project/lotus/chain/actors/builtin/paych"
"github.com/filecoin-project/lotus/chain/types"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-state-types/abi"
specstorage "github.com/filecoin-project/specs-storage/storage"
)
var log = logging.Logger("retrievaladapter")
type retrievalProviderNode struct {
miner *storage.Miner
pp sectorstorage.PieceProvider
full v1api.FullNode
}
// NewRetrievalProviderNode returns a new node adapter for a retrieval provider that talks to the
// Lotus Node
func NewRetrievalProviderNode(miner *storage.Miner, pp sectorstorage.PieceProvider, full v1api.FullNode) retrievalmarket.RetrievalProviderNode {
return &retrievalProviderNode{miner, pp, full}
}
func (rpn *retrievalProviderNode) GetMinerWorkerAddress(ctx context.Context, miner address.Address, tok shared.TipSetToken) (address.Address, error) {
tsk, err := types.TipSetKeyFromBytes(tok)
if err != nil {
return address.Undef, err
}
mi, err := rpn.full.StateMinerInfo(ctx, miner, tsk)
return mi.Worker, err
}
func (rpn *retrievalProviderNode) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
log.Debugf("get sector %d, offset %d, length %d", sectorID, offset, length)
si, err := rpn.miner.GetSectorInfo(sectorID)
if err != nil {
return nil, err
}
mid, err := address.IDFromAddress(rpn.miner.Address())
if err != nil {
return nil, err
}
ref := specstorage.SectorRef{
ID: abi.SectorID{
Miner: abi.ActorID(mid),
Number: sectorID,
},
ProofType: si.SectorType,
}
var commD cid.Cid
if si.CommD != nil {
commD = *si.CommD
}
// Get a reader for the piece, unsealing the piece if necessary
log.Debugf("read piece in sector %d, offset %d, length %d from miner %d", sectorID, offset, length, mid)
r, unsealed, err := rpn.pp.ReadPiece(ctx, ref, storiface.UnpaddedByteIndex(offset), length, si.TicketValue, commD)
if err != nil {
return nil, xerrors.Errorf("failed to unseal piece from sector %d: %w", sectorID, err)
}
_ = unsealed // todo: use
return r, nil
}
func (rpn *retrievalProviderNode) SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *paych.SignedVoucher, proof []byte, expectedAmount abi.TokenAmount, tok shared.TipSetToken) (abi.TokenAmount, error) {
// TODO: respect the provided TipSetToken (a serialized TipSetKey) when
// querying the chain
added, err := rpn.full.PaychVoucherAdd(ctx, paymentChannel, voucher, proof, expectedAmount)
return added, err
}
func (rpn *retrievalProviderNode) GetChainHead(ctx context.Context) (shared.TipSetToken, abi.ChainEpoch, error) {
head, err := rpn.full.ChainHead(ctx)
if err != nil {
return nil, 0, err
}
return head.Key().Bytes(), head.Height(), nil
}
func (rpn *retrievalProviderNode) IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) {
si, err := rpn.miner.GetSectorInfo(sectorID)
if err != nil {
return false, xerrors.Errorf("failed to get sectorinfo, err=%s", err)
}
mid, err := address.IDFromAddress(rpn.miner.Address())
if err != nil {
return false, err
}
ref := specstorage.SectorRef{
ID: abi.SectorID{
Miner: abi.ActorID(mid),
Number: sectorID,
},
ProofType: si.SectorType,
}
log.Debugf("will call IsUnsealed now sector=%+v, offset=%d, size=%d", sectorID, offset, length)
return rpn.pp.IsUnsealed(ctx, ref, storiface.UnpaddedByteIndex(offset), length)
}
// `storageDeals` param here is the list of storage deals made for the `payloadCID` the retrieval client is looking for.
//
// `pieceCID` is the CID of the specific Piece we want to retrieve the payload from. The client can either mandate that
// we retrieve the payload from a specific piece or we choose a Piece to retrieve the payload from, prioritizing
// a Piece for which an unsealed sector file already exists if possible.
//
// 1. For the `VerifiedDeal` flag in the response `PricingInput`, we are looking to answer the question "does there exist any verified storage deal for this `payloadCID`" ?
//
// 2. We also want to ensure that we return the `PieceSize` for the actual piece we want to retrieve the deal from.
func (rpn *retrievalProviderNode) GetRetrievalPricingInput(ctx context.Context, pieceCID cid.Cid, storageDeals []abi.DealID) (retrievalmarket.PricingInput, error) {
resp := retrievalmarket.PricingInput{}
head, err := rpn.full.ChainHead(ctx)
if err != nil {
return resp, xerrors.Errorf("failed to get chain head: %w", err)
}
tsk := head.Key()
for _, dealID := range storageDeals {
ds, err := rpn.full.StateMarketStorageDeal(ctx, dealID, tsk)
if err != nil {
return resp, xerrors.Errorf("failed to look up deal %d on chain: err=%w", dealID, err)
}
if ds.Proposal.VerifiedDeal {
resp.VerifiedDeal = true
}
if ds.Proposal.PieceCID.Equals(pieceCID) {
resp.PieceSize = ds.Proposal.PieceSize.Unpadded()
}
if resp.VerifiedDeal && resp.PieceSize != 0 {
break
}
}
if resp.PieceSize == 0 {
return resp, xerrors.New("failed to find matching piece, PieceSize is zero")
}
return resp, nil
}