Merge pull request #42 from filecoin-project/feat/allow-commit1-anywhere

Allow Commit1 anywhere
This commit is contained in:
Łukasz Magiera 2020-06-05 21:27:46 +02:00 committed by GitHub
commit 4b9317d1f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 529 additions and 86 deletions

View File

@ -19,13 +19,13 @@ func (m *Manager) CheckProvable(ctx context.Context, spt abi.RegisteredProof, se
var bad []abi.SectorID
// TODO: More better checks
// TODO: Use proper locking
for _, sector := range sectors {
err := func() error {
lp, _, done, err := m.localStore.AcquireSector(ctx, sector, spt, stores.FTSealed|stores.FTCache, stores.FTNone, false, stores.AcquireMove)
lp, _, err := m.localStore.AcquireSector(ctx, sector, spt, stores.FTSealed|stores.FTCache, stores.FTNone, false, stores.AcquireMove)
if err != nil {
return xerrors.Errorf("acquire sector in checkProvable: %w", err)
}
defer done()
if lp.Sealed == "" || lp.Cache == "" {
log.Warnw("CheckProvable Sector FAULT: cache an/or sealed paths not found", "sector", sector, "sealed", lp.Sealed, "cache", lp.Cache)

View File

@ -24,7 +24,7 @@ type Provider struct {
waitSector map[sectorFile]chan struct{}
}
func (b *Provider) AcquireSector(ctx context.Context, id abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) {
func (b *Provider) AcquireSector(ctx context.Context, id abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, ptype stores.PathType) (stores.SectorPaths, func(), error) {
if err := os.Mkdir(filepath.Join(b.Root, stores.FTUnsealed.String()), 0755); err != nil && !os.IsExist(err) {
return stores.SectorPaths{}, nil, err
}

View File

@ -43,7 +43,7 @@ type Verifier interface {
type SectorProvider interface {
// * returns storiface.ErrSectorNotFound if a requested existing sector doesn't exist
// * returns an error when allocate is set, and existing isn't, and the sector exists
AcquireSector(ctx context.Context, id abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error)
AcquireSector(ctx context.Context, id abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, ptype stores.PathType) (stores.SectorPaths, func(), error)
}
var _ SectorProvider = &basicfs.Provider{}

View File

@ -59,8 +59,8 @@ type localWorkerPathProvider struct {
op stores.AcquireMode
}
func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) {
paths, storageIDs, done, err := l.w.storage.AcquireSector(ctx, sector, l.w.scfg.SealProofType, existing, allocate, stores.PathType(sealing), l.op)
func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing stores.PathType) (stores.SectorPaths, func(), error) {
paths, storageIDs, err := l.w.storage.AcquireSector(ctx, sector, l.w.scfg.SealProofType, existing, allocate, sealing, l.op)
if err != nil {
return stores.SectorPaths{}, nil, err
}
@ -68,8 +68,6 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.
log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths)
return paths, func() {
done()
for _, fileType := range pathTypes {
if fileType&allocate == 0 {
continue
@ -106,8 +104,8 @@ func (l *LocalWorker) AddPiece(ctx context.Context, sector abi.SectorID, epcs []
return sb.AddPiece(ctx, sector, epcs, sz, r)
}
func (l *LocalWorker) Fetch(ctx context.Context, sector abi.SectorID, fileType stores.SectorFileType, sealing bool, am stores.AcquireMode) error {
_, done, err := (&localWorkerPathProvider{w: l, op: am}).AcquireSector(ctx, sector, fileType, stores.FTNone, sealing)
func (l *LocalWorker) Fetch(ctx context.Context, sector abi.SectorID, fileType stores.SectorFileType, ptype stores.PathType, am stores.AcquireMode) error {
_, done, err := (&localWorkerPathProvider{w: l, op: am}).AcquireSector(ctx, sector, fileType, stores.FTNone, ptype)
if err != nil {
return err
}
@ -176,6 +174,10 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) e
return xerrors.Errorf("removing unsealed data: %w", err)
}
return nil
}
func (l *LocalWorker) MoveStorage(ctx context.Context, sector abi.SectorID) error {
if err := l.storage.MoveStorage(ctx, sector, l.scfg.SealProofType, stores.FTSealed|stores.FTCache); err != nil {
return xerrors.Errorf("moving sealed data to storage: %w", err)
}

View File

@ -29,7 +29,9 @@ type URLs []string
type Worker interface {
ffiwrapper.StorageSealer
Fetch(ctx context.Context, s abi.SectorID, ft stores.SectorFileType, sealing bool, am stores.AcquireMode) error
MoveStorage(ctx context.Context, sector abi.SectorID) error
Fetch(ctx context.Context, s abi.SectorID, ft stores.SectorFileType, ptype stores.PathType, am stores.AcquireMode) error
UnsealPiece(context.Context, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error
ReadPiece(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) error
@ -88,7 +90,7 @@ func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg
return nil, err
}
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor}, cfg)
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si}, cfg)
if err != nil {
return nil, xerrors.Errorf("creating prover instance: %w", err)
}
@ -184,13 +186,20 @@ func schedNop(context.Context, Worker) error {
return nil
}
func schedFetch(sector abi.SectorID, ft stores.SectorFileType, sealing bool, am stores.AcquireMode) func(context.Context, Worker) error {
func schedFetch(sector abi.SectorID, ft stores.SectorFileType, ptype stores.PathType, am stores.AcquireMode) func(context.Context, Worker) error {
return func(ctx context.Context, worker Worker) error {
return worker.Fetch(ctx, sector, ft, sealing, am)
return worker.Fetch(ctx, sector, ft, ptype, am)
}
}
func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if err := m.index.StorageLock(ctx, sector, stores.FTSealed|stores.FTCache, stores.FTUnsealed); err != nil {
return xerrors.Errorf("acquiring sector lock: %w", err)
}
best, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, false)
if err != nil {
return xerrors.Errorf("read piece: checking for already existing unsealed sector: %w", err)
@ -198,7 +207,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
var selector WorkerSelector
if len(best) == 0 { // new
selector, err = newAllocSelector(ctx, m.index, stores.FTUnsealed)
selector, err = newAllocSelector(ctx, m.index, stores.FTUnsealed, stores.PathSealing)
} else { // append to existing
selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false)
}
@ -233,7 +242,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
return xerrors.Errorf("creating readPiece selector: %w", err)
}
err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(sector, stores.FTUnsealed, true, stores.AcquireMove), func(ctx context.Context, w Worker) error {
err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error {
return w.ReadPiece(ctx, sink, sector, offset, size)
})
if err != nil {
@ -249,10 +258,17 @@ func (m *Manager) NewSector(ctx context.Context, sector abi.SectorID) error {
}
func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if err := m.index.StorageLock(ctx, sector, stores.FTNone, stores.FTUnsealed); err != nil {
return abi.PieceInfo{}, xerrors.Errorf("acquiring sector lock: %w", err)
}
var selector WorkerSelector
var err error
if len(existingPieces) == 0 { // new
selector, err = newAllocSelector(ctx, m.index, stores.FTUnsealed)
selector, err = newAllocSelector(ctx, m.index, stores.FTUnsealed, stores.PathSealing)
} else { // use existing
selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false)
}
@ -274,14 +290,21 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
}
func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if err := m.index.StorageLock(ctx, sector, stores.FTUnsealed, stores.FTSealed|stores.FTCache); err != nil {
return nil, xerrors.Errorf("acquiring sector lock: %w", err)
}
// TODO: also consider where the unsealed data sits
selector, err := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed)
selector, err := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed, stores.PathSealing)
if err != nil {
return nil, xerrors.Errorf("creating path selector: %w", err)
}
err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit1, selector, schedFetch(sector, stores.FTUnsealed, true, stores.AcquireMove), func(ctx context.Context, w Worker) error {
err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit1, selector, schedFetch(sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error {
p, err := w.SealPreCommit1(ctx, sector, ticket, pieces)
if err != nil {
return err
@ -294,12 +317,19 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
}
func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.PreCommit1Out) (out storage.SectorCids, err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if err := m.index.StorageLock(ctx, sector, stores.FTSealed, stores.FTCache); err != nil {
return storage.SectorCids{}, xerrors.Errorf("acquiring sector lock: %w", err)
}
selector, err := newExistingSelector(ctx, m.index, sector, stores.FTCache|stores.FTSealed, true)
if err != nil {
return storage.SectorCids{}, xerrors.Errorf("creating path selector: %w", err)
}
err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit2, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, true, stores.AcquireMove), func(ctx context.Context, w Worker) error {
err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit2, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error {
p, err := w.SealPreCommit2(ctx, sector, phase1Out)
if err != nil {
return err
@ -311,16 +341,22 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
}
func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (out storage.Commit1Out, err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if err := m.index.StorageLock(ctx, sector, stores.FTSealed, stores.FTCache); err != nil {
return storage.Commit1Out{}, xerrors.Errorf("acquiring sector lock: %w", err)
}
// NOTE: We set allowFetch to false in so that we always execute on a worker
// with direct access to the data. We want to do that because this step is
// generally very cheap / fast, and transferring data is not worth the effort
selector, err := newExistingSelector(ctx, m.index, sector, stores.FTCache|stores.FTSealed, false)
if err != nil {
return storage.Commit1Out{}, xerrors.Errorf("creating path selector: %w", err)
}
// TODO: Try very hard to execute on worker with access to the sectors
// (except, don't.. for now at least - we are using this step to bring data
// into 'provable' storage. Optimally we'd do that in commit2, in parallel
// with snark compute)
err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, true, stores.AcquireMove), func(ctx context.Context, w Worker) error {
err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error {
p, err := w.SealCommit1(ctx, sector, ticket, seed, pieces, cids)
if err != nil {
return err
@ -347,16 +383,54 @@ func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Ou
}
func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error {
selector, err := newExistingSelector(ctx, m.index, sector, stores.FTCache|stores.FTSealed|stores.FTUnsealed, false)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if err := m.index.StorageLock(ctx, sector, stores.FTNone, stores.FTSealed|stores.FTUnsealed|stores.FTCache); err != nil {
return xerrors.Errorf("acquiring sector lock: %w", err)
}
unsealed := stores.FTUnsealed
{
unsealedStores, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, false)
if err != nil {
return xerrors.Errorf("finding unsealed sector: %w", err)
}
if len(unsealedStores) == 0 { // Is some edge-cases unsealed sector may not exist already, that's fine
unsealed = stores.FTNone
}
}
selector, err := newExistingSelector(ctx, m.index, sector, stores.FTCache|stores.FTSealed|unsealed, false)
if err != nil {
return xerrors.Errorf("creating path selector: %w", err)
}
return m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
schedFetch(sector, stores.FTCache|stores.FTSealed|stores.FTUnsealed, false, stores.AcquireMove),
err = m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
schedFetch(sector, stores.FTCache|stores.FTSealed|unsealed, stores.PathSealing, stores.AcquireMove),
func(ctx context.Context, w Worker) error {
return w.FinalizeSector(ctx, sector)
})
if err != nil {
return err
}
fetchSel, err := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed, stores.PathStorage)
if err != nil {
return xerrors.Errorf("creating fetchSel: %w", err)
}
err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel,
schedFetch(sector, stores.FTCache|stores.FTSealed, stores.PathStorage, stores.AcquireMove),
func(ctx context.Context, w Worker) error {
return w.MoveStorage(ctx, sector)
})
if err != nil {
return xerrors.Errorf("moving sector to storage: %w", err)
}
return nil
}
func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error) {

View File

@ -70,17 +70,6 @@ func (mgr *SectorMgr) NewSector(ctx context.Context, sector abi.SectorID) error
func (mgr *SectorMgr) AddPiece(ctx context.Context, sectorId abi.SectorID, existingPieces []abi.UnpaddedPieceSize, size abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
log.Warn("Add piece: ", sectorId, size, mgr.proofType)
mgr.lk.Lock()
ss, ok := mgr.sectors[sectorId]
if !ok {
ss = &sectorState{
state: statePacking,
}
mgr.sectors[sectorId] = ss
}
mgr.lk.Unlock()
ss.lk.Lock()
defer ss.lk.Unlock()
var b bytes.Buffer
tr := io.TeeReader(r, &b)
@ -92,9 +81,24 @@ func (mgr *SectorMgr) AddPiece(ctx context.Context, sectorId abi.SectorID, exist
log.Warn("Generated Piece CID: ", c)
mgr.lk.Lock()
mgr.pieces[c] = b.Bytes()
ss, ok := mgr.sectors[sectorId]
if !ok {
ss = &sectorState{
state: statePacking,
}
mgr.sectors[sectorId] = ss
}
mgr.lk.Unlock()
ss.lk.Lock()
ss.pieces = append(ss.pieces, c)
ss.lk.Unlock()
return abi.PieceInfo{
Size: size.Padded(),
PieceCID: c,
}, nil

View File

@ -11,16 +11,22 @@ import (
)
type readonlyProvider struct {
stor *stores.Local
spt abi.RegisteredProof
index stores.SectorIndex
stor *stores.Local
spt abi.RegisteredProof
}
func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) {
func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing stores.PathType) (stores.SectorPaths, func(), error) {
if allocate != stores.FTNone {
return stores.SectorPaths{}, nil, xerrors.New("read-only storage")
}
p, _, done, err := l.stor.AcquireSector(ctx, id, l.spt, existing, allocate, stores.PathType(sealing), stores.AcquireMove)
ctx, cancel := context.WithCancel(ctx)
if err := l.index.StorageLock(ctx, id, existing, stores.FTNone); err != nil {
return stores.SectorPaths{}, nil, xerrors.Errorf("acquiring sector lock: %w", err)
}
return p, done, err
p, _, err := l.stor.AcquireSector(ctx, id, l.spt, existing, allocate, sealing, stores.AcquireMove)
return p, cancel, err
}

View File

@ -14,12 +14,14 @@ import (
type allocSelector struct {
index stores.SectorIndex
alloc stores.SectorFileType
ptype stores.PathType
}
func newAllocSelector(ctx context.Context, index stores.SectorIndex, alloc stores.SectorFileType) (*allocSelector, error) {
func newAllocSelector(ctx context.Context, index stores.SectorIndex, alloc stores.SectorFileType, ptype stores.PathType) (*allocSelector, error) {
return &allocSelector{
index: index,
alloc: alloc,
ptype: ptype,
}, nil
}
@ -42,7 +44,7 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi
have[path.ID] = struct{}{}
}
best, err := s.index.StorageBestAlloc(ctx, s.alloc, spt, true)
best, err := s.index.StorageBestAlloc(ctx, s.alloc, spt, s.ptype)
if err != nil {
return false, xerrors.Errorf("finding best alloc storage: %w", err)
}

View File

@ -11,6 +11,8 @@ const (
FTUnsealed SectorFileType = 1 << iota
FTSealed
FTCache
FileTypes = iota
)
const (
@ -71,6 +73,16 @@ func (t SectorFileType) SealSpaceUse(spt abi.RegisteredProof) (uint64, error) {
return need, nil
}
func (t SectorFileType) All() [FileTypes]bool {
var out [FileTypes]bool
for i := range out {
out[i] = t&(1<<i) > 0
}
return out
}
type SectorPaths struct {
Id abi.SectorID

View File

@ -69,14 +69,15 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ
return
}
// The caller has a lock on this sector already, no need to get one here
// passing 0 spt because we don't allocate anything
paths, _, done, err := handler.Local.AcquireSector(r.Context(), id, 0, ft, FTNone, false, AcquireMove)
paths, _, err := handler.Local.AcquireSector(r.Context(), id, 0, ft, FTNone, false, AcquireMove)
if err != nil {
log.Error("%+v", err)
w.WriteHeader(500)
return
}
defer done()
path := PathByType(paths, ft)
if path == "" {

View File

@ -59,6 +59,9 @@ type SectorIndex interface { // part of storage-miner api
StorageFindSector(ctx context.Context, sector abi.SectorID, ft SectorFileType, allowFetch bool) ([]SectorStorageInfo, error)
StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredProof, pathType PathType) ([]StorageInfo, error)
// atomically acquire locks on all sector file types. close ctx to unlock
StorageLock(ctx context.Context, sector abi.SectorID, read SectorFileType, write SectorFileType) error
}
type Decl struct {
@ -80,6 +83,7 @@ type storageEntry struct {
}
type Index struct {
*indexLocks
lk sync.RWMutex
sectors map[Decl][]*declMeta
@ -88,6 +92,9 @@ type Index struct {
func NewIndex() *Index {
return &Index{
indexLocks: &indexLocks{
locks: map[abi.SectorID]*sectorLock{},
},
sectors: map[Decl][]*declMeta{},
stores: map[ID]*storageEntry{},
}

124
stores/index_locks.go Normal file
View File

@ -0,0 +1,124 @@
package stores
import (
"context"
"sync"
"golang.org/x/xerrors"
"github.com/filecoin-project/specs-actors/actors/abi"
)
type sectorLock struct {
cond *ctxCond
r [FileTypes]uint
w SectorFileType
refs uint // access with indexLocks.lk
}
func (l *sectorLock) canLock(read SectorFileType, write SectorFileType) bool {
for i, b := range write.All() {
if b && l.r[i] > 0 {
return false
}
}
// check that there are no locks taken for either read or write file types we want
return l.w&read == 0 && l.w&write == 0
}
func (l *sectorLock) tryLock(read SectorFileType, write SectorFileType) bool {
if !l.canLock(read, write) {
return false
}
for i, set := range read.All() {
if set {
l.r[i]++
}
}
l.w |= write
return true
}
func (l *sectorLock) lock(ctx context.Context, read SectorFileType, write SectorFileType) error {
l.cond.L.Lock()
defer l.cond.L.Unlock()
for !l.tryLock(read, write) {
if err := l.cond.Wait(ctx); err != nil {
return err
}
}
return nil
}
func (l *sectorLock) unlock(read SectorFileType, write SectorFileType) {
l.cond.L.Lock()
defer l.cond.L.Unlock()
for i, set := range read.All() {
if set {
l.r[i]--
}
}
l.w &= ^write
l.cond.Broadcast()
}
type indexLocks struct {
lk sync.Mutex
locks map[abi.SectorID]*sectorLock
}
func (i *indexLocks) StorageLock(ctx context.Context, sector abi.SectorID, read SectorFileType, write SectorFileType) error {
if read|write == 0 {
return nil
}
if read|write > (1<<FileTypes)-1 {
return xerrors.Errorf("unknown file types specified")
}
i.lk.Lock()
slk, ok := i.locks[sector]
if !ok {
slk = &sectorLock{}
slk.cond = newCtxCond(&sync.Mutex{})
i.locks[sector] = slk
}
slk.refs++
i.lk.Unlock()
if err := slk.lock(ctx, read, write); err != nil {
return err
}
go func() {
// TODO: we can avoid this goroutine with a bit of creativity and reflect
<-ctx.Done()
i.lk.Lock()
slk.unlock(read, write)
slk.refs--
if slk.refs == 0 {
delete(i.locks, sector)
}
i.lk.Unlock()
}()
return nil
}

170
stores/index_locks_test.go Normal file
View File

@ -0,0 +1,170 @@
package stores
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/specs-actors/actors/abi"
)
var aSector = abi.SectorID{
Miner: 2,
Number: 9000,
}
func TestCanLock(t *testing.T) {
lk := sectorLock{
r: [FileTypes]uint{},
w: FTNone,
}
require.Equal(t, true, lk.canLock(FTUnsealed, FTNone))
require.Equal(t, true, lk.canLock(FTNone, FTUnsealed))
ftAll := FTUnsealed | FTSealed | FTCache
require.Equal(t, true, lk.canLock(ftAll, FTNone))
require.Equal(t, true, lk.canLock(FTNone, ftAll))
lk.r[0] = 1 // unsealed read taken
require.Equal(t, true, lk.canLock(FTUnsealed, FTNone))
require.Equal(t, false, lk.canLock(FTNone, FTUnsealed))
require.Equal(t, true, lk.canLock(ftAll, FTNone))
require.Equal(t, false, lk.canLock(FTNone, ftAll))
require.Equal(t, true, lk.canLock(FTNone, FTSealed|FTCache))
require.Equal(t, true, lk.canLock(FTUnsealed, FTSealed|FTCache))
lk.r[0] = 0
lk.w = FTSealed
require.Equal(t, true, lk.canLock(FTUnsealed, FTNone))
require.Equal(t, true, lk.canLock(FTNone, FTUnsealed))
require.Equal(t, false, lk.canLock(FTSealed, FTNone))
require.Equal(t, false, lk.canLock(FTNone, FTSealed))
require.Equal(t, false, lk.canLock(ftAll, FTNone))
require.Equal(t, false, lk.canLock(FTNone, ftAll))
}
func TestIndexLocksSeq(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ilk := &indexLocks{
locks: map[abi.SectorID]*sectorLock{},
}
require.NoError(t, ilk.StorageLock(ctx, aSector, FTNone, FTUnsealed))
cancel()
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
require.NoError(t, ilk.StorageLock(ctx, aSector, FTNone, FTUnsealed))
cancel()
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
require.NoError(t, ilk.StorageLock(ctx, aSector, FTNone, FTUnsealed))
cancel()
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
require.NoError(t, ilk.StorageLock(ctx, aSector, FTUnsealed, FTNone))
cancel()
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
require.NoError(t, ilk.StorageLock(ctx, aSector, FTNone, FTUnsealed))
cancel()
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
require.NoError(t, ilk.StorageLock(ctx, aSector, FTNone, FTUnsealed))
cancel()
}
func TestIndexLocksBlockOn(t *testing.T) {
test := func(r1 SectorFileType, w1 SectorFileType, r2 SectorFileType, w2 SectorFileType) func(t *testing.T) {
return func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ilk := &indexLocks{
locks: map[abi.SectorID]*sectorLock{},
}
require.NoError(t, ilk.StorageLock(ctx, aSector, r1, w1))
sch := make(chan struct{})
go func() {
ctx, cancel := context.WithCancel(context.Background())
sch <- struct{}{}
require.NoError(t, ilk.StorageLock(ctx, aSector, r2, w2))
cancel()
sch <- struct{}{}
}()
<-sch
select {
case <-sch:
t.Fatal("that shouldn't happen")
case <-time.After(40 * time.Millisecond):
}
cancel()
select {
case <-sch:
case <-time.After(time.Second):
t.Fatal("timed out")
}
}
}
t.Run("readBlocksWrite", test(FTUnsealed, FTNone, FTNone, FTUnsealed))
t.Run("writeBlocksRead", test(FTNone, FTUnsealed, FTUnsealed, FTNone))
t.Run("writeBlocksWrite", test(FTNone, FTUnsealed, FTNone, FTUnsealed))
}
func TestIndexLocksBlockWonR(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ilk := &indexLocks{
locks: map[abi.SectorID]*sectorLock{},
}
require.NoError(t, ilk.StorageLock(ctx, aSector, FTUnsealed, FTNone))
sch := make(chan struct{})
go func() {
ctx, cancel := context.WithCancel(context.Background())
sch <- struct{}{}
require.NoError(t, ilk.StorageLock(ctx, aSector, FTNone, FTUnsealed))
cancel()
sch <- struct{}{}
}()
<-sch
select {
case <-sch:
t.Fatal("that shouldn't happen")
case <-time.After(40 * time.Millisecond):
}
cancel()
select {
case <-sch:
case <-time.After(time.Second):
t.Fatal("timed out")
}
}

View File

@ -0,0 +1,49 @@
package stores
import (
"context"
"sync"
)
// like sync.Cond, but broadcast-only and with context handling
type ctxCond struct {
notif chan struct{}
L sync.Locker
lk sync.Mutex
}
func newCtxCond(l sync.Locker) *ctxCond {
return &ctxCond{
L: l,
}
}
func (c *ctxCond) Broadcast() {
c.lk.Lock()
if c.notif != nil {
close(c.notif)
c.notif = nil
}
c.lk.Unlock()
}
func (c *ctxCond) Wait(ctx context.Context) error {
c.lk.Lock()
if c.notif == nil {
c.notif = make(chan struct{})
}
wait := c.notif
c.lk.Unlock()
c.L.Unlock()
defer c.L.Lock()
select {
case <-wait:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

View File

@ -24,7 +24,7 @@ const (
)
type Store interface {
AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, sealing PathType, op AcquireMode) (paths SectorPaths, stores SectorPaths, done func(), err error)
AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, sealing PathType, op AcquireMode) (paths SectorPaths, stores SectorPaths, err error)
Remove(ctx context.Context, s abi.SectorID, types SectorFileType, force bool) error
// like remove, but doesn't remove the primary sector copy, nor the last

View File

@ -197,12 +197,13 @@ func (st *Local) reportHealth(ctx context.Context) {
}
}
func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, pathType PathType, op AcquireMode) (SectorPaths, SectorPaths, func(), error) {
func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, pathType PathType, op AcquireMode) (SectorPaths, SectorPaths, error) {
if existing|allocate != existing^allocate {
return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
return SectorPaths{}, SectorPaths{}, xerrors.New("can't both find and allocate a sector")
}
st.localLk.RLock()
defer st.localLk.RUnlock()
var out SectorPaths
var storageIDs SectorPaths
@ -245,7 +246,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.Re
sis, err := st.index.StorageBestAlloc(ctx, fileType, spt, pathType)
if err != nil {
st.localLk.RUnlock()
return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("finding best storage for allocating : %w", err)
return SectorPaths{}, SectorPaths{}, xerrors.Errorf("finding best storage for allocating : %w", err)
}
var best string
@ -277,7 +278,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.Re
if best == "" {
st.localLk.RUnlock()
return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("couldn't find a suitable path for a sector")
return SectorPaths{}, SectorPaths{}, xerrors.Errorf("couldn't find a suitable path for a sector")
}
SetPathByType(&out, fileType, best)
@ -285,7 +286,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.Re
allocate ^= fileType
}
return out, storageIDs, st.localLk.RUnlock, nil
return out, storageIDs, nil
}
func (st *Local) Local(ctx context.Context) ([]StoragePath, error) {
@ -399,17 +400,15 @@ func (st *Local) removeSector(ctx context.Context, sid abi.SectorID, typ SectorF
}
func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, types SectorFileType) error {
dest, destIds, sdone, err := st.AcquireSector(ctx, s, spt, FTNone, types, false, AcquireMove)
dest, destIds, err := st.AcquireSector(ctx, s, spt, FTNone, types, false, AcquireMove)
if err != nil {
return xerrors.Errorf("acquire dest storage: %w", err)
}
defer sdone()
src, srcIds, ddone, err := st.AcquireSector(ctx, s, spt, types, FTNone, false, AcquireMove)
src, srcIds, err := st.AcquireSector(ctx, s, spt, types, FTNone, false, AcquireMove)
if err != nil {
return xerrors.Errorf("acquire src storage: %w", err)
}
defer ddone()
for _, fileType := range PathTypes {
if fileType&types == 0 {

View File

@ -50,9 +50,9 @@ func NewRemote(local *Local, index SectorIndex, auth http.Header) *Remote {
}
}
func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, pathType PathType, op AcquireMode) (SectorPaths, SectorPaths, func(), error) {
func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, pathType PathType, op AcquireMode) (SectorPaths, SectorPaths, error) {
if existing|allocate != existing^allocate {
return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
return SectorPaths{}, SectorPaths{}, xerrors.New("can't both find and allocate a sector")
}
for {
@ -71,7 +71,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi
case <-c:
continue
case <-ctx.Done():
return SectorPaths{}, SectorPaths{}, nil, ctx.Err()
return SectorPaths{}, SectorPaths{}, ctx.Err()
}
}
@ -82,9 +82,9 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi
r.fetchLk.Unlock()
}()
paths, stores, done, err := r.local.AcquireSector(ctx, s, spt, existing, allocate, pathType, op)
paths, stores, err := r.local.AcquireSector(ctx, s, spt, existing, allocate, pathType, op)
if err != nil {
return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("local acquire error: %w", err)
return SectorPaths{}, SectorPaths{}, xerrors.Errorf("local acquire error: %w", err)
}
for _, fileType := range PathTypes {
@ -96,13 +96,11 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi
continue
}
ap, storageID, url, rdone, err := r.acquireFromRemote(ctx, s, spt, fileType, pathType, op)
ap, storageID, url, err := r.acquireFromRemote(ctx, s, spt, fileType, pathType, op)
if err != nil {
done()
return SectorPaths{}, SectorPaths{}, nil, err
return SectorPaths{}, SectorPaths{}, err
}
done = mergeDone(done, rdone)
SetPathByType(&paths, fileType, ap)
SetPathByType(&stores, fileType, string(storageID))
@ -118,26 +116,26 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi
}
}
return paths, stores, done, nil
return paths, stores, nil
}
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, fileType SectorFileType, pathType PathType, op AcquireMode) (string, ID, string, func(), error) {
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, fileType SectorFileType, pathType PathType, op AcquireMode) (string, ID, string, error) {
si, err := r.index.StorageFindSector(ctx, s, fileType, false)
if err != nil {
return "", "", "", nil, err
return "", "", "", err
}
if len(si) == 0 {
return "", "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote(%d): %w", s, fileType, storiface.ErrSectorNotFound)
return "", "", "", xerrors.Errorf("failed to acquire sector %v from remote(%d): %w", s, fileType, storiface.ErrSectorNotFound)
}
sort.Slice(si, func(i, j int) bool {
return si[i].Weight < si[j].Weight
})
apaths, ids, done, err := r.local.AcquireSector(ctx, s, spt, FTNone, fileType, pathType, op)
apaths, ids, err := r.local.AcquireSector(ctx, s, spt, FTNone, fileType, pathType, op)
if err != nil {
return "", "", "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err)
return "", "", "", xerrors.Errorf("allocate local sector for fetching: %w", err)
}
dest := PathByType(apaths, fileType)
storageID := PathByType(ids, fileType)
@ -156,12 +154,11 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, spt abi.
if merr != nil {
log.Warnw("acquireFromRemote encountered errors when fetching sector from remote", "errors", merr)
}
return dest, ID(storageID), url, done, nil
return dest, ID(storageID), url, nil
}
}
done()
return "", "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr)
return "", "", "", xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr)
}
func (r *Remote) fetch(ctx context.Context, url, outname string) error {
@ -215,11 +212,10 @@ func (r *Remote) fetch(ctx context.Context, url, outname string) error {
func (r *Remote) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, types SectorFileType) error {
// Make sure we have the data local
_, _, ddone, err := r.AcquireSector(ctx, s, spt, types, FTNone, PathStorage, AcquireMove)
_, _, err := r.AcquireSector(ctx, s, spt, types, FTNone, PathStorage, AcquireMove)
if err != nil {
return xerrors.Errorf("acquire src storage (remote): %w", err)
}
ddone()
return r.local.MoveStorage(ctx, s, spt, types)
}
@ -336,11 +332,4 @@ func (r *Remote) FsStat(ctx context.Context, id ID) (FsStat, error) {
return out, nil
}
func mergeDone(a func(), b func()) func() {
return func() {
a()
b()
}
}
var _ Store = &Remote{}

View File

@ -77,7 +77,11 @@ func (t *testWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) er
panic("implement me")
}
func (t *testWorker) Fetch(ctx context.Context, id abi.SectorID, fileType stores.SectorFileType, b bool, am stores.AcquireMode) error {
func (t *testWorker) MoveStorage(ctx context.Context, sector abi.SectorID) error {
panic("implement me")
}
func (t *testWorker) Fetch(ctx context.Context, id abi.SectorID, fileType stores.SectorFileType, ptype stores.PathType, am stores.AcquireMode) error {
return nil
}