Merge pull request #2069 from laser/feat/1920-blocklist-cids
allow storage miners to manage a blocklist of piece CIDs
This commit is contained in:
commit
596ed330dd
@ -56,6 +56,8 @@ type StorageMiner interface {
|
||||
DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error
|
||||
DealsList(ctx context.Context) ([]storagemarket.StorageDeal, error)
|
||||
DealsSetAcceptingStorageDeals(context.Context, bool) error
|
||||
DealsPieceCidBlocklist(context.Context) ([]cid.Cid, error)
|
||||
DealsSetPieceCidBlocklist(context.Context, []cid.Cid) error
|
||||
|
||||
StorageAddLocal(ctx context.Context, path string) error
|
||||
}
|
||||
|
@ -226,6 +226,8 @@ type StorageMinerStruct struct {
|
||||
DealsImportData func(ctx context.Context, dealPropCid cid.Cid, file string) error `perm:"write"`
|
||||
DealsList func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"`
|
||||
DealsSetAcceptingStorageDeals func(context.Context, bool) error `perm:"admin"`
|
||||
DealsPieceCidBlocklist func(context.Context) ([]cid.Cid, error) `perm:"admin"`
|
||||
DealsSetPieceCidBlocklist func(context.Context, []cid.Cid) error `perm:"read"`
|
||||
|
||||
StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"`
|
||||
}
|
||||
@ -872,6 +874,14 @@ func (c *StorageMinerStruct) DealsSetAcceptingStorageDeals(ctx context.Context,
|
||||
return c.Internal.DealsSetAcceptingStorageDeals(ctx, b)
|
||||
}
|
||||
|
||||
func (c *StorageMinerStruct) DealsPieceCidBlocklist(ctx context.Context) ([]cid.Cid, error) {
|
||||
return c.Internal.DealsPieceCidBlocklist(ctx)
|
||||
}
|
||||
|
||||
func (c *StorageMinerStruct) DealsSetPieceCidBlocklist(ctx context.Context, cids []cid.Cid) error {
|
||||
return c.Internal.DealsSetPieceCidBlocklist(ctx, cids)
|
||||
}
|
||||
|
||||
func (c *StorageMinerStruct) StorageAddLocal(ctx context.Context, path string) error {
|
||||
return c.Internal.StorageAddLocal(ctx, path)
|
||||
}
|
||||
|
@ -1,14 +1,18 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"text/tabwriter"
|
||||
"time"
|
||||
|
||||
"github.com/docker/go-units"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-cidutil/cidenc"
|
||||
"github.com/multiformats/go-multibase"
|
||||
"github.com/urfave/cli/v2"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
@ -20,6 +24,32 @@ import (
|
||||
lcli "github.com/filecoin-project/lotus/cli"
|
||||
)
|
||||
|
||||
var CidBaseFlag = cli.StringFlag{
|
||||
Name: "cid-base",
|
||||
Hidden: true,
|
||||
Value: "base32",
|
||||
Usage: "Multibase encoding used for version 1 CIDs in output.",
|
||||
DefaultText: "base32",
|
||||
}
|
||||
|
||||
// GetCidEncoder returns an encoder using the `cid-base` flag if provided, or
|
||||
// the default (Base32) encoder if not.
|
||||
func GetCidEncoder(cctx *cli.Context) (cidenc.Encoder, error) {
|
||||
val := cctx.String("cid-base")
|
||||
|
||||
e := cidenc.Encoder{Base: multibase.MustNewEncoder(multibase.Base32)}
|
||||
|
||||
if val != "" {
|
||||
var err error
|
||||
e.Base, err = multibase.EncoderByName(val)
|
||||
if err != nil {
|
||||
return e, err
|
||||
}
|
||||
}
|
||||
|
||||
return e, nil
|
||||
}
|
||||
|
||||
var enableCmd = &cli.Command{
|
||||
Name: "enable",
|
||||
Usage: "Configure the miner to consider storage deal proposals",
|
||||
@ -197,6 +227,9 @@ var dealsCmd = &cli.Command{
|
||||
disableCmd,
|
||||
setAskCmd,
|
||||
getAskCmd,
|
||||
setBlocklistCmd,
|
||||
getBlocklistCmd,
|
||||
resetBlocklistCmd,
|
||||
},
|
||||
}
|
||||
|
||||
@ -255,3 +288,96 @@ var dealsListCmd = &cli.Command{
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
var getBlocklistCmd = &cli.Command{
|
||||
Name: "get-blocklist",
|
||||
Usage: "List the contents of the storage miner's piece CID blocklist",
|
||||
Flags: []cli.Flag{
|
||||
&CidBaseFlag,
|
||||
},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
api, closer, err := lcli.GetStorageMinerAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closer()
|
||||
|
||||
blocklist, err := api.DealsPieceCidBlocklist(lcli.DaemonContext(cctx))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
encoder, err := GetCidEncoder(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for idx := range blocklist {
|
||||
fmt.Println(encoder.Encode(blocklist[idx]))
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
var setBlocklistCmd = &cli.Command{
|
||||
Name: "set-blocklist",
|
||||
Usage: "Set the storage miner's list of blocklisted piece CIDs",
|
||||
ArgsUsage: "[<path-of-file-containing-newline-delimited-piece-CIDs> (optional, will read from stdin if omitted)]",
|
||||
Flags: []cli.Flag{},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
api, closer, err := lcli.GetStorageMinerAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closer()
|
||||
|
||||
scanner := bufio.NewScanner(os.Stdin)
|
||||
if cctx.Args().Present() && cctx.Args().First() != "-" {
|
||||
absPath, err := filepath.Abs(cctx.Args().First())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
file, err := os.Open(absPath)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer file.Close() //nolint:errcheck
|
||||
|
||||
scanner = bufio.NewScanner(file)
|
||||
}
|
||||
|
||||
var blocklist []cid.Cid
|
||||
for scanner.Scan() {
|
||||
decoded, err := cid.Decode(scanner.Text())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
blocklist = append(blocklist, decoded)
|
||||
}
|
||||
|
||||
err = scanner.Err()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return api.DealsSetPieceCidBlocklist(lcli.DaemonContext(cctx), blocklist)
|
||||
},
|
||||
}
|
||||
|
||||
var resetBlocklistCmd = &cli.Command{
|
||||
Name: "reset-blocklist",
|
||||
Usage: "Remove all entries from the storage miner's piece CID blocklist",
|
||||
Flags: []cli.Flag{},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
api, closer, err := lcli.GetStorageMinerAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closer()
|
||||
|
||||
return api.DealsSetPieceCidBlocklist(lcli.DaemonContext(cctx), []cid.Cid{})
|
||||
},
|
||||
}
|
||||
|
@ -314,6 +314,8 @@ func Online() Option {
|
||||
|
||||
Override(new(dtypes.AcceptingStorageDealsConfigFunc), modules.NewAcceptingStorageDealsConfigFunc),
|
||||
Override(new(dtypes.SetAcceptingStorageDealsConfigFunc), modules.NewSetAcceptingStorageDealsConfigFunc),
|
||||
Override(new(dtypes.StorageDealPieceCidBlocklistConfigFunc), modules.NewStorageDealPieceCidBlocklistConfigFunc),
|
||||
Override(new(dtypes.SetStorageDealPieceCidBlocklistConfigFunc), modules.NewSetStorageDealPieceCidBlocklistConfigFunc),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
@ -4,6 +4,8 @@ import (
|
||||
"encoding"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
|
||||
sectorstorage "github.com/filecoin-project/sector-storage"
|
||||
)
|
||||
|
||||
@ -33,6 +35,7 @@ type StorageMiner struct {
|
||||
|
||||
type DealmakingConfig struct {
|
||||
AcceptingStorageDeals bool
|
||||
PieceCidBlocklist []cid.Cid
|
||||
}
|
||||
|
||||
// API contains configs for API endpoint
|
||||
@ -121,6 +124,7 @@ func DefaultStorageMiner() *StorageMiner {
|
||||
|
||||
Dealmaking: DealmakingConfig{
|
||||
AcceptingStorageDeals: true,
|
||||
PieceCidBlocklist: []cid.Cid{},
|
||||
},
|
||||
}
|
||||
cfg.Common.API.ListenAddress = "/ip4/127.0.0.1/tcp/2345/http"
|
||||
|
@ -43,7 +43,9 @@ type StorageMinerAPI struct {
|
||||
StorageMgr *sectorstorage.Manager `optional:"true"`
|
||||
*stores.Index
|
||||
|
||||
SetAcceptingStorageDealsConfigFunc dtypes.SetAcceptingStorageDealsConfigFunc
|
||||
SetAcceptingStorageDealsConfigFunc dtypes.SetAcceptingStorageDealsConfigFunc
|
||||
StorageDealPieceCidBlocklistConfigFunc dtypes.StorageDealPieceCidBlocklistConfigFunc
|
||||
SetStorageDealPieceCidBlocklistConfigFunc dtypes.SetStorageDealPieceCidBlocklistConfigFunc
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) {
|
||||
@ -232,6 +234,14 @@ func (sm *StorageMinerAPI) DealsImportData(ctx context.Context, deal cid.Cid, fn
|
||||
return sm.StorageProvider.ImportDataForDeal(ctx, deal, fi)
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) DealsPieceCidBlocklist(ctx context.Context) ([]cid.Cid, error) {
|
||||
return sm.StorageDealPieceCidBlocklistConfigFunc()
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) DealsSetPieceCidBlocklist(ctx context.Context, cids []cid.Cid) error {
|
||||
return sm.SetStorageDealPieceCidBlocklistConfigFunc(cids)
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) StorageAddLocal(ctx context.Context, path string) error {
|
||||
if sm.StorageMgr == nil {
|
||||
return xerrors.Errorf("no storage manager")
|
||||
|
@ -1,6 +1,8 @@
|
||||
package dtypes
|
||||
|
||||
import (
|
||||
"github.com/ipfs/go-cid"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
)
|
||||
@ -15,3 +17,12 @@ type AcceptingStorageDealsConfigFunc func() (bool, error)
|
||||
// SetAcceptingStorageDealsFunc is a function which is used to disable or enable
|
||||
// storage deal acceptance.
|
||||
type SetAcceptingStorageDealsConfigFunc func(bool) error
|
||||
|
||||
// StorageDealPieceCidBlocklistConfigFunc is a function which reads from miner config
|
||||
// to obtain a list of CIDs for which the storage miner will not accept storage
|
||||
// proposals.
|
||||
type StorageDealPieceCidBlocklistConfigFunc func() ([]cid.Cid, error)
|
||||
|
||||
// SetStorageDealPieceCidBlocklistConfigFunc is a function which is used to set a
|
||||
// list of CIDs for which the storage miner will reject deal proposals.
|
||||
type SetStorageDealPieceCidBlocklistConfigFunc func([]cid.Cid) error
|
||||
|
@ -3,11 +3,13 @@ package modules
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/ipfs/go-bitswap"
|
||||
"github.com/ipfs/go-bitswap/network"
|
||||
"github.com/ipfs/go-blockservice"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-datastore/namespace"
|
||||
graphsync "github.com/ipfs/go-graphsync/impl"
|
||||
@ -307,7 +309,7 @@ func NewStorageAsk(ctx helpers.MetricsCtx, fapi lapi.FullNode, ds dtypes.Metadat
|
||||
return storedAsk, nil
|
||||
}
|
||||
|
||||
func StorageProvider(minerAddress dtypes.MinerAddress, ffiConfig *ffiwrapper.Config, storedAsk *storedask.StoredAsk, h host.Host, ds dtypes.MetadataDS, ibs dtypes.StagingBlockstore, r repo.LockedRepo, pieceStore dtypes.ProviderPieceStore, dataTransfer dtypes.ProviderDataTransfer, spn storagemarket.StorageProviderNode, isAcceptingFunc dtypes.AcceptingStorageDealsConfigFunc) (storagemarket.StorageProvider, error) {
|
||||
func StorageProvider(minerAddress dtypes.MinerAddress, ffiConfig *ffiwrapper.Config, storedAsk *storedask.StoredAsk, h host.Host, ds dtypes.MetadataDS, ibs dtypes.StagingBlockstore, r repo.LockedRepo, pieceStore dtypes.ProviderPieceStore, dataTransfer dtypes.ProviderDataTransfer, spn storagemarket.StorageProviderNode, isAcceptingFunc dtypes.AcceptingStorageDealsConfigFunc, blocklistFunc dtypes.StorageDealPieceCidBlocklistConfigFunc) (storagemarket.StorageProvider, error) {
|
||||
net := smnet.NewFromLibp2pHost(h)
|
||||
store, err := piecefilestore.NewLocalFileStore(piecefilestore.OsPath(r.Path()))
|
||||
if err != nil {
|
||||
@ -325,6 +327,18 @@ func StorageProvider(minerAddress dtypes.MinerAddress, ffiConfig *ffiwrapper.Con
|
||||
return false, "miner is not accepting storage deals", nil
|
||||
}
|
||||
|
||||
blocklist, err := blocklistFunc()
|
||||
if err != nil {
|
||||
return false, "miner error", err
|
||||
}
|
||||
|
||||
for idx := range blocklist {
|
||||
if deal.Proposal.PieceCID.Equals(blocklist[idx]) {
|
||||
log.Warnf("piece CID in proposal %s is blocklisted; rejecting storage deal proposal from client: %s", deal.Proposal.PieceCID, deal.Client.String())
|
||||
return false, fmt.Sprintf("miner has blocklisted piece CID %s", deal.Proposal.PieceCID), nil
|
||||
}
|
||||
}
|
||||
|
||||
return true, "", nil
|
||||
})
|
||||
|
||||
@ -380,35 +394,69 @@ func StorageAuth(ctx helpers.MetricsCtx, ca lapi.Common) (sectorstorage.StorageA
|
||||
}
|
||||
|
||||
func NewAcceptingStorageDealsConfigFunc(r repo.LockedRepo) (dtypes.AcceptingStorageDealsConfigFunc, error) {
|
||||
return func() (bool, error) {
|
||||
raw, err := r.Config()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
cfg, ok := raw.(*config.StorageMiner)
|
||||
if !ok {
|
||||
return false, xerrors.New("expected address of config.StorageMiner")
|
||||
}
|
||||
|
||||
return cfg.Dealmaking.AcceptingStorageDeals, nil
|
||||
return func() (out bool, err error) {
|
||||
err = readCfg(r, func(cfg *config.StorageMiner) {
|
||||
out = cfg.Dealmaking.AcceptingStorageDeals
|
||||
})
|
||||
return
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewSetAcceptingStorageDealsConfigFunc(r repo.LockedRepo) (dtypes.SetAcceptingStorageDealsConfigFunc, error) {
|
||||
return func(b bool) error {
|
||||
var typeErr error
|
||||
|
||||
setConfigErr := r.SetConfig(func(raw interface{}) {
|
||||
cfg, ok := raw.(*config.StorageMiner)
|
||||
if !ok {
|
||||
typeErr = errors.New("expected storage miner config")
|
||||
return
|
||||
}
|
||||
|
||||
return func(b bool) (err error) {
|
||||
err = mutateCfg(r, func(cfg *config.StorageMiner) {
|
||||
cfg.Dealmaking.AcceptingStorageDeals = b
|
||||
})
|
||||
|
||||
return multierr.Combine(typeErr, setConfigErr)
|
||||
return
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewStorageDealPieceCidBlocklistConfigFunc(r repo.LockedRepo) (dtypes.StorageDealPieceCidBlocklistConfigFunc, error) {
|
||||
return func() (out []cid.Cid, err error) {
|
||||
err = readCfg(r, func(cfg *config.StorageMiner) {
|
||||
out = cfg.Dealmaking.PieceCidBlocklist
|
||||
})
|
||||
return
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewSetStorageDealPieceCidBlocklistConfigFunc(r repo.LockedRepo) (dtypes.SetStorageDealPieceCidBlocklistConfigFunc, error) {
|
||||
return func(blocklist []cid.Cid) (err error) {
|
||||
err = mutateCfg(r, func(cfg *config.StorageMiner) {
|
||||
cfg.Dealmaking.PieceCidBlocklist = blocklist
|
||||
})
|
||||
return
|
||||
}, nil
|
||||
}
|
||||
|
||||
func readCfg(r repo.LockedRepo, accessor func(*config.StorageMiner)) error {
|
||||
raw, err := r.Config()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cfg, ok := raw.(*config.StorageMiner)
|
||||
if !ok {
|
||||
return xerrors.New("expected address of config.StorageMiner")
|
||||
}
|
||||
|
||||
accessor(cfg)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func mutateCfg(r repo.LockedRepo, mutator func(*config.StorageMiner)) error {
|
||||
var typeErr error
|
||||
|
||||
setConfigErr := r.SetConfig(func(raw interface{}) {
|
||||
cfg, ok := raw.(*config.StorageMiner)
|
||||
if !ok {
|
||||
typeErr = errors.New("expected storage miner config")
|
||||
return
|
||||
}
|
||||
|
||||
mutator(cfg)
|
||||
})
|
||||
|
||||
return multierr.Combine(typeErr, setConfigErr)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user