Support external deal decision logic

This commit is contained in:
Łukasz Magiera 2020-07-30 19:36:31 +02:00
parent 45f85a5f86
commit fc3c91b738
5 changed files with 129 additions and 58 deletions

40
markets/dealfilter/cli.go Normal file
View File

@ -0,0 +1,40 @@
package dealfilter
import (
"bytes"
"context"
"encoding/json"
"os/exec"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)
func CliDealFilter(cmd string) dtypes.DealFilter {
// TODO: run some checks on the cmd string
return func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) {
j, err := json.MarshalIndent(deal, "", " ")
if err != nil {
return false, "", err
}
var out bytes.Buffer
c := exec.Command("sh", "-c", cmd)
c.Stdin = bytes.NewReader(j)
c.Stdout = &out
c.Stderr = &out
switch err := c.Run().(type) {
case nil:
return true, "", nil
case *exec.ExitError:
return false, out.String(), nil
default:
return false, "filter cmd run error", err
}
}
}

View File

@ -3,6 +3,7 @@ package node
import ( import (
"context" "context"
"errors" "errors"
"github.com/filecoin-project/lotus/markets/dealfilter"
"time" "time"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
@ -309,6 +310,7 @@ func Online() Option {
Override(new(dtypes.ProviderRequestValidator), modules.NewProviderRequestValidator), Override(new(dtypes.ProviderRequestValidator), modules.NewProviderRequestValidator),
Override(new(dtypes.ProviderPieceStore), modules.NewProviderPieceStore), Override(new(dtypes.ProviderPieceStore), modules.NewProviderPieceStore),
Override(new(*storedask.StoredAsk), modules.NewStorageAsk), Override(new(*storedask.StoredAsk), modules.NewStorageAsk),
Override(new(dtypes.DealFilter), modules.BasicDealFilter(nil)),
Override(new(modules.ProviderDealFunds), modules.NewProviderDealFunds), Override(new(modules.ProviderDealFunds), modules.NewProviderDealFunds),
Override(new(storagemarket.StorageProvider), modules.StorageProvider), Override(new(storagemarket.StorageProvider), modules.StorageProvider),
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter), Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter),
@ -425,6 +427,10 @@ func ConfigStorageMiner(c interface{}) Option {
return Options( return Options(
ConfigCommon(&cfg.Common), ConfigCommon(&cfg.Common),
If(cfg.Dealmaking.Filter != "",
Override(new(dtypes.DealFilter), modules.BasicDealFilter(dealfilter.CliDealFilter(cfg.Dealmaking.Filter))),
),
Override(new(sectorstorage.SealerConfig), cfg.Storage), Override(new(sectorstorage.SealerConfig), cfg.Storage),
) )
} }

View File

@ -41,6 +41,8 @@ type DealmakingConfig struct {
ConsiderOfflineRetrievalDeals bool ConsiderOfflineRetrievalDeals bool
PieceCidBlocklist []cid.Cid PieceCidBlocklist []cid.Cid
ExpectedSealDuration Duration ExpectedSealDuration Duration
Filter string
} }
// API contains configs for API endpoint // API contains configs for API endpoint

View File

@ -1,10 +1,14 @@
package dtypes package dtypes
import ( import (
"github.com/filecoin-project/go-address" "context"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/ipfs/go-cid"
"time" "time"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/specs-actors/actors/abi"
) )
type MinerAddress address.Address type MinerAddress address.Address
@ -64,3 +68,5 @@ type SetExpectedSealDurationFunc func(time.Duration) error
// GetExpectedSealDurationFunc is a function which reads from miner // GetExpectedSealDurationFunc is a function which reads from miner
// too determine how long sealing is expected to take // too determine how long sealing is expected to take
type GetExpectedSealDurationFunc func() (time.Duration, error) type GetExpectedSealDurationFunc func() (time.Duration, error)
type DealFilter func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error)

View File

@ -364,27 +364,18 @@ func NewProviderDealFunds(ds dtypes.MetadataDS) (ProviderDealFunds, error) {
return funds.NewDealFunds(ds, datastore.NewKey("/marketfunds/provider")) return funds.NewDealFunds(ds, datastore.NewKey("/marketfunds/provider"))
} }
func StorageProvider(minerAddress dtypes.MinerAddress, func BasicDealFilter(user dtypes.DealFilter) func(onlineOk dtypes.ConsiderOnlineStorageDealsConfigFunc,
ffiConfig *ffiwrapper.Config,
storedAsk *storedask.StoredAsk,
h host.Host, ds dtypes.MetadataDS,
mds dtypes.StagingMultiDstore,
r repo.LockedRepo,
pieceStore dtypes.ProviderPieceStore,
dataTransfer dtypes.ProviderDataTransfer,
spn storagemarket.StorageProviderNode,
funds ProviderDealFunds,
onlineOk dtypes.ConsiderOnlineStorageDealsConfigFunc,
offlineOk dtypes.ConsiderOfflineStorageDealsConfigFunc, offlineOk dtypes.ConsiderOfflineStorageDealsConfigFunc,
blocklistFunc dtypes.StorageDealPieceCidBlocklistConfigFunc, blocklistFunc dtypes.StorageDealPieceCidBlocklistConfigFunc,
expectedSealTimeFunc dtypes.GetExpectedSealDurationFunc) (storagemarket.StorageProvider, error) { expectedSealTimeFunc dtypes.GetExpectedSealDurationFunc,
net := smnet.NewFromLibp2pHost(h) spn storagemarket.StorageProviderNode) dtypes.DealFilter {
store, err := piecefilestore.NewLocalFileStore(piecefilestore.OsPath(r.Path())) return func(onlineOk dtypes.ConsiderOnlineStorageDealsConfigFunc,
if err != nil { offlineOk dtypes.ConsiderOfflineStorageDealsConfigFunc,
return nil, err blocklistFunc dtypes.StorageDealPieceCidBlocklistConfigFunc,
} expectedSealTimeFunc dtypes.GetExpectedSealDurationFunc,
spn storagemarket.StorageProviderNode) dtypes.DealFilter {
opt := storageimpl.CustomDealDecisionLogic(func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) { return func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) {
b, err := onlineOk() b, err := onlineOk()
if err != nil { if err != nil {
return false, "miner error", err return false, "miner error", err
@ -433,8 +424,34 @@ func StorageProvider(minerAddress dtypes.MinerAddress,
return false, fmt.Sprintf("cannot seal a sector before %s", deal.Proposal.StartEpoch), nil return false, fmt.Sprintf("cannot seal a sector before %s", deal.Proposal.StartEpoch), nil
} }
if user != nil {
return user(ctx, deal)
}
return true, "", nil return true, "", nil
}) }
}
}
func StorageProvider(minerAddress dtypes.MinerAddress,
ffiConfig *ffiwrapper.Config,
storedAsk *storedask.StoredAsk,
h host.Host, ds dtypes.MetadataDS,
mds dtypes.StagingMultiDstore,
r repo.LockedRepo,
pieceStore dtypes.ProviderPieceStore,
dataTransfer dtypes.ProviderDataTransfer,
spn storagemarket.StorageProviderNode,
df dtypes.DealFilter,
funds ProviderDealFunds,
) (storagemarket.StorageProvider, error) {
net := smnet.NewFromLibp2pHost(h)
store, err := piecefilestore.NewLocalFileStore(piecefilestore.OsPath(r.Path()))
if err != nil {
return nil, err
}
opt := storageimpl.CustomDealDecisionLogic(storageimpl.DealDeciderFunc(df))
return storageimpl.NewProvider(net, namespace.Wrap(ds, datastore.NewKey("/deals/provider")), store, mds, pieceStore, dataTransfer, spn, address.Address(minerAddress), ffiConfig.SealProofType, storedAsk, funds, opt) return storageimpl.NewProvider(net, namespace.Wrap(ds, datastore.NewKey("/deals/provider")), store, mds, pieceStore, dataTransfer, spn, address.Address(minerAddress), ffiConfig.SealProofType, storedAsk, funds, opt)
} }