workers: Declare sectors after fetching in remote store
This commit is contained in:
parent
bc7fede68b
commit
a75ad492f1
@ -111,8 +111,8 @@ type StorageMiner interface {
|
|||||||
// WorkerConnect tells the node to connect to workers RPC
|
// WorkerConnect tells the node to connect to workers RPC
|
||||||
WorkerConnect(context.Context, string) error
|
WorkerConnect(context.Context, string) error
|
||||||
WorkerAttachStorage(context.Context, StorageInfo) error
|
WorkerAttachStorage(context.Context, StorageInfo) error
|
||||||
WorkerDeclareSector(ctx context.Context, storageId string, s abi.SectorID) error
|
StorageDeclareSector(ctx context.Context, storageId string, s abi.SectorID) error
|
||||||
FindSector(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]StorageInfo, error)
|
StorageFindSector(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]StorageInfo, error)
|
||||||
|
|
||||||
MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error
|
MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error
|
||||||
MarketListDeals(ctx context.Context) ([]storagemarket.StorageDeal, error)
|
MarketListDeals(ctx context.Context) ([]storagemarket.StorageDeal, error)
|
||||||
|
@ -182,8 +182,8 @@ type StorageMinerStruct struct {
|
|||||||
|
|
||||||
WorkerConnect func(context.Context, string) error `perm:"admin"` // TODO: worker perm
|
WorkerConnect func(context.Context, string) error `perm:"admin"` // TODO: worker perm
|
||||||
WorkerAttachStorage func(context.Context, api.StorageInfo) error `perm:"admin"`
|
WorkerAttachStorage func(context.Context, api.StorageInfo) error `perm:"admin"`
|
||||||
WorkerDeclareSector func(ctx context.Context, storageId string, s abi.SectorID) error `perm:"admin"`
|
StorageDeclareSector func(ctx context.Context, storageId string, s abi.SectorID) error `perm:"admin"`
|
||||||
FindSector func(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]api.StorageInfo, error) `perm:"admin"`
|
StorageFindSector func(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]api.StorageInfo, error) `perm:"admin"`
|
||||||
|
|
||||||
DealsImportData func(ctx context.Context, dealPropCid cid.Cid, file string) error `perm:"write"`
|
DealsImportData func(ctx context.Context, dealPropCid cid.Cid, file string) error `perm:"write"`
|
||||||
DealsList func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"`
|
DealsList func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"`
|
||||||
@ -655,12 +655,12 @@ func (c *StorageMinerStruct) WorkerAttachStorage(ctx context.Context, si api.Sto
|
|||||||
return c.Internal.WorkerAttachStorage(ctx, si)
|
return c.Internal.WorkerAttachStorage(ctx, si)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *StorageMinerStruct) WorkerDeclareSector(ctx context.Context, storageId string, s abi.SectorID) error {
|
func (c *StorageMinerStruct) StorageDeclareSector(ctx context.Context, storageId string, s abi.SectorID) error {
|
||||||
return c.Internal.WorkerDeclareSector(ctx, storageId, s)
|
return c.Internal.StorageDeclareSector(ctx, storageId, s)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *StorageMinerStruct) FindSector(ctx context.Context, si abi.SectorID, types sectorbuilder.SectorFileType) ([]api.StorageInfo, error) {
|
func (c *StorageMinerStruct) StorageFindSector(ctx context.Context, si abi.SectorID, types sectorbuilder.SectorFileType) ([]api.StorageInfo, error) {
|
||||||
return c.Internal.FindSector(ctx, si, types)
|
return c.Internal.StorageFindSector(ctx, si, types)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *StorageMinerStruct) MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error {
|
func (c *StorageMinerStruct) MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error {
|
||||||
|
@ -116,13 +116,19 @@ func (st *Local) open() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) {
|
func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) {
|
||||||
|
out, _, done, err := st.acquireSector(ctx, sid, existing, allocate, sealing)
|
||||||
|
return out, done, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (st *Local) acquireSector(ctx context.Context, sid abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, sectorbuilder.SectorPaths, func(), error) {
|
||||||
if existing|allocate != existing^allocate {
|
if existing|allocate != existing^allocate {
|
||||||
return sectorbuilder.SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
|
return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
|
||||||
}
|
}
|
||||||
|
|
||||||
st.localLk.RLock()
|
st.localLk.RLock()
|
||||||
|
|
||||||
var out sectorbuilder.SectorPaths
|
var out sectorbuilder.SectorPaths
|
||||||
|
var storageIDs sectorbuilder.SectorPaths
|
||||||
|
|
||||||
for _, fileType := range pathTypes {
|
for _, fileType := range pathTypes {
|
||||||
if fileType&existing == 0 {
|
if fileType&existing == 0 {
|
||||||
@ -145,6 +151,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing s
|
|||||||
|
|
||||||
spath := filepath.Join(p.local, fileType.String(), sectorutil.SectorName(sid))
|
spath := filepath.Join(p.local, fileType.String(), sectorutil.SectorName(sid))
|
||||||
sectorutil.SetPathByType(&out, fileType, spath)
|
sectorutil.SetPathByType(&out, fileType, spath)
|
||||||
|
sectorutil.SetPathByType(&storageIDs, fileType, p.meta.ID)
|
||||||
|
|
||||||
existing ^= fileType
|
existing ^= fileType
|
||||||
}
|
}
|
||||||
@ -155,7 +162,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing s
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
var best string
|
var best, bestID string
|
||||||
|
|
||||||
for _, p := range st.paths {
|
for _, p := range st.paths {
|
||||||
if sealing && !p.meta.CanSeal {
|
if sealing && !p.meta.CanSeal {
|
||||||
@ -173,19 +180,21 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing s
|
|||||||
// TODO: Calc weights
|
// TODO: Calc weights
|
||||||
|
|
||||||
best = filepath.Join(p.local, fileType.String(), sectorutil.SectorName(sid))
|
best = filepath.Join(p.local, fileType.String(), sectorutil.SectorName(sid))
|
||||||
|
bestID = p.meta.ID
|
||||||
break // todo: the first path won't always be the best
|
break // todo: the first path won't always be the best
|
||||||
}
|
}
|
||||||
|
|
||||||
if best == "" {
|
if best == "" {
|
||||||
st.localLk.RUnlock()
|
st.localLk.RUnlock()
|
||||||
return sectorbuilder.SectorPaths{}, nil, xerrors.Errorf("couldn't find a suitable path for a sector")
|
return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.Errorf("couldn't find a suitable path for a sector")
|
||||||
}
|
}
|
||||||
|
|
||||||
sectorutil.SetPathByType(&out, fileType, best)
|
sectorutil.SetPathByType(&out, fileType, best)
|
||||||
|
sectorutil.SetPathByType(&storageIDs, fileType, bestID)
|
||||||
allocate ^= fileType
|
allocate ^= fileType
|
||||||
}
|
}
|
||||||
|
|
||||||
return out, st.localLk.RUnlock, nil
|
return out, storageIDs, st.localLk.RUnlock, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (st *Local) FindBestAllocStorage(allocate sectorbuilder.SectorFileType, sealing bool) ([]config.StorageMeta, error) {
|
func (st *Local) FindBestAllocStorage(allocate sectorbuilder.SectorFileType, sealing bool) ([]config.StorageMeta, error) {
|
||||||
|
@ -21,7 +21,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Remote struct {
|
type Remote struct {
|
||||||
local Store
|
local *Local
|
||||||
remote SectorIndex
|
remote SectorIndex
|
||||||
auth http.Header
|
auth http.Header
|
||||||
|
|
||||||
@ -30,7 +30,7 @@ type Remote struct {
|
|||||||
// (make sure to not fetch the same sector data twice)
|
// (make sure to not fetch the same sector data twice)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRemote(local Store, remote SectorIndex, auth http.Header) *Remote {
|
func NewRemote(local *Local, remote SectorIndex, auth http.Header) *Remote {
|
||||||
return &Remote{
|
return &Remote{
|
||||||
local: local,
|
local: local,
|
||||||
remote: remote,
|
remote: remote,
|
||||||
@ -39,7 +39,8 @@ func NewRemote(local Store, remote SectorIndex, auth http.Header) *Remote {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type SectorIndex interface {
|
type SectorIndex interface {
|
||||||
FindSector(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]api.StorageInfo, error)
|
StorageDeclareSector(ctx context.Context, storageId string, s abi.SectorID) error
|
||||||
|
StorageFindSector(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]api.StorageInfo, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) {
|
func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) {
|
||||||
@ -64,7 +65,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
ap, rdone, err := r.acquireFromRemote(ctx, s, fileType, sealing)
|
ap, storageID, rdone, err := r.acquireFromRemote(ctx, s, fileType, sealing)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
done()
|
done()
|
||||||
return sectorbuilder.SectorPaths{}, nil, err
|
return sectorbuilder.SectorPaths{}, nil, err
|
||||||
@ -73,26 +74,30 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec
|
|||||||
done = mergeDone(done, rdone)
|
done = mergeDone(done, rdone)
|
||||||
sectorutil.SetPathByType(&paths, fileType, ap)
|
sectorutil.SetPathByType(&paths, fileType, ap)
|
||||||
|
|
||||||
|
if err := r.remote.StorageDeclareSector(ctx, storageID, s); err != nil {
|
||||||
|
log.Warnf("declaring sector %v in %s failed: %+v", s, storageID, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return paths, done, nil
|
return paths, done, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType sectorbuilder.SectorFileType, sealing bool) (string, func(), error) {
|
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType sectorbuilder.SectorFileType, sealing bool) (string, string, func(), error) {
|
||||||
si, err := r.remote.FindSector(ctx, s, fileType)
|
si, err := r.remote.StorageFindSector(ctx, s, fileType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", nil, err
|
return "", "", nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
sort.Slice(si, func(i, j int) bool {
|
sort.Slice(si, func(i, j int) bool {
|
||||||
return si[i].Cost < si[j].Cost
|
return si[i].Cost < si[j].Cost
|
||||||
})
|
})
|
||||||
|
|
||||||
apaths, done, err := r.local.AcquireSector(ctx, s, 0, fileType, sealing)
|
apaths, ids, done, err := r.local.acquireSector(ctx, s, 0, fileType, sealing)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err)
|
return "", "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err)
|
||||||
}
|
}
|
||||||
dest := sectorutil.PathByType(apaths, fileType)
|
dest := sectorutil.PathByType(apaths, fileType)
|
||||||
|
storageID := sectorutil.PathByType(ids, fileType)
|
||||||
|
|
||||||
var merr error
|
var merr error
|
||||||
for _, info := range si {
|
for _, info := range si {
|
||||||
@ -106,12 +111,12 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
|
|||||||
if merr != nil {
|
if merr != nil {
|
||||||
log.Warnw("acquireFromRemote encountered errors when fetching sector from remote", "errors", merr)
|
log.Warnw("acquireFromRemote encountered errors when fetching sector from remote", "errors", merr)
|
||||||
}
|
}
|
||||||
return dest, done, nil
|
return dest, storageID, done, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
done()
|
done()
|
||||||
return "", nil, xerrors.Errorf("failed to acquire sector %v from remote: %w", s, merr)
|
return "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote: %w", s, merr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user