dbindex: Fix heartbeat checks
This commit is contained in:
parent
0244c7df50
commit
725460bc45
@ -187,7 +187,7 @@ retryAttachStorage:
|
|||||||
var urls sql.NullString
|
var urls sql.NullString
|
||||||
var storageId sql.NullString
|
var storageId sql.NullString
|
||||||
err = tx.QueryRow(
|
err = tx.QueryRow(
|
||||||
"Select storage_id, urls FROM storage_path WHERE storage_id = $1", string(si.ID)).Scan(&storageId, &urls)
|
"SELECT storage_id, urls FROM storage_path WHERE storage_id = $1", string(si.ID)).Scan(&storageId, &urls)
|
||||||
if err != nil && !strings.Contains(err.Error(), "no rows in result set") {
|
if err != nil && !strings.Contains(err.Error(), "no rows in result set") {
|
||||||
return false, xerrors.Errorf("storage attach select fails: %v", err)
|
return false, xerrors.Errorf("storage attach select fails: %v", err)
|
||||||
}
|
}
|
||||||
@ -202,7 +202,7 @@ retryAttachStorage:
|
|||||||
currUrls = union(currUrls, si.URLs)
|
currUrls = union(currUrls, si.URLs)
|
||||||
|
|
||||||
_, err = tx.Exec(
|
_, err = tx.Exec(
|
||||||
"UPDATE storage_path set urls=$1, weight=$2, max_storage=$3, can_seal=$4, can_store=$5, groups=$6, allow_to=$7, allow_types=$8, deny_types=$9 WHERE storage_id=$10",
|
"UPDATE storage_path set urls=$1, weight=$2, max_storage=$3, can_seal=$4, can_store=$5, groups=$6, allow_to=$7, allow_types=$8, deny_types=$9, last_heartbeat=NOW() WHERE storage_id=$10",
|
||||||
strings.Join(currUrls, ","),
|
strings.Join(currUrls, ","),
|
||||||
si.Weight,
|
si.Weight,
|
||||||
si.MaxStorage,
|
si.MaxStorage,
|
||||||
@ -223,7 +223,7 @@ retryAttachStorage:
|
|||||||
// Insert storage id
|
// Insert storage id
|
||||||
_, err = tx.Exec(
|
_, err = tx.Exec(
|
||||||
"INSERT INTO storage_path "+
|
"INSERT INTO storage_path "+
|
||||||
"Values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)",
|
"Values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, NOW())",
|
||||||
si.ID,
|
si.ID,
|
||||||
strings.Join(si.URLs, ","),
|
strings.Join(si.URLs, ","),
|
||||||
si.Weight,
|
si.Weight,
|
||||||
@ -238,8 +238,7 @@ retryAttachStorage:
|
|||||||
st.Available,
|
st.Available,
|
||||||
st.FSAvailable,
|
st.FSAvailable,
|
||||||
st.Reserved,
|
st.Reserved,
|
||||||
st.Used,
|
st.Used)
|
||||||
time.Now())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("StorageAttach insert fails: %v", err)
|
return false, xerrors.Errorf("StorageAttach insert fails: %v", err)
|
||||||
}
|
}
|
||||||
@ -331,13 +330,12 @@ func (dbi *DBIndex) StorageReportHealth(ctx context.Context, id storiface.ID, re
|
|||||||
}
|
}
|
||||||
|
|
||||||
_, err = dbi.harmonyDB.Exec(ctx,
|
_, err = dbi.harmonyDB.Exec(ctx,
|
||||||
"UPDATE storage_path set capacity=$1, available=$2, fs_available=$3, reserved=$4, used=$5, last_heartbeat=$6",
|
"UPDATE storage_path set capacity=$1, available=$2, fs_available=$3, reserved=$4, used=$5, last_heartbeat=NOW()",
|
||||||
report.Stat.Capacity,
|
report.Stat.Capacity,
|
||||||
report.Stat.Available,
|
report.Stat.Available,
|
||||||
report.Stat.FSAvailable,
|
report.Stat.FSAvailable,
|
||||||
report.Stat.Reserved,
|
report.Stat.Reserved,
|
||||||
report.Stat.Used,
|
report.Stat.Used)
|
||||||
time.Now().UTC())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("updating storage health in DB fails with err: %v", err)
|
return xerrors.Errorf("updating storage health in DB fails with err: %v", err)
|
||||||
}
|
}
|
||||||
@ -574,9 +572,9 @@ func (dbi *DBIndex) StorageFindSector(ctx context.Context, s abi.SectorID, ft st
|
|||||||
FROM storage_path
|
FROM storage_path
|
||||||
WHERE can_seal=true
|
WHERE can_seal=true
|
||||||
and available >= $1
|
and available >= $1
|
||||||
and NOW()-last_heartbeat < $2
|
and NOW()-($2 * INTERVAL '1 second') < last_heartbeat
|
||||||
and heartbeat_err is null`,
|
and heartbeat_err is null`,
|
||||||
spaceReq, SkippedHeartbeatThresh)
|
spaceReq, SkippedHeartbeatThresh.Seconds())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("Selecting allowfetch storage paths from DB fails err: %v", err)
|
return nil, xerrors.Errorf("Selecting allowfetch storage paths from DB fails err: %v", err)
|
||||||
}
|
}
|
||||||
@ -713,12 +711,12 @@ func (dbi *DBIndex) StorageBestAlloc(ctx context.Context, allocate storiface.Sec
|
|||||||
deny_types
|
deny_types
|
||||||
FROM storage_path
|
FROM storage_path
|
||||||
WHERE available >= $1
|
WHERE available >= $1
|
||||||
and NOW()-last_heartbeat < $2
|
and NOW()-($2 * INTERVAL '1 second') < last_heartbeat
|
||||||
and heartbeat_err is null
|
and heartbeat_err is null
|
||||||
and ($3 and can_seal = TRUE or $4 and can_store = TRUE)
|
and (($3 and can_seal = TRUE) or ($4 and can_store = TRUE))
|
||||||
order by (available::numeric * weight) desc`,
|
order by (available::numeric * weight) desc`,
|
||||||
spaceReq,
|
spaceReq,
|
||||||
SkippedHeartbeatThresh,
|
SkippedHeartbeatThresh.Seconds(),
|
||||||
pathType == storiface.PathSealing,
|
pathType == storiface.PathSealing,
|
||||||
pathType == storiface.PathStorage,
|
pathType == storiface.PathStorage,
|
||||||
)
|
)
|
||||||
|
Loading…
Reference in New Issue
Block a user