lotus/storagemarketadapter/provider_adapter.go

197 lines
5.8 KiB
Go
Raw Normal View History

package storagemarketadapter
// this file implements storagemarket.StorageProviderNode
import (
"bytes"
"context"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
unixfile "github.com/ipfs/go-unixfs/file"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/padreader"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/storage/sectorblocks"
"github.com/filecoin-project/lotus/storagemarket"
)
var log = logging.Logger("provideradapter")
type ProviderNodeAdapter struct {
api.FullNode
// this goes away with the data transfer module
dag dtypes.StagingDAG
secb *sectorblocks.SectorBlocks
}
func NewProviderNodeAdapter(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode {
return &ProviderNodeAdapter{
FullNode: full,
dag: dag,
secb: secb,
}
}
func (n *ProviderNodeAdapter) PublishDeals(ctx context.Context, deal storagemarket.MinerDeal) (storagemarket.DealID, cid.Cid, error) {
log.Info("publishing deal")
worker, err := n.StateMinerWorker(ctx, deal.Proposal.Provider, nil)
if err != nil {
return 0, cid.Undef, err
}
params, err := actors.SerializeParams(&actors.PublishStorageDealsParams{
Deals: []actors.StorageDealProposal{deal.Proposal},
})
if err != nil {
return 0, cid.Undef, xerrors.Errorf("serializing PublishStorageDeals params failed: ", err)
}
// TODO: We may want this to happen after fetching data
smsg, err := n.MpoolPushMessage(ctx, &types.Message{
To: actors.StorageMarketAddress,
From: worker,
Value: types.NewInt(0),
GasPrice: types.NewInt(0),
GasLimit: types.NewInt(1000000),
Method: actors.SMAMethods.PublishStorageDeals,
Params: params,
})
if err != nil {
return 0, cid.Undef, err
}
r, err := n.StateWaitMsg(ctx, smsg.Cid())
if err != nil {
return 0, cid.Undef, err
}
if r.Receipt.ExitCode != 0 {
return 0, cid.Undef, xerrors.Errorf("publishing deal failed: exit %d", r.Receipt.ExitCode)
}
var resp actors.PublishStorageDealResponse
if err := resp.UnmarshalCBOR(bytes.NewReader(r.Receipt.Return)); err != nil {
return 0, cid.Undef, err
}
if len(resp.DealIDs) != 1 {
return 0, cid.Undef, xerrors.Errorf("got unexpected number of DealIDs from")
}
return storagemarket.DealID(resp.DealIDs[0]), smsg.Cid(), nil
}
func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagemarket.MinerDeal, piecePath string) (uint64, error) {
root, err := n.dag.Get(ctx, deal.Ref)
if err != nil {
return 0, xerrors.Errorf("failed to get file root for deal: %s", err)
}
// TODO: abstract this away into ReadSizeCloser + implement different modes
node, err := unixfile.NewUnixfsFile(ctx, n.dag, root)
if err != nil {
return 0, xerrors.Errorf("cannot open unixfs file: %s", err)
}
uf, ok := node.(sectorblocks.UnixfsReader)
if !ok {
// we probably got directory, unsupported for now
return 0, xerrors.Errorf("unsupported unixfs file type")
}
// TODO: uf.Size() is user input, not trusted
// This won't be useful / here after we migrate to putting CARs into sectors
size, err := uf.Size()
if err != nil {
return 0, xerrors.Errorf("getting unixfs file size: %w", err)
}
if padreader.PaddedSize(uint64(size)) != deal.Proposal.PieceSize {
return 0, xerrors.Errorf("deal.Proposal.PieceSize didn't match padded unixfs file size")
}
sectorID, err := n.secb.AddUnixfsPiece(ctx, uf, deal.DealID)
if err != nil {
return 0, xerrors.Errorf("AddPiece failed: %s", err)
}
log.Warnf("New Sector: %d (deal %d)", sectorID, deal.DealID)
return sectorID, nil
}
func (n *ProviderNodeAdapter) ListProviderDeals(ctx context.Context, addr address.Address) ([]actors.OnChainDeal, error) {
allDeals, err := n.StateMarketDeals(ctx, nil)
if err != nil {
return nil, err
}
var out []actors.OnChainDeal
for _, deal := range allDeals {
if deal.Provider == addr {
out = append(out, deal)
}
}
return out, nil
}
func (n *ProviderNodeAdapter) GetMinerWorker(ctx context.Context, miner address.Address) (address.Address, error) {
return n.StateMinerWorker(ctx, miner, nil)
}
func (n *ProviderNodeAdapter) SignBytes(ctx context.Context, signer address.Address, b []byte) (*types.Signature, error) {
return n.WalletSign(ctx, signer, b)
}
func (n *ProviderNodeAdapter) EnsureFunds(ctx context.Context, addr address.Address, amt storagemarket.TokenAmount) error {
return n.MarketEnsureAvailable(ctx, addr, types.BigInt(amt))
}
func (n *ProviderNodeAdapter) MostRecentStateId(ctx context.Context) (storagemarket.StateKey, error) {
return n.ChainHead(ctx)
}
// Adds funds with the StorageMinerActor for a storage participant. Used by both providers and clients.
func (n *ProviderNodeAdapter) AddFunds(ctx context.Context, addr address.Address, amount storagemarket.TokenAmount) error {
// (Provider Node API)
smsg, err := n.MpoolPushMessage(ctx, &types.Message{
To: actors.StorageMarketAddress,
From: addr,
Value: types.BigInt(amount),
GasPrice: types.NewInt(0),
GasLimit: types.NewInt(1000000),
Method: actors.SMAMethods.AddBalance,
})
if err != nil {
return err
}
r, err := n.StateWaitMsg(ctx, smsg.Cid())
if err != nil {
return err
}
if r.Receipt.ExitCode != 0 {
return xerrors.Errorf("adding funds to storage miner market actor failed: exit %d", r.Receipt.ExitCode)
}
return nil
}
func (n *ProviderNodeAdapter) GetBalance(ctx context.Context, addr address.Address) (storagemarket.Balance, error) {
bal, err := n.StateMarketBalance(ctx, addr, nil)
if err != nil {
return storagemarket.Balance{}, err
}
return bal, nil
}
var _ storagemarket.StorageProviderNode = &ProviderNodeAdapter{}