block producers should have only one address

This commit is contained in:
Jeromy 2020-05-05 12:01:44 -07:00
parent 299a017ef5
commit 8b58f22c2e
4 changed files with 63 additions and 120 deletions

View File

@ -456,15 +456,15 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode,
return err return err
} }
m := miner.NewMiner(api, epp, beacon) m := miner.NewMiner(api, epp, beacon, a)
{ {
if err := m.Register(a); err != nil { if err := m.Start(ctx); err != nil {
return xerrors.Errorf("failed to start up genesis miner: %w", err) return xerrors.Errorf("failed to start up genesis miner: %w", err)
} }
cerr := configureStorageMiner(ctx, api, a, peerid, gasPrice) cerr := configureStorageMiner(ctx, api, a, peerid, gasPrice)
if err := m.Unregister(ctx, a); err != nil { if err := m.Stop(ctx); err != nil {
log.Error("failed to shut down storage miner: ", err) log.Error("failed to shut down storage miner: ", err)
} }

View File

@ -29,16 +29,17 @@ var log = logging.Logger("miner")
// returns a callback reporting whether we mined a blocks in this round // returns a callback reporting whether we mined a blocks in this round
type waitFunc func(ctx context.Context, baseTime uint64) (func(bool), error) type waitFunc func(ctx context.Context, baseTime uint64) (func(bool), error)
func NewMiner(api api.FullNode, epp gen.WinningPoStProver, beacon beacon.RandomBeacon) *Miner { func NewMiner(api api.FullNode, epp gen.WinningPoStProver, beacon beacon.RandomBeacon, addr address.Address) *Miner {
arc, err := lru.NewARC(10000) arc, err := lru.NewARC(10000)
if err != nil { if err != nil {
panic(err) panic(err)
} }
return &Miner{ return &Miner{
api: api, api: api,
epp: epp, epp: epp,
beacon: beacon, beacon: beacon,
address: addr,
waitFunc: func(ctx context.Context, baseTime uint64) (func(bool), error) { waitFunc: func(ctx context.Context, baseTime uint64) (func(bool), error) {
// Wait around for half the block time in case other parents come in // Wait around for half the block time in case other parents come in
deadline := baseTime + build.PropagationDelay deadline := baseTime + build.PropagationDelay
@ -56,10 +57,10 @@ type Miner struct {
epp gen.WinningPoStProver epp gen.WinningPoStProver
beacon beacon.RandomBeacon beacon beacon.RandomBeacon
lk sync.Mutex lk sync.Mutex
addresses []address.Address address address.Address
stop chan struct{} stop chan struct{}
stopping chan struct{} stopping chan struct{}
waitFunc waitFunc waitFunc waitFunc
@ -68,74 +69,38 @@ type Miner struct {
minedBlockHeights *lru.ARCCache minedBlockHeights *lru.ARCCache
} }
func (m *Miner) Addresses() ([]address.Address, error) { func (m *Miner) Address() address.Address {
m.lk.Lock() m.lk.Lock()
defer m.lk.Unlock() defer m.lk.Unlock()
out := make([]address.Address, len(m.addresses)) return m.address
copy(out, m.addresses)
return out, nil
} }
func (m *Miner) Register(addr address.Address) error { func (m *Miner) Start(ctx context.Context) error {
m.lk.Lock() m.lk.Lock()
defer m.lk.Unlock() defer m.lk.Unlock()
if m.stop != nil {
if len(m.addresses) > 0 { return fmt.Errorf("miner already started")
for _, a := range m.addresses {
if a == addr {
log.Warnf("miner.Register called more than once for actor '%s'", addr)
return xerrors.Errorf("miner.Register called more than once for actor '%s'", addr)
}
}
} }
m.stop = make(chan struct{})
m.addresses = append(m.addresses, addr) go m.mine(context.TODO())
if len(m.addresses) == 1 {
m.stop = make(chan struct{})
go m.mine(context.TODO())
}
return nil return nil
} }
func (m *Miner) Unregister(ctx context.Context, addr address.Address) error { func (m *Miner) Stop(ctx context.Context) error {
m.lk.Lock() m.lk.Lock()
defer m.lk.Unlock() defer m.lk.Unlock()
if len(m.addresses) == 0 {
return xerrors.New("no addresses registered") m.stopping = make(chan struct{})
stopping := m.stopping
close(m.stop)
select {
case <-stopping:
return nil
case <-ctx.Done():
return ctx.Err()
} }
idx := -1
for i, a := range m.addresses {
if a == addr {
idx = i
break
}
}
if idx == -1 {
return xerrors.New("unregister: address not found")
}
m.addresses[idx] = m.addresses[len(m.addresses)-1]
m.addresses = m.addresses[:len(m.addresses)-1]
// Unregistering last address, stop mining first
if len(m.addresses) == 0 && m.stop != nil {
m.stopping = make(chan struct{})
stopping := m.stopping
close(m.stop)
select {
case <-stopping:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
} }
func (m *Miner) niceSleep(d time.Duration) bool { func (m *Miner) niceSleep(d time.Duration) bool {
@ -153,7 +118,6 @@ func (m *Miner) mine(ctx context.Context) {
var lastBase MiningBase var lastBase MiningBase
eventLoop:
for { for {
select { select {
case <-m.stop: case <-m.stop:
@ -166,10 +130,6 @@ eventLoop:
default: default:
} }
m.lk.Lock()
addrs := m.addresses
m.lk.Unlock()
prebase, err := m.GetBestMiningCandidate(ctx) prebase, err := m.GetBestMiningCandidate(ctx)
if err != nil { if err != nil {
log.Errorf("failed to get best mining candidate: %s", err) log.Errorf("failed to get best mining candidate: %s", err)
@ -196,23 +156,16 @@ eventLoop:
} }
lastBase = *base lastBase = *base
blks := make([]*types.BlockMsg, 0) b, err := m.mineOne(ctx, base)
if err != nil {
for _, addr := range addrs { log.Errorf("mining block failed: %+v", err)
b, err := m.mineOne(ctx, addr, base) continue
if err != nil {
log.Errorf("mining block failed: %+v", err)
continue
}
if b != nil {
blks = append(blks, b)
}
} }
onDone(len(blks) != 0) onDone(b != nil)
if len(blks) != 0 { if b != nil {
btime := time.Unix(int64(blks[0].Header.Timestamp), 0) btime := time.Unix(int64(b.Header.Timestamp), 0)
if time.Now().Before(btime) { if time.Now().Before(btime) {
if !m.niceSleep(time.Until(btime)) { if !m.niceSleep(time.Until(btime)) {
log.Warnf("received interrupt while waiting to broadcast block, will shutdown after block is sent out") log.Warnf("received interrupt while waiting to broadcast block, will shutdown after block is sent out")
@ -223,30 +176,19 @@ eventLoop:
"time", time.Now(), "duration", time.Since(btime)) "time", time.Now(), "duration", time.Since(btime))
} }
mWon := make(map[address.Address]struct{}) // TODO: this code was written to handle creating blocks for multiple miners.
for _, b := range blks { // However, we don't use that, and we probably never will. So even though this code will
_, notOk := mWon[b.Header.Miner] // never see different miners, i'm going to handle the caching as if it was going to.
if notOk { // We can clean it up later when we remove all the multiple miner logic.
log.Errorw("2 blocks for the same miner. Throwing hands in the air. Report this. It is important.", "blocks", blks) blkKey := fmt.Sprintf("%s-%d", b.Header.Miner, b.Header.Height)
continue eventLoop if _, ok := m.minedBlockHeights.Get(blkKey); ok {
} log.Warnw("Created a block at the same height as another block we've created", "height", b.Header.Height, "miner", b.Header.Miner, "parents", b.Header.Parents)
mWon[b.Header.Miner] = struct{}{} continue
} }
for _, b := range blks {
// TODO: this code was written to handle creating blocks for multiple miners.
// However, we don't use that, and we probably never will. So even though this code will
// never see different miners, i'm going to handle the caching as if it was going to.
// We can clean it up later when we remove all the multiple miner logic.
blkKey := fmt.Sprintf("%s-%d", b.Header.Miner, b.Header.Height)
if _, ok := m.minedBlockHeights.Get(blkKey); ok {
log.Warnw("Created a block at the same height as another block we've created", "height", b.Header.Height, "miner", b.Header.Miner, "parents", b.Header.Parents)
continue
}
m.minedBlockHeights.Add(blkKey, true) m.minedBlockHeights.Add(blkKey, true)
if err := m.api.SyncSubmitBlock(ctx, b); err != nil { if err := m.api.SyncSubmitBlock(ctx, b); err != nil {
log.Errorf("failed to submit newly mined block: %s", err) log.Errorf("failed to submit newly mined block: %s", err)
}
} }
} else { } else {
nextRound := time.Unix(int64(base.TipSet.MinTimestamp()+uint64(build.BlockDelay*base.NullRounds)), 0) nextRound := time.Unix(int64(base.TipSet.MinTimestamp()+uint64(build.BlockDelay*base.NullRounds)), 0)
@ -307,13 +249,13 @@ func (m *Miner) hasPower(ctx context.Context, addr address.Address, ts *types.Ti
return !power.MinerPower.QualityAdjPower.Equals(types.NewInt(0)), nil return !power.MinerPower.QualityAdjPower.Equals(types.NewInt(0)), nil
} }
func (m *Miner) mineOne(ctx context.Context, addr address.Address, base *MiningBase) (*types.BlockMsg, error) { func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, error) {
log.Debugw("attempting to mine a block", "tipset", types.LogCids(base.TipSet.Cids())) log.Debugw("attempting to mine a block", "tipset", types.LogCids(base.TipSet.Cids()))
start := time.Now() start := time.Now()
round := base.TipSet.Height() + base.NullRounds + 1 round := base.TipSet.Height() + base.NullRounds + 1
mbi, err := m.api.MinerGetBaseInfo(ctx, addr, round, base.TipSet.Key()) mbi, err := m.api.MinerGetBaseInfo(ctx, m.address, round, base.TipSet.Key())
if err != nil { if err != nil {
return nil, xerrors.Errorf("failed to get mining base info: %w", err) return nil, xerrors.Errorf("failed to get mining base info: %w", err)
} }
@ -329,7 +271,7 @@ func (m *Miner) mineOne(ctx context.Context, addr address.Address, base *MiningB
return nil, xerrors.Errorf("get beacon entries failed: %w", err) return nil, xerrors.Errorf("get beacon entries failed: %w", err)
} }
hasPower, err := m.hasPower(ctx, addr, base.TipSet) hasPower, err := m.hasPower(ctx, m.address, base.TipSet)
if err != nil { if err != nil {
return nil, xerrors.Errorf("checking if miner is slashed: %w", err) return nil, xerrors.Errorf("checking if miner is slashed: %w", err)
} }
@ -346,12 +288,12 @@ func (m *Miner) mineOne(ctx context.Context, addr address.Address, base *MiningB
rbase = bvals[len(bvals)-1] rbase = bvals[len(bvals)-1]
} }
ticket, err := m.computeTicket(ctx, addr, &rbase, base, len(bvals) > 0) ticket, err := m.computeTicket(ctx, &rbase, base, len(bvals) > 0)
if err != nil { if err != nil {
return nil, xerrors.Errorf("scratching ticket failed: %w", err) return nil, xerrors.Errorf("scratching ticket failed: %w", err)
} }
winner, err := gen.IsRoundWinner(ctx, base.TipSet, round, addr, rbase, mbi, m.api) winner, err := gen.IsRoundWinner(ctx, base.TipSet, round, m.address, rbase, mbi, m.api)
if err != nil { if err != nil {
return nil, xerrors.Errorf("failed to check if we win next round: %w", err) return nil, xerrors.Errorf("failed to check if we win next round: %w", err)
} }
@ -362,7 +304,7 @@ func (m *Miner) mineOne(ctx context.Context, addr address.Address, base *MiningB
} }
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
if err := addr.MarshalCBOR(buf); err != nil { if err := m.address.MarshalCBOR(buf); err != nil {
return nil, xerrors.Errorf("failed to marshal miner address: %w", err) return nil, xerrors.Errorf("failed to marshal miner address: %w", err)
} }
@ -385,7 +327,7 @@ func (m *Miner) mineOne(ctx context.Context, addr address.Address, base *MiningB
} }
// TODO: winning post proof // TODO: winning post proof
b, err := m.createBlock(base, addr, ticket, winner, bvals, postProof, pending) b, err := m.createBlock(base, m.address, ticket, winner, bvals, postProof, pending)
if err != nil { if err != nil {
return nil, xerrors.Errorf("failed to create block: %w", err) return nil, xerrors.Errorf("failed to create block: %w", err)
} }
@ -399,8 +341,8 @@ func (m *Miner) mineOne(ctx context.Context, addr address.Address, base *MiningB
return b, nil return b, nil
} }
func (m *Miner) computeTicket(ctx context.Context, addr address.Address, brand *types.BeaconEntry, base *MiningBase, haveNewEntries bool) (*types.Ticket, error) { func (m *Miner) computeTicket(ctx context.Context, brand *types.BeaconEntry, base *MiningBase, haveNewEntries bool) (*types.Ticket, error) {
mi, err := m.api.StateMinerInfo(ctx, addr, types.EmptyTSK) mi, err := m.api.StateMinerInfo(ctx, m.address, types.EmptyTSK)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -410,7 +352,7 @@ func (m *Miner) computeTicket(ctx context.Context, addr address.Address, brand *
} }
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
if err := addr.MarshalCBOR(buf); err != nil { if err := m.address.MarshalCBOR(buf); err != nil {
return nil, xerrors.Errorf("failed to marshal address to cbor: %w", err) return nil, xerrors.Errorf("failed to marshal address to cbor: %w", err)
} }

View File

@ -23,9 +23,10 @@ func NewTestMiner(nextCh <-chan func(bool), addr address.Address) func(api.FullN
waitFunc: chanWaiter(nextCh), waitFunc: chanWaiter(nextCh),
epp: epp, epp: epp,
minedBlockHeights: arc, minedBlockHeights: arc,
address: addr,
} }
if err := m.Register(addr); err != nil { if err := m.Start(context.TODO()); err != nil {
panic(err) panic(err)
} }
return m return m

View File

@ -264,17 +264,17 @@ func SetupBlockProducer(lc fx.Lifecycle, ds dtypes.MetadataDS, api lapi.FullNode
return nil, err return nil, err
} }
m := miner.NewMiner(api, epp, beacon) m := miner.NewMiner(api, epp, beacon, minerAddr)
lc.Append(fx.Hook{ lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error { OnStart: func(ctx context.Context) error {
if err := m.Register(minerAddr); err != nil { if err := m.Start(ctx); err != nil {
return err return err
} }
return nil return nil
}, },
OnStop: func(ctx context.Context) error { OnStop: func(ctx context.Context) error {
return m.Unregister(ctx, minerAddr) return m.Stop(ctx)
}, },
}) })