Merge pull request #138 from filecoin-project/feat/sectorstore
Sector Store
This commit is contained in:
commit
1ed701db18
@ -6,8 +6,8 @@ import (
|
|||||||
|
|
||||||
"github.com/filecoin-project/go-lotus/api"
|
"github.com/filecoin-project/go-lotus/api"
|
||||||
"github.com/filecoin-project/go-lotus/chain/address"
|
"github.com/filecoin-project/go-lotus/chain/address"
|
||||||
"github.com/filecoin-project/go-lotus/lib/sectorbuilder"
|
|
||||||
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
|
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
|
||||||
|
"github.com/filecoin-project/go-lotus/storage/sector"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
@ -35,8 +35,8 @@ type MinerDeal struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Handler struct {
|
type Handler struct {
|
||||||
sb *sectorbuilder.SectorBuilder
|
secst *sector.Store
|
||||||
full api.FullNode
|
full api.FullNode
|
||||||
|
|
||||||
// TODO: Use a custom protocol or graphsync in the future
|
// TODO: Use a custom protocol or graphsync in the future
|
||||||
// TODO: GC
|
// TODO: GC
|
||||||
@ -60,7 +60,7 @@ type dealUpdate struct {
|
|||||||
mut func(*MinerDeal)
|
mut func(*MinerDeal)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewHandler(ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder, dag dtypes.StagingDAG, fullNode api.FullNode) (*Handler, error) {
|
func NewHandler(ds dtypes.MetadataDS, secst *sector.Store, dag dtypes.StagingDAG, fullNode api.FullNode) (*Handler, error) {
|
||||||
addr, err := ds.Get(datastore.NewKey("miner-address"))
|
addr, err := ds.Get(datastore.NewKey("miner-address"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -71,9 +71,9 @@ func NewHandler(ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder, dag dtype
|
|||||||
}
|
}
|
||||||
|
|
||||||
return &Handler{
|
return &Handler{
|
||||||
sb: sb,
|
secst: secst,
|
||||||
dag: dag,
|
dag: dag,
|
||||||
full: fullNode,
|
full: fullNode,
|
||||||
|
|
||||||
conns: map[cid.Cid]inet.Stream{},
|
conns: map[cid.Cid]inet.Stream{},
|
||||||
|
|
||||||
|
@ -2,8 +2,6 @@ package deals
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/go-lotus/lib/sectorbuilder"
|
"github.com/filecoin-project/go-lotus/lib/sectorbuilder"
|
||||||
|
|
||||||
files "github.com/ipfs/go-ipfs-files"
|
files "github.com/ipfs/go-ipfs-files"
|
||||||
@ -88,11 +86,7 @@ func (h *Handler) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal),
|
|||||||
return nil, xerrors.Errorf("failed to get file size: %s", err)
|
return nil, xerrors.Errorf("failed to get file size: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var sectorID uint64
|
sectorID, err := h.secst.AddPiece(deal.Proposal.PieceRef, uint64(size), uf, deal.Proposal.Duration)
|
||||||
err = withTemp(uf, func(f string) (err error) {
|
|
||||||
sectorID, err = h.sb.AddPiece(deal.Proposal.PieceRef, uint64(size), f)
|
|
||||||
return err
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("AddPiece failed: %s", err)
|
return nil, xerrors.Errorf("AddPiece failed: %s", err)
|
||||||
}
|
}
|
||||||
@ -117,37 +111,29 @@ func getInclusionProof(ref string, status sectorbuilder.SectorSealingStatus) (Pi
|
|||||||
return PieceInclusionProof{}, xerrors.Errorf("pieceInclusionProof for %s in sector %d not found", ref, status.SectorID)
|
return PieceInclusionProof{}, xerrors.Errorf("pieceInclusionProof for %s in sector %d not found", ref, status.SectorID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) pollSectorSealed(deal MinerDeal) (status sectorbuilder.SectorSealingStatus, err error) {
|
func (h *Handler) waitSealed(deal MinerDeal) (sectorbuilder.SectorSealingStatus, error) {
|
||||||
loop:
|
status, err := h.secst.WaitSeal(context.TODO(), deal.SectorID)
|
||||||
for {
|
if err != nil {
|
||||||
status, err = h.sb.SealStatus(deal.SectorID)
|
return sectorbuilder.SectorSealingStatus{}, err
|
||||||
if err != nil {
|
|
||||||
return sectorbuilder.SectorSealingStatus{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
switch status.SealStatusCode {
|
|
||||||
case 0: // sealed
|
|
||||||
break loop
|
|
||||||
case 2: // failed
|
|
||||||
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sealing sector %d for deal %s (ref=%s) failed: %s", deal.SectorID, deal.ProposalCid, deal.Ref, status.SealErrorMsg)
|
|
||||||
case 1: // pending
|
|
||||||
if err := h.sb.SealAllStagedSectors(); err != nil {
|
|
||||||
return sectorbuilder.SectorSealingStatus{}, err
|
|
||||||
}
|
|
||||||
// start seal
|
|
||||||
fallthrough
|
|
||||||
case 3: // sealing
|
|
||||||
// wait
|
|
||||||
default:
|
|
||||||
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("unknown SealStatusCode: %d", status.SectorID)
|
|
||||||
}
|
|
||||||
time.Sleep(3 * time.Second)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
switch status.SealStatusCode {
|
||||||
|
case 0: // sealed
|
||||||
|
case 2: // failed
|
||||||
|
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sealing sector %d for deal %s (ref=%s) failed: %s", deal.SectorID, deal.ProposalCid, deal.Ref, status.SealErrorMsg)
|
||||||
|
case 1: // pending
|
||||||
|
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sector status was 'pending' after call to WaitSeal (for sector %d)", deal.SectorID)
|
||||||
|
case 3: // sealing
|
||||||
|
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sector status was 'wait' after call to WaitSeal (for sector %d)", deal.SectorID)
|
||||||
|
default:
|
||||||
|
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("unknown SealStatusCode: %d", status.SectorID)
|
||||||
|
}
|
||||||
|
|
||||||
return status, nil
|
return status, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
|
func (h *Handler) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
|
||||||
status, err := h.pollSectorSealed(deal)
|
status, err := h.waitSealed(deal)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -1,56 +0,0 @@
|
|||||||
package sectorbuilder
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
// TODO: really need to get a callbacks API from the rust-sectorbuilder
|
|
||||||
func (sb *SectorBuilder) pollForSealedSectors(ctx context.Context) {
|
|
||||||
watching := make(map[uint64]bool)
|
|
||||||
|
|
||||||
staged, err := sb.GetAllStagedSectors()
|
|
||||||
if err != nil {
|
|
||||||
// TODO: this is probably worth shutting the miner down over until we
|
|
||||||
// have better recovery mechanisms
|
|
||||||
log.Errorf("failed to get staged sectors: %s", err)
|
|
||||||
}
|
|
||||||
for _, s := range staged {
|
|
||||||
watching[s.SectorID] = true
|
|
||||||
}
|
|
||||||
|
|
||||||
tick := time.Tick(time.Second * 5)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-tick:
|
|
||||||
log.Info("polling for sealed sectors...")
|
|
||||||
|
|
||||||
// add new staged sectors to watch list
|
|
||||||
staged, err := sb.GetAllStagedSectors()
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("in loop: failed to get staged sectors: %s", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, s := range staged {
|
|
||||||
watching[s.SectorID] = true
|
|
||||||
}
|
|
||||||
|
|
||||||
for s := range watching {
|
|
||||||
status, err := sb.SealStatus(s)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("getting seal status: %s", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if status.SealStatusCode == 0 { // constant pls, zero implies the last step?
|
|
||||||
delete(watching, s)
|
|
||||||
sb.sschan <- status
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case <-ctx.Done():
|
|
||||||
close(sb.sschan)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,7 +1,6 @@
|
|||||||
package sectorbuilder
|
package sectorbuilder
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
|
|
||||||
@ -22,8 +21,6 @@ const CommLen = sectorbuilder.CommitmentBytesLen
|
|||||||
|
|
||||||
type SectorBuilder struct {
|
type SectorBuilder struct {
|
||||||
handle unsafe.Pointer
|
handle unsafe.Pointer
|
||||||
|
|
||||||
sschan chan SectorSealingStatus
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type SectorBuilderConfig struct {
|
type SectorBuilderConfig struct {
|
||||||
@ -44,7 +41,6 @@ func New(cfg *SectorBuilderConfig) (*SectorBuilder, error) {
|
|||||||
|
|
||||||
return &SectorBuilder{
|
return &SectorBuilder{
|
||||||
handle: sbp,
|
handle: sbp,
|
||||||
sschan: make(chan SectorSealingStatus, 32),
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -60,10 +56,6 @@ func sectorIDtoBytes(sid uint64) [31]byte {
|
|||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sb *SectorBuilder) Run(ctx context.Context) {
|
|
||||||
go sb.pollForSealedSectors(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sb *SectorBuilder) Destroy() {
|
func (sb *SectorBuilder) Destroy() {
|
||||||
sectorbuilder.DestroySectorBuilder(sb.handle)
|
sectorbuilder.DestroySectorBuilder(sb.handle)
|
||||||
}
|
}
|
||||||
@ -95,11 +87,6 @@ func (sb *SectorBuilder) GeneratePoSt(sortedCommRs [][CommLen]byte, challengeSee
|
|||||||
return sectorbuilder.GeneratePoSt(sb.handle, sortedCommRs, challengeSeed)
|
return sectorbuilder.GeneratePoSt(sb.handle, sortedCommRs, challengeSeed)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sb *SectorBuilder) SealedSectorChan() <-chan SectorSealingStatus {
|
|
||||||
// is this ever going to be multi-consumer? If so, switch to using pubsub/eventbus
|
|
||||||
return sb.sschan
|
|
||||||
}
|
|
||||||
|
|
||||||
var UserBytesForSectorSize = sectorbuilder.GetMaxUserBytesPerStagedSector
|
var UserBytesForSectorSize = sectorbuilder.GetMaxUserBytesPerStagedSector
|
||||||
|
|
||||||
func VerifySeal(sectorSize uint64, commR, commD, commRStar []byte, proverID address.Address, sectorID uint64, proof []byte) (bool, error) {
|
func VerifySeal(sectorSize uint64, commR, commD, commRStar []byte, proverID address.Address, sectorID uint64, proof []byte) (bool, error) {
|
||||||
|
@ -1,14 +1,14 @@
|
|||||||
package sectorbuilder
|
package sectorbuilder_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-lotus/chain/address"
|
"github.com/filecoin-project/go-lotus/chain/address"
|
||||||
|
"github.com/filecoin-project/go-lotus/lib/sectorbuilder"
|
||||||
|
"github.com/filecoin-project/go-lotus/storage/sector"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestSealAndVerify(t *testing.T) {
|
func TestSealAndVerify(t *testing.T) {
|
||||||
@ -23,7 +23,7 @@ func TestSealAndVerify(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
sb, err := New(&SectorBuilderConfig{
|
sb, err := sectorbuilder.New(§orbuilder.SectorBuilderConfig{
|
||||||
SectorSize: 1024,
|
SectorSize: 1024,
|
||||||
SealedDir: dir,
|
SealedDir: dir,
|
||||||
StagedDir: dir,
|
StagedDir: dir,
|
||||||
@ -34,11 +34,6 @@ func TestSealAndVerify(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
sb.Run(ctx)
|
|
||||||
|
|
||||||
fi, err := ioutil.TempFile("", "sbtestfi")
|
fi, err := ioutil.TempFile("", "sbtestfi")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -51,10 +46,11 @@ func TestSealAndVerify(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ssinfo := <-sb.SealedSectorChan()
|
store := sector.NewStore(sb)
|
||||||
fmt.Println("sector sealed...")
|
store.Service()
|
||||||
|
ssinfo := <-store.Incoming()
|
||||||
|
|
||||||
ok, err := VerifySeal(1024, ssinfo.CommR[:], ssinfo.CommD[:], ssinfo.CommRStar[:], addr, ssinfo.SectorID, ssinfo.Proof)
|
ok, err := sectorbuilder.VerifySeal(1024, ssinfo.CommR[:], ssinfo.CommD[:], ssinfo.CommRStar[:], addr, ssinfo.SectorID, ssinfo.Proof)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -35,6 +35,7 @@ import (
|
|||||||
"github.com/filecoin-project/go-lotus/node/repo"
|
"github.com/filecoin-project/go-lotus/node/repo"
|
||||||
"github.com/filecoin-project/go-lotus/paych"
|
"github.com/filecoin-project/go-lotus/paych"
|
||||||
"github.com/filecoin-project/go-lotus/storage"
|
"github.com/filecoin-project/go-lotus/storage"
|
||||||
|
"github.com/filecoin-project/go-lotus/storage/sector"
|
||||||
)
|
)
|
||||||
|
|
||||||
// special is a type used to give keys to modules which
|
// special is a type used to give keys to modules which
|
||||||
@ -74,7 +75,10 @@ const (
|
|||||||
HandleIncomingMessagesKey
|
HandleIncomingMessagesKey
|
||||||
|
|
||||||
RunDealClientKey
|
RunDealClientKey
|
||||||
|
|
||||||
|
// storage miner
|
||||||
HandleDealsKey
|
HandleDealsKey
|
||||||
|
RunSectorServiceKey
|
||||||
|
|
||||||
// daemon
|
// daemon
|
||||||
ExtractApiKey
|
ExtractApiKey
|
||||||
@ -222,13 +226,15 @@ func Online() Option {
|
|||||||
|
|
||||||
// Storage miner
|
// Storage miner
|
||||||
ApplyIf(func(s *Settings) bool { return s.nodeType == nodeStorageMiner },
|
ApplyIf(func(s *Settings) bool { return s.nodeType == nodeStorageMiner },
|
||||||
Override(new(*sectorbuilder.SectorBuilder), modules.SectorBuilder),
|
Override(new(*sectorbuilder.SectorBuilder), sectorbuilder.New),
|
||||||
|
Override(new(*sector.Store), sector.NewStore),
|
||||||
Override(new(*storage.Miner), modules.StorageMiner),
|
Override(new(*storage.Miner), modules.StorageMiner),
|
||||||
|
|
||||||
Override(new(dtypes.StagingDAG), modules.StagingDAG),
|
Override(new(dtypes.StagingDAG), modules.StagingDAG),
|
||||||
|
|
||||||
Override(new(*deals.Handler), deals.NewHandler),
|
Override(new(*deals.Handler), deals.NewHandler),
|
||||||
Override(HandleDealsKey, modules.HandleDeals),
|
Override(HandleDealsKey, modules.HandleDeals),
|
||||||
|
Override(RunSectorServiceKey, modules.RunSectorService),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
@ -3,13 +3,14 @@ package impl
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/filecoin-project/go-lotus/chain/address"
|
"io"
|
||||||
"io/ioutil"
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-lotus/api"
|
"github.com/filecoin-project/go-lotus/api"
|
||||||
|
"github.com/filecoin-project/go-lotus/chain/address"
|
||||||
"github.com/filecoin-project/go-lotus/lib/sectorbuilder"
|
"github.com/filecoin-project/go-lotus/lib/sectorbuilder"
|
||||||
"github.com/filecoin-project/go-lotus/storage"
|
"github.com/filecoin-project/go-lotus/storage"
|
||||||
|
"github.com/filecoin-project/go-lotus/storage/sector"
|
||||||
)
|
)
|
||||||
|
|
||||||
type StorageMinerAPI struct {
|
type StorageMinerAPI struct {
|
||||||
@ -17,6 +18,7 @@ type StorageMinerAPI struct {
|
|||||||
|
|
||||||
SectorBuilderConfig *sectorbuilder.SectorBuilderConfig
|
SectorBuilderConfig *sectorbuilder.SectorBuilderConfig
|
||||||
SectorBuilder *sectorbuilder.SectorBuilder
|
SectorBuilder *sectorbuilder.SectorBuilder
|
||||||
|
Sectors *sector.Store
|
||||||
|
|
||||||
Miner *storage.Miner
|
Miner *storage.Miner
|
||||||
}
|
}
|
||||||
@ -26,20 +28,10 @@ func (sm *StorageMinerAPI) ActorAddresses(context.Context) ([]address.Address, e
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sm *StorageMinerAPI) StoreGarbageData(ctx context.Context) (uint64, error) {
|
func (sm *StorageMinerAPI) StoreGarbageData(ctx context.Context) (uint64, error) {
|
||||||
maxSize := uint64(1016) // this is the most data we can fit in a 1024 byte sector
|
size := uint64(1016) // this is the most data we can fit in a 1024 byte sector
|
||||||
data := make([]byte, maxSize)
|
|
||||||
fi, err := ioutil.TempFile("", "lotus-garbage")
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err := fi.Write(data); err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
fi.Close()
|
|
||||||
|
|
||||||
name := fmt.Sprintf("fake-file-%d", rand.Intn(100000000))
|
name := fmt.Sprintf("fake-file-%d", rand.Intn(100000000))
|
||||||
sectorId, err := sm.SectorBuilder.AddPiece(name, maxSize, fi.Name())
|
sectorId, err := sm.Sectors.AddPiece(name, size, io.LimitReader(rand.New(rand.NewSource(42)), 1016), 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,7 @@ package modules
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/filecoin-project/go-lotus/storage/sector"
|
||||||
|
|
||||||
"github.com/libp2p/go-libp2p-core/host"
|
"github.com/libp2p/go-libp2p-core/host"
|
||||||
inet "github.com/libp2p/go-libp2p-core/network"
|
inet "github.com/libp2p/go-libp2p-core/network"
|
||||||
@ -69,3 +70,16 @@ func RunDealClient(lc fx.Lifecycle, c *deals.Client) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func RunSectorService(lc fx.Lifecycle, secst *sector.Store) {
|
||||||
|
lc.Append(fx.Hook{
|
||||||
|
OnStart: func(context.Context) error {
|
||||||
|
secst.Service()
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
OnStop: func(context.Context) error {
|
||||||
|
secst.Stop()
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
@ -2,6 +2,7 @@ package modules
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/filecoin-project/go-lotus/storage/sector"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
"github.com/ipfs/go-bitswap"
|
"github.com/ipfs/go-bitswap"
|
||||||
@ -63,31 +64,13 @@ func SectorBuilderConfig(storagePath string) func(dtypes.MetadataDS) (*sectorbui
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func SectorBuilder(mctx helpers.MetricsCtx, lc fx.Lifecycle, sbc *sectorbuilder.SectorBuilderConfig) (*sectorbuilder.SectorBuilder, error) {
|
func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, secst *sector.Store) (*storage.Miner, error) {
|
||||||
sb, err := sectorbuilder.New(sbc)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
|
||||||
|
|
||||||
lc.Append(fx.Hook{
|
|
||||||
OnStart: func(context.Context) error {
|
|
||||||
sb.Run(ctx)
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
return sb, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder) (*storage.Miner, error) {
|
|
||||||
maddr, err := minerAddrFromDS(ds)
|
maddr, err := minerAddrFromDS(ds)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
sm, err := storage.NewMiner(api, maddr, h, ds, sb)
|
sm, err := storage.NewMiner(api, maddr, h, ds, secst)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -3,6 +3,7 @@ package storage
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/filecoin-project/go-lotus/storage/sector"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-lotus/api"
|
"github.com/filecoin-project/go-lotus/api"
|
||||||
"github.com/filecoin-project/go-lotus/chain/actors"
|
"github.com/filecoin-project/go-lotus/chain/actors"
|
||||||
@ -23,7 +24,7 @@ var log = logging.Logger("storageminer")
|
|||||||
type Miner struct {
|
type Miner struct {
|
||||||
api storageMinerApi
|
api storageMinerApi
|
||||||
|
|
||||||
sb *sectorbuilder.SectorBuilder
|
secst *sector.Store
|
||||||
|
|
||||||
maddr address.Address
|
maddr address.Address
|
||||||
|
|
||||||
@ -52,13 +53,13 @@ type storageMinerApi interface {
|
|||||||
WalletHas(context.Context, address.Address) (bool, error)
|
WalletHas(context.Context, address.Address) (bool, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, sb *sectorbuilder.SectorBuilder) (*Miner, error) {
|
func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, secst *sector.Store) (*Miner, error) {
|
||||||
return &Miner{
|
return &Miner{
|
||||||
api: api,
|
api: api,
|
||||||
maddr: addr,
|
maddr: addr,
|
||||||
h: h,
|
h: h,
|
||||||
ds: ds,
|
ds: ds,
|
||||||
sb: sb,
|
secst: secst,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -73,9 +74,12 @@ func (m *Miner) Run(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) handlePostingSealedSectors(ctx context.Context) {
|
func (m *Miner) handlePostingSealedSectors(ctx context.Context) {
|
||||||
|
incoming := m.secst.Incoming()
|
||||||
|
defer m.secst.CloseIncoming(incoming)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case sinfo, ok := <-m.sb.SealedSectorChan():
|
case sinfo, ok := <-incoming:
|
||||||
if !ok {
|
if !ok {
|
||||||
// TODO: set some state variable so that this state can be
|
// TODO: set some state variable so that this state can be
|
||||||
// visible via some status command
|
// visible via some status command
|
||||||
|
190
storage/sector/store.go
Normal file
190
storage/sector/store.go
Normal file
@ -0,0 +1,190 @@
|
|||||||
|
package sector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-lotus/lib/sectorbuilder"
|
||||||
|
|
||||||
|
logging "github.com/ipfs/go-log"
|
||||||
|
)
|
||||||
|
|
||||||
|
var log = logging.Logger("sectorstore")
|
||||||
|
|
||||||
|
// TODO: eventually handle sector storage here instead of in rust-sectorbuilder
|
||||||
|
type Store struct {
|
||||||
|
lk sync.Mutex
|
||||||
|
sb *sectorbuilder.SectorBuilder
|
||||||
|
|
||||||
|
waiting map[uint64]chan struct{}
|
||||||
|
incoming []chan sectorbuilder.SectorSealingStatus
|
||||||
|
// TODO: outdated chan
|
||||||
|
|
||||||
|
closeCh chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewStore(sb *sectorbuilder.SectorBuilder) *Store {
|
||||||
|
return &Store{
|
||||||
|
sb: sb,
|
||||||
|
waiting: map[uint64]chan struct{}{},
|
||||||
|
closeCh: make(chan struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) Service() {
|
||||||
|
go s.service()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) poll() {
|
||||||
|
log.Info("polling for sealed sectors...")
|
||||||
|
|
||||||
|
// get a list of sectors to poll
|
||||||
|
s.lk.Lock()
|
||||||
|
toPoll := make([]uint64, 0, len(s.waiting))
|
||||||
|
|
||||||
|
for id := range s.waiting {
|
||||||
|
toPoll = append(toPoll, id)
|
||||||
|
}
|
||||||
|
s.lk.Unlock()
|
||||||
|
|
||||||
|
var done []sectorbuilder.SectorSealingStatus
|
||||||
|
|
||||||
|
// check status of each
|
||||||
|
for _, sec := range toPoll {
|
||||||
|
status, err := s.sb.SealStatus(sec)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("getting seal status: %s", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if status.SealStatusCode == 0 { // constant pls, zero implies the last step?
|
||||||
|
done = append(done, status)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// send updates
|
||||||
|
s.lk.Lock()
|
||||||
|
for _, sector := range done {
|
||||||
|
watch, ok := s.waiting[sector.SectorID]
|
||||||
|
if ok {
|
||||||
|
close(watch)
|
||||||
|
delete(s.waiting, sector.SectorID)
|
||||||
|
}
|
||||||
|
for _, c := range s.incoming {
|
||||||
|
c <- sector // TODO: ctx!
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.lk.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) service() {
|
||||||
|
poll := time.Tick(5 * time.Second)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-poll:
|
||||||
|
s.poll()
|
||||||
|
case <-s.closeCh:
|
||||||
|
s.lk.Lock()
|
||||||
|
for _, c := range s.incoming {
|
||||||
|
close(c)
|
||||||
|
}
|
||||||
|
s.lk.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) AddPiece(ref string, size uint64, r io.Reader, keepAtLeast uint64) (sectorID uint64, err error) {
|
||||||
|
err = withTemp(r, func(f string) (err error) {
|
||||||
|
sectorID, err = s.sb.AddPiece(ref, size, f)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.lk.Lock()
|
||||||
|
_, exists := s.waiting[sectorID]
|
||||||
|
if !exists { // pieces can share sectors
|
||||||
|
s.waiting[sectorID] = make(chan struct{})
|
||||||
|
}
|
||||||
|
s.lk.Unlock()
|
||||||
|
|
||||||
|
return sectorID, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) CloseIncoming(c <-chan sectorbuilder.SectorSealingStatus) {
|
||||||
|
s.lk.Lock()
|
||||||
|
var at = -1
|
||||||
|
for i, ch := range s.incoming {
|
||||||
|
if ch == c {
|
||||||
|
at = i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if at == -1 {
|
||||||
|
s.lk.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(s.incoming) > 1 {
|
||||||
|
last := len(s.incoming) - 1
|
||||||
|
s.incoming[at] = s.incoming[last]
|
||||||
|
s.incoming[last] = nil
|
||||||
|
}
|
||||||
|
s.incoming = s.incoming[:len(s.incoming)-1]
|
||||||
|
s.lk.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) Incoming() <-chan sectorbuilder.SectorSealingStatus {
|
||||||
|
ch := make(chan sectorbuilder.SectorSealingStatus, 8)
|
||||||
|
s.lk.Lock()
|
||||||
|
s.incoming = append(s.incoming, ch)
|
||||||
|
s.lk.Unlock()
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) WaitSeal(ctx context.Context, sector uint64) (sectorbuilder.SectorSealingStatus, error) {
|
||||||
|
s.lk.Lock()
|
||||||
|
watch, ok := s.waiting[sector]
|
||||||
|
s.lk.Unlock()
|
||||||
|
if ok {
|
||||||
|
select {
|
||||||
|
case <-watch:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return sectorbuilder.SectorSealingStatus{}, ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.sb.SealStatus(sector)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) Stop() {
|
||||||
|
close(s.closeCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
func withTemp(r io.Reader, cb func(string) error) error {
|
||||||
|
f, err := ioutil.TempFile(os.TempDir(), "lotus-temp-")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := io.Copy(f, r); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := f.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = cb(f.Name())
|
||||||
|
if err != nil {
|
||||||
|
if err := os.Remove(f.Name()); err != nil {
|
||||||
|
log.Errorf("couldn't remove temp file '%s'", f.Name())
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return os.Remove(f.Name())
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user