From fc3c91b7381be95151eb049238dd8d5f24207329 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 30 Jul 2020 19:36:31 +0200 Subject: [PATCH] Support external deal decision logic --- markets/dealfilter/cli.go | 40 +++++++++++ node/builder.go | 6 ++ node/config/def.go | 2 + node/modules/dtypes/miner.go | 12 +++- node/modules/storageminer.go | 127 ++++++++++++++++++++--------------- 5 files changed, 129 insertions(+), 58 deletions(-) create mode 100644 markets/dealfilter/cli.go diff --git a/markets/dealfilter/cli.go b/markets/dealfilter/cli.go new file mode 100644 index 000000000..2cb9d6c4f --- /dev/null +++ b/markets/dealfilter/cli.go @@ -0,0 +1,40 @@ +package dealfilter + +import ( + "bytes" + "context" + "encoding/json" + "os/exec" + + "github.com/filecoin-project/go-fil-markets/storagemarket" + + "github.com/filecoin-project/lotus/node/modules/dtypes" +) + +func CliDealFilter(cmd string) dtypes.DealFilter { + // TODO: run some checks on the cmd string + + return func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) { + j, err := json.MarshalIndent(deal, "", " ") + if err != nil { + return false, "", err + } + + var out bytes.Buffer + + c := exec.Command("sh", "-c", cmd) + c.Stdin = bytes.NewReader(j) + c.Stdout = &out + c.Stderr = &out + + switch err := c.Run().(type) { + case nil: + return true, "", nil + case *exec.ExitError: + return false, out.String(), nil + default: + return false, "filter cmd run error", err + } + + } +} diff --git a/node/builder.go b/node/builder.go index b4f0fa518..38b9b933d 100644 --- a/node/builder.go +++ b/node/builder.go @@ -3,6 +3,7 @@ package node import ( "context" "errors" + "github.com/filecoin-project/lotus/markets/dealfilter" "time" logging "github.com/ipfs/go-log" @@ -309,6 +310,7 @@ func Online() Option { Override(new(dtypes.ProviderRequestValidator), modules.NewProviderRequestValidator), Override(new(dtypes.ProviderPieceStore), modules.NewProviderPieceStore), Override(new(*storedask.StoredAsk), modules.NewStorageAsk), + Override(new(dtypes.DealFilter), modules.BasicDealFilter(nil)), Override(new(modules.ProviderDealFunds), modules.NewProviderDealFunds), Override(new(storagemarket.StorageProvider), modules.StorageProvider), Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter), @@ -425,6 +427,10 @@ func ConfigStorageMiner(c interface{}) Option { return Options( ConfigCommon(&cfg.Common), + If(cfg.Dealmaking.Filter != "", + Override(new(dtypes.DealFilter), modules.BasicDealFilter(dealfilter.CliDealFilter(cfg.Dealmaking.Filter))), + ), + Override(new(sectorstorage.SealerConfig), cfg.Storage), ) } diff --git a/node/config/def.go b/node/config/def.go index 81ffdcd9d..b967ed6c3 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -41,6 +41,8 @@ type DealmakingConfig struct { ConsiderOfflineRetrievalDeals bool PieceCidBlocklist []cid.Cid ExpectedSealDuration Duration + + Filter string } // API contains configs for API endpoint diff --git a/node/modules/dtypes/miner.go b/node/modules/dtypes/miner.go index c653ec6eb..34911df5e 100644 --- a/node/modules/dtypes/miner.go +++ b/node/modules/dtypes/miner.go @@ -1,10 +1,14 @@ package dtypes import ( - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/specs-actors/actors/abi" - "github.com/ipfs/go-cid" + "context" "time" + + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/specs-actors/actors/abi" ) type MinerAddress address.Address @@ -64,3 +68,5 @@ type SetExpectedSealDurationFunc func(time.Duration) error // GetExpectedSealDurationFunc is a function which reads from miner // too determine how long sealing is expected to take type GetExpectedSealDurationFunc func() (time.Duration, error) + +type DealFilter func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 17cfed53f..699a4c81e 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -364,6 +364,75 @@ func NewProviderDealFunds(ds dtypes.MetadataDS) (ProviderDealFunds, error) { return funds.NewDealFunds(ds, datastore.NewKey("/marketfunds/provider")) } +func BasicDealFilter(user dtypes.DealFilter) func(onlineOk dtypes.ConsiderOnlineStorageDealsConfigFunc, + offlineOk dtypes.ConsiderOfflineStorageDealsConfigFunc, + blocklistFunc dtypes.StorageDealPieceCidBlocklistConfigFunc, + expectedSealTimeFunc dtypes.GetExpectedSealDurationFunc, + spn storagemarket.StorageProviderNode) dtypes.DealFilter { + return func(onlineOk dtypes.ConsiderOnlineStorageDealsConfigFunc, + offlineOk dtypes.ConsiderOfflineStorageDealsConfigFunc, + blocklistFunc dtypes.StorageDealPieceCidBlocklistConfigFunc, + expectedSealTimeFunc dtypes.GetExpectedSealDurationFunc, + spn storagemarket.StorageProviderNode) dtypes.DealFilter { + + return func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) { + b, err := onlineOk() + if err != nil { + return false, "miner error", err + } + + if deal.Ref != nil && deal.Ref.TransferType != storagemarket.TTManual && !b { + log.Warnf("online storage deal consideration disabled; rejecting storage deal proposal from client: %s", deal.Client.String()) + return false, "miner is not considering online storage deals", nil + } + + b, err = offlineOk() + if err != nil { + return false, "miner error", err + } + + if deal.Ref != nil && deal.Ref.TransferType == storagemarket.TTManual && !b { + log.Warnf("offline storage deal consideration disabled; rejecting storage deal proposal from client: %s", deal.Client.String()) + return false, "miner is not accepting offline storage deals", nil + } + + blocklist, err := blocklistFunc() + if err != nil { + return false, "miner error", err + } + + for idx := range blocklist { + if deal.Proposal.PieceCID.Equals(blocklist[idx]) { + log.Warnf("piece CID in proposal %s is blocklisted; rejecting storage deal proposal from client: %s", deal.Proposal.PieceCID, deal.Client.String()) + return false, fmt.Sprintf("miner has blocklisted piece CID %s", deal.Proposal.PieceCID), nil + } + } + + sealDuration, err := expectedSealTimeFunc() + if err != nil { + return false, "miner error", err + } + + sealEpochs := sealDuration / (time.Duration(build.BlockDelaySecs) * time.Second) + _, ht, err := spn.GetChainHead(ctx) + if err != nil { + return false, "failed to get chain head", err + } + earliest := abi.ChainEpoch(sealEpochs) + ht + if deal.Proposal.StartEpoch < earliest { + log.Warnw("proposed deal would start before sealing can be completed; rejecting storage deal proposal from client", "piece_cid", deal.Proposal.PieceCID, "client", deal.Client.String(), "seal_duration", sealDuration, "earliest", earliest, "curepoch", ht) + return false, fmt.Sprintf("cannot seal a sector before %s", deal.Proposal.StartEpoch), nil + } + + if user != nil { + return user(ctx, deal) + } + + return true, "", nil + } + } +} + func StorageProvider(minerAddress dtypes.MinerAddress, ffiConfig *ffiwrapper.Config, storedAsk *storedask.StoredAsk, @@ -373,68 +442,16 @@ func StorageProvider(minerAddress dtypes.MinerAddress, pieceStore dtypes.ProviderPieceStore, dataTransfer dtypes.ProviderDataTransfer, spn storagemarket.StorageProviderNode, + df dtypes.DealFilter, funds ProviderDealFunds, - onlineOk dtypes.ConsiderOnlineStorageDealsConfigFunc, - offlineOk dtypes.ConsiderOfflineStorageDealsConfigFunc, - blocklistFunc dtypes.StorageDealPieceCidBlocklistConfigFunc, - expectedSealTimeFunc dtypes.GetExpectedSealDurationFunc) (storagemarket.StorageProvider, error) { +) (storagemarket.StorageProvider, error) { net := smnet.NewFromLibp2pHost(h) store, err := piecefilestore.NewLocalFileStore(piecefilestore.OsPath(r.Path())) if err != nil { return nil, err } - opt := storageimpl.CustomDealDecisionLogic(func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) { - b, err := onlineOk() - if err != nil { - return false, "miner error", err - } - - if deal.Ref != nil && deal.Ref.TransferType != storagemarket.TTManual && !b { - log.Warnf("online storage deal consideration disabled; rejecting storage deal proposal from client: %s", deal.Client.String()) - return false, "miner is not considering online storage deals", nil - } - - b, err = offlineOk() - if err != nil { - return false, "miner error", err - } - - if deal.Ref != nil && deal.Ref.TransferType == storagemarket.TTManual && !b { - log.Warnf("offline storage deal consideration disabled; rejecting storage deal proposal from client: %s", deal.Client.String()) - return false, "miner is not accepting offline storage deals", nil - } - - blocklist, err := blocklistFunc() - if err != nil { - return false, "miner error", err - } - - for idx := range blocklist { - if deal.Proposal.PieceCID.Equals(blocklist[idx]) { - log.Warnf("piece CID in proposal %s is blocklisted; rejecting storage deal proposal from client: %s", deal.Proposal.PieceCID, deal.Client.String()) - return false, fmt.Sprintf("miner has blocklisted piece CID %s", deal.Proposal.PieceCID), nil - } - } - - sealDuration, err := expectedSealTimeFunc() - if err != nil { - return false, "miner error", err - } - - sealEpochs := sealDuration / (time.Duration(build.BlockDelaySecs) * time.Second) - _, ht, err := spn.GetChainHead(ctx) - if err != nil { - return false, "failed to get chain head", err - } - earliest := abi.ChainEpoch(sealEpochs) + ht - if deal.Proposal.StartEpoch < earliest { - log.Warnw("proposed deal would start before sealing can be completed; rejecting storage deal proposal from client", "piece_cid", deal.Proposal.PieceCID, "client", deal.Client.String(), "seal_duration", sealDuration, "earliest", earliest, "curepoch", ht) - return false, fmt.Sprintf("cannot seal a sector before %s", deal.Proposal.StartEpoch), nil - } - - return true, "", nil - }) + opt := storageimpl.CustomDealDecisionLogic(storageimpl.DealDeciderFunc(df)) return storageimpl.NewProvider(net, namespace.Wrap(ds, datastore.NewKey("/deals/provider")), store, mds, pieceStore, dataTransfer, spn, address.Address(minerAddress), ffiConfig.SealProofType, storedAsk, funds, opt) }