2022-06-14 18:03:38 +00:00
package sealer
2021-07-27 03:15:53 +00:00
import (
"context"
2023-03-06 13:30:58 +00:00
"errors"
2022-01-18 14:53:13 +00:00
"math/rand"
2021-07-27 03:15:53 +00:00
"sync"
"time"
2023-05-10 19:43:42 +00:00
"github.com/google/uuid"
2023-03-06 13:46:26 +00:00
"github.com/hashicorp/go-multierror"
2022-06-15 10:06:22 +00:00
"golang.org/x/xerrors"
2022-01-18 10:37:15 +00:00
2023-03-06 13:46:26 +00:00
"github.com/filecoin-project/go-jsonrpc"
2022-01-18 14:53:13 +00:00
"github.com/filecoin-project/go-state-types/abi"
2022-06-14 18:25:52 +00:00
"github.com/filecoin-project/lotus/storage/paths"
2022-06-14 18:03:38 +00:00
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
2022-06-15 10:06:22 +00:00
"github.com/filecoin-project/lotus/storage/sealer/storiface"
2021-07-27 03:15:53 +00:00
)
type poStScheduler struct {
2022-01-18 14:53:13 +00:00
lk sync . RWMutex
2022-05-18 13:47:08 +00:00
workers map [ storiface . WorkerID ] * WorkerHandle
2022-01-18 14:53:13 +00:00
cond * sync . Cond
2021-07-27 03:15:53 +00:00
2022-01-18 14:53:13 +00:00
postType sealtasks . TaskType
2021-07-27 03:15:53 +00:00
}
func newPoStScheduler ( t sealtasks . TaskType ) * poStScheduler {
ps := & poStScheduler {
2022-05-18 13:47:08 +00:00
workers : map [ storiface . WorkerID ] * WorkerHandle { } ,
2022-01-18 14:53:13 +00:00
postType : t ,
2021-07-27 03:15:53 +00:00
}
ps . cond = sync . NewCond ( & ps . lk )
return ps
}
2022-05-18 13:47:08 +00:00
func ( ps * poStScheduler ) MaybeAddWorker ( wid storiface . WorkerID , tasks map [ sealtasks . TaskType ] struct { } , w * WorkerHandle ) bool {
2022-01-14 13:11:04 +00:00
if _ , ok := tasks [ ps . postType ] ; ! ok {
2021-07-27 03:15:53 +00:00
return false
}
ps . lk . Lock ( )
defer ps . lk . Unlock ( )
ps . workers [ wid ] = w
go ps . watch ( wid , w )
ps . cond . Broadcast ( )
return true
}
2022-05-18 13:47:08 +00:00
func ( ps * poStScheduler ) delWorker ( wid storiface . WorkerID ) * WorkerHandle {
2021-07-27 03:15:53 +00:00
ps . lk . Lock ( )
defer ps . lk . Unlock ( )
2022-05-18 13:47:08 +00:00
var w * WorkerHandle = nil
2021-07-27 03:15:53 +00:00
if wh , ok := ps . workers [ wid ] ; ok {
w = wh
delete ( ps . workers , wid )
}
return w
}
func ( ps * poStScheduler ) CanSched ( ctx context . Context ) bool {
ps . lk . RLock ( )
defer ps . lk . RUnlock ( )
if len ( ps . workers ) == 0 {
return false
}
for _ , w := range ps . workers {
2022-05-18 13:47:08 +00:00
if w . Enabled {
2021-07-27 03:15:53 +00:00
return true
}
}
return false
}
2022-01-18 14:53:13 +00:00
func ( ps * poStScheduler ) Schedule ( ctx context . Context , primary bool , spt abi . RegisteredSealProof , work WorkerAction ) error {
2021-07-27 03:15:53 +00:00
ps . lk . Lock ( )
defer ps . lk . Unlock ( )
if len ( ps . workers ) == 0 {
2022-01-21 09:02:00 +00:00
return xerrors . Errorf ( "can't find %s post worker" , ps . postType )
2021-07-27 03:15:53 +00:00
}
// Get workers by resource
2022-01-18 14:53:13 +00:00
canDo , candidates := ps . readyWorkers ( spt )
2021-07-27 03:15:53 +00:00
for ! canDo {
//if primary is true, it must be dispatched to a worker
if primary {
ps . cond . Wait ( )
2022-01-18 14:53:13 +00:00
canDo , candidates = ps . readyWorkers ( spt )
2021-07-27 03:15:53 +00:00
} else {
2022-01-18 14:53:13 +00:00
return xerrors . Errorf ( "can't find %s post worker" , ps . postType )
2021-07-27 03:15:53 +00:00
}
}
2022-01-18 14:53:13 +00:00
defer func ( ) {
if ps . cond != nil {
ps . cond . Broadcast ( )
}
} ( )
2023-03-06 13:30:58 +00:00
var rpcErrs error
2022-01-18 14:53:13 +00:00
2023-03-06 13:30:58 +00:00
for i , selected := range candidates {
worker := ps . workers [ selected . id ]
2021-07-27 03:15:53 +00:00
2023-05-10 19:43:42 +00:00
err := worker . active . withResources ( uuid . UUID { } , selected . id , worker . Info , ps . postType . SealTask ( spt ) , selected . res , & ps . lk , func ( ) error {
2023-03-06 13:30:58 +00:00
ps . lk . Unlock ( )
defer ps . lk . Lock ( )
return work ( ctx , worker . workerRpc )
} )
if err == nil {
return nil
}
// if the error is RPCConnectionError, try another worker, if not, return the error
if ! errors . As ( err , new ( * jsonrpc . RPCConnectionError ) ) {
return err
}
log . Warnw ( "worker RPC connection error, will retry with another candidate if possible" , "error" , err , "worker" , selected . id , "candidate" , i , "candidates" , len ( candidates ) )
rpcErrs = multierror . Append ( rpcErrs , err )
}
return xerrors . Errorf ( "got RPC errors from all workers: %w" , rpcErrs )
2021-07-27 03:15:53 +00:00
}
2022-01-18 14:53:13 +00:00
type candidateWorker struct {
id storiface . WorkerID
res storiface . Resources
}
2021-07-27 03:15:53 +00:00
2022-01-18 14:53:13 +00:00
func ( ps * poStScheduler ) readyWorkers ( spt abi . RegisteredSealProof ) ( bool , [ ] candidateWorker ) {
var accepts [ ] candidateWorker
2022-01-21 09:02:00 +00:00
//if the gpus of the worker are insufficient or it's disabled, it cannot be scheduled
2021-07-27 03:15:53 +00:00
for wid , wr := range ps . workers {
2022-05-18 13:47:08 +00:00
needRes := wr . Info . Resources . ResourceSpec ( spt , ps . postType )
2022-01-18 14:53:13 +00:00
2023-03-06 12:56:23 +00:00
if ! wr . Enabled {
log . Debugf ( "sched: not scheduling on PoSt-worker %s, worker disabled" , wid )
continue
}
2023-05-10 19:43:42 +00:00
if ! wr . active . CanHandleRequest ( uuid . UUID { } , ps . postType . SealTask ( spt ) , needRes , wid , "post-readyWorkers" , wr . Info ) {
2021-07-27 03:15:53 +00:00
continue
}
2022-01-18 14:53:13 +00:00
accepts = append ( accepts , candidateWorker {
id : wid ,
res : needRes ,
} )
2021-07-27 03:15:53 +00:00
}
2022-01-18 14:53:13 +00:00
// todo: round robin or something
rand . Shuffle ( len ( accepts ) , func ( i , j int ) {
accepts [ i ] , accepts [ j ] = accepts [ j ] , accepts [ i ]
2021-07-27 03:15:53 +00:00
} )
2022-01-18 14:53:13 +00:00
return len ( accepts ) != 0 , accepts
2021-07-27 03:15:53 +00:00
}
func ( ps * poStScheduler ) disable ( wid storiface . WorkerID ) {
ps . lk . Lock ( )
defer ps . lk . Unlock ( )
2022-05-18 13:47:08 +00:00
ps . workers [ wid ] . Enabled = false
2021-07-27 03:15:53 +00:00
}
func ( ps * poStScheduler ) enable ( wid storiface . WorkerID ) {
ps . lk . Lock ( )
defer ps . lk . Unlock ( )
2022-05-18 13:47:08 +00:00
ps . workers [ wid ] . Enabled = true
2021-07-27 03:15:53 +00:00
}
2022-05-18 13:47:08 +00:00
func ( ps * poStScheduler ) watch ( wid storiface . WorkerID , worker * WorkerHandle ) {
2022-06-14 18:25:52 +00:00
heartbeatTimer := time . NewTicker ( paths . HeartbeatInterval )
2021-07-27 03:15:53 +00:00
defer heartbeatTimer . Stop ( )
ctx , cancel := context . WithCancel ( context . TODO ( ) )
defer cancel ( )
defer close ( worker . closedMgr )
defer func ( ) {
log . Warnw ( "Worker closing" , "WorkerID" , wid )
ps . delWorker ( wid )
} ( )
for {
2022-10-14 13:58:25 +00:00
select {
case <- heartbeatTimer . C :
case <- worker . closingMgr :
return
}
2022-06-14 18:25:52 +00:00
sctx , scancel := context . WithTimeout ( ctx , paths . HeartbeatInterval / 2 )
2021-07-27 03:15:53 +00:00
curSes , err := worker . workerRpc . Session ( sctx )
scancel ( )
if err != nil {
// Likely temporary error
log . Warnw ( "failed to check worker session" , "error" , err )
ps . disable ( wid )
2022-10-14 13:58:25 +00:00
continue
2021-07-27 03:15:53 +00:00
}
if storiface . WorkerID ( curSes ) != wid {
if curSes != ClosedWorkerID {
// worker restarted
log . Warnw ( "worker session changed (worker restarted?)" , "initial" , wid , "current" , curSes )
}
return
}
ps . enable ( wid )
}
}
2022-05-18 13:47:08 +00:00
func ( ps * poStScheduler ) workerCleanup ( wid storiface . WorkerID , w * WorkerHandle ) {
2021-07-27 03:15:53 +00:00
select {
case <- w . closingMgr :
default :
close ( w . closingMgr )
}
ps . lk . Unlock ( )
select {
case <- w . closedMgr :
case <- time . After ( time . Second ) :
2022-01-14 13:11:04 +00:00
log . Errorf ( "timeout closing worker manager goroutine %s" , wid )
2021-07-27 03:15:53 +00:00
}
ps . lk . Lock ( )
}
func ( ps * poStScheduler ) schedClose ( ) {
ps . lk . Lock ( )
defer ps . lk . Unlock ( )
log . Debugf ( "closing scheduler" )
for i , w := range ps . workers {
ps . workerCleanup ( i , w )
}
}
2022-05-18 13:47:08 +00:00
func ( ps * poStScheduler ) WorkerStats ( ctx context . Context , cb func ( ctx context . Context , wid storiface . WorkerID , worker * WorkerHandle ) ) {
2021-07-27 03:15:53 +00:00
ps . lk . RLock ( )
defer ps . lk . RUnlock ( )
for id , w := range ps . workers {
2022-03-18 20:31:15 +00:00
cb ( ctx , id , w )
2021-07-27 03:15:53 +00:00
}
}