diff --git a/chain/actors/actor_miner.go b/chain/actors/actor_miner.go index 19b319662..54076cac8 100644 --- a/chain/actors/actor_miner.go +++ b/chain/actors/actor_miner.go @@ -9,10 +9,10 @@ import ( ffi "github.com/filecoin-project/filecoin-ffi" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors/aerrors" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/lib/sectorbuilder" "github.com/filecoin-project/go-amt-ipld" "github.com/ipfs/go-cid" diff --git a/chain/actors/actor_miner_test.go b/chain/actors/actor_miner_test.go index 173e7e6e0..a522fe349 100644 --- a/chain/actors/actor_miner_test.go +++ b/chain/actors/actor_miner_test.go @@ -7,12 +7,12 @@ import ( "testing" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/actors/aerrors" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/lib/sectorbuilder" hamt "github.com/ipfs/go-hamt-ipld" blockstore "github.com/ipfs/go-ipfs-blockstore" cbg "github.com/whyrusleeping/cbor-gen" diff --git a/chain/actors/actor_storagemarket.go b/chain/actors/actor_storagemarket.go index 526099e00..e4ad58cb5 100644 --- a/chain/actors/actor_storagemarket.go +++ b/chain/actors/actor_storagemarket.go @@ -14,10 +14,10 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors/aerrors" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/lib/sectorbuilder" ) type StorageMarketActor struct{} diff --git a/chain/deals/client.go b/chain/deals/client.go index afbb317f0..709432f63 100644 --- a/chain/deals/client.go +++ b/chain/deals/client.go @@ -12,6 +12,7 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/go-statestore" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/events" @@ -20,7 +21,6 @@ import ( "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/wallet" - "github.com/filecoin-project/lotus/lib/statestore" "github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/retrieval/discovery" diff --git a/chain/deals/provider.go b/chain/deals/provider.go index a12f9fa07..9e45ef8b2 100644 --- a/chain/deals/provider.go +++ b/chain/deals/provider.go @@ -14,11 +14,11 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/go-statestore" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/datatransfer" - "github.com/filecoin-project/lotus/lib/statestore" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage/sectorblocks" diff --git a/chain/deals/provider_utils.go b/chain/deals/provider_utils.go index c3e77b760..e1fcfcf32 100644 --- a/chain/deals/provider_utils.go +++ b/chain/deals/provider_utils.go @@ -12,9 +12,9 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/go-statestore" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/lib/statestore" "github.com/ipfs/go-cid" inet "github.com/libp2p/go-libp2p-core/network" diff --git a/chain/deals/request_validation_test.go b/chain/deals/request_validation_test.go index c0bb34e03..6da68d219 100644 --- a/chain/deals/request_validation_test.go +++ b/chain/deals/request_validation_test.go @@ -15,11 +15,11 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/go-statestore" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/deals" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/lib/statestore" ) var blockGenerator = blocksutil.NewBlockGenerator() diff --git a/cmd/lotus-bench/main.go b/cmd/lotus-bench/main.go index b1ff4b838..57b963041 100644 --- a/cmd/lotus-bench/main.go +++ b/cmd/lotus-bench/main.go @@ -23,10 +23,10 @@ import ( "gopkg.in/urfave/cli.v2" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/genesis" - "github.com/filecoin-project/lotus/lib/sectorbuilder" ) var log = logging.Logger("lotus-bench") diff --git a/cmd/lotus-seal-worker/sub.go b/cmd/lotus-seal-worker/sub.go index ec514b0e9..fa1b00a8a 100644 --- a/cmd/lotus-seal-worker/sub.go +++ b/cmd/lotus-seal-worker/sub.go @@ -7,8 +7,8 @@ import ( paramfetch "github.com/filecoin-project/go-paramfetch" "golang.org/x/xerrors" + "github.com/filecoin-project/go-sectorbuilder" lapi "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/lib/sectorbuilder" ) type worker struct { diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index 0f2a83d39..9bf52fea8 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -22,6 +22,7 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/go-sectorbuilder" lapi "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors" @@ -29,7 +30,6 @@ import ( "github.com/filecoin-project/lotus/chain/types" lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/genesis" - "github.com/filecoin-project/lotus/lib/sectorbuilder" "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/node/modules" "github.com/filecoin-project/lotus/node/modules/dtypes" diff --git a/go.mod b/go.mod index 3283f456b..3b47f778b 100644 --- a/go.mod +++ b/go.mod @@ -82,6 +82,7 @@ require ( github.com/onsi/ginkgo v1.9.0 // indirect github.com/onsi/gomega v1.6.0 // indirect github.com/opentracing/opentracing-go v1.1.0 + github.com/otiai10/copy v1.0.2 github.com/polydawn/refmt v0.0.0-20190809202753-05966cbd336a github.com/stretchr/testify v1.4.0 github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba diff --git a/go.sum b/go.sum index 3b5e37ae8..34d5667dc 100644 --- a/go.sum +++ b/go.sum @@ -89,19 +89,12 @@ github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:a github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= -<<<<<<< HEAD github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878 h1:YicJT9xhPzZ1SBGiJFNUCkfwqK/G9vFyY1ytKBSjNJA= github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go.mod h1:40kI2Gv16mwcRsHptI3OAV4nlOEU7wVDc4RgMylNFjU= github.com/filecoin-project/go-sectorbuilder v0.0.0-20200107152336-0cbb2c483013 h1:OGpRq3HRxyrxZJtbNKCOsb5YTmc+RBLLwdAgwZfkRnY= github.com/filecoin-project/go-sectorbuilder v0.0.0-20200107152336-0cbb2c483013/go.mod h1:3OZ4E3B2OuwhJjtxR4r7hPU9bCfB+A+hm4alLEsaeDc= github.com/filecoin-project/go-statestore v0.0.0-20200102200712-1f63c701c1e5 h1:NZXq90YlfakSmB2/84dGr0AVmKYFA97+yyViBIgTFbk= github.com/filecoin-project/go-statestore v0.0.0-20200102200712-1f63c701c1e5/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI= -======= -github.com/filecoin-project/go-sectorbuilder v0.0.0-20191220204520-82965a74eaca h1:qe8gfAQ+WNBh2MCRN9oRN3f+eGC+p8jfOhLhNmcLGxc= -github.com/filecoin-project/go-sectorbuilder v0.0.0-20191220204520-82965a74eaca/go.mod h1:tOSdS//OQy+/03Fe9JDGtfJX/iTQIlYig41ZzKh5cwU= -github.com/filecoin-project/go-statestore v0.0.0-20191220165813-c30fad612418 h1:6qIy+u0HRpRz7iisUTRxYWhTbAES1kt8R/wn7hCybrU= -github.com/filecoin-project/go-statestore v0.0.0-20191220165813-c30fad612418/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI= ->>>>>>> go mod tidy github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1 h1:EzDjxMg43q1tA2c0MV3tNbaontnHLplHyFF6M5KiVP0= @@ -490,11 +483,6 @@ github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaO github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU= github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= -<<<<<<< HEAD -======= -github.com/mattn/go-colorable v0.1.4 h1:snbPLB8fVfU9iwbbo30TPtbLRzwWu6aJS6Xh4eaaviA= -github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= ->>>>>>> go mod tidy github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go deleted file mode 100644 index 206ea41b2..000000000 --- a/lib/sectorbuilder/sectorbuilder.go +++ /dev/null @@ -1,799 +0,0 @@ -package sectorbuilder - -import ( - "fmt" - "io" - "io/ioutil" - "os" - "path/filepath" - "strconv" - "sync" - "sync/atomic" - - sectorbuilder "github.com/filecoin-project/filecoin-ffi" - "github.com/ipfs/go-datastore" - logging "github.com/ipfs/go-log" - dcopy "github.com/otiai10/copy" - "golang.org/x/xerrors" - - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/node/modules/dtypes" -) - -const PoStReservedWorkers = 1 -const PoRepProofPartitions = 10 - -var lastSectorIdKey = datastore.NewKey("/sectorbuilder/last") - -var log = logging.Logger("sectorbuilder") - -type SortedPublicSectorInfo = sectorbuilder.SortedPublicSectorInfo -type SortedPrivateSectorInfo = sectorbuilder.SortedPrivateSectorInfo - -type SealTicket = sectorbuilder.SealTicket - -type SealSeed = sectorbuilder.SealSeed - -type SealPreCommitOutput = sectorbuilder.SealPreCommitOutput - -type SealCommitOutput = sectorbuilder.SealCommitOutput - -type PublicPieceInfo = sectorbuilder.PublicPieceInfo - -type RawSealPreCommitOutput sectorbuilder.RawSealPreCommitOutput - -type EPostCandidate = sectorbuilder.Candidate - -const CommLen = sectorbuilder.CommitmentBytesLen - -type WorkerCfg struct { - NoPreCommit bool - NoCommit bool - - // TODO: 'cost' info, probably in terms of sealing + transfer speed -} - -type SectorBuilder struct { - ds dtypes.MetadataDS - idLk sync.Mutex - - ssize uint64 - lastID uint64 - - Miner address.Address - - unsealLk sync.Mutex - - noCommit bool - noPreCommit bool - rateLimit chan struct{} - - precommitTasks chan workerCall - commitTasks chan workerCall - - taskCtr uint64 - remoteLk sync.Mutex - remoteCtr int - remotes map[int]*remote - remoteResults map[uint64]chan<- SealRes - - addPieceWait int32 - preCommitWait int32 - commitWait int32 - unsealWait int32 - - fsLk sync.Mutex - filesystem *fs // TODO: multi-fs support - - stopping chan struct{} -} - -type JsonRSPCO struct { - CommD []byte - CommR []byte -} - -func (rspco *RawSealPreCommitOutput) ToJson() JsonRSPCO { - return JsonRSPCO{ - CommD: rspco.CommD[:], - CommR: rspco.CommR[:], - } -} - -func (rspco *JsonRSPCO) rspco() RawSealPreCommitOutput { - var out RawSealPreCommitOutput - copy(out.CommD[:], rspco.CommD) - copy(out.CommR[:], rspco.CommR) - return out -} - -type SealRes struct { - Err string - GoErr error `json:"-"` - - Proof []byte - Rspco JsonRSPCO -} - -type remote struct { - lk sync.Mutex - - sealTasks chan<- WorkerTask - busy uint64 // only for metrics -} - -type Config struct { - SectorSize uint64 - Miner address.Address - - WorkerThreads uint8 - FallbackLastID uint64 - NoCommit bool - NoPreCommit bool - - Dir string - _ struct{} // guard against nameless init -} - -func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) { - if cfg.WorkerThreads < PoStReservedWorkers { - return nil, xerrors.Errorf("minimum worker threads is %d, specified %d", PoStReservedWorkers, cfg.WorkerThreads) - } - - var lastUsedID uint64 - b, err := ds.Get(lastSectorIdKey) - switch err { - case nil: - i, err := strconv.ParseInt(string(b), 10, 64) - if err != nil { - return nil, err - } - lastUsedID = uint64(i) - case datastore.ErrNotFound: - lastUsedID = cfg.FallbackLastID - default: - return nil, err - } - - rlimit := cfg.WorkerThreads - PoStReservedWorkers - - sealLocal := rlimit > 0 - - if rlimit == 0 { - rlimit = 1 - } - - sb := &SectorBuilder{ - ds: ds, - - ssize: cfg.SectorSize, - lastID: lastUsedID, - - filesystem: openFs(cfg.Dir), - - Miner: cfg.Miner, - - noPreCommit: cfg.NoPreCommit || !sealLocal, - noCommit: cfg.NoCommit || !sealLocal, - rateLimit: make(chan struct{}, rlimit), - - taskCtr: 1, - precommitTasks: make(chan workerCall), - commitTasks: make(chan workerCall), - remoteResults: map[uint64]chan<- SealRes{}, - remotes: map[int]*remote{}, - - stopping: make(chan struct{}), - } - - if err := sb.filesystem.init(); err != nil { - return nil, xerrors.Errorf("initializing sectorbuilder filesystem: %w", err) - } - - return sb, nil -} - -func NewStandalone(cfg *Config) (*SectorBuilder, error) { - sb := &SectorBuilder{ - ds: nil, - - ssize: cfg.SectorSize, - - Miner: cfg.Miner, - filesystem: openFs(cfg.Dir), - - taskCtr: 1, - remotes: map[int]*remote{}, - rateLimit: make(chan struct{}, cfg.WorkerThreads), - stopping: make(chan struct{}), - } - - if err := sb.filesystem.init(); err != nil { - return nil, xerrors.Errorf("initializing sectorbuilder filesystem: %w", err) - } - - return sb, nil -} - -func (sb *SectorBuilder) checkRateLimit() { - if cap(sb.rateLimit) == len(sb.rateLimit) { - log.Warn("rate-limiting local sectorbuilder call") - } -} - -func (sb *SectorBuilder) RateLimit() func() { - sb.checkRateLimit() - - sb.rateLimit <- struct{}{} - - return func() { - <-sb.rateLimit - } -} - -type WorkerStats struct { - LocalFree int - LocalReserved int - LocalTotal int - // todo: post in progress - RemotesTotal int - RemotesFree int - - AddPieceWait int - PreCommitWait int - CommitWait int - UnsealWait int -} - -func (sb *SectorBuilder) WorkerStats() WorkerStats { - sb.remoteLk.Lock() - defer sb.remoteLk.Unlock() - - remoteFree := len(sb.remotes) - for _, r := range sb.remotes { - if r.busy > 0 { - remoteFree-- - } - } - - return WorkerStats{ - LocalFree: cap(sb.rateLimit) - len(sb.rateLimit), - LocalReserved: PoStReservedWorkers, - LocalTotal: cap(sb.rateLimit) + PoStReservedWorkers, - RemotesTotal: len(sb.remotes), - RemotesFree: remoteFree, - - AddPieceWait: int(atomic.LoadInt32(&sb.addPieceWait)), - PreCommitWait: int(atomic.LoadInt32(&sb.preCommitWait)), - CommitWait: int(atomic.LoadInt32(&sb.commitWait)), - UnsealWait: int(atomic.LoadInt32(&sb.unsealWait)), - } -} - -func addressToProverID(a address.Address) [32]byte { - var proverId [32]byte - copy(proverId[:], a.Payload()) - return proverId -} - -func (sb *SectorBuilder) AcquireSectorId() (uint64, error) { - sb.idLk.Lock() - defer sb.idLk.Unlock() - - sb.lastID++ - id := sb.lastID - - err := sb.ds.Put(lastSectorIdKey, []byte(fmt.Sprint(id))) - if err != nil { - return 0, err - } - return id, nil -} - -func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Reader, existingPieceSizes []uint64) (PublicPieceInfo, error) { - fs := sb.filesystem - - if err := fs.reserve(dataStaging, sb.ssize); err != nil { - return PublicPieceInfo{}, err - } - defer fs.free(dataStaging, sb.ssize) - - atomic.AddInt32(&sb.addPieceWait, 1) - ret := sb.RateLimit() - atomic.AddInt32(&sb.addPieceWait, -1) - defer ret() - - f, werr, err := toReadableFile(file, int64(pieceSize)) - if err != nil { - return PublicPieceInfo{}, err - } - - stagedFile, err := sb.stagedSectorFile(sectorId) - if err != nil { - return PublicPieceInfo{}, err - } - - _, _, commP, err := sectorbuilder.WriteWithAlignment(f, pieceSize, stagedFile, existingPieceSizes) - if err != nil { - return PublicPieceInfo{}, err - } - - if err := stagedFile.Close(); err != nil { - return PublicPieceInfo{}, err - } - - if err := f.Close(); err != nil { - return PublicPieceInfo{}, err - } - - return PublicPieceInfo{ - Size: pieceSize, - CommP: commP, - }, werr() -} - -func (sb *SectorBuilder) ReadPieceFromSealedSector(sectorID uint64, offset uint64, size uint64, ticket []byte, commD []byte) (io.ReadCloser, error) { - fs := sb.filesystem - - if err := fs.reserve(dataUnsealed, sb.ssize); err != nil { // TODO: this needs to get smarter when we start supporting partial unseals - return nil, err - } - defer fs.free(dataUnsealed, sb.ssize) - - atomic.AddInt32(&sb.unsealWait, 1) - // TODO: Don't wait if cached - ret := sb.RateLimit() // TODO: check perf, consider remote unseal worker - defer ret() - atomic.AddInt32(&sb.unsealWait, -1) - - sb.unsealLk.Lock() // TODO: allow unsealing unrelated sectors in parallel - defer sb.unsealLk.Unlock() - - cacheDir, err := sb.sectorCacheDir(sectorID) - if err != nil { - return nil, err - } - - sealedPath, err := sb.SealedSectorPath(sectorID) - if err != nil { - return nil, err - } - - unsealedPath := sb.unsealedSectorPath(sectorID) - - // TODO: GC for those - // (Probably configurable count of sectors to be kept unsealed, and just - // remove last used one (or use whatever other cache policy makes sense)) - f, err := os.OpenFile(unsealedPath, os.O_RDONLY, 0644) - if err != nil { - if !os.IsNotExist(err) { - return nil, err - } - - var commd [CommLen]byte - copy(commd[:], commD) - - var tkt [CommLen]byte - copy(tkt[:], ticket) - - err = sectorbuilder.Unseal(sb.ssize, - PoRepProofPartitions, - cacheDir, - sealedPath, - unsealedPath, - sectorID, - addressToProverID(sb.Miner), - tkt, - commd) - if err != nil { - return nil, xerrors.Errorf("unseal failed: %w", err) - } - - f, err = os.OpenFile(unsealedPath, os.O_RDONLY, 0644) - if err != nil { - return nil, err - } - } - - if _, err := f.Seek(int64(offset), io.SeekStart); err != nil { - return nil, xerrors.Errorf("seek: %w", err) - } - - lr := io.LimitReader(f, int64(size)) - - return &struct { - io.Reader - io.Closer - }{ - Reader: lr, - Closer: f, - }, nil -} - -func (sb *SectorBuilder) sealPreCommitRemote(call workerCall) (RawSealPreCommitOutput, error) { - atomic.AddInt32(&sb.preCommitWait, -1) - - select { - case ret := <-call.ret: - var err error - if ret.Err != "" { - err = xerrors.New(ret.Err) - } - return ret.Rspco.rspco(), err - case <-sb.stopping: - return RawSealPreCommitOutput{}, xerrors.New("sectorbuilder stopped") - } -} - -func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, pieces []PublicPieceInfo) (RawSealPreCommitOutput, error) { - fs := sb.filesystem - - if err := fs.reserve(dataCache, sb.ssize); err != nil { - return RawSealPreCommitOutput{}, err - } - defer fs.free(dataCache, sb.ssize) - - if err := fs.reserve(dataSealed, sb.ssize); err != nil { - return RawSealPreCommitOutput{}, err - } - defer fs.free(dataSealed, sb.ssize) - - call := workerCall{ - task: WorkerTask{ - Type: WorkerPreCommit, - TaskID: atomic.AddUint64(&sb.taskCtr, 1), - SectorID: sectorID, - SealTicket: ticket, - Pieces: pieces, - }, - ret: make(chan SealRes), - } - - atomic.AddInt32(&sb.preCommitWait, 1) - - select { // prefer remote - case sb.precommitTasks <- call: - return sb.sealPreCommitRemote(call) - default: - } - - sb.checkRateLimit() - - rl := sb.rateLimit - if sb.noPreCommit { - rl = make(chan struct{}) - } - - select { // use whichever is available - case sb.precommitTasks <- call: - return sb.sealPreCommitRemote(call) - case rl <- struct{}{}: - } - - atomic.AddInt32(&sb.preCommitWait, -1) - - // local - - defer func() { - <-sb.rateLimit - }() - - cacheDir, err := sb.sectorCacheDir(sectorID) - if err != nil { - return RawSealPreCommitOutput{}, xerrors.Errorf("getting cache dir: %w", err) - } - - sealedPath, err := sb.SealedSectorPath(sectorID) - if err != nil { - return RawSealPreCommitOutput{}, xerrors.Errorf("getting sealed sector path: %w", err) - } - - e, err := os.OpenFile(sealedPath, os.O_RDWR|os.O_CREATE, 0644) - if err != nil { - return RawSealPreCommitOutput{}, xerrors.Errorf("ensuring sealed file exists: %w", err) - } - if err := e.Close(); err != nil { - return RawSealPreCommitOutput{}, err - } - - var sum uint64 - for _, piece := range pieces { - sum += piece.Size - } - ussize := UserBytesForSectorSize(sb.ssize) - if sum != ussize { - return RawSealPreCommitOutput{}, xerrors.Errorf("aggregated piece sizes don't match sector size: %d != %d (%d)", sum, ussize, int64(ussize-sum)) - } - - stagedPath := sb.StagedSectorPath(sectorID) - - rspco, err := sectorbuilder.SealPreCommit( - sb.ssize, - PoRepProofPartitions, - cacheDir, - stagedPath, - sealedPath, - sectorID, - addressToProverID(sb.Miner), - ticket.TicketBytes, - pieces, - ) - if err != nil { - return RawSealPreCommitOutput{}, xerrors.Errorf("presealing sector %d (%s): %w", sectorID, stagedPath, err) - } - - return RawSealPreCommitOutput(rspco), nil -} - -func (sb *SectorBuilder) sealCommitRemote(call workerCall) (proof []byte, err error) { - atomic.AddInt32(&sb.commitWait, -1) - - select { - case ret := <-call.ret: - if ret.Err != "" { - err = xerrors.New(ret.Err) - } - return ret.Proof, err - case <-sb.stopping: - return nil, xerrors.New("sectorbuilder stopped") - } -} - -func (sb *SectorBuilder) sealCommitLocal(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, rspco RawSealPreCommitOutput) (proof []byte, err error) { - atomic.AddInt32(&sb.commitWait, -1) - - defer func() { - <-sb.rateLimit - }() - - cacheDir, err := sb.sectorCacheDir(sectorID) - if err != nil { - return nil, err - } - - proof, err = sectorbuilder.SealCommit( - sb.ssize, - PoRepProofPartitions, - cacheDir, - sectorID, - addressToProverID(sb.Miner), - ticket.TicketBytes, - seed.TicketBytes, - pieces, - sectorbuilder.RawSealPreCommitOutput(rspco), - ) - if err != nil { - log.Warn("StandaloneSealCommit error: ", err) - log.Warnf("sid:%d tkt:%v seed:%v, ppi:%v rspco:%v", sectorID, ticket, seed, pieces, rspco) - - return nil, xerrors.Errorf("StandaloneSealCommit: %w", err) - } - - return proof, nil -} - -func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, rspco RawSealPreCommitOutput) (proof []byte, err error) { - call := workerCall{ - task: WorkerTask{ - Type: WorkerCommit, - TaskID: atomic.AddUint64(&sb.taskCtr, 1), - SectorID: sectorID, - SealTicket: ticket, - Pieces: pieces, - - SealSeed: seed, - Rspco: rspco, - }, - ret: make(chan SealRes), - } - - atomic.AddInt32(&sb.commitWait, 1) - - select { // prefer remote - case sb.commitTasks <- call: - proof, err = sb.sealCommitRemote(call) - default: - sb.checkRateLimit() - - rl := sb.rateLimit - if sb.noCommit { - rl = make(chan struct{}) - } - - select { // use whichever is available - case sb.commitTasks <- call: - proof, err = sb.sealCommitRemote(call) - case rl <- struct{}{}: - proof, err = sb.sealCommitLocal(sectorID, ticket, seed, pieces, rspco) - } - } - if err != nil { - return nil, xerrors.Errorf("commit: %w", err) - } - - return proof, nil -} - -func (sb *SectorBuilder) ComputeElectionPoSt(sectorInfo SortedPublicSectorInfo, challengeSeed []byte, winners []EPostCandidate) ([]byte, error) { - if len(challengeSeed) != CommLen { - return nil, xerrors.Errorf("given challenge seed was the wrong length: %d != %d", len(challengeSeed), CommLen) - } - var cseed [CommLen]byte - copy(cseed[:], challengeSeed) - - privsects, err := sb.pubSectorToPriv(sectorInfo, nil) // TODO: faults - if err != nil { - return nil, err - } - - proverID := addressToProverID(sb.Miner) - - return sectorbuilder.GeneratePoSt(sb.ssize, proverID, privsects, cseed, winners) -} - -func (sb *SectorBuilder) GenerateEPostCandidates(sectorInfo SortedPublicSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]EPostCandidate, error) { - privsectors, err := sb.pubSectorToPriv(sectorInfo, faults) - if err != nil { - return nil, err - } - - challengeCount := types.ElectionPostChallengeCount(uint64(len(sectorInfo.Values())), len(faults)) - - proverID := addressToProverID(sb.Miner) - return sectorbuilder.GenerateCandidates(sb.ssize, proverID, challengeSeed, challengeCount, privsectors) -} - -func (sb *SectorBuilder) pubSectorToPriv(sectorInfo SortedPublicSectorInfo, faults []uint64) (SortedPrivateSectorInfo, error) { - fmap := map[uint64]struct{}{} - for _, fault := range faults { - fmap[fault] = struct{}{} - } - - var out []sectorbuilder.PrivateSectorInfo - for _, s := range sectorInfo.Values() { - if _, faulty := fmap[s.SectorID]; faulty { - continue - } - - cachePath, err := sb.sectorCacheDir(s.SectorID) - if err != nil { - return SortedPrivateSectorInfo{}, xerrors.Errorf("getting cache path for sector %d: %w", s.SectorID, err) - } - - sealedPath, err := sb.SealedSectorPath(s.SectorID) - if err != nil { - return SortedPrivateSectorInfo{}, xerrors.Errorf("getting sealed path for sector %d: %w", s.SectorID, err) - } - - out = append(out, sectorbuilder.PrivateSectorInfo{ - SectorID: s.SectorID, - CommR: s.CommR, - CacheDirPath: cachePath, - SealedSectorPath: sealedPath, - }) - } - return NewSortedPrivateSectorInfo(out), nil -} - -func (sb *SectorBuilder) GenerateFallbackPoSt(sectorInfo SortedPublicSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]EPostCandidate, []byte, error) { - privsectors, err := sb.pubSectorToPriv(sectorInfo, faults) - if err != nil { - return nil, nil, err - } - - challengeCount := fallbackPostChallengeCount(uint64(len(sectorInfo.Values())), len(faults)) - - proverID := addressToProverID(sb.Miner) - candidates, err := sectorbuilder.GenerateCandidates(sb.ssize, proverID, challengeSeed, challengeCount, privsectors) - if err != nil { - return nil, nil, err - } - - proof, err := sectorbuilder.GeneratePoSt(sb.ssize, proverID, privsectors, challengeSeed, candidates) - return candidates, proof, err -} - -func (sb *SectorBuilder) Stop() { - close(sb.stopping) -} - -func fallbackPostChallengeCount(sectors uint64, faults int) uint64 { - challengeCount := types.ElectionPostChallengeCount(sectors, faults) - if challengeCount > build.MaxFallbackPostChallengeCount { - return build.MaxFallbackPostChallengeCount - } - return challengeCount -} - -func (sb *SectorBuilder) ImportFrom(osb *SectorBuilder, symlink bool) error { - if err := migrate(osb.filesystem.pathFor(dataCache), sb.filesystem.pathFor(dataCache), symlink); err != nil { - return err - } - - if err := migrate(osb.filesystem.pathFor(dataStaging), sb.filesystem.pathFor(dataStaging), symlink); err != nil { - return err - } - - if err := migrate(osb.filesystem.pathFor(dataSealed), sb.filesystem.pathFor(dataSealed), symlink); err != nil { - return err - } - - val, err := osb.ds.Get(lastSectorIdKey) - if err != nil { - return err - } - - if err := sb.ds.Put(lastSectorIdKey, val); err != nil { - return err - } - - sb.lastID = osb.lastID - - return nil -} - -func (sb *SectorBuilder) SetLastSectorID(id uint64) error { - if err := sb.ds.Put(lastSectorIdKey, []byte(fmt.Sprint(id))); err != nil { - return err - } - - sb.lastID = id - return nil -} - -func migrate(from, to string, symlink bool) error { - st, err := os.Stat(from) - if err != nil { - return err - } - - if st.IsDir() { - return migrateDir(from, to, symlink) - } - return migrateFile(from, to, symlink) -} - -func migrateDir(from, to string, symlink bool) error { - tost, err := os.Stat(to) - if err != nil { - if !os.IsNotExist(err) { - return err - } - - if err := os.MkdirAll(to, 0755); err != nil { - return err - } - } else if !tost.IsDir() { - return xerrors.Errorf("target %q already exists and is a file (expected directory)") - } - - dirents, err := ioutil.ReadDir(from) - if err != nil { - return err - } - - for _, inf := range dirents { - n := inf.Name() - if inf.IsDir() { - if err := migrate(filepath.Join(from, n), filepath.Join(to, n), symlink); err != nil { - return err - } - } else { - if err := migrate(filepath.Join(from, n), filepath.Join(to, n), symlink); err != nil { - return err - } - } - } - - return nil -} - -func migrateFile(from, to string, symlink bool) error { - if symlink { - return os.Symlink(from, to) - } - - return dcopy.Copy(from, to) -} diff --git a/lib/sectorbuilder/sectorbuilder_test.go b/lib/sectorbuilder/sectorbuilder_test.go deleted file mode 100644 index 7720c59b1..000000000 --- a/lib/sectorbuilder/sectorbuilder_test.go +++ /dev/null @@ -1,365 +0,0 @@ -package sectorbuilder_test - -import ( - "context" - "fmt" - "io" - "io/ioutil" - "math/rand" - "os" - "runtime" - "sync" - "testing" - "time" - - ffi "github.com/filecoin-project/filecoin-ffi" - paramfetch "github.com/filecoin-project/go-paramfetch" - "github.com/ipfs/go-datastore" - logging "github.com/ipfs/go-log" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/lib/sectorbuilder" -) - -func init() { - logging.SetLogLevel("*", "INFO") -} - -const sectorSize = 1024 - -type seal struct { - sid uint64 - - pco sectorbuilder.RawSealPreCommitOutput - ppi sectorbuilder.PublicPieceInfo - - ticket sectorbuilder.SealTicket -} - -func (s *seal) precommit(t *testing.T, sb *sectorbuilder.SectorBuilder, sid uint64, done func()) { - dlen := sectorbuilder.UserBytesForSectorSize(sectorSize) - - var err error - r := io.LimitReader(rand.New(rand.NewSource(42+int64(sid))), int64(dlen)) - s.ppi, err = sb.AddPiece(dlen, sid, r, []uint64{}) - if err != nil { - t.Fatalf("%+v", err) - } - - s.ticket = sectorbuilder.SealTicket{ - BlockHeight: 5, - TicketBytes: [32]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2}, - } - - s.pco, err = sb.SealPreCommit(sid, s.ticket, []sectorbuilder.PublicPieceInfo{s.ppi}) - if err != nil { - t.Fatalf("%+v", err) - } - - done() -} - -func (s *seal) commit(t *testing.T, sb *sectorbuilder.SectorBuilder, done func()) { - seed := sectorbuilder.SealSeed{ - BlockHeight: 15, - TicketBytes: [32]byte{0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9, 8, 7, 6, 45, 3, 2, 1, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9}, - } - - proof, err := sb.SealCommit(s.sid, s.ticket, seed, []sectorbuilder.PublicPieceInfo{s.ppi}, s.pco) - if err != nil { - t.Fatalf("%+v", err) - } - - ok, err := sectorbuilder.VerifySeal(sectorSize, s.pco.CommR[:], s.pco.CommD[:], sb.Miner, s.ticket.TicketBytes[:], seed.TicketBytes[:], s.sid, proof) - if err != nil { - t.Fatalf("%+v", err) - } - - if !ok { - t.Fatal("proof failed to validate") - } - - done() -} - -func post(t *testing.T, sb *sectorbuilder.SectorBuilder, seals ...seal) time.Time { - cSeed := [32]byte{0, 9, 2, 7, 6, 5, 4, 3, 2, 1, 0, 9, 8, 7, 6, 45, 3, 2, 1, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9} - - ppi := make([]ffi.PublicSectorInfo, len(seals)) - for i, s := range seals { - ppi[i] = ffi.PublicSectorInfo{ - SectorID: s.sid, - CommR: s.pco.CommR, - } - } - - ssi := sectorbuilder.NewSortedPublicSectorInfo(ppi) - - candndates, err := sb.GenerateEPostCandidates(ssi, cSeed, []uint64{}) - if err != nil { - t.Fatalf("%+v", err) - } - - genCandidates := time.Now() - - if len(candndates) != 1 { - t.Fatal("expected 1 candidate") - } - - postProof, err := sb.ComputeElectionPoSt(ssi, cSeed[:], candndates) - if err != nil { - t.Fatalf("%+v", err) - } - - ok, err := sectorbuilder.VerifyElectionPost(context.TODO(), sb.SectorSize(), ssi, cSeed[:], postProof, candndates, sb.Miner) - if err != nil { - t.Fatalf("%+v", err) - } - if !ok { - t.Fatal("bad post") - } - - return genCandidates -} - -func TestSealAndVerify(t *testing.T) { - if testing.Short() { - t.Skip("skipping test in short mode") - } - if runtime.NumCPU() < 10 && os.Getenv("CI") == "" { // don't bother on slow hardware - t.Skip("this is slow") - } - _ = os.Setenv("RUST_LOG", "info") - - build.SectorSizes = []uint64{sectorSize} - - if err := paramfetch.GetParams(sectorSize); err != nil { - t.Fatalf("%+v", err) - } - - ds := datastore.NewMapDatastore() - - dir, err := ioutil.TempDir("", "sbtest") - if err != nil { - t.Fatal(err) - } - - sb, err := sectorbuilder.TempSectorbuilderDir(dir, sectorSize, ds) - if err != nil { - t.Fatalf("%+v", err) - } - cleanup := func() { - if t.Failed() { - fmt.Printf("not removing %s\n", dir) - return - } - if err := os.RemoveAll(dir); err != nil { - t.Error(err) - } - } - defer cleanup() - - si, err := sb.AcquireSectorId() - if err != nil { - t.Fatalf("%+v", err) - } - - s := seal{sid: si} - - start := time.Now() - - s.precommit(t, sb, 1, func() {}) - - precommit := time.Now() - - s.commit(t, sb, func() {}) - - commit := time.Now() - - genCandidiates := post(t, sb, s) - - epost := time.Now() - - // Restart sectorbuilder, re-run post - sb, err = sectorbuilder.TempSectorbuilderDir(dir, sectorSize, ds) - if err != nil { - t.Fatalf("%+v", err) - } - - post(t, sb, s) - - fmt.Printf("PreCommit: %s\n", precommit.Sub(start).String()) - fmt.Printf("Commit: %s\n", commit.Sub(precommit).String()) - fmt.Printf("GenCandidates: %s\n", genCandidiates.Sub(commit).String()) - fmt.Printf("EPoSt: %s\n", epost.Sub(genCandidiates).String()) -} - -func TestSealPoStNoCommit(t *testing.T) { - if testing.Short() { - t.Skip("skipping test in short mode") - } - if runtime.NumCPU() < 10 && os.Getenv("CI") == "" { // don't bother on slow hardware - t.Skip("this is slow") - } - _ = os.Setenv("RUST_LOG", "info") - - build.SectorSizes = []uint64{sectorSize} - - if err := paramfetch.GetParams(sectorSize); err != nil { - t.Fatalf("%+v", err) - } - - ds := datastore.NewMapDatastore() - - dir, err := ioutil.TempDir("", "sbtest") - if err != nil { - t.Fatal(err) - } - - sb, err := sectorbuilder.TempSectorbuilderDir(dir, sectorSize, ds) - if err != nil { - t.Fatalf("%+v", err) - } - cleanup := func() { - if t.Failed() { - fmt.Printf("not removing %s\n", dir) - return - } - if err := os.RemoveAll(dir); err != nil { - t.Error(err) - } - } - defer cleanup() - - si, err := sb.AcquireSectorId() - if err != nil { - t.Fatalf("%+v", err) - } - - s := seal{sid: si} - - start := time.Now() - - s.precommit(t, sb, 1, func() {}) - - precommit := time.Now() - - // Restart sectorbuilder, re-run post - sb, err = sectorbuilder.TempSectorbuilderDir(dir, sectorSize, ds) - if err != nil { - t.Fatalf("%+v", err) - } - - if err := sb.TrimCache(1); err != nil { - t.Fatal(err) - } - - genCandidiates := post(t, sb, s) - - epost := time.Now() - - fmt.Printf("PreCommit: %s\n", precommit.Sub(start).String()) - fmt.Printf("GenCandidates: %s\n", genCandidiates.Sub(precommit).String()) - fmt.Printf("EPoSt: %s\n", epost.Sub(genCandidiates).String()) -} - -func TestSealAndVerify2(t *testing.T) { - if testing.Short() { - t.Skip("skipping test in short mode") - } - if runtime.NumCPU() < 10 && os.Getenv("CI") == "" { // don't bother on slow hardware - t.Skip("this is slow") - } - _ = os.Setenv("RUST_LOG", "info") - - build.SectorSizes = []uint64{sectorSize} - - if err := paramfetch.GetParams(sectorSize); err != nil { - t.Fatalf("%+v", err) - } - - ds := datastore.NewMapDatastore() - - dir, err := ioutil.TempDir("", "sbtest") - if err != nil { - t.Fatal(err) - } - - sb, err := sectorbuilder.TempSectorbuilderDir(dir, sectorSize, ds) - if err != nil { - t.Fatalf("%+v", err) - } - cleanup := func() { - if err := os.RemoveAll(dir); err != nil { - t.Error(err) - } - } - - defer cleanup() - - var wg sync.WaitGroup - - si1, err := sb.AcquireSectorId() - if err != nil { - t.Fatalf("%+v", err) - } - si2, err := sb.AcquireSectorId() - if err != nil { - t.Fatalf("%+v", err) - } - - s1 := seal{sid: si1} - s2 := seal{sid: si2} - - wg.Add(2) - go s1.precommit(t, sb, 1, wg.Done) - time.Sleep(100 * time.Millisecond) - go s2.precommit(t, sb, 2, wg.Done) - wg.Wait() - - wg.Add(2) - go s1.commit(t, sb, wg.Done) - go s2.commit(t, sb, wg.Done) - wg.Wait() - - post(t, sb, s1, s2) -} - -func TestAcquireID(t *testing.T) { - ds := datastore.NewMapDatastore() - - dir, err := ioutil.TempDir("", "sbtest") - if err != nil { - t.Fatal(err) - } - - sb, err := sectorbuilder.TempSectorbuilderDir(dir, sectorSize, ds) - if err != nil { - t.Fatalf("%+v", err) - } - - assertAcquire := func(expect uint64) { - id, err := sb.AcquireSectorId() - require.NoError(t, err) - assert.Equal(t, expect, id) - } - - assertAcquire(1) - assertAcquire(2) - assertAcquire(3) - - sb, err = sectorbuilder.TempSectorbuilderDir(dir, sectorSize, ds) - if err != nil { - t.Fatalf("%+v", err) - } - - assertAcquire(4) - assertAcquire(5) - assertAcquire(6) - - if err := os.RemoveAll(dir); err != nil { - t.Error(err) - } -} diff --git a/lib/statestore/store.go b/lib/statestore/store.go deleted file mode 100644 index 38ce17b39..000000000 --- a/lib/statestore/store.go +++ /dev/null @@ -1,165 +0,0 @@ -package statestore - -import ( - "bytes" - "fmt" - "reflect" - - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/query" - cbg "github.com/whyrusleeping/cbor-gen" - "go.uber.org/multierr" - "golang.org/x/xerrors" - - "github.com/filecoin-project/go-cbor-util" -) - -type StateStore struct { - ds datastore.Datastore -} - -func New(ds datastore.Datastore) *StateStore { - return &StateStore{ds: ds} -} - -func toKey(k interface{}) datastore.Key { - switch t := k.(type) { - case uint64: - return datastore.NewKey(fmt.Sprint(t)) - case fmt.Stringer: - return datastore.NewKey(t.String()) - default: - panic("unexpected key type") - } -} - -func (st *StateStore) Begin(i interface{}, state interface{}) error { - k := toKey(i) - has, err := st.ds.Has(k) - if err != nil { - return err - } - if has { - return xerrors.Errorf("already tracking state for %v", i) - } - - b, err := cborutil.Dump(state) - if err != nil { - return err - } - - return st.ds.Put(k, b) -} - -func (st *StateStore) End(i interface{}) error { - k := toKey(i) - has, err := st.ds.Has(k) - if err != nil { - return err - } - if !has { - return xerrors.Errorf("No state for %s", i) - } - return st.ds.Delete(k) -} - -func cborMutator(mutator interface{}) func([]byte) ([]byte, error) { - rmut := reflect.ValueOf(mutator) - - return func(in []byte) ([]byte, error) { - state := reflect.New(rmut.Type().In(0).Elem()) - - err := cborutil.ReadCborRPC(bytes.NewReader(in), state.Interface()) - if err != nil { - return nil, err - } - - out := rmut.Call([]reflect.Value{state}) - - if err := out[0].Interface(); err != nil { - return nil, err.(error) - } - - return cborutil.Dump(state.Interface()) - } -} - -// mutator func(*T) error -func (st *StateStore) Mutate(i interface{}, mutator interface{}) error { - return st.mutate(i, cborMutator(mutator)) -} - -func (st *StateStore) mutate(i interface{}, mutator func([]byte) ([]byte, error)) error { - k := toKey(i) - has, err := st.ds.Has(k) - if err != nil { - return err - } - if !has { - return xerrors.Errorf("No state for %s", i) - } - - cur, err := st.ds.Get(k) - if err != nil { - return err - } - - mutated, err := mutator(cur) - if err != nil { - return err - } - - return st.ds.Put(k, mutated) -} - -func (st *StateStore) Has(i interface{}) (bool, error) { - return st.ds.Has(toKey(i)) -} - -func (st *StateStore) Get(i interface{}, out cbg.CBORUnmarshaler) error { - k := toKey(i) - val, err := st.ds.Get(k) - if err != nil { - if xerrors.Is(err, datastore.ErrNotFound) { - return xerrors.Errorf("No state for %s: %w", i, err) - } - return err - } - - return out.UnmarshalCBOR(bytes.NewReader(val)) -} - -// out: *[]T -func (st *StateStore) List(out interface{}) error { - res, err := st.ds.Query(query.Query{}) - if err != nil { - return err - } - defer res.Close() - - outT := reflect.TypeOf(out).Elem().Elem() - rout := reflect.ValueOf(out) - - var errs error - - for { - res, ok := res.NextSync() - if !ok { - break - } - if res.Error != nil { - return res.Error - } - - elem := reflect.New(outT) - err := cborutil.ReadCborRPC(bytes.NewReader(res.Value), elem.Interface()) - if err != nil { - errs = multierr.Append(errs, xerrors.Errorf("decoding state for key '%s': %w", res.Key, err)) - continue - } - - rout.Elem().Set(reflect.Append(rout.Elem(), elem.Elem())) - } - - return nil -} diff --git a/lib/statestore/store_test.go b/lib/statestore/store_test.go deleted file mode 100644 index b0aa366d1..000000000 --- a/lib/statestore/store_test.go +++ /dev/null @@ -1,38 +0,0 @@ -package statestore - -import ( - "testing" - - "github.com/ipfs/go-datastore" - - "github.com/filecoin-project/go-cbor-util" - "github.com/filecoin-project/lotus/chain/types" -) - -func TestList(t *testing.T) { - ds := datastore.NewMapDatastore() - - e, err := cborutil.Dump(types.NewInt(7)) - if err != nil { - t.Fatal(err) - } - - if err := ds.Put(datastore.NewKey("/2"), e); err != nil { - t.Fatal(err) - } - - st := &StateStore{ds: ds} - - var out []types.BigInt - if err := st.List(&out); err != nil { - t.Fatal(err) - } - - if len(out) != 1 { - t.Fatal("wrong len") - } - - if out[0].Int64() != 7 { - t.Fatal("wrong data") - } -} diff --git a/node/impl/storminer.go b/node/impl/storminer.go index dd6e6825e..5568ed529 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -13,8 +13,8 @@ import ( files "github.com/ipfs/go-ipfs-files" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/lib/sectorbuilder" "github.com/filecoin-project/lotus/lib/tarutil" "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/storage" diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 5a5b55cec..23fb35455 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -20,13 +20,13 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/go-statestore" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/deals" "github.com/filecoin-project/lotus/chain/gen" "github.com/filecoin-project/lotus/datatransfer" - "github.com/filecoin-project/lotus/lib/sectorbuilder" - "github.com/filecoin-project/lotus/lib/statestore" "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" diff --git a/storage/miner.go b/storage/miner.go index 2a0b267f5..3265aa2ae 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -13,14 +13,14 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/go-statestore" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/gen" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/lib/sectorbuilder" - "github.com/filecoin-project/lotus/lib/statestore" ) var log = logging.Logger("storageminer")