address moar comments
This commit is contained in:
parent
95709a0583
commit
a51e55390a
@ -88,8 +88,8 @@ Ex:
|
|||||||
pet := "cat"
|
pet := "cat"
|
||||||
err := db.Select(ctx, &users, "SELECT name, id, tel_no FROM customers WHERE pet=?", pet)
|
err := db.Select(ctx, &users, "SELECT name, id, tel_no FROM customers WHERE pet=?", pet)
|
||||||
*/
|
*/
|
||||||
func (db *DB) Select(ctx context.Context, sliceOfStructPtr any, sql string, arguments ...any) error {
|
func (db *DB) Select(ctx context.Context, sliceOfStructPtr any, sql rawStringOnly, arguments ...any) error {
|
||||||
return pgxscan.Select(ctx, db.pgx, sliceOfStructPtr, sql, arguments...)
|
return pgxscan.Select(ctx, db.pgx, sliceOfStructPtr, string(sql), arguments...)
|
||||||
}
|
}
|
||||||
|
|
||||||
type Tx struct {
|
type Tx struct {
|
||||||
|
@ -132,7 +132,7 @@ func ConfigStorageMiner(c interface{}) Option {
|
|||||||
|
|
||||||
If(cfg.Subsystems.EnableSectorStorage,
|
If(cfg.Subsystems.EnableSectorStorage,
|
||||||
// Sector storage
|
// Sector storage
|
||||||
Override(new(*paths.IndexProxy), paths.NewIndexProxyHelper(true)),
|
Override(new(*paths.IndexProxy), paths.NewIndexProxyHelper(cfg.Subsystems.EnableSectorIndexDB)),
|
||||||
Override(new(paths.SectorIndex), From(new(*paths.IndexProxy))),
|
Override(new(paths.SectorIndex), From(new(*paths.IndexProxy))),
|
||||||
Override(new(*sectorstorage.Manager), modules.SectorStorage),
|
Override(new(*sectorstorage.Manager), modules.SectorStorage),
|
||||||
Override(new(sectorstorage.Unsealer), From(new(*sectorstorage.Manager))),
|
Override(new(sectorstorage.Unsealer), From(new(*sectorstorage.Manager))),
|
||||||
|
@ -3,6 +3,7 @@ package paths
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/url"
|
"net/url"
|
||||||
gopath "path"
|
gopath "path"
|
||||||
@ -23,6 +24,8 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var errAlreadyLocked = errors.New("already locked")
|
||||||
|
|
||||||
type DBIndex struct {
|
type DBIndex struct {
|
||||||
alerting *alerting.Alerting
|
alerting *alerting.Alerting
|
||||||
pathAlerts map[storiface.ID]alerting.AlertType
|
pathAlerts map[storiface.ID]alerting.AlertType
|
||||||
@ -776,7 +779,7 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac
|
|||||||
// Conditions: No write lock or write lock is stale, No read lock or read lock is stale
|
// Conditions: No write lock or write lock is stale, No read lock or read lock is stale
|
||||||
for _, wft := range write.AllSet() {
|
for _, wft := range write.AllSet() {
|
||||||
if isLocked(lockMap[wft].writeTs) || isLocked(lockMap[wft].readTs) {
|
if isLocked(lockMap[wft].writeTs) || isLocked(lockMap[wft].readTs) {
|
||||||
return false, xerrors.Errorf("cannot acquire writelock for sector %v filetype %d already locked", sector, wft)
|
return false, xerrors.Errorf("cannot acquire writelock for sector %v filetype %d already locked: %w", sector, wft, errAlreadyLocked)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -784,7 +787,7 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac
|
|||||||
// Conditions: No write lock or write lock is stale
|
// Conditions: No write lock or write lock is stale
|
||||||
for _, rft := range read.AllSet() {
|
for _, rft := range read.AllSet() {
|
||||||
if isLocked(lockMap[rft].writeTs) {
|
if isLocked(lockMap[rft].writeTs) {
|
||||||
return false, xerrors.Errorf("cannot acquire read lock for sector %v filetype %d already locked for writing", sector, rft)
|
return false, xerrors.Errorf("cannot acquire read lock for sector %v filetype %d already locked for writing: %w", sector, rft, errAlreadyLocked)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -812,7 +815,6 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac
|
|||||||
AND sector_filetype = ANY($3)`,
|
AND sector_filetype = ANY($3)`,
|
||||||
sector.Miner,
|
sector.Miner,
|
||||||
sector.Number,
|
sector.Number,
|
||||||
|
|
||||||
read.AllSet())
|
read.AllSet())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("acquiring read locks for sector %v fails with err: %v", sector, err)
|
return false, xerrors.Errorf("acquiring read locks for sector %v fails with err: %v", sector, err)
|
||||||
@ -886,7 +888,7 @@ func (dbi *DBIndex) StorageLock(ctx context.Context, sector abi.SectorID, read s
|
|||||||
for {
|
for {
|
||||||
locked, err := dbi.lock(ctx, sector, read, write, lockUuid)
|
locked, err := dbi.lock(ctx, sector, read, write, lockUuid)
|
||||||
// if err is not nil and is not because we cannot acquire lock, retry
|
// if err is not nil and is not because we cannot acquire lock, retry
|
||||||
if err != nil && !strings.Contains(err.Error(), "cannot acquire") {
|
if err != nil && !errors.As(err, &errAlreadyLocked) {
|
||||||
retries--
|
retries--
|
||||||
if retries == 0 {
|
if retries == 0 {
|
||||||
return err
|
return err
|
||||||
|
Loading…
Reference in New Issue
Block a user