storage: Improve thread safety
This commit is contained in:
parent
946fff39a6
commit
01087e01d2
@ -170,7 +170,7 @@ var initCmd = &cli.Command{
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var sc config.StorageConfig
|
var localPaths []config.LocalPath
|
||||||
|
|
||||||
if pssb := cctx.StringSlice("pre-sealed-sectors"); len(pssb) != 0 {
|
if pssb := cctx.StringSlice("pre-sealed-sectors"); len(pssb) != 0 {
|
||||||
log.Infof("Setting up storage config with presealed sector", pssb)
|
log.Infof("Setting up storage config with presealed sector", pssb)
|
||||||
@ -180,7 +180,7 @@ var initCmd = &cli.Command{
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
sc.StoragePaths = append(sc.StoragePaths, config.LocalPath{
|
localPaths = append(localPaths, config.LocalPath{
|
||||||
Path: psp,
|
Path: psp,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -201,12 +201,14 @@ var initCmd = &cli.Command{
|
|||||||
return xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(lr.Path(), "sectorstore.json"), err)
|
return xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(lr.Path(), "sectorstore.json"), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
sc.StoragePaths = append(sc.StoragePaths, config.LocalPath{
|
localPaths = append(localPaths, config.LocalPath{
|
||||||
Path: lr.Path(),
|
Path: lr.Path(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := lr.SetStorage(sc); err != nil {
|
if err := lr.SetStorage(func(sc *config.StorageConfig) {
|
||||||
|
sc.StoragePaths = append(sc.StoragePaths, localPaths...)
|
||||||
|
}); err != nil {
|
||||||
return xerrors.Errorf("set storage config: %w", err)
|
return xerrors.Errorf("set storage config: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -219,6 +219,8 @@ type fsLockedRepo struct {
|
|||||||
ds datastore.Batching
|
ds datastore.Batching
|
||||||
dsErr error
|
dsErr error
|
||||||
dsOnce sync.Once
|
dsOnce sync.Once
|
||||||
|
|
||||||
|
storageLk sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fsr *fsLockedRepo) Path() string {
|
func (fsr *fsLockedRepo) Path() string {
|
||||||
@ -278,6 +280,13 @@ func (fsr *fsLockedRepo) Config() (interface{}, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (fsr *fsLockedRepo) GetStorage() (config.StorageConfig, error) {
|
func (fsr *fsLockedRepo) GetStorage() (config.StorageConfig, error) {
|
||||||
|
fsr.storageLk.Lock()
|
||||||
|
defer fsr.storageLk.Unlock()
|
||||||
|
|
||||||
|
return fsr.getStorage()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fsr *fsLockedRepo) getStorage() (config.StorageConfig, error) {
|
||||||
c, err := config.StorageFromFile(fsr.join(fsStorageConfig), nil)
|
c, err := config.StorageFromFile(fsr.join(fsStorageConfig), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return config.StorageConfig{}, err
|
return config.StorageConfig{}, err
|
||||||
@ -285,8 +294,18 @@ func (fsr *fsLockedRepo) GetStorage() (config.StorageConfig, error) {
|
|||||||
return *c, nil
|
return *c, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fsr *fsLockedRepo) SetStorage(c config.StorageConfig) error {
|
func (fsr *fsLockedRepo) SetStorage(c func(*config.StorageConfig)) error {
|
||||||
return config.WriteStorageFile(fsr.join(fsStorageConfig), c)
|
fsr.storageLk.Lock()
|
||||||
|
defer fsr.storageLk.Unlock()
|
||||||
|
|
||||||
|
sc, err := fsr.getStorage()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("get storage: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c(&sc)
|
||||||
|
|
||||||
|
return config.WriteStorageFile(fsr.join(fsStorageConfig), sc)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fsr *fsLockedRepo) SetAPIEndpoint(ma multiaddr.Multiaddr) error {
|
func (fsr *fsLockedRepo) SetAPIEndpoint(ma multiaddr.Multiaddr) error {
|
||||||
|
@ -39,7 +39,7 @@ type LockedRepo interface {
|
|||||||
Config() (interface{}, error)
|
Config() (interface{}, error)
|
||||||
|
|
||||||
GetStorage() (config.StorageConfig, error)
|
GetStorage() (config.StorageConfig, error)
|
||||||
SetStorage(config.StorageConfig) error
|
SetStorage(func(*config.StorageConfig)) error
|
||||||
|
|
||||||
// SetAPIEndpoint sets the endpoint of the current API
|
// SetAPIEndpoint sets the endpoint of the current API
|
||||||
// so it can be read by API clients
|
// so it can be read by API clients
|
||||||
|
@ -53,8 +53,10 @@ func (lmem *lockedMemRepo) GetStorage() (config.StorageConfig, error) {
|
|||||||
return *lmem.sc, nil
|
return *lmem.sc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (lmem *lockedMemRepo) SetStorage(sc config.StorageConfig) error {
|
func (lmem *lockedMemRepo) SetStorage(c func(*config.StorageConfig)) error {
|
||||||
lmem.sc = &sc
|
_, _ = lmem.GetStorage()
|
||||||
|
|
||||||
|
c(lmem.sc)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -23,7 +23,7 @@ type SectorIDCounter interface {
|
|||||||
|
|
||||||
type LocalStorage interface {
|
type LocalStorage interface {
|
||||||
GetStorage() (config.StorageConfig, error)
|
GetStorage() (config.StorageConfig, error)
|
||||||
SetStorage(config.StorageConfig) error
|
SetStorage(func(*config.StorageConfig)) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type Path struct {
|
type Path struct {
|
||||||
@ -96,16 +96,9 @@ func (m *Manager) AddLocalStorage(path string) error {
|
|||||||
return xerrors.Errorf("opening local path: %w", err)
|
return xerrors.Errorf("opening local path: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Locks!
|
if err := m.storage.localStorage.SetStorage(func(sc *config.StorageConfig) {
|
||||||
|
sc.StoragePaths = append(sc.StoragePaths, config.LocalPath{Path: path})
|
||||||
sc, err := m.storage.localStorage.GetStorage()
|
}); err != nil {
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("get storage config: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
sc.StoragePaths = append(sc.StoragePaths, config.LocalPath{Path: path})
|
|
||||||
|
|
||||||
if err := m.storage.localStorage.SetStorage(sc); err != nil {
|
|
||||||
return xerrors.Errorf("get storage config: %w", err)
|
return xerrors.Errorf("get storage config: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -28,6 +28,8 @@ type storage struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type path struct {
|
type path struct {
|
||||||
|
lk sync.Mutex
|
||||||
|
|
||||||
meta config.StorageMeta
|
meta config.StorageMeta
|
||||||
local string
|
local string
|
||||||
|
|
||||||
@ -115,10 +117,12 @@ func (st *storage) acquireSector(mid abi.ActorID, id abi.SectorNumber, existing
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, p := range st.paths {
|
for _, p := range st.paths {
|
||||||
|
p.lk.Lock()
|
||||||
s, ok := p.sectors[abi.SectorID{
|
s, ok := p.sectors[abi.SectorID{
|
||||||
Miner: mid,
|
Miner: mid,
|
||||||
Number: id,
|
Number: id,
|
||||||
}]
|
}]
|
||||||
|
p.lk.Unlock()
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -155,10 +159,13 @@ func (st *storage) acquireSector(mid abi.ActorID, id abi.SectorNumber, existing
|
|||||||
if !sealing && !p.meta.CanStore {
|
if !sealing && !p.meta.CanStore {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
p.lk.Lock()
|
||||||
p.sectors[abi.SectorID{
|
p.sectors[abi.SectorID{
|
||||||
Miner: mid,
|
Miner: mid,
|
||||||
Number: id,
|
Number: id,
|
||||||
}] |= fileType
|
}] |= fileType
|
||||||
|
p.lk.Unlock()
|
||||||
|
|
||||||
// TODO: Check free space
|
// TODO: Check free space
|
||||||
// TODO: Calc weights
|
// TODO: Calc weights
|
||||||
@ -214,6 +221,7 @@ func (st *storage) findBestAllocStorage(allocate sectorbuilder.SectorFileType, s
|
|||||||
func (st *storage) findSector(mid abi.ActorID, sn abi.SectorNumber, typ sectorbuilder.SectorFileType) ([]config.StorageMeta, error) {
|
func (st *storage) findSector(mid abi.ActorID, sn abi.SectorNumber, typ sectorbuilder.SectorFileType) ([]config.StorageMeta, error) {
|
||||||
var out []config.StorageMeta
|
var out []config.StorageMeta
|
||||||
for _, p := range st.paths {
|
for _, p := range st.paths {
|
||||||
|
p.lk.Lock()
|
||||||
t := p.sectors[abi.SectorID{
|
t := p.sectors[abi.SectorID{
|
||||||
Miner: mid,
|
Miner: mid,
|
||||||
Number: sn,
|
Number: sn,
|
||||||
@ -221,6 +229,7 @@ func (st *storage) findSector(mid abi.ActorID, sn abi.SectorNumber, typ sectorbu
|
|||||||
if t|typ == 0 {
|
if t|typ == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
p.lk.Unlock()
|
||||||
out = append(out, p.meta)
|
out = append(out, p.meta)
|
||||||
}
|
}
|
||||||
if len(out) == 0 {
|
if len(out) == 0 {
|
||||||
|
Loading…
Reference in New Issue
Block a user