Merge pull request #1373 from filecoin-project/fix/amgr-races

storage: Improve thread safety
This commit is contained in:
Łukasz Magiera 2020-03-09 23:11:17 +01:00 committed by GitHub
commit 5d6d75f546
13 changed files with 103 additions and 73 deletions

View File

@ -17,7 +17,6 @@ const BlockDelay = 6
const PropagationDelay = 3 const PropagationDelay = 3
// SlashablePowerDelay is the number of epochs after ElectionPeriodStart, after // SlashablePowerDelay is the number of epochs after ElectionPeriodStart, after
// which the miner is slashed // which the miner is slashed
// //

View File

@ -205,7 +205,7 @@ func ReqContext(cctx *cli.Context) context.Context {
<-sigChan <-sigChan
done() done()
}() }()
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT) signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP)
return ctx return ctx
} }

View File

@ -78,7 +78,7 @@ var runCmd = &cli.Command{
return err return err
} }
log.Info("Remote version: %s", v.Version) log.Infof("Remote version: %s", v.Version)
maxBatch := cctx.Int("max-batch") maxBatch := cctx.Int("max-batch")

View File

@ -175,10 +175,10 @@ create table if not exists messages
create unique index if not exists messages_cid_uindex create unique index if not exists messages_cid_uindex
on messages (cid); on messages (cid);
create index messages_from_index create index if not exists messages_from_index
on messages ("from"); on messages ("from");
create index messages_to_index create index if not exists messages_to_index
on messages ("to"); on messages ("to");
create table if not exists block_messages create table if not exists block_messages
@ -451,51 +451,51 @@ func (st *storage) storeActors(actors map[address.Address]map[types.Actor]actorI
func (st *storage) storeMiners(miners map[minerKey]*minerInfo) error { func (st *storage) storeMiners(miners map[minerKey]*minerInfo) error {
/*tx, err := st.db.Begin() /*tx, err := st.db.Begin()
if err != nil { if err != nil {
return err
}
if _, err := tx.Exec(`
create temp table mh (like miner_heads excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy mh (head, addr, stateroot, sectorset, setsize, provingset, provingsize, owner, worker, peerid, sectorsize, power, ppe) from STDIN`)
if err != nil {
return err
}
for k, i := range miners {
if _, err := stmt.Exec(
k.act.Head.String(),
k.addr.String(),
k.stateroot.String(),
i.state.Sectors.String(),
fmt.Sprint(i.ssize),
i.state.ProvingSet.String(),
fmt.Sprint(i.psize),
i.info.Owner.String(),
i.info.Worker.String(),
i.info.PeerId.String(),
i.info.SectorSize,
i.power.String(), // TODO: SPA
i.state.PoStState.ProvingPeriodStart,
); err != nil {
return err return err
} }
}
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into miner_heads select * from mh on conflict do nothing `); err != nil { if _, err := tx.Exec(`
return xerrors.Errorf("actor put: %w", err)
}
return tx.Commit()*/ create temp table mh (like miner_heads excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy mh (head, addr, stateroot, sectorset, setsize, provingset, provingsize, owner, worker, peerid, sectorsize, power, ppe) from STDIN`)
if err != nil {
return err
}
for k, i := range miners {
if _, err := stmt.Exec(
k.act.Head.String(),
k.addr.String(),
k.stateroot.String(),
i.state.Sectors.String(),
fmt.Sprint(i.ssize),
i.state.ProvingSet.String(),
fmt.Sprint(i.psize),
i.info.Owner.String(),
i.info.Worker.String(),
i.info.PeerId.String(),
i.info.SectorSize,
i.power.String(), // TODO: SPA
i.state.PoStState.ProvingPeriodStart,
); err != nil {
return err
}
}
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into miner_heads select * from mh on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
return tx.Commit()*/
return nil return nil
} }
@ -941,7 +941,7 @@ func (st *storage) storeDeals(deals map[string]api.MarketDeal) error {
if err := tx.Commit(); err != nil { if err := tx.Commit(); err != nil {
return err return err
} }
*/ */
return nil return nil
} }

View File

@ -170,7 +170,7 @@ var initCmd = &cli.Command{
return err return err
} }
var sc config.StorageConfig var localPaths []config.LocalPath
if pssb := cctx.StringSlice("pre-sealed-sectors"); len(pssb) != 0 { if pssb := cctx.StringSlice("pre-sealed-sectors"); len(pssb) != 0 {
log.Infof("Setting up storage config with presealed sector", pssb) log.Infof("Setting up storage config with presealed sector", pssb)
@ -180,7 +180,7 @@ var initCmd = &cli.Command{
if err != nil { if err != nil {
return err return err
} }
sc.StoragePaths = append(sc.StoragePaths, config.LocalPath{ localPaths = append(localPaths, config.LocalPath{
Path: psp, Path: psp,
}) })
} }
@ -201,12 +201,14 @@ var initCmd = &cli.Command{
return xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(lr.Path(), "sectorstore.json"), err) return xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(lr.Path(), "sectorstore.json"), err)
} }
sc.StoragePaths = append(sc.StoragePaths, config.LocalPath{ localPaths = append(localPaths, config.LocalPath{
Path: lr.Path(), Path: lr.Path(),
}) })
} }
if err := lr.SetStorage(sc); err != nil { if err := lr.SetStorage(func(sc *config.StorageConfig) {
sc.StoragePaths = append(sc.StoragePaths, localPaths...)
}); err != nil {
return xerrors.Errorf("set storage config: %w", err) return xerrors.Errorf("set storage config: %w", err)
} }

View File

@ -65,6 +65,12 @@ var storageAttachCmd = &cli.Command{
} }
if cctx.Bool("init") { if cctx.Bool("init") {
if err := os.MkdirAll(p, 0755); err != nil {
if !os.IsExist(err) {
return err
}
}
_, err := os.Stat(filepath.Join(p, metaFile)) _, err := os.Stat(filepath.Join(p, metaFile))
if !os.IsNotExist(err) { if !os.IsNotExist(err) {
if err == nil { if err == nil {

2
go.mod
View File

@ -22,7 +22,7 @@ require (
github.com/filecoin-project/go-fil-markets v0.0.0-20200304003055-d449a980d4bd github.com/filecoin-project/go-fil-markets v0.0.0-20200304003055-d449a980d4bd
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663 github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200306215829-15085af52c2b github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200309211213-75e9124a1904
github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9 github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9
github.com/filecoin-project/go-statestore v0.1.0 github.com/filecoin-project/go-statestore v0.1.0
github.com/filecoin-project/specs-actors v0.0.0-20200309035311-2490172c2e9f github.com/filecoin-project/specs-actors v0.0.0-20200309035311-2490172c2e9f

4
go.sum
View File

@ -124,8 +124,8 @@ github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6/go.m
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663 h1:eYxi6vI5CyeXD15X1bB3bledDXbqKxqf0wQzTLgwYwA= github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663 h1:eYxi6vI5CyeXD15X1bB3bledDXbqKxqf0wQzTLgwYwA=
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc= github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc=
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200226210935-4739f8749f56/go.mod h1:tzTc9BxxSbjlIzhFwm5h9oBkXKkRuLxeiWspntwnKyw= github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200226210935-4739f8749f56/go.mod h1:tzTc9BxxSbjlIzhFwm5h9oBkXKkRuLxeiWspntwnKyw=
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200306215829-15085af52c2b h1:/08Z7ZXHt1qRGz1Ipd0suMgJOmuind8MDwG9va+OPz0= github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200309211213-75e9124a1904 h1:yz9PqcRxTx3WsmbGoNRRvU2HfD5+B5YCxu2c3iFa6+o=
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200306215829-15085af52c2b/go.mod h1:NcE+iL0bbYnamGmYQgCPVGbSaf8VF2/CLra/61B3I3I= github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200309211213-75e9124a1904/go.mod h1:NcE+iL0bbYnamGmYQgCPVGbSaf8VF2/CLra/61B3I3I=
github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9 h1:k9qVR9ItcziSB2rxtlkN/MDWNlbsI6yzec+zjUatLW0= github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9 h1:k9qVR9ItcziSB2rxtlkN/MDWNlbsI6yzec+zjUatLW0=
github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig= github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig=
github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ= github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ=

View File

@ -219,6 +219,8 @@ type fsLockedRepo struct {
ds datastore.Batching ds datastore.Batching
dsErr error dsErr error
dsOnce sync.Once dsOnce sync.Once
storageLk sync.Mutex
} }
func (fsr *fsLockedRepo) Path() string { func (fsr *fsLockedRepo) Path() string {
@ -278,15 +280,32 @@ func (fsr *fsLockedRepo) Config() (interface{}, error) {
} }
func (fsr *fsLockedRepo) GetStorage() (config.StorageConfig, error) { func (fsr *fsLockedRepo) GetStorage() (config.StorageConfig, error) {
c, err := config.StorageFromFile(fsr.join(fsStorageConfig), nil) fsr.storageLk.Lock()
defer fsr.storageLk.Unlock()
return fsr.getStorage(nil)
}
func (fsr *fsLockedRepo) getStorage(def *config.StorageConfig) (config.StorageConfig, error) {
c, err := config.StorageFromFile(fsr.join(fsStorageConfig), def)
if err != nil { if err != nil {
return config.StorageConfig{}, err return config.StorageConfig{}, err
} }
return *c, nil return *c, nil
} }
func (fsr *fsLockedRepo) SetStorage(c config.StorageConfig) error { func (fsr *fsLockedRepo) SetStorage(c func(*config.StorageConfig)) error {
return config.WriteStorageFile(fsr.join(fsStorageConfig), c) fsr.storageLk.Lock()
defer fsr.storageLk.Unlock()
sc, err := fsr.getStorage(&config.StorageConfig{})
if err != nil {
return xerrors.Errorf("get storage: %w", err)
}
c(&sc)
return config.WriteStorageFile(fsr.join(fsStorageConfig), sc)
} }
func (fsr *fsLockedRepo) SetAPIEndpoint(ma multiaddr.Multiaddr) error { func (fsr *fsLockedRepo) SetAPIEndpoint(ma multiaddr.Multiaddr) error {

View File

@ -39,7 +39,7 @@ type LockedRepo interface {
Config() (interface{}, error) Config() (interface{}, error)
GetStorage() (config.StorageConfig, error) GetStorage() (config.StorageConfig, error)
SetStorage(config.StorageConfig) error SetStorage(func(*config.StorageConfig)) error
// SetAPIEndpoint sets the endpoint of the current API // SetAPIEndpoint sets the endpoint of the current API
// so it can be read by API clients // so it can be read by API clients

View File

@ -53,8 +53,10 @@ func (lmem *lockedMemRepo) GetStorage() (config.StorageConfig, error) {
return *lmem.sc, nil return *lmem.sc, nil
} }
func (lmem *lockedMemRepo) SetStorage(sc config.StorageConfig) error { func (lmem *lockedMemRepo) SetStorage(c func(*config.StorageConfig)) error {
lmem.sc = &sc _, _ = lmem.GetStorage()
c(lmem.sc)
return nil return nil
} }

View File

@ -23,7 +23,7 @@ type SectorIDCounter interface {
type LocalStorage interface { type LocalStorage interface {
GetStorage() (config.StorageConfig, error) GetStorage() (config.StorageConfig, error)
SetStorage(config.StorageConfig) error SetStorage(func(*config.StorageConfig)) error
} }
type Path struct { type Path struct {
@ -96,16 +96,9 @@ func (m *Manager) AddLocalStorage(path string) error {
return xerrors.Errorf("opening local path: %w", err) return xerrors.Errorf("opening local path: %w", err)
} }
// TODO: Locks! if err := m.storage.localStorage.SetStorage(func(sc *config.StorageConfig) {
sc.StoragePaths = append(sc.StoragePaths, config.LocalPath{Path: path})
sc, err := m.storage.localStorage.GetStorage() }); err != nil {
if err != nil {
return xerrors.Errorf("get storage config: %w", err)
}
sc.StoragePaths = append(sc.StoragePaths, config.LocalPath{Path: path})
if err := m.storage.localStorage.SetStorage(sc); err != nil {
return xerrors.Errorf("get storage config: %w", err) return xerrors.Errorf("get storage config: %w", err)
} }
return nil return nil

View File

@ -24,10 +24,12 @@ type storage struct {
localLk sync.RWMutex localLk sync.RWMutex
localStorage LocalStorage localStorage LocalStorage
paths []path paths []*path
} }
type path struct { type path struct {
lk sync.Mutex
meta config.StorageMeta meta config.StorageMeta
local string local string
@ -47,7 +49,7 @@ func (st *storage) openPath(p string) error {
// TODO: Check existing / dedupe // TODO: Check existing / dedupe
out := path{ out := &path{
meta: meta, meta: meta,
local: p, local: p,
sectors: map[abi.SectorID]sectorbuilder.SectorFileType{}, sectors: map[abi.SectorID]sectorbuilder.SectorFileType{},
@ -115,10 +117,12 @@ func (st *storage) acquireSector(mid abi.ActorID, id abi.SectorNumber, existing
} }
for _, p := range st.paths { for _, p := range st.paths {
p.lk.Lock()
s, ok := p.sectors[abi.SectorID{ s, ok := p.sectors[abi.SectorID{
Miner: mid, Miner: mid,
Number: id, Number: id,
}] }]
p.lk.Unlock()
if !ok { if !ok {
continue continue
} }
@ -155,10 +159,13 @@ func (st *storage) acquireSector(mid abi.ActorID, id abi.SectorNumber, existing
if !sealing && !p.meta.CanStore { if !sealing && !p.meta.CanStore {
continue continue
} }
p.lk.Lock()
p.sectors[abi.SectorID{ p.sectors[abi.SectorID{
Miner: mid, Miner: mid,
Number: id, Number: id,
}] |= fileType }] |= fileType
p.lk.Unlock()
// TODO: Check free space // TODO: Check free space
// TODO: Calc weights // TODO: Calc weights
@ -214,6 +221,7 @@ func (st *storage) findBestAllocStorage(allocate sectorbuilder.SectorFileType, s
func (st *storage) findSector(mid abi.ActorID, sn abi.SectorNumber, typ sectorbuilder.SectorFileType) ([]config.StorageMeta, error) { func (st *storage) findSector(mid abi.ActorID, sn abi.SectorNumber, typ sectorbuilder.SectorFileType) ([]config.StorageMeta, error) {
var out []config.StorageMeta var out []config.StorageMeta
for _, p := range st.paths { for _, p := range st.paths {
p.lk.Lock()
t := p.sectors[abi.SectorID{ t := p.sectors[abi.SectorID{
Miner: mid, Miner: mid,
Number: sn, Number: sn,
@ -221,6 +229,7 @@ func (st *storage) findSector(mid abi.ActorID, sn abi.SectorNumber, typ sectorbu
if t|typ == 0 { if t|typ == 0 {
continue continue
} }
p.lk.Unlock()
out = append(out, p.meta) out = append(out, p.meta)
} }
if len(out) == 0 { if len(out) == 0 {