initial sectorbuilder FS refactor integration

This commit is contained in:
Łukasz Magiera 2020-01-29 00:08:02 +01:00
parent eb4b85aea5
commit 5af64c53b6
12 changed files with 45 additions and 28 deletions

View File

@ -146,7 +146,7 @@ func main() {
Miner: maddr, Miner: maddr,
SectorSize: sectorSize, SectorSize: sectorSize,
WorkerThreads: 2, WorkerThreads: 2,
Dir: sbdir, Paths: sectorbuilder.SimplePath(sbdir),
} }
if robench == "" { if robench == "" {
@ -174,7 +174,7 @@ func main() {
r := rand.New(rand.NewSource(100 + int64(i))) r := rand.New(rand.NewSource(100 + int64(i)))
pi, err := sb.AddPiece(dataSize, i, r, nil) pi, err := sb.AddPiece(context.TODO(), dataSize, i, r, nil)
if err != nil { if err != nil {
return err return err
} }
@ -225,7 +225,7 @@ func main() {
if !c.Bool("skip-unseal") { if !c.Bool("skip-unseal") {
log.Info("Unsealing sector") log.Info("Unsealing sector")
rc, err := sb.ReadPieceFromSealedSector(1, 0, dataSize, ticket.TicketBytes[:], commD[:]) rc, err := sb.ReadPieceFromSealedSector(context.TODO(), 1, 0, dataSize, ticket.TicketBytes[:], commD[:])
if err != nil { if err != nil {
return err return err
} }

View File

@ -41,7 +41,6 @@ func runSyncer(ctx context.Context, api api.FullNode, st *storage) {
go subMpool(ctx, api, st) go subMpool(ctx, api, st)
go subBlocks(ctx, api, st) go subBlocks(ctx, api, st)
} }
} }
} }
}() }()

View File

@ -35,7 +35,7 @@ func acceptJobs(ctx context.Context, api lapi.StorageMiner, endpoint string, aut
SectorSize: ssize, SectorSize: ssize,
Miner: act, Miner: act,
WorkerThreads: 1, WorkerThreads: 1,
Dir: repo, Paths: sectorbuilder.SimplePath(repo),
}) })
if err != nil { if err != nil {
return err return err

View File

@ -196,7 +196,7 @@ var aggregateSectorDirsCmd = &cli.Command{
agsb, err := sectorbuilder.New(&sectorbuilder.Config{ agsb, err := sectorbuilder.New(&sectorbuilder.Config{
Miner: maddr, Miner: maddr,
SectorSize: ssize, SectorSize: ssize,
Dir: destdir, Paths: sectorbuilder.SimplePath(destdir),
WorkerThreads: 2, WorkerThreads: 2,
}, namespace.Wrap(agmds, datastore.NewKey("/sectorbuilder"))) }, namespace.Wrap(agmds, datastore.NewKey("/sectorbuilder")))
if err != nil { if err != nil {
@ -257,7 +257,7 @@ var aggregateSectorDirsCmd = &cli.Command{
sb, err := sectorbuilder.New(&sectorbuilder.Config{ sb, err := sectorbuilder.New(&sectorbuilder.Config{
Miner: maddr, Miner: maddr,
SectorSize: genm.SectorSize, SectorSize: genm.SectorSize,
Dir: dir, Paths: sectorbuilder.SimplePath(dir),
WorkerThreads: 2, WorkerThreads: 2,
}, namespace.Wrap(mds, datastore.NewKey("/sectorbuilder"))) }, namespace.Wrap(mds, datastore.NewKey("/sectorbuilder")))
if err != nil { if err != nil {

View File

@ -32,7 +32,7 @@ func PreSeal(maddr address.Address, ssize uint64, offset uint64, sectors int, sb
Miner: maddr, Miner: maddr,
SectorSize: ssize, SectorSize: ssize,
FallbackLastID: offset, FallbackLastID: offset,
Dir: sbroot, Paths: sectorbuilder.SimplePath(sbroot),
WorkerThreads: 2, WorkerThreads: 2,
} }
@ -59,7 +59,7 @@ func PreSeal(maddr address.Address, ssize uint64, offset uint64, sectors int, sb
return nil, err return nil, err
} }
pi, err := sb.AddPiece(size, sid, rand.Reader, nil) pi, err := sb.AddPiece(context.TODO(), size, sid, rand.Reader, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -76,7 +76,7 @@ func PreSeal(maddr address.Address, ssize uint64, offset uint64, sectors int, sb
return nil, xerrors.Errorf("commit: %w", err) return nil, xerrors.Errorf("commit: %w", err)
} }
if err := sb.TrimCache(sid); err != nil { if err := sb.TrimCache(context.TODO(), sid); err != nil {
return nil, xerrors.Errorf("trim cache: %w", err) return nil, xerrors.Errorf("trim cache: %w", err)
} }

View File

@ -177,7 +177,7 @@ var initCmd = &cli.Command{
oldsb, err := sectorbuilder.New(&sectorbuilder.Config{ oldsb, err := sectorbuilder.New(&sectorbuilder.Config{
SectorSize: ssize, SectorSize: ssize,
WorkerThreads: 2, WorkerThreads: 2,
Dir: pssb, Paths: sectorbuilder.SimplePath(pssb),
}, namespace.Wrap(oldmds, datastore.NewKey("/sectorbuilder"))) }, namespace.Wrap(oldmds, datastore.NewKey("/sectorbuilder")))
if err != nil { if err != nil {
return xerrors.Errorf("failed to open up preseal sectorbuilder: %w", err) return xerrors.Errorf("failed to open up preseal sectorbuilder: %w", err)
@ -186,7 +186,7 @@ var initCmd = &cli.Command{
nsb, err := sectorbuilder.New(&sectorbuilder.Config{ nsb, err := sectorbuilder.New(&sectorbuilder.Config{
SectorSize: ssize, SectorSize: ssize,
WorkerThreads: 2, WorkerThreads: 2,
Dir: lr.Path(), Paths: sectorbuilder.SimplePath(lr.Path()),
}, namespace.Wrap(mds, datastore.NewKey("/sectorbuilder"))) }, namespace.Wrap(mds, datastore.NewKey("/sectorbuilder")))
if err != nil { if err != nil {
return xerrors.Errorf("failed to open up sectorbuilder: %w", err) return xerrors.Errorf("failed to open up sectorbuilder: %w", err)

2
go.mod
View File

@ -113,3 +113,5 @@ replace github.com/golangci/golangci-lint => github.com/golangci/golangci-lint v
replace github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi replace github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi
replace github.com/coreos/go-systemd => github.com/coreos/go-systemd/v22 v22.0.0 replace github.com/coreos/go-systemd => github.com/coreos/go-systemd/v22 v22.0.0
replace github.com/filecoin-project/go-sectorbuilder => /home/magik6k/gohack/github.com/filecoin-project/go-sectorbuilder

View File

@ -7,15 +7,16 @@ import (
"mime" "mime"
"net/http" "net/http"
"os" "os"
"strconv"
"github.com/filecoin-project/lotus/api/apistruct"
"github.com/gorilla/mux" "github.com/gorilla/mux"
files "github.com/ipfs/go-ipfs-files" files "github.com/ipfs/go-ipfs-files"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/go-sectorbuilder/fs"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/apistruct"
"github.com/filecoin-project/lotus/lib/tarutil" "github.com/filecoin-project/lotus/lib/tarutil"
"github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage"
@ -43,8 +44,8 @@ func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) {
mux := mux.NewRouter() mux := mux.NewRouter()
mux.HandleFunc("/remote/{type}/{sname}", sm.remoteGetSector).Methods("GET") mux.HandleFunc("/remote/{type}/{id}", sm.remoteGetSector).Methods("GET")
mux.HandleFunc("/remote/{type}/{sname}", sm.remotePutSector).Methods("PUT") mux.HandleFunc("/remote/{type}/{id}", sm.remotePutSector).Methods("PUT")
log.Infof("SERVEGETREMOTE %s", r.URL) log.Infof("SERVEGETREMOTE %s", r.URL)
@ -54,14 +55,21 @@ func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) {
func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Request) { func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r) vars := mux.Vars(r)
path, err := sm.SectorBuilder.GetPath(vars["type"], vars["sname"]) id, err := strconv.ParseUint(vars["id"], 10, 64)
if err != nil {
log.Error("parsing sector id: ", err)
w.WriteHeader(500)
return
}
path, err := sm.SectorBuilder.SectorPath(fs.DataType(vars["type"]), id)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
w.WriteHeader(500) w.WriteHeader(500)
return return
} }
stat, err := os.Stat(path) stat, err := os.Stat(string(path))
if err != nil { if err != nil {
log.Error(err) log.Error(err)
w.WriteHeader(500) w.WriteHeader(500)
@ -70,10 +78,10 @@ func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Reques
var rd io.Reader var rd io.Reader
if stat.IsDir() { if stat.IsDir() {
rd, err = tarutil.TarDirectory(path) rd, err = tarutil.TarDirectory(string(path))
w.Header().Set("Content-Type", "application/x-tar") w.Header().Set("Content-Type", "application/x-tar")
} else { } else {
rd, err = os.OpenFile(path, os.O_RDONLY, 0644) rd, err = os.OpenFile(string(path), os.O_RDONLY, 0644)
w.Header().Set("Content-Type", "application/octet-stream") w.Header().Set("Content-Type", "application/octet-stream")
} }
if err != nil { if err != nil {
@ -92,7 +100,14 @@ func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Reques
func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Request) { func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r) vars := mux.Vars(r)
path, err := sm.SectorBuilder.GetPath(vars["type"], vars["sname"]) id, err := strconv.ParseUint(vars["id"], 10, 64)
if err != nil {
log.Error("parsing sector id: ", err)
w.WriteHeader(500)
return
}
path, err := sm.SectorBuilder.SectorPath(fs.DataType(vars["type"]), id)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
w.WriteHeader(500) w.WriteHeader(500)
@ -106,7 +121,7 @@ func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Reques
return return
} }
if err := os.RemoveAll(path); err != nil { if err := os.RemoveAll(string(path)); err != nil {
log.Error(err) log.Error(err)
w.WriteHeader(500) w.WriteHeader(500)
return return
@ -114,13 +129,13 @@ func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Reques
switch mediatype { switch mediatype {
case "application/x-tar": case "application/x-tar":
if err := tarutil.ExtractTar(r.Body, path); err != nil { if err := tarutil.ExtractTar(r.Body, string(path)); err != nil {
log.Error(err) log.Error(err)
w.WriteHeader(500) w.WriteHeader(500)
return return
} }
default: default:
if err := files.WriteTo(files.NewReaderFile(r.Body), path); err != nil { if err := files.WriteTo(files.NewReaderFile(r.Body), string(path)); err != nil {
log.Error(err) log.Error(err)
w.WriteHeader(500) w.WriteHeader(500)
return return

View File

@ -91,7 +91,7 @@ func SectorBuilderConfig(storagePath string, threads uint, noprecommit, nocommit
NoPreCommit: noprecommit, NoPreCommit: noprecommit,
NoCommit: nocommit, NoCommit: nocommit,
Dir: sp, Paths: sectorbuilder.SimplePath(sp),
} }
return sb, nil return sb, nil

View File

@ -81,9 +81,9 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPie
out := make([]Piece, len(sizes)) out := make([]Piece, len(sizes))
for i, size := range sizes { for i, size := range sizes {
ppi, err := m.sb.AddPiece(size, sectorID, io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), existingPieceSizes) ppi, err := m.sb.AddPiece(ctx, size, sectorID, io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), existingPieceSizes)
if err != nil { if err != nil {
return nil, err return nil, xerrors.Errorf("add piece: %w", err)
} }
existingPieceSizes = append(existingPieceSizes, size) existingPieceSizes = append(existingPieceSizes, size)

View File

@ -112,7 +112,7 @@ func (m *Sealing) AllocatePiece(size uint64) (sectorID uint64, offset uint64, er
func (m *Sealing) SealPiece(ctx context.Context, size uint64, r io.Reader, sectorID uint64, dealID uint64) error { func (m *Sealing) SealPiece(ctx context.Context, size uint64, r io.Reader, sectorID uint64, dealID uint64) error {
log.Infof("Seal piece for deal %d", dealID) log.Infof("Seal piece for deal %d", dealID)
ppi, err := m.sb.AddPiece(size, sectorID, r, []uint64{}) ppi, err := m.sb.AddPiece(ctx, size, sectorID, r, []uint64{})
if err != nil { if err != nil {
return xerrors.Errorf("adding piece to sector: %w", err) return xerrors.Errorf("adding piece to sector: %w", err)
} }

View File

@ -96,6 +96,7 @@ func (s *SectorBlockStore) Get(c cid.Cid) (blocks.Block, error) {
log.Infof("reading block %s from sector %d(+%d;%d)", c, best.SectorID, best.Offset, best.Size) log.Infof("reading block %s from sector %d(+%d;%d)", c, best.SectorID, best.Offset, best.Size)
r, err := s.sectorBlocks.sb.ReadPieceFromSealedSector( r, err := s.sectorBlocks.sb.ReadPieceFromSealedSector(
context.TODO(),
best.SectorID, best.SectorID,
best.Offset, best.Offset,
best.Size, best.Size,