lotus/cmd/lotus-sim/simulation/commit_queue.go

200 lines
5.4 KiB
Go

package simulation
import (
"sort"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/actors/policy"
)
// pendingCommitTracker tracks pending commits per-miner for a single epohc.
type pendingCommitTracker map[address.Address]minerPendingCommits
// minerPendingCommits tracks a miner's pending commits during a single epoch (grouped by seal proof type).
type minerPendingCommits map[abi.RegisteredSealProof][]abi.SectorNumber
// finish markes count sectors of the given proof type as "prove-committed".
func (m minerPendingCommits) finish(proof abi.RegisteredSealProof, count int) {
snos := m[proof]
if len(snos) < count {
panic("not enough sector numbers to finish")
} else if len(snos) == count {
delete(m, proof)
} else {
m[proof] = snos[count:]
}
}
// empty returns true if there are no pending commits.
func (m minerPendingCommits) empty() bool {
return len(m) == 0
}
// count returns the number of pending commits.
func (m minerPendingCommits) count() int {
count := 0
for _, snos := range m {
count += len(snos)
}
return count
}
// commitQueue is used to track pending prove-commits.
//
// Miners are processed in round-robin where _all_ commits from a given miner are finished before
// moving on to the next. This is designed to maximize batching.
type commitQueue struct {
minerQueue []address.Address
queue []pendingCommitTracker
offset abi.ChainEpoch
}
// ready returns the number of prove-commits ready to be proven at the current epoch. Useful for logging.
func (q *commitQueue) ready() int {
if len(q.queue) == 0 {
return 0
}
count := 0
for _, pending := range q.queue[0] {
count += pending.count()
}
return count
}
// nextMiner returns the next miner to be proved and the set of pending prove commits for that
// miner. When some number of sectors have successfully been proven, call "finish" so we don't try
// to prove them again.
func (q *commitQueue) nextMiner() (address.Address, minerPendingCommits, bool) {
if len(q.queue) == 0 {
return address.Undef, nil, false
}
next := q.queue[0]
// Go through the queue and find the first non-empty batch.
for len(q.minerQueue) > 0 {
addr := q.minerQueue[0]
q.minerQueue = q.minerQueue[1:]
pending := next[addr]
if !pending.empty() {
return addr, pending, true
}
delete(next, addr)
}
return address.Undef, nil, false
}
// advanceEpoch will advance to the next epoch. If some sectors were left unproven in the current
// epoch, they will be "prepended" into the next epochs sector set.
func (q *commitQueue) advanceEpoch(epoch abi.ChainEpoch) {
if epoch < q.offset {
panic("cannot roll epoch backwards")
}
// Now we "roll forwards", merging each epoch we advance over with the next.
for len(q.queue) > 1 && q.offset < epoch {
curr := q.queue[0]
q.queue[0] = nil
q.queue = q.queue[1:]
q.offset++
next := q.queue[0]
// Cleanup empty entries.
for addr, pending := range curr {
if pending.empty() {
delete(curr, addr)
}
}
// If the entire level is actually empty, just skip to the next one.
if len(curr) == 0 {
continue
}
// Otherwise, merge the next into the current.
for addr, nextPending := range next {
currPending := curr[addr]
if currPending.empty() {
curr[addr] = nextPending
continue
}
for ty, nextSnos := range nextPending {
currSnos := currPending[ty]
if len(currSnos) == 0 {
currPending[ty] = nextSnos
continue
}
currPending[ty] = append(currSnos, nextSnos...)
}
}
// Now replace next with the merged curr.
q.queue[0] = curr
}
q.offset = epoch
if len(q.queue) == 0 {
return
}
next := q.queue[0]
seenMiners := make(map[address.Address]struct{}, len(q.minerQueue))
for _, addr := range q.minerQueue {
seenMiners[addr] = struct{}{}
}
// Find the new miners not already in the queue.
offset := len(q.minerQueue)
for addr, pending := range next {
if pending.empty() {
delete(next, addr)
continue
}
if _, ok := seenMiners[addr]; ok {
continue
}
q.minerQueue = append(q.minerQueue, addr)
}
// Sort the new miners only.
newMiners := q.minerQueue[offset:]
sort.Slice(newMiners, func(i, j int) bool {
// eh, escape analysis should be fine here...
return string(newMiners[i].Bytes()) < string(newMiners[j].Bytes())
})
}
// enquueProveCommit enqueues prove-commit for the given pre-commit for the given miner.
func (q *commitQueue) enqueueProveCommit(addr address.Address, preCommitEpoch abi.ChainEpoch, info miner.SectorPreCommitInfo) error {
// Compute the epoch at which we can start trying to commit.
preCommitDelay := policy.GetPreCommitChallengeDelay()
minCommitEpoch := preCommitEpoch + preCommitDelay + 1
// Figure out the offset in the queue.
i := int(minCommitEpoch - q.offset)
if i < 0 {
i = 0
}
// Expand capacity and insert.
if cap(q.queue) <= i {
pc := make([]pendingCommitTracker, i+1, preCommitDelay*2)
copy(pc, q.queue)
q.queue = pc
} else if len(q.queue) <= i {
q.queue = q.queue[:i+1]
}
tracker := q.queue[i]
if tracker == nil {
tracker = make(pendingCommitTracker)
q.queue[i] = tracker
}
minerPending := tracker[addr]
if minerPending == nil {
minerPending = make(minerPendingCommits)
tracker[addr] = minerPending
}
minerPending[info.SealProof] = append(minerPending[info.SealProof], info.SectorNumber)
return nil
}