feat(datatransfer): request valdiators for storage
Add request validators that will work by default for storage deals
This commit is contained in:
parent
e1be257b28
commit
98cf0769ff
191
chain/deals/request_validation.go
Normal file
191
chain/deals/request_validation.go
Normal file
@ -0,0 +1,191 @@
|
||||
package deals
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"reflect"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
datastore "github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-datastore/namespace"
|
||||
"github.com/ipld/go-ipld-prime/traversal/selector"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"go.uber.org/fx"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/datatransfer"
|
||||
"github.com/filecoin-project/lotus/lib/statestore"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrWrongVoucherType means the voucher was not the correct type can validate against
|
||||
ErrWrongVoucherType = errors.New("cannot validate voucher type")
|
||||
|
||||
// ErrNoPushAccepted just means clients do not accept pushes for storage deals
|
||||
ErrNoPushAccepted = errors.New("Client should not receive data for a storage deal")
|
||||
|
||||
// ErrNoPullAccepted just means providers do not accept pulls for storage deals
|
||||
ErrNoPullAccepted = errors.New("Provider should not send data for a storage deal")
|
||||
|
||||
// ErrNoDeal means no active deal was found for this vouchers proposal cid
|
||||
ErrNoDeal = errors.New("No deal found for this proposal")
|
||||
|
||||
// ErrWrongPeer means that the other peer for this data transfer request does not match
|
||||
// the other peer for the deal
|
||||
ErrWrongPeer = errors.New("Data Transfer peer id and Deal peer id do not match")
|
||||
|
||||
// ErrWrongPiece means that the pieceref for this data transfer request does not match
|
||||
// the one specified in the deal
|
||||
ErrWrongPiece = errors.New("PieceRef for deal does not match piece ref for piece")
|
||||
|
||||
// ErrInacceptableDealState means the deal for this transfer is not in a deal state
|
||||
// where transfer can be performed
|
||||
ErrInacceptableDealState = errors.New("Deal is not a in a state where deals are accepted")
|
||||
|
||||
// AcceptableDealStates are the states in which it would make sense to actually start a data transfer
|
||||
AcceptableDealStates = []api.DealState{api.DealAccepted, api.DealUnknown}
|
||||
)
|
||||
|
||||
type StorageDataTransferVoucher struct {
|
||||
Proposal cid.Cid
|
||||
}
|
||||
|
||||
func (dv StorageDataTransferVoucher) ToBytes() []byte {
|
||||
return dv.Proposal.Bytes()
|
||||
}
|
||||
|
||||
func (dv StorageDataTransferVoucher) FromBytes(raw []byte) (datatransfer.Voucher, error) {
|
||||
c, err := cid.Cast(raw)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return StorageDataTransferVoucher{c}, nil
|
||||
}
|
||||
|
||||
func (dv StorageDataTransferVoucher) Identifier() string {
|
||||
return "StorageDataTransferVoucher"
|
||||
}
|
||||
|
||||
var _ datatransfer.RequestValidator = &ClientRequestValidator{}
|
||||
|
||||
type ClientRequestValidator struct {
|
||||
deals *statestore.StateStore
|
||||
}
|
||||
|
||||
func RegisterClientValidator(lc fx.Lifecycle, crv *ClientRequestValidator, dtm datatransfer.Manager) {
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
return dtm.RegisterVoucherType(reflect.TypeOf(StorageDataTransferVoucher{}), crv)
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func NewClientRequestValidator(ds dtypes.MetadataDS) *ClientRequestValidator {
|
||||
crv := &ClientRequestValidator{
|
||||
deals: statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client"))),
|
||||
}
|
||||
return crv
|
||||
}
|
||||
|
||||
func (c *ClientRequestValidator) ValidatePush(
|
||||
sender peer.ID,
|
||||
voucher datatransfer.Voucher,
|
||||
PieceRef cid.Cid,
|
||||
Selector selector.Selector) error {
|
||||
return ErrNoPushAccepted
|
||||
}
|
||||
|
||||
func (c *ClientRequestValidator) ValidatePull(
|
||||
receiver peer.ID,
|
||||
voucher datatransfer.Voucher,
|
||||
PieceRef cid.Cid,
|
||||
Selector selector.Selector) error {
|
||||
dealVoucher, ok := voucher.(StorageDataTransferVoucher)
|
||||
if !ok {
|
||||
return ErrWrongVoucherType
|
||||
}
|
||||
|
||||
var deal ClientDeal
|
||||
err := c.deals.Mutate(dealVoucher.Proposal, func(d *ClientDeal) error {
|
||||
deal = *d
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return ErrNoDeal
|
||||
}
|
||||
if deal.Miner != receiver {
|
||||
return ErrWrongPeer
|
||||
}
|
||||
if !bytes.Equal(deal.Proposal.PieceRef, PieceRef.Bytes()) {
|
||||
return ErrWrongPiece
|
||||
}
|
||||
for _, state := range AcceptableDealStates {
|
||||
if deal.State == state {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return ErrInacceptableDealState
|
||||
}
|
||||
|
||||
var _ datatransfer.RequestValidator = &ProviderRequestValidator{}
|
||||
|
||||
type ProviderRequestValidator struct {
|
||||
deals *statestore.StateStore
|
||||
}
|
||||
|
||||
func RegisterProviderValidator(lc fx.Lifecycle, mrv *ProviderRequestValidator, dtm datatransfer.Manager) {
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
return dtm.RegisterVoucherType(reflect.TypeOf(StorageDataTransferVoucher{}), mrv)
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func NewProviderRequestValidator(ds dtypes.MetadataDS) *ProviderRequestValidator {
|
||||
return &ProviderRequestValidator{
|
||||
deals: statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client"))),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *ProviderRequestValidator) ValidatePush(
|
||||
sender peer.ID,
|
||||
voucher datatransfer.Voucher,
|
||||
PieceRef cid.Cid,
|
||||
Selector selector.Selector) error {
|
||||
dealVoucher, ok := voucher.(StorageDataTransferVoucher)
|
||||
if !ok {
|
||||
return ErrWrongVoucherType
|
||||
}
|
||||
|
||||
var deal MinerDeal
|
||||
err := m.deals.Mutate(dealVoucher.Proposal, func(d *MinerDeal) error {
|
||||
deal = *d
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return ErrNoDeal
|
||||
}
|
||||
if deal.Client != sender {
|
||||
return ErrWrongPeer
|
||||
}
|
||||
|
||||
if !bytes.Equal(deal.Proposal.PieceRef, PieceRef.Bytes()) {
|
||||
return ErrWrongPiece
|
||||
}
|
||||
for _, state := range AcceptableDealStates {
|
||||
if deal.State == state {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return ErrInacceptableDealState
|
||||
}
|
||||
|
||||
func (m *ProviderRequestValidator) ValidatePull(
|
||||
receiver peer.ID,
|
||||
voucher datatransfer.Voucher,
|
||||
PieceRef cid.Cid,
|
||||
Selector selector.Selector) error {
|
||||
return ErrNoPullAccepted
|
||||
}
|
284
chain/deals/request_validation_test.go
Normal file
284
chain/deals/request_validation_test.go
Normal file
@ -0,0 +1,284 @@
|
||||
package deals_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-datastore/namespace"
|
||||
dss "github.com/ipfs/go-datastore/sync"
|
||||
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/actors"
|
||||
"github.com/filecoin-project/lotus/chain/address"
|
||||
"github.com/filecoin-project/lotus/chain/deals"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/datatransfer"
|
||||
"github.com/filecoin-project/lotus/lib/cborrpc"
|
||||
"github.com/filecoin-project/lotus/lib/statestore"
|
||||
)
|
||||
|
||||
var blockGenerator = blocksutil.NewBlockGenerator()
|
||||
|
||||
type wrongDTType struct {
|
||||
}
|
||||
|
||||
func (wrongDTType) ToBytes() []byte {
|
||||
return []byte{}
|
||||
}
|
||||
|
||||
func (wrongDTType) FromBytes([]byte) (datatransfer.Voucher, error) {
|
||||
return nil, fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
func (wrongDTType) Identifier() string {
|
||||
return "WrongDTTYPE"
|
||||
}
|
||||
|
||||
func uniqueStorageDealProposal() (actors.StorageDealProposal, error) {
|
||||
clientAddr, err := address.NewIDAddress(uint64(rand.Int()))
|
||||
if err != nil {
|
||||
return actors.StorageDealProposal{}, err
|
||||
}
|
||||
providerAddr, err := address.NewIDAddress(uint64(rand.Int()))
|
||||
if err != nil {
|
||||
return actors.StorageDealProposal{}, err
|
||||
}
|
||||
return actors.StorageDealProposal{
|
||||
PieceRef: blockGenerator.Next().Cid().Bytes(),
|
||||
Client: clientAddr,
|
||||
Provider: providerAddr,
|
||||
ProposerSignature: &types.Signature{
|
||||
Data: []byte("foo bar cat dog"),
|
||||
Type: types.KTBLS,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newClientDeal(minerId peer.ID, state api.DealState) (deals.ClientDeal, error) {
|
||||
newProposal, err := uniqueStorageDealProposal()
|
||||
if err != nil {
|
||||
return deals.ClientDeal{}, err
|
||||
}
|
||||
proposalNd, err := cborrpc.AsIpld(&newProposal)
|
||||
if err != nil {
|
||||
return deals.ClientDeal{}, err
|
||||
}
|
||||
return deals.ClientDeal{
|
||||
Proposal: newProposal,
|
||||
ProposalCid: proposalNd.Cid(),
|
||||
Miner: minerId,
|
||||
State: state,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newMinerDeal(clientID peer.ID, state api.DealState) (deals.MinerDeal, error) {
|
||||
newProposal, err := uniqueStorageDealProposal()
|
||||
if err != nil {
|
||||
return deals.MinerDeal{}, err
|
||||
}
|
||||
proposalNd, err := cborrpc.AsIpld(&newProposal)
|
||||
if err != nil {
|
||||
return deals.MinerDeal{}, err
|
||||
}
|
||||
ref, err := cid.Cast(newProposal.PieceRef)
|
||||
if err != nil {
|
||||
return deals.MinerDeal{}, err
|
||||
}
|
||||
return deals.MinerDeal{
|
||||
Proposal: newProposal,
|
||||
ProposalCid: proposalNd.Cid(),
|
||||
Client: clientID,
|
||||
State: state,
|
||||
Ref: ref,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func TestClientRequestValidation(t *testing.T) {
|
||||
ds := dss.MutexWrap(datastore.NewMapDatastore())
|
||||
state := statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client")))
|
||||
|
||||
crv := deals.NewClientRequestValidator(ds)
|
||||
minerId := peer.ID("fakepeerid")
|
||||
block := blockGenerator.Next()
|
||||
t.Run("ValidatePush fails", func(t *testing.T) {
|
||||
if crv.ValidatePush(minerId, wrongDTType{}, block.Cid(), nil) != deals.ErrNoPushAccepted {
|
||||
t.Fatal("Push should fail for the client request validator for storage deals")
|
||||
}
|
||||
})
|
||||
t.Run("ValidatePull fails deal not found", func(t *testing.T) {
|
||||
proposal, err := uniqueStorageDealProposal()
|
||||
if err != nil {
|
||||
t.Fatal("error creating proposal")
|
||||
}
|
||||
proposalNd, err := cborrpc.AsIpld(&proposal)
|
||||
if err != nil {
|
||||
t.Fatal("error serializing proposal")
|
||||
}
|
||||
pieceRef, err := cid.Cast(proposal.PieceRef)
|
||||
if err != nil {
|
||||
t.Fatal("unable to construct piece cid")
|
||||
}
|
||||
if crv.ValidatePull(minerId, deals.StorageDataTransferVoucher{proposalNd.Cid()}, pieceRef, nil) != deals.ErrNoDeal {
|
||||
t.Fatal("Pull should fail if there is no deal stored")
|
||||
}
|
||||
})
|
||||
t.Run("ValidatePull fails wrong client", func(t *testing.T) {
|
||||
otherMiner := peer.ID("otherminer")
|
||||
clientDeal, err := newClientDeal(otherMiner, api.DealAccepted)
|
||||
if err != nil {
|
||||
t.Fatal("error creating client deal")
|
||||
}
|
||||
if err := state.Begin(clientDeal.ProposalCid, &clientDeal); err != nil {
|
||||
t.Fatal("deal tracking failed")
|
||||
}
|
||||
pieceRef, err := cid.Cast(clientDeal.Proposal.PieceRef)
|
||||
if err != nil {
|
||||
t.Fatal("unable to construct piece cid")
|
||||
}
|
||||
if crv.ValidatePull(minerId, deals.StorageDataTransferVoucher{clientDeal.ProposalCid}, pieceRef, nil) != deals.ErrWrongPeer {
|
||||
t.Fatal("Pull should fail if miner address is incorrect")
|
||||
}
|
||||
})
|
||||
t.Run("ValidatePull fails wrong piece ref", func(t *testing.T) {
|
||||
clientDeal, err := newClientDeal(minerId, api.DealAccepted)
|
||||
if err != nil {
|
||||
t.Fatal("error creating client deal")
|
||||
}
|
||||
if err := state.Begin(clientDeal.ProposalCid, &clientDeal); err != nil {
|
||||
t.Fatal("deal tracking failed")
|
||||
}
|
||||
if crv.ValidatePull(minerId, deals.StorageDataTransferVoucher{clientDeal.ProposalCid}, blockGenerator.Next().Cid(), nil) != deals.ErrWrongPiece {
|
||||
t.Fatal("Pull should fail if piece ref is incorrect")
|
||||
}
|
||||
})
|
||||
t.Run("ValidatePull fails wrong deal state", func(t *testing.T) {
|
||||
clientDeal, err := newClientDeal(minerId, api.DealComplete)
|
||||
if err != nil {
|
||||
t.Fatal("error creating client deal")
|
||||
}
|
||||
if err := state.Begin(clientDeal.ProposalCid, &clientDeal); err != nil {
|
||||
t.Fatal("deal tracking failed")
|
||||
}
|
||||
pieceRef, err := cid.Cast(clientDeal.Proposal.PieceRef)
|
||||
if err != nil {
|
||||
t.Fatal("unable to construct piece cid")
|
||||
}
|
||||
if crv.ValidatePull(minerId, deals.StorageDataTransferVoucher{clientDeal.ProposalCid}, pieceRef, nil) != deals.ErrInacceptableDealState {
|
||||
t.Fatal("Pull should fail if deal is in a state that cannot be data transferred")
|
||||
}
|
||||
})
|
||||
t.Run("ValidatePull succeeds", func(t *testing.T) {
|
||||
clientDeal, err := newClientDeal(minerId, api.DealAccepted)
|
||||
if err != nil {
|
||||
t.Fatal("error creating client deal")
|
||||
}
|
||||
if err := state.Begin(clientDeal.ProposalCid, &clientDeal); err != nil {
|
||||
t.Fatal("deal tracking failed")
|
||||
}
|
||||
pieceRef, err := cid.Cast(clientDeal.Proposal.PieceRef)
|
||||
if err != nil {
|
||||
t.Fatal("unable to construct piece cid")
|
||||
}
|
||||
if crv.ValidatePull(minerId, deals.StorageDataTransferVoucher{clientDeal.ProposalCid}, pieceRef, nil) != nil {
|
||||
t.Fatal("Pull should should succeed when all parameters are correct")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestProviderRequestValidation(t *testing.T) {
|
||||
ds := dss.MutexWrap(datastore.NewMapDatastore())
|
||||
state := statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client")))
|
||||
|
||||
mrv := deals.NewProviderRequestValidator(ds)
|
||||
clientID := peer.ID("fakepeerid")
|
||||
block := blockGenerator.Next()
|
||||
t.Run("ValidatePull fails", func(t *testing.T) {
|
||||
if mrv.ValidatePull(clientID, wrongDTType{}, block.Cid(), nil) != deals.ErrNoPullAccepted {
|
||||
t.Fatal("Pull should fail for the provider request validator for storage deals")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ValidatePush fails deal not found", func(t *testing.T) {
|
||||
proposal, err := uniqueStorageDealProposal()
|
||||
if err != nil {
|
||||
t.Fatal("error creating proposal")
|
||||
}
|
||||
proposalNd, err := cborrpc.AsIpld(&proposal)
|
||||
if err != nil {
|
||||
t.Fatal("error serializing proposal")
|
||||
}
|
||||
pieceRef, err := cid.Cast(proposal.PieceRef)
|
||||
if err != nil {
|
||||
t.Fatal("unable to construct piece cid")
|
||||
}
|
||||
if mrv.ValidatePush(clientID, deals.StorageDataTransferVoucher{proposalNd.Cid()}, pieceRef, nil) != deals.ErrNoDeal {
|
||||
t.Fatal("Push should fail if there is no deal stored")
|
||||
}
|
||||
})
|
||||
t.Run("ValidatePush fails wrong miner", func(t *testing.T) {
|
||||
otherClient := peer.ID("otherclient")
|
||||
minerDeal, err := newMinerDeal(otherClient, api.DealAccepted)
|
||||
if err != nil {
|
||||
t.Fatal("error creating client deal")
|
||||
}
|
||||
if err := state.Begin(minerDeal.ProposalCid, &minerDeal); err != nil {
|
||||
t.Fatal("deal tracking failed")
|
||||
}
|
||||
pieceRef, err := cid.Cast(minerDeal.Proposal.PieceRef)
|
||||
if err != nil {
|
||||
t.Fatal("unable to construct piece cid")
|
||||
}
|
||||
if mrv.ValidatePush(clientID, deals.StorageDataTransferVoucher{minerDeal.ProposalCid}, pieceRef, nil) != deals.ErrWrongPeer {
|
||||
t.Fatal("Push should fail if miner address is incorrect")
|
||||
}
|
||||
})
|
||||
t.Run("ValidatePush fails wrong piece ref", func(t *testing.T) {
|
||||
minerDeal, err := newMinerDeal(clientID, api.DealAccepted)
|
||||
if err != nil {
|
||||
t.Fatal("error creating client deal")
|
||||
}
|
||||
if err := state.Begin(minerDeal.ProposalCid, &minerDeal); err != nil {
|
||||
t.Fatal("deal tracking failed")
|
||||
}
|
||||
if mrv.ValidatePush(clientID, deals.StorageDataTransferVoucher{minerDeal.ProposalCid}, blockGenerator.Next().Cid(), nil) != deals.ErrWrongPiece {
|
||||
t.Fatal("Push should fail if piece ref is incorrect")
|
||||
}
|
||||
})
|
||||
t.Run("ValidatePush fails wrong deal state", func(t *testing.T) {
|
||||
minerDeal, err := newMinerDeal(clientID, api.DealComplete)
|
||||
if err != nil {
|
||||
t.Fatal("error creating client deal")
|
||||
}
|
||||
if err := state.Begin(minerDeal.ProposalCid, &minerDeal); err != nil {
|
||||
t.Fatal("deal tracking failed")
|
||||
}
|
||||
pieceRef, err := cid.Cast(minerDeal.Proposal.PieceRef)
|
||||
if err != nil {
|
||||
t.Fatal("unable to construct piece cid")
|
||||
}
|
||||
if mrv.ValidatePush(clientID, deals.StorageDataTransferVoucher{minerDeal.ProposalCid}, pieceRef, nil) != deals.ErrInacceptableDealState {
|
||||
t.Fatal("Push should fail if deal is in a state that cannot be data transferred")
|
||||
}
|
||||
})
|
||||
t.Run("ValidatePush succeeds", func(t *testing.T) {
|
||||
minerDeal, err := newMinerDeal(clientID, api.DealAccepted)
|
||||
if err != nil {
|
||||
t.Fatal("error creating client deal")
|
||||
}
|
||||
if err := state.Begin(minerDeal.ProposalCid, &minerDeal); err != nil {
|
||||
t.Fatal("deal tracking failed")
|
||||
}
|
||||
pieceRef, err := cid.Cast(minerDeal.Proposal.PieceRef)
|
||||
if err != nil {
|
||||
t.Fatal("unable to construct piece cid")
|
||||
}
|
||||
if mrv.ValidatePush(clientID, deals.StorageDataTransferVoucher{minerDeal.ProposalCid}, pieceRef, nil) != nil {
|
||||
t.Fatal("Push should should succeed when all parameters are correct")
|
||||
}
|
||||
})
|
||||
}
|
Loading…
Reference in New Issue
Block a user