211 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			211 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package storage
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"time"
 | |
| 
 | |
| 	"golang.org/x/xerrors"
 | |
| 
 | |
| 	"github.com/filecoin-project/go-address"
 | |
| 	"github.com/filecoin-project/go-state-types/abi"
 | |
| 	"github.com/filecoin-project/go-state-types/dline"
 | |
| 	"github.com/filecoin-project/specs-storage/storage"
 | |
| 
 | |
| 	"github.com/filecoin-project/lotus/api"
 | |
| 	"github.com/filecoin-project/lotus/build"
 | |
| 	"github.com/filecoin-project/lotus/chain/store"
 | |
| 	"github.com/filecoin-project/lotus/chain/types"
 | |
| 	sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
 | |
| 	"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
 | |
| 	"github.com/filecoin-project/lotus/journal"
 | |
| 	"github.com/filecoin-project/lotus/node/config"
 | |
| 
 | |
| 	"go.opencensus.io/trace"
 | |
| )
 | |
| 
 | |
| // WindowPoStScheduler is the coordinator for WindowPoSt submissions, fault
 | |
| // declaration, and recovery declarations. It watches the chain for reverts and
 | |
| // applies, and schedules/run those processes as partition deadlines arrive.
 | |
| //
 | |
| // WindowPoStScheduler watches the chain though the changeHandler, which in turn
 | |
| // turn calls the scheduler when the time arrives to do work.
 | |
| type WindowPoStScheduler struct {
 | |
| 	api              fullNodeFilteredAPI
 | |
| 	feeCfg           config.MinerFeeConfig
 | |
| 	addrSel          *AddressSelector
 | |
| 	prover           storage.Prover
 | |
| 	verifier         ffiwrapper.Verifier
 | |
| 	faultTracker     sectorstorage.FaultTracker
 | |
| 	proofType        abi.RegisteredPoStProof
 | |
| 	partitionSectors uint64
 | |
| 	ch               *changeHandler
 | |
| 
 | |
| 	actor address.Address
 | |
| 
 | |
| 	evtTypes [4]journal.EventType
 | |
| 	journal  journal.Journal
 | |
| 
 | |
| 	// failed abi.ChainEpoch // eps
 | |
| 	// failLk sync.Mutex
 | |
| }
 | |
| 
 | |
| // NewWindowedPoStScheduler creates a new WindowPoStScheduler scheduler.
 | |
| func NewWindowedPoStScheduler(api fullNodeFilteredAPI,
 | |
| 	cfg config.MinerFeeConfig,
 | |
| 	as *AddressSelector,
 | |
| 	sp storage.Prover,
 | |
| 	verif ffiwrapper.Verifier,
 | |
| 	ft sectorstorage.FaultTracker,
 | |
| 	j journal.Journal,
 | |
| 	actor address.Address) (*WindowPoStScheduler, error) {
 | |
| 	mi, err := api.StateMinerInfo(context.TODO(), actor, types.EmptyTSK)
 | |
| 	if err != nil {
 | |
| 		return nil, xerrors.Errorf("getting sector size: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	return &WindowPoStScheduler{
 | |
| 		api:              api,
 | |
| 		feeCfg:           cfg,
 | |
| 		addrSel:          as,
 | |
| 		prover:           sp,
 | |
| 		verifier:         verif,
 | |
| 		faultTracker:     ft,
 | |
| 		proofType:        mi.WindowPoStProofType,
 | |
| 		partitionSectors: mi.WindowPoStPartitionSectors,
 | |
| 
 | |
| 		actor: actor,
 | |
| 		evtTypes: [...]journal.EventType{
 | |
| 			evtTypeWdPoStScheduler:  j.RegisterEventType("wdpost", "scheduler"),
 | |
| 			evtTypeWdPoStProofs:     j.RegisterEventType("wdpost", "proofs_processed"),
 | |
| 			evtTypeWdPoStRecoveries: j.RegisterEventType("wdpost", "recoveries_processed"),
 | |
| 			evtTypeWdPoStFaults:     j.RegisterEventType("wdpost", "faults_processed"),
 | |
| 		},
 | |
| 		journal: j,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func (s *WindowPoStScheduler) Run(ctx context.Context) {
 | |
| 	// Initialize change handler.
 | |
| 
 | |
| 	// callbacks is a union of the fullNodeFilteredAPI and ourselves.
 | |
| 	callbacks := struct {
 | |
| 		fullNodeFilteredAPI
 | |
| 		*WindowPoStScheduler
 | |
| 	}{s.api, s}
 | |
| 
 | |
| 	s.ch = newChangeHandler(callbacks, s.actor)
 | |
| 	defer s.ch.shutdown()
 | |
| 	s.ch.start()
 | |
| 
 | |
| 	var (
 | |
| 		notifs <-chan []*api.HeadChange
 | |
| 		err    error
 | |
| 		gotCur bool
 | |
| 	)
 | |
| 
 | |
| 	// not fine to panic after this point
 | |
| 	for {
 | |
| 		if notifs == nil {
 | |
| 			notifs, err = s.api.ChainNotify(ctx)
 | |
| 			if err != nil {
 | |
| 				log.Errorf("ChainNotify error: %+v", err)
 | |
| 
 | |
| 				build.Clock.Sleep(10 * time.Second)
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			gotCur = false
 | |
| 			log.Info("restarting window post scheduler")
 | |
| 		}
 | |
| 
 | |
| 		select {
 | |
| 		case changes, ok := <-notifs:
 | |
| 			if !ok {
 | |
| 				log.Warn("window post scheduler notifs channel closed")
 | |
| 				notifs = nil
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			if !gotCur {
 | |
| 				if len(changes) != 1 {
 | |
| 					log.Errorf("expected first notif to have len = 1")
 | |
| 					continue
 | |
| 				}
 | |
| 				chg := changes[0]
 | |
| 				if chg.Type != store.HCCurrent {
 | |
| 					log.Errorf("expected first notif to tell current ts")
 | |
| 					continue
 | |
| 				}
 | |
| 
 | |
| 				ctx, span := trace.StartSpan(ctx, "WindowPoStScheduler.headChange")
 | |
| 
 | |
| 				s.update(ctx, nil, chg.Val)
 | |
| 
 | |
| 				span.End()
 | |
| 				gotCur = true
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			ctx, span := trace.StartSpan(ctx, "WindowPoStScheduler.headChange")
 | |
| 
 | |
| 			var lowest, highest *types.TipSet = nil, nil
 | |
| 
 | |
| 			for _, change := range changes {
 | |
| 				if change.Val == nil {
 | |
| 					log.Errorf("change.Val was nil")
 | |
| 				}
 | |
| 				switch change.Type {
 | |
| 				case store.HCRevert:
 | |
| 					lowest = change.Val
 | |
| 				case store.HCApply:
 | |
| 					highest = change.Val
 | |
| 				}
 | |
| 			}
 | |
| 
 | |
| 			s.update(ctx, lowest, highest)
 | |
| 
 | |
| 			span.End()
 | |
| 		case <-ctx.Done():
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (s *WindowPoStScheduler) update(ctx context.Context, revert, apply *types.TipSet) {
 | |
| 	if apply == nil {
 | |
| 		log.Error("no new tipset in window post WindowPoStScheduler.update")
 | |
| 		return
 | |
| 	}
 | |
| 	err := s.ch.update(ctx, revert, apply)
 | |
| 	if err != nil {
 | |
| 		log.Errorf("handling head updates in window post sched: %+v", err)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // onAbort is called when generating proofs or submitting proofs is aborted
 | |
| func (s *WindowPoStScheduler) onAbort(ts *types.TipSet, deadline *dline.Info) {
 | |
| 	s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
 | |
| 		c := evtCommon{}
 | |
| 		if ts != nil {
 | |
| 			c.Deadline = deadline
 | |
| 			c.Height = ts.Height()
 | |
| 			c.TipSet = ts.Cids()
 | |
| 		}
 | |
| 		return WdPoStSchedulerEvt{
 | |
| 			evtCommon: c,
 | |
| 			State:     SchedulerStateAborted,
 | |
| 		}
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func (s *WindowPoStScheduler) getEvtCommon(err error) evtCommon {
 | |
| 	c := evtCommon{Error: err}
 | |
| 	currentTS, currentDeadline := s.ch.currentTSDI()
 | |
| 	if currentTS != nil {
 | |
| 		c.Deadline = currentDeadline
 | |
| 		c.Height = currentTS.Height()
 | |
| 		c.TipSet = currentTS.Cids()
 | |
| 	}
 | |
| 	return c
 | |
| }
 |