diff --git a/api/api_storage.go b/api/api_storage.go index e4d5d8a25..13aec9544 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -111,8 +111,8 @@ type StorageMiner interface { // WorkerConnect tells the node to connect to workers RPC WorkerConnect(context.Context, string) error WorkerAttachStorage(context.Context, StorageInfo) error - WorkerDeclareSector(ctx context.Context, storageId string, s abi.SectorID) error - FindSector(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]StorageInfo, error) + StorageDeclareSector(ctx context.Context, storageId string, s abi.SectorID) error + StorageFindSector(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]StorageInfo, error) MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error MarketListDeals(ctx context.Context) ([]storagemarket.StorageDeal, error) diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index a6b2019b6..1713f8b2d 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -182,8 +182,8 @@ type StorageMinerStruct struct { WorkerConnect func(context.Context, string) error `perm:"admin"` // TODO: worker perm WorkerAttachStorage func(context.Context, api.StorageInfo) error `perm:"admin"` - WorkerDeclareSector func(ctx context.Context, storageId string, s abi.SectorID) error `perm:"admin"` - FindSector func(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]api.StorageInfo, error) `perm:"admin"` + StorageDeclareSector func(ctx context.Context, storageId string, s abi.SectorID) error `perm:"admin"` + StorageFindSector func(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]api.StorageInfo, error) `perm:"admin"` DealsImportData func(ctx context.Context, dealPropCid cid.Cid, file string) error `perm:"write"` DealsList func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"` @@ -655,12 +655,12 @@ func (c *StorageMinerStruct) WorkerAttachStorage(ctx context.Context, si api.Sto return c.Internal.WorkerAttachStorage(ctx, si) } -func (c *StorageMinerStruct) WorkerDeclareSector(ctx context.Context, storageId string, s abi.SectorID) error { - return c.Internal.WorkerDeclareSector(ctx, storageId, s) +func (c *StorageMinerStruct) StorageDeclareSector(ctx context.Context, storageId string, s abi.SectorID) error { + return c.Internal.StorageDeclareSector(ctx, storageId, s) } -func (c *StorageMinerStruct) FindSector(ctx context.Context, si abi.SectorID, types sectorbuilder.SectorFileType) ([]api.StorageInfo, error) { - return c.Internal.FindSector(ctx, si, types) +func (c *StorageMinerStruct) StorageFindSector(ctx context.Context, si abi.SectorID, types sectorbuilder.SectorFileType) ([]api.StorageInfo, error) { + return c.Internal.StorageFindSector(ctx, si, types) } func (c *StorageMinerStruct) MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error { diff --git a/storage/sealmgr/stores/local.go b/storage/sealmgr/stores/local.go index 3420a0aea..88de7bf7b 100644 --- a/storage/sealmgr/stores/local.go +++ b/storage/sealmgr/stores/local.go @@ -116,13 +116,19 @@ func (st *Local) open() error { } func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) { + out, _, done, err := st.acquireSector(ctx, sid, existing, allocate, sealing) + return out, done, err +} + +func (st *Local) acquireSector(ctx context.Context, sid abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, sectorbuilder.SectorPaths, func(), error) { if existing|allocate != existing^allocate { - return sectorbuilder.SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector") + return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector") } st.localLk.RLock() var out sectorbuilder.SectorPaths + var storageIDs sectorbuilder.SectorPaths for _, fileType := range pathTypes { if fileType&existing == 0 { @@ -145,6 +151,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing s spath := filepath.Join(p.local, fileType.String(), sectorutil.SectorName(sid)) sectorutil.SetPathByType(&out, fileType, spath) + sectorutil.SetPathByType(&storageIDs, fileType, p.meta.ID) existing ^= fileType } @@ -155,7 +162,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing s continue } - var best string + var best, bestID string for _, p := range st.paths { if sealing && !p.meta.CanSeal { @@ -173,19 +180,21 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing s // TODO: Calc weights best = filepath.Join(p.local, fileType.String(), sectorutil.SectorName(sid)) + bestID = p.meta.ID break // todo: the first path won't always be the best } if best == "" { st.localLk.RUnlock() - return sectorbuilder.SectorPaths{}, nil, xerrors.Errorf("couldn't find a suitable path for a sector") + return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.Errorf("couldn't find a suitable path for a sector") } sectorutil.SetPathByType(&out, fileType, best) + sectorutil.SetPathByType(&storageIDs, fileType, bestID) allocate ^= fileType } - return out, st.localLk.RUnlock, nil + return out, storageIDs, st.localLk.RUnlock, nil } func (st *Local) FindBestAllocStorage(allocate sectorbuilder.SectorFileType, sealing bool) ([]config.StorageMeta, error) { diff --git a/storage/sealmgr/stores/remote.go b/storage/sealmgr/stores/remote.go index e278dd701..05b2ce5dc 100644 --- a/storage/sealmgr/stores/remote.go +++ b/storage/sealmgr/stores/remote.go @@ -21,7 +21,7 @@ import ( ) type Remote struct { - local Store + local *Local remote SectorIndex auth http.Header @@ -30,7 +30,7 @@ type Remote struct { // (make sure to not fetch the same sector data twice) } -func NewRemote(local Store, remote SectorIndex, auth http.Header) *Remote { +func NewRemote(local *Local, remote SectorIndex, auth http.Header) *Remote { return &Remote{ local: local, remote: remote, @@ -39,7 +39,8 @@ func NewRemote(local Store, remote SectorIndex, auth http.Header) *Remote { } type SectorIndex interface { - FindSector(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]api.StorageInfo, error) + StorageDeclareSector(ctx context.Context, storageId string, s abi.SectorID) error + StorageFindSector(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]api.StorageInfo, error) } func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) { @@ -64,7 +65,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec continue } - ap, rdone, err := r.acquireFromRemote(ctx, s, fileType, sealing) + ap, storageID, rdone, err := r.acquireFromRemote(ctx, s, fileType, sealing) if err != nil { done() return sectorbuilder.SectorPaths{}, nil, err @@ -73,26 +74,30 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec done = mergeDone(done, rdone) sectorutil.SetPathByType(&paths, fileType, ap) + if err := r.remote.StorageDeclareSector(ctx, storageID, s); err != nil { + log.Warnf("declaring sector %v in %s failed: %+v", s, storageID, err) + } } return paths, done, nil } -func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType sectorbuilder.SectorFileType, sealing bool) (string, func(), error) { - si, err := r.remote.FindSector(ctx, s, fileType) +func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType sectorbuilder.SectorFileType, sealing bool) (string, string, func(), error) { + si, err := r.remote.StorageFindSector(ctx, s, fileType) if err != nil { - return "", nil, err + return "", "", nil, err } sort.Slice(si, func(i, j int) bool { return si[i].Cost < si[j].Cost }) - apaths, done, err := r.local.AcquireSector(ctx, s, 0, fileType, sealing) + apaths, ids, done, err := r.local.acquireSector(ctx, s, 0, fileType, sealing) if err != nil { - return "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err) + return "", "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err) } dest := sectorutil.PathByType(apaths, fileType) + storageID := sectorutil.PathByType(ids, fileType) var merr error for _, info := range si { @@ -106,12 +111,12 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType if merr != nil { log.Warnw("acquireFromRemote encountered errors when fetching sector from remote", "errors", merr) } - return dest, done, nil + return dest, storageID, done, nil } } done() - return "", nil, xerrors.Errorf("failed to acquire sector %v from remote: %w", s, merr) + return "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote: %w", s, merr) }