implement storage client and provider node adapters

This commit is contained in:
laser 2020-03-18 11:37:32 -07:00
parent 6622576cf8
commit a8dd6a831c
2 changed files with 43 additions and 29 deletions

View File

@ -6,6 +6,10 @@ import (
"bytes" "bytes"
"context" "context"
"github.com/filecoin-project/go-address"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin"
samarket "github.com/filecoin-project/specs-actors/actors/builtin/market" samarket "github.com/filecoin-project/specs-actors/actors/builtin/market"
@ -13,10 +17,6 @@ import (
"github.com/filecoin-project/specs-actors/actors/crypto" "github.com/filecoin-project/specs-actors/actors/crypto"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/market" "github.com/filecoin-project/lotus/chain/market"
@ -115,10 +115,6 @@ func (n *ClientNodeAdapter) ListClientDeals(ctx context.Context, addr address.Ad
return out, nil return out, nil
} }
func (n *ClientNodeAdapter) MostRecentStateId(ctx context.Context) (storagemarket.StateKey, error) {
return n.ChainHead(ctx)
}
// Adds funds with the StorageMinerActor for a storage participant. Used by both providers and clients. // Adds funds with the StorageMinerActor for a storage participant. Used by both providers and clients.
func (n *ClientNodeAdapter) AddFunds(ctx context.Context, addr address.Address, amount abi.TokenAmount) error { func (n *ClientNodeAdapter) AddFunds(ctx context.Context, addr address.Address, amount abi.TokenAmount) error {
// (Provider Node API) // (Provider Node API)
@ -161,7 +157,7 @@ func (n *ClientNodeAdapter) GetBalance(ctx context.Context, addr address.Address
// ValidatePublishedDeal validates that the provided deal has appeared on chain and references the same ClientDeal // ValidatePublishedDeal validates that the provided deal has appeared on chain and references the same ClientDeal
// returns the Deal id if there is no error // returns the Deal id if there is no error
func (c *ClientNodeAdapter) ValidatePublishedDeal(ctx context.Context, deal storagemarket.ClientDeal) (uint64, error) { func (c *ClientNodeAdapter) ValidatePublishedDeal(ctx context.Context, deal storagemarket.ClientDeal) (abi.DealID, error) {
log.Infow("DEAL ACCEPTED!") log.Infow("DEAL ACCEPTED!")
pubmsg, err := c.cs.GetMessage(*deal.PublishMessage) pubmsg, err := c.cs.GetMessage(*deal.PublishMessage)
@ -223,12 +219,12 @@ func (c *ClientNodeAdapter) ValidatePublishedDeal(ctx context.Context, deal stor
return 0, err return 0, err
} }
return uint64(res.IDs[dealIdx]), nil return res.IDs[dealIdx], nil
} }
func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealId uint64, cb storagemarket.DealSectorCommittedCallback) error { func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealId abi.DealID, cb storagemarket.DealSectorCommittedCallback) error {
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
sd, err := stmgr.GetStorageDeal(ctx, c.StateManager, abi.DealID(dealId), ts) sd, err := stmgr.GetStorageDeal(ctx, c.StateManager, dealId, ts)
if err != nil { if err != nil {
// TODO: This may be fine for some errors // TODO: This may be fine for some errors
@ -367,4 +363,13 @@ func (n *ClientNodeAdapter) ValidateAskSignature(ask *storagemarket.SignedStorag
} }
func (n *ClientNodeAdapter) GetChainHead(ctx context.Context) (shared.TipSetToken, abi.ChainEpoch, error) {
head, err := n.ChainHead(ctx)
if err != nil {
return nil, 0, err
}
return head.Key().Bytes(), head.Height(), nil
}
var _ storagemarket.StorageClientNode = &ClientNodeAdapter{} var _ storagemarket.StorageClientNode = &ClientNodeAdapter{}

View File

@ -7,6 +7,9 @@ import (
"context" "context"
"io" "io"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/builtin/market"
@ -16,8 +19,6 @@ import (
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/actors"
@ -51,7 +52,7 @@ func NewProviderNodeAdapter(dag dtypes.StagingDAG, secb *sectorblocks.SectorBloc
} }
} }
func (n *ProviderNodeAdapter) PublishDeals(ctx context.Context, deal storagemarket.MinerDeal) (storagemarket.DealID, cid.Cid, error) { func (n *ProviderNodeAdapter) PublishDeals(ctx context.Context, deal storagemarket.MinerDeal) (abi.DealID, cid.Cid, error) {
log.Info("publishing deal") log.Info("publishing deal")
worker, err := n.StateMinerWorker(ctx, deal.Proposal.Provider, types.EmptyTSK) worker, err := n.StateMinerWorker(ctx, deal.Proposal.Provider, types.EmptyTSK)
@ -96,11 +97,11 @@ func (n *ProviderNodeAdapter) PublishDeals(ctx context.Context, deal storagemark
} }
// TODO: bad types here // TODO: bad types here
return storagemarket.DealID(resp.IDs[0]), smsg.Cid(), nil return resp.IDs[0], smsg.Cid(), nil
} }
func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagemarket.MinerDeal, pieceSize abi.UnpaddedPieceSize, pieceData io.Reader) error { func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagemarket.MinerDeal, pieceSize abi.UnpaddedPieceSize, pieceData io.Reader) error {
_, err := n.secb.AddPiece(ctx, abi.UnpaddedPieceSize(pieceSize), pieceData, abi.DealID(deal.DealID)) _, err := n.secb.AddPiece(ctx, abi.UnpaddedPieceSize(pieceSize), pieceData, deal.DealID)
if err != nil { if err != nil {
return xerrors.Errorf("AddPiece failed: %s", err) return xerrors.Errorf("AddPiece failed: %s", err)
} }
@ -132,9 +133,13 @@ func (n *ProviderNodeAdapter) ListProviderDeals(ctx context.Context, addr addres
return out, nil return out, nil
} }
func (n *ProviderNodeAdapter) GetMinerWorker(ctx context.Context, miner address.Address) (address.Address, error) { func (n *ProviderNodeAdapter) GetMinerWorkerAddress(ctx context.Context, miner address.Address, tok shared.TipSetToken) (address.Address, error) {
addr, err := n.StateMinerWorker(ctx, miner, types.EmptyTSK) tsk, err := types.TipSetKeyFromBytes(tok)
return addr, err if err != nil {
return address.Undef, err
}
return n.StateMinerWorker(ctx, miner, tsk)
} }
func (n *ProviderNodeAdapter) SignBytes(ctx context.Context, signer address.Address, b []byte) (*crypto.Signature, error) { func (n *ProviderNodeAdapter) SignBytes(ctx context.Context, signer address.Address, b []byte) (*crypto.Signature, error) {
@ -149,10 +154,6 @@ func (n *ProviderNodeAdapter) EnsureFunds(ctx context.Context, addr, wallet addr
return n.MarketEnsureAvailable(ctx, addr, wallet, amt) return n.MarketEnsureAvailable(ctx, addr, wallet, amt)
} }
func (n *ProviderNodeAdapter) MostRecentStateId(ctx context.Context) (storagemarket.StateKey, error) {
return n.ChainHead(ctx)
}
// Adds funds with the StorageMinerActor for a storage participant. Used by both providers and clients. // Adds funds with the StorageMinerActor for a storage participant. Used by both providers and clients.
func (n *ProviderNodeAdapter) AddFunds(ctx context.Context, addr address.Address, amount abi.TokenAmount) error { func (n *ProviderNodeAdapter) AddFunds(ctx context.Context, addr address.Address, amount abi.TokenAmount) error {
// (Provider Node API) // (Provider Node API)
@ -189,9 +190,8 @@ func (n *ProviderNodeAdapter) GetBalance(ctx context.Context, addr address.Addre
return utils.ToSharedBalance(bal), nil return utils.ToSharedBalance(bal), nil
} }
func (n *ProviderNodeAdapter) LocatePieceForDealWithinSector(ctx context.Context, dealID uint64) (sectorID uint64, offset uint64, length uint64, err error) { func (n *ProviderNodeAdapter) LocatePieceForDealWithinSector(ctx context.Context, dealID abi.DealID) (sectorID uint64, offset uint64, length uint64, err error) {
refs, err := n.secb.GetRefs(dealID)
refs, err := n.secb.GetRefs(abi.DealID(dealID))
if err != nil { if err != nil {
return 0, 0, 0, err return 0, 0, 0, err
} }
@ -219,9 +219,9 @@ func (n *ProviderNodeAdapter) LocatePieceForDealWithinSector(ctx context.Context
return uint64(best.SectorID), best.Offset, uint64(best.Size), nil return uint64(best.SectorID), best.Offset, uint64(best.Size), nil
} }
func (n *ProviderNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealID uint64, cb storagemarket.DealSectorCommittedCallback) error { func (n *ProviderNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, cb storagemarket.DealSectorCommittedCallback) error {
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
sd, err := n.StateMarketStorageDeal(ctx, abi.DealID(dealID), ts.Key()) sd, err := n.StateMarketStorageDeal(ctx, dealID, ts.Key())
if err != nil { if err != nil {
// TODO: This may be fine for some errors // TODO: This may be fine for some errors
@ -322,4 +322,13 @@ func (n *ProviderNodeAdapter) OnDealSectorCommitted(ctx context.Context, provide
return nil return nil
} }
func (n *ProviderNodeAdapter) GetChainHead(ctx context.Context) (shared.TipSetToken, abi.ChainEpoch, error) {
head, err := n.ChainHead(ctx)
if err != nil {
return nil, 0, err
}
return head.Key().Bytes(), head.Height(), nil
}
var _ storagemarket.StorageProviderNode = &ProviderNodeAdapter{} var _ storagemarket.StorageProviderNode = &ProviderNodeAdapter{}