lpwinning: adder done

This commit is contained in:
Łukasz Magiera 2023-11-10 18:00:21 +01:00
parent f4d86716ab
commit f30a7db5df
2 changed files with 232 additions and 146 deletions

View File

@ -0,0 +1,24 @@
create table mining_tasks
(
task_id bigint not null
constraint mining_tasks_pk
primary key,
sp_id bigint,
epoch bigint,
constraint mining_tasks_sp_epoch
unique (sp_id, epoch)
);
create table mining_base_block
(
id bigserial not null
constraint mining_base_block_pk
primary key,
task_id bigint not null
constraint mining_base_block_mining_tasks_task_id_fk
references mining_tasks
on delete cascade,
block_cid text not null
constraint mining_base_block_cid_k
unique
);

View File

@ -4,13 +4,17 @@ import (
"context" "context"
"crypto/rand" "crypto/rand"
"encoding/binary" "encoding/binary"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask" "github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/lib/harmony/resources" "github.com/filecoin-project/lotus/lib/harmony/resources"
"github.com/filecoin-project/lotus/lib/promise"
"github.com/filecoin-project/lotus/node/modules/dtypes"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
"os" "golang.org/x/xerrors"
"time" "time"
) )
@ -22,7 +26,10 @@ type WinPostTask struct {
// lastWork holds the last MiningBase we built upon. // lastWork holds the last MiningBase we built upon.
lastWork *MiningBase lastWork *MiningBase
api WinPostAPI api WinPostAPI
actors []dtypes.MinerAddress
mineTF promise.Promise[harmonytask.AddTaskFunc]
} }
type WinPostAPI interface { type WinPostAPI interface {
@ -68,8 +75,7 @@ func (t *WinPostTask) TypeDetails() harmonytask.TaskTypeDetails {
} }
func (t *WinPostTask) Adder(taskFunc harmonytask.AddTaskFunc) { func (t *WinPostTask) Adder(taskFunc harmonytask.AddTaskFunc) {
//TODO implement me t.mineTF.Set(taskFunc)
panic("implement me")
} }
// MiningBase is the tipset on top of which we plan to construct our next block. // MiningBase is the tipset on top of which we plan to construct our next block.
@ -77,13 +83,18 @@ func (t *WinPostTask) Adder(taskFunc harmonytask.AddTaskFunc) {
type MiningBase struct { type MiningBase struct {
TipSet *types.TipSet TipSet *types.TipSet
ComputeTime time.Time ComputeTime time.Time
NullRounds abi.ChainEpoch AddRounds abi.ChainEpoch
}
func (mb MiningBase) epoch() abi.ChainEpoch {
// return the epoch that will result from mining on this base
return mb.TipSet.Height() + mb.AddRounds + 1
} }
func (mb MiningBase) baseTime() time.Time { func (mb MiningBase) baseTime() time.Time {
tsTime := time.Unix(int64(mb.TipSet.MinTimestamp()), 0) tsTime := time.Unix(int64(mb.TipSet.MinTimestamp()), 0)
nullDelay := build.BlockDelaySecs * uint64(mb.NullRounds) roundDelay := build.BlockDelaySecs * uint64(mb.AddRounds+1)
tsTime = tsTime.Add(time.Duration(nullDelay) * time.Second) tsTime = tsTime.Add(time.Duration(roundDelay) * time.Second)
return tsTime return tsTime
} }
@ -108,6 +119,8 @@ func retry1[R any](f func() (R, error)) R {
func (t *WinPostTask) mineBasic(ctx context.Context) { func (t *WinPostTask) mineBasic(ctx context.Context) {
var workBase MiningBase var workBase MiningBase
taskFn := t.mineTF.Val(ctx)
{ {
head := retry1(func() (*types.TipSet, error) { head := retry1(func() (*types.TipSet, error) {
return t.api.ChainHead(ctx) return t.api.ChainHead(ctx)
@ -115,13 +128,26 @@ func (t *WinPostTask) mineBasic(ctx context.Context) {
workBase = MiningBase{ workBase = MiningBase{
TipSet: head, TipSet: head,
NullRounds: 0, AddRounds: 0,
ComputeTime: time.Now(), ComputeTime: time.Now(),
} }
} }
/*
/- T+0 == workBase.baseTime
|
>--------*------*--------[wait until next round]----->
|
|- T+PD == workBase.afterPropDelay+(~1s)
|- Here we acquire the new workBase, and start a new round task
\- Then we loop around, and wait for the next head
time -->
*/
for { for {
// wait for propagation delay // wait for *NEXT* propagation delay
time.Sleep(time.Until(workBase.afterPropDelay())) time.Sleep(time.Until(workBase.afterPropDelay()))
// check current best candidate // check current best candidate
@ -130,7 +156,9 @@ func (t *WinPostTask) mineBasic(ctx context.Context) {
}) })
if workBase.TipSet.Equals(maybeBase) { if workBase.TipSet.Equals(maybeBase) {
workBase.NullRounds++ // workbase didn't change in the new round so we have a null round here
workBase.AddRounds++
log.Debugw("workbase update", "tipset", workBase.TipSet.Cids(), "nulls", workBase.AddRounds, "lastUpdate", time.Since(workBase.ComputeTime), "type", "same-tipset")
} else { } else {
btsw := retry1(func() (types.BigInt, error) { btsw := retry1(func() (types.BigInt, error) {
return t.api.ChainTipSetWeight(ctx, maybeBase.Key()) return t.api.ChainTipSetWeight(ctx, maybeBase.Key())
@ -141,149 +169,182 @@ func (t *WinPostTask) mineBasic(ctx context.Context) {
}) })
if types.BigCmp(btsw, ltsw) <= 0 { if types.BigCmp(btsw, ltsw) <= 0 {
workBase.NullRounds++ // new tipset for some reason has less weight than the old one, assume null round here
// NOTE: the backing node may have reorged, or manually changed head
workBase.AddRounds++
log.Debugw("workbase update", "tipset", workBase.TipSet.Cids(), "nulls", workBase.AddRounds, "lastUpdate", time.Since(workBase.ComputeTime), "type", "prefer-local-weight")
} else { } else {
// new tipset has more weight, so we should mine on it, no null round here
log.Debugw("workbase update", "tipset", workBase.TipSet.Cids(), "nulls", workBase.AddRounds, "lastUpdate", time.Since(workBase.ComputeTime), "type", "prefer-new-tipset")
workBase = MiningBase{ workBase = MiningBase{
TipSet: maybeBase, TipSet: maybeBase,
NullRounds: 0, AddRounds: 0,
ComputeTime: time.Now(), ComputeTime: time.Now(),
} }
} }
} }
} // dispatch mining task
} // (note equivocation prevention is handled by the mining code)
func (t *WinPostTask) mine2(ctx context.Context) { for _, act := range t.actors {
var lastBase MiningBase spID, err := address.IDFromAddress(address.Address(act))
// Start the main mining loop.
for {
// todo handle stop signals?
var base *MiningBase
// Look for the best mining candidate.
for {
prebase, err := t.GetBestMiningCandidate(ctx)
if err != nil { if err != nil {
log.Errorf("failed to get best mining candidate: %s", err) log.Errorf("failed to get spID from address %s: %s", act, err)
time.Sleep(5 * time.Second)
continue continue
} }
// Check if we have a new base or if the current base is still valid. taskFn(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
if base != nil && base.TipSet.Height() == prebase.TipSet.Height() && base.NullRounds == prebase.NullRounds { _, err := tx.Exec(`INSERT INTO mining_tasks (task_id, sp_id, epoch) VALUES ($1, $2, $3)`, id, spID, workBase.epoch())
// We have a valid base. if err != nil {
return false, xerrors.Errorf("inserting mining_tasks: %w", err)
}
for _, c := range workBase.TipSet.Cids() {
_, err = tx.Exec(`INSERT INTO mining_base_block (task_id, block_cid) VALUES ($1, $2)`, id, c)
if err != nil {
return false, xerrors.Errorf("inserting mining base blocks: %w", err)
}
}
return true, nil // no errors, commit the transaction
})
}
}
}
/*
func (t *WinPostTask) mine2(ctx context.Context) {
var lastBase MiningBase
// Start the main mining loop.
for {
// todo handle stop signals?
var base *MiningBase
// Look for the best mining candidate.
for {
prebase, err := t.GetBestMiningCandidate(ctx)
if err != nil {
log.Errorf("failed to get best mining candidate: %s", err)
time.Sleep(5 * time.Second)
continue
}
// Check if we have a new base or if the current base is still valid.
if base != nil && base.TipSet.Height() == prebase.TipSet.Height() && base.AddRounds == prebase.AddRounds {
// We have a valid base.
base = prebase
break
}
// TODO: need to change the orchestration here. the problem is that
// we are waiting *after* we enter this loop and selecta mining
// candidate, which is almost certain to change in multiminer
// tests. Instead, we should block before entering the loop, so
// that when the test 'MineOne' function is triggered, we pull our
// best mining candidate at that time.
// Wait until propagation delay period after block we plan to mine on
{
// if we're mining a block in the past via catch-up/rush mining,
// such as when recovering from a network halt, this sleep will be
// for a negative duration, and therefore **will return
// immediately**.
//
// the result is that we WILL NOT wait, therefore fast-forwarding
// and thus healing the chain by backfilling it with null rounds
// rapidly.
baseTs := prebase.TipSet.MinTimestamp() + build.PropagationDelaySecs
baseT := time.Unix(int64(baseTs), 0)
baseT = baseT.Add(randTimeOffset(time.Second))
time.Sleep(time.Until(baseT))
}
// Ensure the beacon entry is available before finalizing the mining base.
_, err = t.api.StateGetBeaconEntry(ctx, prebase.TipSet.Height()+prebase.AddRounds+1)
if err != nil {
log.Errorf("failed getting beacon entry: %s", err)
time.Sleep(time.Second)
continue
}
base = prebase base = prebase
break
} }
// TODO: need to change the orchestration here. the problem is that // Check for repeated mining candidates and handle sleep for the next round.
// we are waiting *after* we enter this loop and selecta mining if base.TipSet.Equals(lastBase.TipSet) && lastBase.AddRounds == base.AddRounds {
// candidate, which is almost certain to change in multiminer log.Warnf("BestMiningCandidate from the previous round: %s (nulls:%d)", lastBase.TipSet.Cids(), lastBase.AddRounds)
// tests. Instead, we should block before entering the loop, so time.Sleep(time.Duration(build.BlockDelaySecs) * time.Second)
// that when the test 'MineOne' function is triggered, we pull our continue
// best mining candidate at that time.
// Wait until propagation delay period after block we plan to mine on
{
// if we're mining a block in the past via catch-up/rush mining,
// such as when recovering from a network halt, this sleep will be
// for a negative duration, and therefore **will return
// immediately**.
//
// the result is that we WILL NOT wait, therefore fast-forwarding
// and thus healing the chain by backfilling it with null rounds
// rapidly.
baseTs := prebase.TipSet.MinTimestamp() + build.PropagationDelaySecs
baseT := time.Unix(int64(baseTs), 0)
baseT = baseT.Add(randTimeOffset(time.Second))
time.Sleep(time.Until(baseT))
} }
// Ensure the beacon entry is available before finalizing the mining base. // Attempt to mine a block.
_, err = t.api.StateGetBeaconEntry(ctx, prebase.TipSet.Height()+prebase.NullRounds+1) b, err := m.mineOne(ctx, base)
if err != nil { if err != nil {
log.Errorf("failed getting beacon entry: %s", err) log.Errorf("mining block failed: %+v", err)
time.Sleep(time.Second) time.Sleep(time.Second)
continue continue
} }
lastBase = *base
base = prebase // todo figure out this whole bottom section
} // we won't know if we've mined a block here, we just submit a task
// making attempts to mine one
// Check for repeated mining candidates and handle sleep for the next round. // Process the mined block.
if base.TipSet.Equals(lastBase.TipSet) && lastBase.NullRounds == base.NullRounds { if b != nil {
log.Warnf("BestMiningCandidate from the previous round: %s (nulls:%d)", lastBase.TipSet.Cids(), lastBase.NullRounds) btime := time.Unix(int64(b.Header.Timestamp), 0)
time.Sleep(time.Duration(build.BlockDelaySecs) * time.Second) now := build.Clock.Now()
continue // Handle timing for broadcasting the block.
} switch {
case btime == now:
// Attempt to mine a block. // block timestamp is perfectly aligned with time.
b, err := m.mineOne(ctx, base) case btime.After(now):
if err != nil { // Wait until it's time to broadcast the block.
log.Errorf("mining block failed: %+v", err) if !m.niceSleep(build.Clock.Until(btime)) {
time.Sleep(time.Second) log.Warnf("received interrupt while waiting to broadcast block, will shutdown after block is sent out")
continue build.Clock.Sleep(build.Clock.Until(btime))
} }
lastBase = *base default:
// Log if the block was mined in the past.
// todo figure out this whole bottom section log.Warnw("mined block in the past",
// we won't know if we've mined a block here, we just submit a task "block-time", btime, "time", build.Clock.Now(), "difference", build.Clock.Since(btime))
// making attempts to mine one
// Process the mined block.
if b != nil {
btime := time.Unix(int64(b.Header.Timestamp), 0)
now := build.Clock.Now()
// Handle timing for broadcasting the block.
switch {
case btime == now:
// block timestamp is perfectly aligned with time.
case btime.After(now):
// Wait until it's time to broadcast the block.
if !m.niceSleep(build.Clock.Until(btime)) {
log.Warnf("received interrupt while waiting to broadcast block, will shutdown after block is sent out")
build.Clock.Sleep(build.Clock.Until(btime))
}
default:
// Log if the block was mined in the past.
log.Warnw("mined block in the past",
"block-time", btime, "time", build.Clock.Now(), "difference", build.Clock.Since(btime))
}
// Check for slash filter conditions.
if os.Getenv("LOTUS_MINER_NO_SLASHFILTER") != "_yes_i_know_i_can_and_probably_will_lose_all_my_fil_and_power_" && !build.IsNearUpgrade(base.TipSet.Height(), build.UpgradeWatermelonFixHeight) {
witness, fault, err := m.sf.MinedBlock(ctx, b.Header, base.TipSet.Height()+base.NullRounds)
if err != nil {
log.Errorf("<!!> SLASH FILTER ERRORED: %s", err)
// Continue here, because it's _probably_ wiser to not submit this block
continue
} }
if fault { // Check for slash filter conditions.
log.Errorf("<!!> SLASH FILTER DETECTED FAULT due to blocks %s and %s", b.Header.Cid(), witness) if os.Getenv("LOTUS_MINER_NO_SLASHFILTER") != "_yes_i_know_i_can_and_probably_will_lose_all_my_fil_and_power_" && !build.IsNearUpgrade(base.TipSet.Height(), build.UpgradeWatermelonFixHeight) {
continue witness, fault, err := m.sf.MinedBlock(ctx, b.Header, base.TipSet.Height()+base.AddRounds)
if err != nil {
log.Errorf("<!!> SLASH FILTER ERRORED: %s", err)
// Continue here, because it's _probably_ wiser to not submit this block
continue
}
if fault {
log.Errorf("<!!> SLASH FILTER DETECTED FAULT due to blocks %s and %s", b.Header.Cid(), witness)
continue
}
} }
// Submit the newly mined block.
if err := t.api.SyncSubmitBlock(ctx, b); err != nil {
log.Errorf("failed to submit newly mined block: %+v", err)
}
} else {
// If no block was mined, increase the null rounds and wait for the next epoch.
base.AddRounds++
// Calculate the time for the next round.
nextRound := time.Unix(int64(base.TipSet.MinTimestamp()+build.BlockDelaySecs*uint64(base.AddRounds))+int64(build.PropagationDelaySecs), 0)
// Wait for the next round.
time.Sleep(time.Until(nextRound))
} }
// Submit the newly mined block.
if err := t.api.SyncSubmitBlock(ctx, b); err != nil {
log.Errorf("failed to submit newly mined block: %+v", err)
}
} else {
// If no block was mined, increase the null rounds and wait for the next epoch.
base.NullRounds++
// Calculate the time for the next round.
nextRound := time.Unix(int64(base.TipSet.MinTimestamp()+build.BlockDelaySecs*uint64(base.NullRounds))+int64(build.PropagationDelaySecs), 0)
// Wait for the next round.
time.Sleep(time.Until(nextRound))
} }
} }
}
// GetBestMiningCandidate implements the fork choice rule from a miner's // GetBestMiningCandidate implements the fork choice rule from a miner's
// perspective. // perspective.
@ -291,36 +352,37 @@ func (t *WinPostTask) mine2(ctx context.Context) {
// It obtains the current chain head (HEAD), and compares it to the last tipset // It obtains the current chain head (HEAD), and compares it to the last tipset
// we selected as our mining base (LAST). If HEAD's weight is larger than // we selected as our mining base (LAST). If HEAD's weight is larger than
// LAST's weight, it selects HEAD to build on. Else, it selects LAST. // LAST's weight, it selects HEAD to build on. Else, it selects LAST.
func (t *WinPostTask) GetBestMiningCandidate(ctx context.Context) (*MiningBase, error) {
bts, err := t.api.ChainHead(ctx)
if err != nil {
return nil, err
}
if t.lastWork != nil { func (t *WinPostTask) GetBestMiningCandidate(ctx context.Context) (*MiningBase, error) {
if t.lastWork.TipSet.Equals(bts) { bts, err := t.api.ChainHead(ctx)
return t.lastWork, nil
}
btsw, err := t.api.ChainTipSetWeight(ctx, bts.Key())
if err != nil { if err != nil {
return nil, err return nil, err
} }
ltsw, err := t.api.ChainTipSetWeight(ctx, t.lastWork.TipSet.Key())
if err != nil { if t.lastWork != nil {
t.lastWork = nil if t.lastWork.TipSet.Equals(bts) {
return nil, err return t.lastWork, nil
}
btsw, err := t.api.ChainTipSetWeight(ctx, bts.Key())
if err != nil {
return nil, err
}
ltsw, err := t.api.ChainTipSetWeight(ctx, t.lastWork.TipSet.Key())
if err != nil {
t.lastWork = nil
return nil, err
}
if types.BigCmp(btsw, ltsw) <= 0 {
return t.lastWork, nil
}
} }
if types.BigCmp(btsw, ltsw) <= 0 { t.lastWork = &MiningBase{TipSet: bts, ComputeTime: time.Now()}
return t.lastWork, nil return t.lastWork, nil
}
} }
*/
t.lastWork = &MiningBase{TipSet: bts, ComputeTime: time.Now()}
return t.lastWork, nil
}
func randTimeOffset(width time.Duration) time.Duration { func randTimeOffset(width time.Duration) time.Duration {
buf := make([]byte, 8) buf := make([]byte, 8)
rand.Reader.Read(buf) //nolint:errcheck rand.Reader.Read(buf) //nolint:errcheck