This commit is contained in:
Ingar Shu 2020-10-15 09:24:48 -07:00
parent 01de4cb2ec
commit 0b7dc6971d
No known key found for this signature in database
GPG Key ID: BE3D9CE79F22E769
4 changed files with 102 additions and 54 deletions

View File

@ -6,35 +6,57 @@ import (
"encoding/json" "encoding/json"
"os/exec" "os/exec"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
) )
func CliDealFilter(cmd string) dtypes.DealFilter { func CliStorageDealFilter(cmd string) dtypes.StorageDealFilter {
// TODO: run some checks on the cmd string
return func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) { return func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) {
j, err := json.MarshalIndent(deal, "", " ") d := struct {
if err != nil { storagemarket.MinerDeal
return false, "", err DealType string
}{
MinerDeal: deal,
DealType: "storage",
} }
return runDealFilter(ctx, cmd, d)
var out bytes.Buffer }
}
c := exec.Command("sh", "-c", cmd)
c.Stdin = bytes.NewReader(j) func CliRetrievalDealFilter(cmd string) dtypes.RetrievalDealFilter {
c.Stdout = &out return func(ctx context.Context, deal retrievalmarket.ProviderDealState) (bool, string, error) {
c.Stderr = &out d := struct {
retrievalmarket.ProviderDealState
switch err := c.Run().(type) { DealType string
case nil: }{
return true, "", nil ProviderDealState: deal,
case *exec.ExitError: DealType: "retrieval",
return false, out.String(), nil }
default: return runDealFilter(ctx, cmd, d)
return false, "filter cmd run error", err }
} }
func runDealFilter(ctx context.Context, cmd string, deal interface{}) (bool, string, error) {
j, err := json.MarshalIndent(deal, "", " ")
if err != nil {
return false, "", err
}
var out bytes.Buffer
c := exec.Command("sh", "-c", cmd)
c.Stdin = bytes.NewReader(j)
c.Stdout = &out
c.Stderr = &out
switch err := c.Run().(type) {
case nil:
return true, "", nil
case *exec.ExitError:
return false, out.String(), nil
default:
return false, "filter cmd run error", err
} }
} }

View File

@ -354,7 +354,8 @@ func Online() Option {
Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer), Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer),
Override(new(dtypes.ProviderPieceStore), modules.NewProviderPieceStore), Override(new(dtypes.ProviderPieceStore), modules.NewProviderPieceStore),
Override(new(*storedask.StoredAsk), modules.NewStorageAsk), Override(new(*storedask.StoredAsk), modules.NewStorageAsk),
Override(new(dtypes.DealFilter), modules.BasicDealFilter(nil)), Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(nil)),
Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(nil)),
Override(new(modules.ProviderDealFunds), modules.NewProviderDealFunds), Override(new(modules.ProviderDealFunds), modules.NewProviderDealFunds),
Override(new(storagemarket.StorageProvider), modules.StorageProvider), Override(new(storagemarket.StorageProvider), modules.StorageProvider),
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(nil)), Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(nil)),
@ -484,7 +485,8 @@ func ConfigStorageMiner(c interface{}) Option {
ConfigCommon(&cfg.Common), ConfigCommon(&cfg.Common),
If(cfg.Dealmaking.Filter != "", If(cfg.Dealmaking.Filter != "",
Override(new(dtypes.DealFilter), modules.BasicDealFilter(dealfilter.CliDealFilter(cfg.Dealmaking.Filter))), Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(dealfilter.CliStorageDealFilter(cfg.Dealmaking.Filter))),
Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(dealfilter.CliRetrievalDealFilter(cfg.Dealmaking.Filter))),
), ),
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(&cfg.Fees)), Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(&cfg.Fees)),

View File

@ -7,6 +7,7 @@ import (
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
@ -71,4 +72,5 @@ type SetExpectedSealDurationFunc func(time.Duration) error
// too determine how long sealing is expected to take // too determine how long sealing is expected to take
type GetExpectedSealDurationFunc func() (time.Duration, error) type GetExpectedSealDurationFunc func() (time.Duration, error)
type DealFilter func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) type StorageDealFilter func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error)
type RetrievalDealFilter func(ctx context.Context, deal retrievalmarket.ProviderDealState) (bool, string, error)

View File

@ -412,16 +412,16 @@ func NewProviderDealFunds(ds dtypes.MetadataDS) (ProviderDealFunds, error) {
return funds.NewDealFunds(ds, datastore.NewKey("/marketfunds/provider")) return funds.NewDealFunds(ds, datastore.NewKey("/marketfunds/provider"))
} }
func BasicDealFilter(user dtypes.DealFilter) func(onlineOk dtypes.ConsiderOnlineStorageDealsConfigFunc, func BasicDealFilter(user dtypes.StorageDealFilter) func(onlineOk dtypes.ConsiderOnlineStorageDealsConfigFunc,
offlineOk dtypes.ConsiderOfflineStorageDealsConfigFunc, offlineOk dtypes.ConsiderOfflineStorageDealsConfigFunc,
blocklistFunc dtypes.StorageDealPieceCidBlocklistConfigFunc, blocklistFunc dtypes.StorageDealPieceCidBlocklistConfigFunc,
expectedSealTimeFunc dtypes.GetExpectedSealDurationFunc, expectedSealTimeFunc dtypes.GetExpectedSealDurationFunc,
spn storagemarket.StorageProviderNode) dtypes.DealFilter { spn storagemarket.StorageProviderNode) dtypes.StorageDealFilter {
return func(onlineOk dtypes.ConsiderOnlineStorageDealsConfigFunc, return func(onlineOk dtypes.ConsiderOnlineStorageDealsConfigFunc,
offlineOk dtypes.ConsiderOfflineStorageDealsConfigFunc, offlineOk dtypes.ConsiderOfflineStorageDealsConfigFunc,
blocklistFunc dtypes.StorageDealPieceCidBlocklistConfigFunc, blocklistFunc dtypes.StorageDealPieceCidBlocklistConfigFunc,
expectedSealTimeFunc dtypes.GetExpectedSealDurationFunc, expectedSealTimeFunc dtypes.GetExpectedSealDurationFunc,
spn storagemarket.StorageProviderNode) dtypes.DealFilter { spn storagemarket.StorageProviderNode) dtypes.StorageDealFilter {
return func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) { return func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) {
b, err := onlineOk() b, err := onlineOk()
@ -497,7 +497,7 @@ func StorageProvider(minerAddress dtypes.MinerAddress,
pieceStore dtypes.ProviderPieceStore, pieceStore dtypes.ProviderPieceStore,
dataTransfer dtypes.ProviderDataTransfer, dataTransfer dtypes.ProviderDataTransfer,
spn storagemarket.StorageProviderNode, spn storagemarket.StorageProviderNode,
df dtypes.DealFilter, df dtypes.StorageDealFilter,
funds ProviderDealFunds, funds ProviderDealFunds,
) (storagemarket.StorageProvider, error) { ) (storagemarket.StorageProvider, error) {
net := smnet.NewFromLibp2pHost(h) net := smnet.NewFromLibp2pHost(h)
@ -511,8 +511,52 @@ func StorageProvider(minerAddress dtypes.MinerAddress,
return storageimpl.NewProvider(net, namespace.Wrap(ds, datastore.NewKey("/deals/provider")), store, mds, pieceStore, dataTransfer, spn, address.Address(minerAddress), ffiConfig.SealProofType, storedAsk, funds, opt) return storageimpl.NewProvider(net, namespace.Wrap(ds, datastore.NewKey("/deals/provider")), store, mds, pieceStore, dataTransfer, spn, address.Address(minerAddress), ffiConfig.SealProofType, storedAsk, funds, opt)
} }
func RetrievalDealFilter(userFilter dtypes.RetrievalDealFilter) func(onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc,
offlineOk dtypes.ConsiderOfflineRetrievalDealsConfigFunc) dtypes.RetrievalDealFilter {
return func(onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc,
offlineOk dtypes.ConsiderOfflineRetrievalDealsConfigFunc) dtypes.RetrievalDealFilter {
return func(ctx context.Context, state retrievalmarket.ProviderDealState) (bool, string, error) {
b, err := onlineOk()
if err != nil {
return false, "miner error", err
}
if !b {
log.Warn("online retrieval deal consideration disabled; rejecting retrieval deal proposal from client")
return false, "miner is not accepting online retrieval deals", nil
}
b, err = offlineOk()
if err != nil {
return false, "miner error", err
}
if !b {
log.Info("offline retrieval has not been implemented yet")
}
if userFilter != nil {
return userFilter(ctx, state)
}
return true, "", nil
}
}
}
// RetrievalProvider creates a new retrieval provider attached to the provider blockstore // RetrievalProvider creates a new retrieval provider attached to the provider blockstore
func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sectorstorage.SectorManager, full lapi.FullNode, ds dtypes.MetadataDS, pieceStore dtypes.ProviderPieceStore, mds dtypes.StagingMultiDstore, dt dtypes.ProviderDataTransfer, onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc, offlineOk dtypes.ConsiderOfflineRetrievalDealsConfigFunc) (retrievalmarket.RetrievalProvider, error) { func RetrievalProvider(h host.Host,
miner *storage.Miner,
sealer sectorstorage.SectorManager,
full lapi.FullNode,
ds dtypes.MetadataDS,
pieceStore dtypes.ProviderPieceStore,
mds dtypes.StagingMultiDstore,
dt dtypes.ProviderDataTransfer,
onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc,
offlineOk dtypes.ConsiderOfflineRetrievalDealsConfigFunc,
userFilter dtypes.RetrievalDealFilter,
) (retrievalmarket.RetrievalProvider, error) {
adapter := retrievaladapter.NewRetrievalProviderNode(miner, sealer, full) adapter := retrievaladapter.NewRetrievalProviderNode(miner, sealer, full)
maddr, err := minerAddrFromDS(ds) maddr, err := minerAddrFromDS(ds)
@ -521,29 +565,7 @@ func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sectorstorage.S
} }
netwk := rmnet.NewFromLibp2pHost(h) netwk := rmnet.NewFromLibp2pHost(h)
opt := retrievalimpl.DealDeciderOpt(retrievalimpl.DealDecider(userFilter))
opt := retrievalimpl.DealDeciderOpt(func(ctx context.Context, state retrievalmarket.ProviderDealState) (bool, string, error) {
b, err := onlineOk()
if err != nil {
return false, "miner error", err
}
if !b {
log.Warn("online retrieval deal consideration disabled; rejecting retrieval deal proposal from client")
return false, "miner is not accepting online retrieval deals", nil
}
b, err = offlineOk()
if err != nil {
return false, "miner error", err
}
if !b {
log.Info("offline retrieval has not been implemented yet")
}
return true, "", nil
})
return retrievalimpl.NewProvider(maddr, adapter, netwk, pieceStore, mds, dt, namespace.Wrap(ds, datastore.NewKey("/retrievals/provider")), opt) return retrievalimpl.NewProvider(maddr, adapter, netwk, pieceStore, mds, dt, namespace.Wrap(ds, datastore.NewKey("/retrievals/provider")), opt)
} }