change table names and address other comments
This commit is contained in:
parent
169f953de4
commit
9262eb4fc9
@ -1,4 +1,4 @@
|
||||
create table sectorlocation
|
||||
create table sector_location
|
||||
(
|
||||
miner_id bigint not null,
|
||||
sector_num bigint not null,
|
||||
@ -15,10 +15,10 @@ create table sectorlocation
|
||||
|
||||
|
||||
|
||||
create table StorageLocation
|
||||
create table storage_path
|
||||
(
|
||||
"storage_id" varchar not null
|
||||
constraint "StorageLocation_pkey"
|
||||
constraint "storage_path_pkey"
|
||||
primary key,
|
||||
"urls" varchar, -- comma separated list of urls
|
||||
"weight" bigint,
|
||||
|
@ -50,7 +50,7 @@ func (dbi *DBIndex) StorageList(ctx context.Context) (map[storiface.ID][]storifa
|
||||
}
|
||||
|
||||
err := dbi.harmonyDB.Select(ctx, §orEntries,
|
||||
"SELECT stor.storage_id, miner_id, sector_num, sector_filetype, is_primary FROM storagelocation stor LEFT JOIN sectorlocation sec on stor.storage_id=sec.storage_id")
|
||||
"SELECT stor.storage_id, miner_id, sector_num, sector_filetype, is_primary FROM storage_path stor LEFT JOIN sector_location sec on stor.storage_id=sec.storage_id")
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("StorageList DB query fails: %v", err)
|
||||
}
|
||||
@ -183,7 +183,7 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo,
|
||||
var urls sql.NullString
|
||||
var storageId sql.NullString
|
||||
err = dbi.harmonyDB.QueryRow(ctx,
|
||||
"Select storage_id, urls FROM storagelocation WHERE storage_id = $1", string(si.ID)).Scan(&storageId, &urls)
|
||||
"Select storage_id, urls FROM storage_path WHERE storage_id = $1", string(si.ID)).Scan(&storageId, &urls)
|
||||
if err != nil && !strings.Contains(err.Error(), "no rows in result set") {
|
||||
return false, xerrors.Errorf("storage attach select fails: %v", err)
|
||||
}
|
||||
@ -198,7 +198,7 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo,
|
||||
currUrls = union(currUrls, si.URLs)
|
||||
|
||||
_, err = dbi.harmonyDB.Exec(ctx,
|
||||
"UPDATE StorageLocation set urls=$1, weight=$2, max_storage=$3, can_seal=$4, can_store=$5, groups=$6, allow_to=$7, allow_types=$8, deny_types=$9 WHERE storage_id=$10",
|
||||
"UPDATE storage_path set urls=$1, weight=$2, max_storage=$3, can_seal=$4, can_store=$5, groups=$6, allow_to=$7, allow_types=$8, deny_types=$9 WHERE storage_id=$10",
|
||||
strings.Join(currUrls, ","),
|
||||
si.Weight,
|
||||
si.MaxStorage,
|
||||
@ -218,7 +218,7 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo,
|
||||
|
||||
// Insert storage id
|
||||
_, err = dbi.harmonyDB.Exec(ctx,
|
||||
"INSERT INTO StorageLocation "+
|
||||
"INSERT INTO storage_path "+
|
||||
"Values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)",
|
||||
si.ID,
|
||||
strings.Join(si.URLs, ","),
|
||||
@ -254,7 +254,7 @@ func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url stri
|
||||
// if this is only path url for this storage path, drop storage path and sector decls which have this as a storage path
|
||||
|
||||
var qUrls string
|
||||
err := dbi.harmonyDB.QueryRow(ctx, "SELECT COALESCE(urls,'') FROM storagelocation WHERE storage_id=$1", string(id)).Scan(&qUrls)
|
||||
err := dbi.harmonyDB.QueryRow(ctx, "SELECT COALESCE(urls,'') FROM storage_path WHERE storage_id=$1", string(id)).Scan(&qUrls)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -274,7 +274,7 @@ func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url stri
|
||||
|
||||
if len(modUrls) > 0 {
|
||||
newUrls := strings.Join(modUrls, ",")
|
||||
_, err := dbi.harmonyDB.Exec(ctx, "UPDATE storagelocation set urls=$1 WHERE storage_id=$2", newUrls, id)
|
||||
_, err := dbi.harmonyDB.Exec(ctx, "UPDATE storage_path set urls=$1 WHERE storage_id=$2", newUrls, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -284,13 +284,13 @@ func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url stri
|
||||
// Single transaction to drop storage path and sector decls which have this as a storage path
|
||||
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
||||
// Drop storage path completely
|
||||
_, err = dbi.harmonyDB.Exec(ctx, "DELETE FROM storagelocation WHERE storage_id=$1", id)
|
||||
_, err = dbi.harmonyDB.Exec(ctx, "DELETE FROM storage_path WHERE storage_id=$1", id)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Drop all sectors entries which use this storage path
|
||||
_, err = dbi.harmonyDB.Exec(ctx, "DELETE FROM sectorlocation WHERE storage_id=$1", id)
|
||||
_, err = dbi.harmonyDB.Exec(ctx, "DELETE FROM sector_location WHERE storage_id=$1", id)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@ -309,13 +309,13 @@ func (dbi *DBIndex) StorageReportHealth(ctx context.Context, id storiface.ID, re
|
||||
|
||||
var canSeal, canStore bool
|
||||
err := dbi.harmonyDB.QueryRow(ctx,
|
||||
"SELECT can_seal, can_store FROM storagelocation WHERE storage_id=$1", id).Scan(&canSeal, &canStore)
|
||||
"SELECT can_seal, can_store FROM storage_path WHERE storage_id=$1", id).Scan(&canSeal, &canStore)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("Querying for storage id %s fails with err %v", id, err)
|
||||
}
|
||||
|
||||
_, err = dbi.harmonyDB.Exec(ctx,
|
||||
"UPDATE storagelocation set capacity=$1, available=$2, fs_available=$3, reserved=$4, used=$5, last_heartbeat=$6",
|
||||
"UPDATE storage_path set capacity=$1, available=$2, fs_available=$3, reserved=$4, used=$5, last_heartbeat=$6",
|
||||
report.Stat.Capacity,
|
||||
report.Stat.Available,
|
||||
report.Stat.FSAvailable,
|
||||
@ -373,7 +373,7 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac
|
||||
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
||||
var currPrimary sql.NullBool
|
||||
err = dbi.harmonyDB.QueryRow(ctx,
|
||||
"SELECT is_primary FROM SectorLocation WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4",
|
||||
"SELECT is_primary FROM sector_location WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4",
|
||||
uint64(s.Miner), uint64(s.Number), int(ft), string(storageID)).Scan(&currPrimary)
|
||||
if err != nil && !strings.Contains(err.Error(), "no rows in result set") {
|
||||
return false, xerrors.Errorf("DB SELECT fails: %v", err)
|
||||
@ -383,7 +383,7 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac
|
||||
if currPrimary.Valid {
|
||||
if !currPrimary.Bool && primary {
|
||||
_, err = dbi.harmonyDB.Exec(ctx,
|
||||
"UPDATE SectorLocation set is_primary = TRUE WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4",
|
||||
"UPDATE sector_location set is_primary = TRUE WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4",
|
||||
s.Miner, s.Number, ft, storageID)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("DB update fails: %v", err)
|
||||
@ -393,7 +393,7 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac
|
||||
}
|
||||
} else {
|
||||
_, err = dbi.harmonyDB.Exec(ctx,
|
||||
"INSERT INTO SectorLocation "+
|
||||
"INSERT INTO sector_location "+
|
||||
"values($1, $2, $3, $4, $5)",
|
||||
s.Miner, s.Number, ft, storageID, primary)
|
||||
if err != nil {
|
||||
@ -417,7 +417,7 @@ func (dbi *DBIndex) StorageDropSector(ctx context.Context, storageID storiface.I
|
||||
}
|
||||
|
||||
_, err := dbi.harmonyDB.Exec(ctx,
|
||||
"DELETE FROM sectorlocation WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4",
|
||||
"DELETE FROM sector_location WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4",
|
||||
int(s.Miner), int(s.Number), int(ft), string(storageID))
|
||||
if err != nil {
|
||||
return xerrors.Errorf("StorageDropSector DELETE query fails: %v", err)
|
||||
@ -464,8 +464,8 @@ func (dbi *DBIndex) StorageFindSector(ctx context.Context, s abi.SectorID, ft st
|
||||
allow_to,
|
||||
allow_types,
|
||||
deny_types
|
||||
FROM sectorlocation sec
|
||||
JOIN storagelocation stor ON sec.storage_id = stor.storage_id
|
||||
FROM sector_location sec
|
||||
JOIN storage_path stor ON sec.storage_id = stor.storage_id
|
||||
WHERE sec.miner_id = $1
|
||||
AND sec.sector_num = $2
|
||||
AND sec.sector_filetype = ANY($3)
|
||||
@ -548,7 +548,7 @@ func (dbi *DBIndex) StorageFindSector(ctx context.Context, s abi.SectorID, ft st
|
||||
groups,
|
||||
allow_types,
|
||||
deny_types
|
||||
FROM storagelocation
|
||||
FROM storage_path
|
||||
WHERE can_seal=true
|
||||
and available >= $1
|
||||
and NOW()-last_heartbeat < $2
|
||||
@ -629,7 +629,7 @@ func (dbi *DBIndex) StorageInfo(ctx context.Context, id storiface.ID) (storiface
|
||||
|
||||
err := dbi.harmonyDB.Select(ctx, &qResults,
|
||||
"SELECT urls, weight, max_storage, can_seal, can_store, groups, allow_to, allow_types, deny_types "+
|
||||
"FROM StorageLocation WHERE storage_id=$1", string(id))
|
||||
"FROM storage_path WHERE storage_id=$1", string(id))
|
||||
if err != nil {
|
||||
return storiface.StorageInfo{}, xerrors.Errorf("StorageInfo query fails: %v", err)
|
||||
}
|
||||
@ -664,13 +664,6 @@ func (dbi *DBIndex) StorageBestAlloc(ctx context.Context, allocate storiface.Sec
|
||||
return nil, xerrors.Errorf("estimating required space: %w", err)
|
||||
}
|
||||
|
||||
var checkString string
|
||||
if pathType == storiface.PathSealing {
|
||||
checkString = "can_seal = TRUE"
|
||||
} else if pathType == storiface.PathStorage {
|
||||
checkString = "can_store = TRUE"
|
||||
}
|
||||
|
||||
var rows []struct {
|
||||
StorageId string
|
||||
Urls string
|
||||
@ -684,7 +677,8 @@ func (dbi *DBIndex) StorageBestAlloc(ctx context.Context, allocate storiface.Sec
|
||||
DenyTypes string
|
||||
}
|
||||
|
||||
sql := fmt.Sprintf(`SELECT storage_id,
|
||||
err = dbi.harmonyDB.Select(ctx, &rows,
|
||||
`SELECT storage_id,
|
||||
urls,
|
||||
weight,
|
||||
max_storage,
|
||||
@ -694,15 +688,19 @@ func (dbi *DBIndex) StorageBestAlloc(ctx context.Context, allocate storiface.Sec
|
||||
allow_to,
|
||||
allow_types,
|
||||
deny_types
|
||||
FROM storagelocation
|
||||
WHERE %s and available >= $1
|
||||
FROM storage_path
|
||||
WHERE available >= $1
|
||||
and NOW()-last_heartbeat < $2
|
||||
and heartbeat_err is null
|
||||
order by (available::numeric * weight) desc`, checkString)
|
||||
err = dbi.harmonyDB.Select(ctx, &rows,
|
||||
sql, spaceReq, SkippedHeartbeatThresh)
|
||||
and ($3 and can_seal = TRUE or $4 and can_store = TRUE)
|
||||
order by (available::numeric * weight) desc`,
|
||||
spaceReq,
|
||||
SkippedHeartbeatThresh,
|
||||
pathType == storiface.PathSealing,
|
||||
pathType == storiface.PathStorage,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("Querying for best storage sectors fails with sql: %s and err %w: ", sql, err)
|
||||
return nil, xerrors.Errorf("Querying for best storage sectors fails with err %w: ", err)
|
||||
}
|
||||
|
||||
var result []storiface.StorageInfo
|
||||
@ -751,7 +749,7 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac
|
||||
fts := (read | write).AllSet()
|
||||
err = dbi.harmonyDB.Select(ctx, &rows,
|
||||
`SELECT sector_filetype, read_ts, read_refs, write_ts
|
||||
FROM sectorlocation
|
||||
FROM sector_location
|
||||
WHERE miner_id=$1
|
||||
AND sector_num=$2
|
||||
AND sector_filetype = ANY($3)`,
|
||||
@ -792,7 +790,7 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac
|
||||
|
||||
// Acquire write locks
|
||||
_, err = dbi.harmonyDB.Exec(ctx,
|
||||
`UPDATE sectorlocation
|
||||
`UPDATE sector_location
|
||||
SET write_ts = NOW(), write_lock_owner = $1
|
||||
WHERE miner_id=$2
|
||||
AND sector_num=$3
|
||||
@ -807,7 +805,7 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac
|
||||
|
||||
// Acquire read locks
|
||||
_, err = dbi.harmonyDB.Exec(ctx,
|
||||
`UPDATE sectorlocation
|
||||
`UPDATE sector_location
|
||||
SET read_ts = NOW(), read_refs = read_refs + 1
|
||||
WHERE miner_id=$1
|
||||
AND sector_num=$2
|
||||
@ -840,7 +838,7 @@ func (dbi *DBIndex) unlock(ctx context.Context, sector abi.SectorID, read storif
|
||||
|
||||
// Relinquish write locks
|
||||
_, err := dbi.harmonyDB.Exec(ctx,
|
||||
`UPDATE sectorlocation
|
||||
`UPDATE sector_location
|
||||
SET write_ts = NULL, write_lock_owner = NULL
|
||||
WHERE miner_id=$2
|
||||
AND sector_num=$3
|
||||
@ -857,7 +855,7 @@ func (dbi *DBIndex) unlock(ctx context.Context, sector abi.SectorID, read storif
|
||||
|
||||
// Relinquish read locks
|
||||
_, err = dbi.harmonyDB.Exec(ctx,
|
||||
`UPDATE sectorlocation
|
||||
`UPDATE sector_location
|
||||
SET read_refs = read_refs-1,
|
||||
read_ts = CASE WHEN read_refs - 1 = 0 THEN NULL ELSE read_ts END
|
||||
WHERE miner_id=$1
|
||||
@ -894,9 +892,10 @@ func (dbi *DBIndex) StorageLock(ctx context.Context, sector abi.SectorID, read s
|
||||
return err
|
||||
}
|
||||
|
||||
bgCtx := context.Background()
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
_, err := dbi.unlock(ctx, sector, read, write, lockUuid)
|
||||
_, err := dbi.unlock(bgCtx, sector, read, write, lockUuid)
|
||||
if err != nil {
|
||||
log.Errorf("unlocking sector %v for filetypes: read=%d, write=%d, fails with err: %v", sector, read, write, err)
|
||||
}
|
||||
@ -938,7 +937,7 @@ func (dbi *DBIndex) StorageGetLocks(ctx context.Context) (storiface.SectorLocks,
|
||||
}
|
||||
|
||||
err := dbi.harmonyDB.Select(ctx, &rows,
|
||||
"SELECT miner_id, sector_num, sector_filetype, read_ts, read_refs, write_ts FROM sectorlocation")
|
||||
"SELECT miner_id, sector_num, sector_filetype, read_ts, read_refs, write_ts FROM sector_location")
|
||||
if err != nil {
|
||||
return storiface.SectorLocks{}, err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user