more working code

This commit is contained in:
Łukasz Magiera 2020-09-14 09:44:55 +02:00
parent 5f08fe7ead
commit 1ebca8f732
9 changed files with 296 additions and 93 deletions

View File

@ -15,6 +15,8 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
manet "github.com/multiformats/go-multiaddr/net" manet "github.com/multiformats/go-multiaddr/net"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
@ -23,6 +25,7 @@ import (
"github.com/filecoin-project/go-jsonrpc" "github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-jsonrpc/auth" "github.com/filecoin-project/go-jsonrpc/auth"
paramfetch "github.com/filecoin-project/go-paramfetch" paramfetch "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/apistruct" "github.com/filecoin-project/lotus/api/apistruct"
@ -34,6 +37,7 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/lib/lotuslog" "github.com/filecoin-project/lotus/lib/lotuslog"
"github.com/filecoin-project/lotus/lib/rpcenc" "github.com/filecoin-project/lotus/lib/rpcenc"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
) )
@ -342,11 +346,13 @@ var runCmd = &cli.Command{
// Create / expose the worker // Create / expose the worker
wsts := statestore.New(namespace.Wrap(datastore.NewMapDatastore(), modules.WorkerCallsPrefix)) // TODO: USE A REAL DATASTORE
workerApi := &worker{ workerApi := &worker{
LocalWorker: sectorstorage.NewLocalWorker(sectorstorage.WorkerConfig{ LocalWorker: sectorstorage.NewLocalWorker(sectorstorage.WorkerConfig{
SealProof: spt, SealProof: spt,
TaskTypes: taskTypes, TaskTypes: taskTypes,
}, remote, localStore, nodeApi, nodeApi), }, remote, localStore, nodeApi, nodeApi, wsts),
localStore: localStore, localStore: localStore,
ls: lr, ls: lr,
} }

View File

@ -12,11 +12,10 @@ import (
"path/filepath" "path/filepath"
"strconv" "strconv"
"github.com/filecoin-project/go-state-types/big"
"github.com/docker/go-units" "github.com/docker/go-units"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/mitchellh/go-homedir" "github.com/mitchellh/go-homedir"
@ -27,7 +26,9 @@ import (
cborutil "github.com/filecoin-project/go-cbor-util" cborutil "github.com/filecoin-project/go-cbor-util"
paramfetch "github.com/filecoin-project/go-paramfetch" paramfetch "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
crypto2 "github.com/filecoin-project/go-state-types/crypto" crypto2 "github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-statestore"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/extern/sector-storage/stores"
@ -441,6 +442,8 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode,
return err return err
} }
wsts := statestore.New(namespace.Wrap(mds, modules.WorkerCallsPrefix))
smgr, err := sectorstorage.New(ctx, lr, stores.NewIndex(), &ffiwrapper.Config{ smgr, err := sectorstorage.New(ctx, lr, stores.NewIndex(), &ffiwrapper.Config{
SealProofType: spt, SealProofType: spt,
}, sectorstorage.SealerConfig{ }, sectorstorage.SealerConfig{
@ -450,7 +453,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode,
AllowPreCommit2: true, AllowPreCommit2: true,
AllowCommit: true, AllowCommit: true,
AllowUnseal: true, AllowUnseal: true,
}, nil, sa) }, nil, sa, wsts)
if err != nil { if err != nil {
return err return err
} }

46
extern/sector-storage/calltracker.go vendored Normal file
View File

@ -0,0 +1,46 @@
package sectorstorage
import (
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
type callTracker struct {
st *statestore.StateStore // by CallID
}
type CallState uint64
const (
CallStarted CallState = iota
CallDone
// returned -> remove
)
type Call struct {
State CallState
// Params cbg.Deferred // TODO: support once useful
Result []byte
}
func (wt *callTracker) onStart(ci storiface.CallID) error {
return wt.st.Begin(ci, &Call{
State: CallStarted,
})
}
func (wt *callTracker) onDone(ci storiface.CallID, ret []byte) error {
st := wt.st.Get(ci)
return st.Mutate(func(cs *Call) error {
cs.State = CallDone
cs.Result = ret
return nil
})
}
func (wt *callTracker) onReturned(ci storiface.CallID) error {
st := wt.st.Get(ci)
return st.End()
}

119
extern/sector-storage/cbor_gen.go vendored Normal file
View File

@ -0,0 +1,119 @@
// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT.
package sectorstorage
import (
"fmt"
"io"
cbg "github.com/whyrusleeping/cbor-gen"
xerrors "golang.org/x/xerrors"
)
var _ = xerrors.Errorf
func (t *Call) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
}
panic("cbg")
if _, err := w.Write([]byte{162}); err != nil {
return err
}
scratch := make([]byte, 9)
// t.State (sectorstorage.CallState) (uint64)
if len("State") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"State\" was too long")
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("State"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("State")); err != nil {
return err
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajUnsignedInt, uint64(t.State)); err != nil {
return err
}
// t.Result (typegen.Deferred) (struct)
if len("Result") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"Result\" was too long")
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("Result"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("Result")); err != nil {
return err
}
return nil
}
func (t *Call) UnmarshalCBOR(r io.Reader) error {
*t = Call{}
br := cbg.GetPeeker(r)
scratch := make([]byte, 8)
maj, extra, err := cbg.CborReadHeaderBuf(br, scratch)
if err != nil {
return err
}
if maj != cbg.MajMap {
return fmt.Errorf("cbor input should be of type map")
}
if extra > cbg.MaxLength {
return fmt.Errorf("Call: map struct too large (%d)", extra)
}
var name string
n := extra
for i := uint64(0); i < n; i++ {
{
sval, err := cbg.ReadStringBuf(br, scratch)
if err != nil {
return err
}
name = string(sval)
}
switch name {
// t.State (sectorstorage.CallState) (uint64)
case "State":
{
maj, extra, err = cbg.CborReadHeaderBuf(br, scratch)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.State = CallState(extra)
}
// t.Result (typegen.Deferred) (struct)
case "Result":
{
}
default:
return fmt.Errorf("unknown struct field %d: '%s'", i, name)
}
}
return nil
}

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"io" "io"
"os" "os"
"reflect"
"runtime" "runtime"
"github.com/elastic/go-sysinfo" "github.com/elastic/go-sysinfo"
@ -14,6 +15,7 @@ import (
ffi "github.com/filecoin-project/filecoin-ffi" ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-statestore"
storage2 "github.com/filecoin-project/specs-storage/storage" storage2 "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
@ -36,10 +38,11 @@ type LocalWorker struct {
sindex stores.SectorIndex sindex stores.SectorIndex
ret storiface.WorkerReturn ret storiface.WorkerReturn
ct *callTracker
acceptTasks map[sealtasks.TaskType]struct{} acceptTasks map[sealtasks.TaskType]struct{}
} }
func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex, ret storiface.WorkerReturn) *LocalWorker { func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker {
acceptTasks := map[sealtasks.TaskType]struct{}{} acceptTasks := map[sealtasks.TaskType]struct{}{}
for _, taskType := range wcfg.TaskTypes { for _, taskType := range wcfg.TaskTypes {
acceptTasks[taskType] = struct{}{} acceptTasks[taskType] = struct{}{}
@ -54,6 +57,9 @@ func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local,
sindex: sindex, sindex: sindex,
ret: ret, ret: ret,
ct: &callTracker{
st: cst,
},
acceptTasks: acceptTasks, acceptTasks: acceptTasks,
} }
} }
@ -98,13 +104,62 @@ func (l *LocalWorker) sb() (ffiwrapper.Storage, error) {
return ffiwrapper.New(&localWorkerPathProvider{w: l}, l.scfg) return ffiwrapper.New(&localWorkerPathProvider{w: l}, l.scfg)
} }
func (l *LocalWorker) asyncCall(sector abi.SectorID, work func(ci storiface.CallID)) (storiface.CallID, error) { type returnType string
// in: func(WorkerReturn, context.Context, CallID, err string)
// in: func(WorkerReturn, context.Context, CallID, ret T, err string)
func rfunc(in interface{}) func(context.Context, storiface.WorkerReturn, interface{}, error) error {
rf := reflect.ValueOf(in)
ft := rf.Type()
withRet := ft.NumIn() == 4
return func(ctx context.Context, wr storiface.WorkerReturn, i interface{}, err error) error {
rctx := reflect.ValueOf(ctx)
rwr := reflect.ValueOf(wr)
rerr := reflect.ValueOf(errstr(err))
var ro []reflect.Value
if withRet {
ro = rf.Call([]reflect.Value{rwr, rctx, reflect.ValueOf(i), rerr})
} else {
ro = rf.Call([]reflect.Value{rwr, rctx, rerr})
}
return ro[0].Interface().(error)
}
}
var returnFunc = map[returnType]func(context.Context, storiface.WorkerReturn, interface{}, error) error{
"AddPiece": rfunc(storiface.WorkerReturn.ReturnAddPiece),
"SealPreCommit1": rfunc(storiface.WorkerReturn.ReturnSealPreCommit1),
"SealPreCommit2": rfunc(storiface.WorkerReturn.ReturnSealPreCommit2),
"SealCommit1": rfunc(storiface.WorkerReturn.ReturnSealCommit1),
"SealCommit2": rfunc(storiface.WorkerReturn.ReturnSealCommit2),
"FinalizeSector": rfunc(storiface.WorkerReturn.ReturnFinalizeSector),
"ReleaseUnsealed": rfunc(storiface.WorkerReturn.ReturnReleaseUnsealed),
"MoveStorage": rfunc(storiface.WorkerReturn.ReturnMoveStorage),
"UnsealPiece": rfunc(storiface.WorkerReturn.ReturnUnsealPiece),
"ReadPiece": rfunc(storiface.WorkerReturn.ReturnReadPiece),
"Fetch": rfunc(storiface.WorkerReturn.ReturnFetch),
}
func (l *LocalWorker) asyncCall(ctx context.Context, sector abi.SectorID, rt returnType, work func(ci storiface.CallID) (interface{}, error)) (storiface.CallID, error) {
ci := storiface.CallID{ ci := storiface.CallID{
Sector: sector, Sector: sector,
ID: uuid.New(), ID: uuid.New(),
} }
go work(ci) if err := l.ct.onStart(ci); err != nil {
log.Errorf("tracking call (start): %+v", err)
}
go func() {
res, err := work(ci)
if err := returnFunc[rt](ctx, l.ret, res, err); err != nil {
log.Errorf("return error: %s: %+v", rt, err)
}
}()
return ci, nil return ci, nil
} }
@ -132,58 +187,42 @@ func (l *LocalWorker) AddPiece(ctx context.Context, sector abi.SectorID, epcs []
return storiface.UndefCall, err return storiface.UndefCall, err
} }
return l.asyncCall(sector, func(ci storiface.CallID) { return l.asyncCall(ctx, sector, "AddPiece", func(ci storiface.CallID) (interface{}, error) {
pi, err := sb.AddPiece(ctx, sector, epcs, sz, r) return sb.AddPiece(ctx, sector, epcs, sz, r)
if err := l.ret.ReturnAddPiece(ctx, ci, pi, errstr(err)); err != nil {
log.Errorf("ReturnAddPiece: %+v", err)
}
}) })
} }
func (l *LocalWorker) Fetch(ctx context.Context, sector abi.SectorID, fileType storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) { func (l *LocalWorker) Fetch(ctx context.Context, sector abi.SectorID, fileType storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
return l.asyncCall(sector, func(ci storiface.CallID) { return l.asyncCall(ctx, sector, "Fetch", func(ci storiface.CallID) (interface{}, error) {
_, done, err := (&localWorkerPathProvider{w: l, op: am}).AcquireSector(ctx, sector, fileType, storiface.FTNone, ptype) _, done, err := (&localWorkerPathProvider{w: l, op: am}).AcquireSector(ctx, sector, fileType, storiface.FTNone, ptype)
if err == nil { if err == nil {
done() done()
} }
if err := l.ret.ReturnFetch(ctx, ci, errstr(err)); err != nil { return nil, err
log.Errorf("ReturnFetch: %+v", err)
}
}) })
} }
func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) { func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
return l.asyncCall(sector, func(ci storiface.CallID) { return l.asyncCall(ctx, sector, "SealPreCommit1", func(ci storiface.CallID) (interface{}, error) {
var err error
var p1o storage2.PreCommit1Out
defer func() {
if err := l.ret.ReturnSealPreCommit1(ctx, ci, p1o, errstr(err)); err != nil {
log.Errorf("ReturnSealPreCommit1: %+v", err)
}
}()
{ {
// cleanup previous failed attempts if they exist // cleanup previous failed attempts if they exist
if err = l.storage.Remove(ctx, sector, storiface.FTSealed, true); err != nil { if err := l.storage.Remove(ctx, sector, storiface.FTSealed, true); err != nil {
err = xerrors.Errorf("cleaning up sealed data: %w", err) return nil, xerrors.Errorf("cleaning up sealed data: %w", err)
return
} }
if err = l.storage.Remove(ctx, sector, storiface.FTCache, true); err != nil { if err := l.storage.Remove(ctx, sector, storiface.FTCache, true); err != nil {
err = xerrors.Errorf("cleaning up cache data: %w", err) return nil, xerrors.Errorf("cleaning up cache data: %w", err)
return
} }
} }
var sb ffiwrapper.Storage sb, err := l.sb()
sb, err = l.sb()
if err != nil { if err != nil {
return return nil, err
} }
p1o, err = sb.SealPreCommit1(ctx, sector, ticket, pieces) return sb.SealPreCommit1(ctx, sector, ticket, pieces)
}) })
} }
@ -193,12 +232,8 @@ func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, p
return storiface.UndefCall, err return storiface.UndefCall, err
} }
return l.asyncCall(sector, func(ci storiface.CallID) { return l.asyncCall(ctx, sector, "SealPreCommit2", func(ci storiface.CallID) (interface{}, error) {
cs, err := sb.SealPreCommit2(ctx, sector, phase1Out) return sb.SealPreCommit2(ctx, sector, phase1Out)
if err := l.ret.ReturnSealPreCommit2(ctx, ci, cs, errstr(err)); err != nil {
log.Errorf("ReturnSealPreCommit2: %+v", err)
}
}) })
} }
@ -208,12 +243,8 @@ func (l *LocalWorker) SealCommit1(ctx context.Context, sector abi.SectorID, tick
return storiface.UndefCall, err return storiface.UndefCall, err
} }
return l.asyncCall(sector, func(ci storiface.CallID) { return l.asyncCall(ctx, sector, "SealCommit1", func(ci storiface.CallID) (interface{}, error) {
c1o, err := sb.SealCommit1(ctx, sector, ticket, seed, pieces, cids) return sb.SealCommit1(ctx, sector, ticket, seed, pieces, cids)
if err := l.ret.ReturnSealCommit1(ctx, ci, c1o, errstr(err)); err != nil {
log.Errorf("ReturnSealCommit1: %+v", err)
}
}) })
} }
@ -223,12 +254,8 @@ func (l *LocalWorker) SealCommit2(ctx context.Context, sector abi.SectorID, phas
return storiface.UndefCall, err return storiface.UndefCall, err
} }
return l.asyncCall(sector, func(ci storiface.CallID) { return l.asyncCall(ctx, sector, "SealCommit2", func(ci storiface.CallID) (interface{}, error) {
proof, err := sb.SealCommit2(ctx, sector, phase1Out) return sb.SealCommit2(ctx, sector, phase1Out)
if err := l.ret.ReturnSealCommit2(ctx, ci, proof, errstr(err)); err != nil {
log.Errorf("ReturnSealCommit2: %+v", err)
}
}) })
} }
@ -238,23 +265,18 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, k
return storiface.UndefCall, err return storiface.UndefCall, err
} }
return l.asyncCall(sector, func(ci storiface.CallID) { return l.asyncCall(ctx, sector, "FinalizeSector", func(ci storiface.CallID) (interface{}, error) {
if err := sb.FinalizeSector(ctx, sector, keepUnsealed); err != nil { if err := sb.FinalizeSector(ctx, sector, keepUnsealed); err != nil {
if err := l.ret.ReturnFinalizeSector(ctx, ci, errstr(xerrors.Errorf("finalizing sector: %w", err))); err != nil { return nil, xerrors.Errorf("finalizing sector: %w", err)
log.Errorf("ReturnFinalizeSector: %+v", err)
}
} }
if len(keepUnsealed) == 0 { if len(keepUnsealed) == 0 {
err = xerrors.Errorf("removing unsealed data: %w", err) if err := l.storage.Remove(ctx, sector, storiface.FTUnsealed, true); err != nil {
if err := l.ret.ReturnFinalizeSector(ctx, ci, errstr(err)); err != nil { return nil, xerrors.Errorf("removing unsealed data: %w", err)
log.Errorf("ReturnFinalizeSector: %+v", err)
} }
} }
if err := l.ret.ReturnFinalizeSector(ctx, ci, errstr(err)); err != nil { return nil, err
log.Errorf("ReturnFinalizeSector: %+v", err)
}
}) })
} }
@ -279,12 +301,8 @@ func (l *LocalWorker) Remove(ctx context.Context, sector abi.SectorID) error {
} }
func (l *LocalWorker) MoveStorage(ctx context.Context, sector abi.SectorID, types storiface.SectorFileType) (storiface.CallID, error) { func (l *LocalWorker) MoveStorage(ctx context.Context, sector abi.SectorID, types storiface.SectorFileType) (storiface.CallID, error) {
return l.asyncCall(sector, func(ci storiface.CallID) { return l.asyncCall(ctx, sector, "MoveStorage", func(ci storiface.CallID) (interface{}, error) {
err := l.storage.MoveStorage(ctx, sector, l.scfg.SealProofType, types) return nil, l.storage.MoveStorage(ctx, sector, l.scfg.SealProofType, types)
if err := l.ret.ReturnMoveStorage(ctx, ci, errstr(err)); err != nil {
log.Errorf("ReturnMoveStorage: %+v", err)
}
}) })
} }
@ -294,28 +312,20 @@ func (l *LocalWorker) UnsealPiece(ctx context.Context, sector abi.SectorID, inde
return storiface.UndefCall, err return storiface.UndefCall, err
} }
return l.asyncCall(sector, func(ci storiface.CallID) { return l.asyncCall(ctx, sector, "UnsealPiece", func(ci storiface.CallID) (interface{}, error) {
var err error
defer func() {
if err := l.ret.ReturnUnsealPiece(ctx, ci, errstr(err)); err != nil {
log.Errorf("ReturnUnsealPiece: %+v", err)
}
}()
if err = sb.UnsealPiece(ctx, sector, index, size, randomness, cid); err != nil { if err = sb.UnsealPiece(ctx, sector, index, size, randomness, cid); err != nil {
err = xerrors.Errorf("unsealing sector: %w", err) return nil, xerrors.Errorf("unsealing sector: %w", err)
return
} }
if err = l.storage.RemoveCopies(ctx, sector, storiface.FTSealed); err != nil { if err = l.storage.RemoveCopies(ctx, sector, storiface.FTSealed); err != nil {
err = xerrors.Errorf("removing source data: %w", err) return nil, xerrors.Errorf("removing source data: %w", err)
return
} }
if err = l.storage.RemoveCopies(ctx, sector, storiface.FTCache); err != nil { if err = l.storage.RemoveCopies(ctx, sector, storiface.FTCache); err != nil {
err = xerrors.Errorf("removing source data: %w", err) return nil, xerrors.Errorf("removing source data: %w", err)
return
} }
return nil, nil
}) })
} }
@ -325,12 +335,8 @@ func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector ab
return storiface.UndefCall, err return storiface.UndefCall, err
} }
return l.asyncCall(sector, func(ci storiface.CallID) { return l.asyncCall(ctx, sector, "ReadPiece", func(ci storiface.CallID) (interface{}, error) {
ok, err := sb.ReadPiece(ctx, writer, sector, index, size) return sb.ReadPiece(ctx, writer, sector, index, size)
if err := l.ret.ReturnReadPiece(ctx, ci, ok, errstr(err)); err != nil {
log.Errorf("ReturnReadPiece: %+v", err)
}
}) })
} }

View File

@ -3,6 +3,7 @@ package sectorstorage
import ( import (
"context" "context"
"errors" "errors"
"github.com/filecoin-project/go-statestore"
"io" "io"
"net/http" "net/http"
"sync" "sync"
@ -94,7 +95,9 @@ type SealerConfig struct {
type StorageAuth http.Header type StorageAuth http.Header
func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg *ffiwrapper.Config, sc SealerConfig, urls URLs, sa StorageAuth) (*Manager, error) { type WorkerStateStore *statestore.StateStore
func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg *ffiwrapper.Config, sc SealerConfig, urls URLs, sa StorageAuth, wss WorkerStateStore) (*Manager, error) {
lstor, err := stores.NewLocal(ctx, ls, si, urls) lstor, err := stores.NewLocal(ctx, ls, si, urls)
if err != nil { if err != nil {
return nil, err return nil, err
@ -148,7 +151,7 @@ func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg
err = m.AddWorker(ctx, NewLocalWorker(WorkerConfig{ err = m.AddWorker(ctx, NewLocalWorker(WorkerConfig{
SealProof: cfg.SealProofType, SealProof: cfg.SealProofType,
TaskTypes: localTasks, TaskTypes: localTasks,
}, stor, lstor, si, m)) }, stor, lstor, si, m, wss))
if err != nil { if err != nil {
return nil, xerrors.Errorf("adding local worker: %w", err) return nil, xerrors.Errorf("adding local worker: %w", err)
} }

View File

@ -2,6 +2,7 @@ package storiface
import ( import (
"context" "context"
"fmt"
"io" "io"
"time" "time"
@ -53,6 +54,12 @@ type CallID struct {
ID uuid.UUID ID uuid.UUID
} }
func (c CallID) String() string {
return fmt.Sprintf("%d-%d-%s", c.Sector.Miner, c.Sector.Number, c.ID)
}
var _ fmt.Stringer = &CallID{}
var UndefCall CallID var UndefCall CallID
type WorkerCalls interface { type WorkerCalls interface {

View File

@ -2,6 +2,7 @@ package main
import ( import (
"fmt" "fmt"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"os" "os"
gen "github.com/whyrusleeping/cbor-gen" gen "github.com/whyrusleeping/cbor-gen"
@ -74,4 +75,11 @@ func main() {
os.Exit(1) os.Exit(1)
} }
err = gen.WriteMapEncodersToFile("./extern/sector-storage/cbor_gen.go", "sectorstorage",
sectorstorage.Call{},
)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
} }

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/filecoin-project/go-statestore"
"net/http" "net/http"
"time" "time"
@ -479,10 +480,14 @@ func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sectorstorage.S
return retrievalimpl.NewProvider(maddr, adapter, netwk, pieceStore, mds, dt, namespace.Wrap(ds, datastore.NewKey("/retrievals/provider")), opt) return retrievalimpl.NewProvider(maddr, adapter, netwk, pieceStore, mds, dt, namespace.Wrap(ds, datastore.NewKey("/retrievals/provider")), opt)
} }
func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStorage, si stores.SectorIndex, cfg *ffiwrapper.Config, sc sectorstorage.SealerConfig, urls sectorstorage.URLs, sa sectorstorage.StorageAuth) (*sectorstorage.Manager, error) { var WorkerCallsPrefix = datastore.NewKey("/worker/calls")
func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStorage, si stores.SectorIndex, cfg *ffiwrapper.Config, sc sectorstorage.SealerConfig, urls sectorstorage.URLs, sa sectorstorage.StorageAuth, ds dtypes.MetadataDS) (*sectorstorage.Manager, error) {
ctx := helpers.LifecycleCtx(mctx, lc) ctx := helpers.LifecycleCtx(mctx, lc)
sst, err := sectorstorage.New(ctx, ls, si, cfg, sc, urls, sa) wsts := statestore.New(namespace.Wrap(ds, WorkerCallsPrefix))
sst, err := sectorstorage.New(ctx, ls, si, cfg, sc, urls, sa, wsts)
if err != nil { if err != nil {
return nil, err return nil, err
} }