lotus/markets/retrievaladapter/provider.go

107 lines
3.5 KiB
Go
Raw Normal View History

package retrievaladapter
import (
"context"
"io"
2021-04-05 17:56:53 +00:00
"github.com/filecoin-project/lotus/api/v1api"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
2020-09-28 21:25:58 +00:00
"github.com/filecoin-project/lotus/chain/actors/builtin/paych"
"github.com/filecoin-project/lotus/chain/types"
2020-08-17 13:39:33 +00:00
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/shared"
2020-09-07 03:49:10 +00:00
"github.com/filecoin-project/go-state-types/abi"
2020-11-05 06:44:46 +00:00
specstorage "github.com/filecoin-project/specs-storage/storage"
)
var log = logging.Logger("retrievaladapter")
type retrievalProviderNode struct {
2020-03-03 22:19:22 +00:00
miner *storage.Miner
sealer sectorstorage.SectorManager
2021-04-05 17:56:53 +00:00
full v1api.FullNode
}
// NewRetrievalProviderNode returns a new node adapter for a retrieval provider that talks to the
// Lotus Node
2021-04-05 17:56:53 +00:00
func NewRetrievalProviderNode(miner *storage.Miner, sealer sectorstorage.SectorManager, full v1api.FullNode) retrievalmarket.RetrievalProviderNode {
2020-03-03 22:19:22 +00:00
return &retrievalProviderNode{miner, sealer, full}
}
func (rpn *retrievalProviderNode) GetMinerWorkerAddress(ctx context.Context, miner address.Address, tok shared.TipSetToken) (address.Address, error) {
tsk, err := types.TipSetKeyFromBytes(tok)
if err != nil {
return address.Undef, err
}
mi, err := rpn.full.StateMinerInfo(ctx, miner, tsk)
return mi.Worker, err
}
2020-07-30 12:31:31 +00:00
func (rpn *retrievalProviderNode) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
log.Debugf("get sector %d, offset %d, length %d", sectorID, offset, length)
2020-07-30 12:31:31 +00:00
si, err := rpn.miner.GetSectorInfo(sectorID)
if err != nil {
return nil, err
}
2020-03-17 20:19:52 +00:00
mid, err := address.IDFromAddress(rpn.miner.Address())
if err != nil {
2020-03-22 21:39:06 +00:00
return nil, err
2020-03-17 20:19:52 +00:00
}
2020-11-05 06:44:46 +00:00
ref := specstorage.SectorRef{
ID: abi.SectorID{
Miner: abi.ActorID(mid),
Number: sectorID,
},
ProofType: si.SectorType,
2020-03-17 20:19:52 +00:00
}
// Set up a pipe so that data can be written from the unsealing process
// into the reader returned by this function
r, w := io.Pipe()
go func() {
var commD cid.Cid
if si.CommD != nil {
commD = *si.CommD
}
2021-03-22 13:39:03 +00:00
// Read the piece into the pipe's writer, unsealing the piece if necessary
log.Debugf("read piece in sector %d, offset %d, length %d from miner %d", sectorID, offset, length, mid)
2020-11-05 06:44:46 +00:00
err := rpn.sealer.ReadPiece(ctx, w, ref, storiface.UnpaddedByteIndex(offset), length, si.TicketValue, commD)
if err != nil {
log.Errorf("failed to unseal piece from sector %d: %s", sectorID, err)
}
// Close the reader with any error that was returned while reading the piece
_ = w.CloseWithError(err)
}()
return r, nil
}
func (rpn *retrievalProviderNode) SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *paych.SignedVoucher, proof []byte, expectedAmount abi.TokenAmount, tok shared.TipSetToken) (abi.TokenAmount, error) {
// TODO: respect the provided TipSetToken (a serialized TipSetKey) when
// querying the chain
added, err := rpn.full.PaychVoucherAdd(ctx, paymentChannel, voucher, proof, expectedAmount)
return added, err
}
func (rpn *retrievalProviderNode) GetChainHead(ctx context.Context) (shared.TipSetToken, abi.ChainEpoch, error) {
head, err := rpn.full.ChainHead(ctx)
if err != nil {
return nil, 0, err
}
return head.Key().Bytes(), head.Height(), nil
}