Support multiple miners

License: MIT
Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai>
This commit is contained in:
Jakub Sztandera 2019-11-18 15:59:31 -06:00
parent eeec3c1783
commit bf556e2c99
No known key found for this signature in database
GPG Key ID: 9A9AF56F8B3879BA

View File

@ -72,18 +72,20 @@ func (m *Miner) Register(addr address.Address) error {
defer m.lk.Unlock()
if len(m.addresses) > 0 {
if len(m.addresses) > 1 || m.addresses[0] != addr {
return errors.New("mining with more than one storage miner instance not supported yet") // TODO !
for _, a := range m.addresses {
if a == addr {
log.Warnf("miner.Register called more than once for actor '%s'", addr)
return xerrors.Errorf("miner.Register called more than once for actor '%s'", addr)
}
}
log.Warnf("miner.Register called more than once for actor '%s'", addr)
return xerrors.Errorf("miner.Register called more than once for actor '%s'", addr)
}
m.addresses = append(m.addresses, addr)
m.stop = make(chan struct{})
go m.mine(context.TODO())
if len(m.addresses) == 1 {
// TODO: there is probably a race here
m.stop = make(chan struct{})
go m.mine(context.TODO())
}
return nil
}
@ -95,19 +97,24 @@ func (m *Miner) Unregister(ctx context.Context, addr address.Address) error {
return xerrors.New("no addresses registered")
}
if len(m.addresses) > 1 {
m.lk.Unlock()
log.Errorf("UNREGISTER NOT IMPLEMENTED FOR MORE THAN ONE ADDRESS!")
return xerrors.New("can't unregister when more than one actor is registered: not implemented")
}
idx := -1
if m.addresses[0] != addr {
for i, a := range m.addresses {
if a == addr {
idx = i
break
}
}
if idx == -1 {
m.lk.Unlock()
return xerrors.New("unregister: address not found")
}
m.addresses[idx] = m.addresses[len(m.addresses)-1]
m.addresses = m.addresses[:len(m.addresses)-1]
// Unregistering last address, stop mining first
if m.stop != nil {
if len(m.addresses) == 0 && m.stop != nil {
if m.stopping == nil {
m.stopping = make(chan struct{})
close(m.stop)
@ -122,8 +129,6 @@ func (m *Miner) Unregister(ctx context.Context, addr address.Address) error {
m.lk.Lock()
}
m.addresses = []address.Address{}
m.lk.Unlock()
return nil
}
@ -149,6 +154,10 @@ func (m *Miner) mine(ctx context.Context) {
default:
}
m.lk.Lock()
addrs := m.addresses
m.lk.Unlock()
// Sleep a small amount in order to wait for other blocks to arrive
if err := m.waitFunc(ctx); err != nil {
log.Error(err)
@ -167,16 +176,19 @@ func (m *Miner) mine(ctx context.Context) {
}
lastBase = *base
b, err := m.mineOne(ctx, base)
if err != nil {
log.Errorf("mining block failed: %s", err)
log.Warn("waiting 400ms before attempting to mine a block")
time.Sleep(400 * time.Millisecond)
continue
blks := make([]*types.BlockMsg, 0)
for _, addr := range addrs {
b, err := m.mineOne(ctx, addr, base)
if err != nil {
log.Errorf("mining block failed: %s", err)
continue
}
blks = append(blks, b)
}
if b != nil {
btime := time.Unix(int64(b.Header.Timestamp), 0)
if len(blks) != 0 {
btime := time.Unix(int64(blks[0].Header.Timestamp), 0)
if time.Now().Before(btime) {
time.Sleep(time.Until(btime))
} else {
@ -184,8 +196,10 @@ func (m *Miner) mine(ctx context.Context) {
"time", time.Now(), "duration", time.Now().Sub(btime))
}
if err := m.api.SyncSubmitBlock(ctx, b); err != nil {
log.Errorf("failed to submit newly mined block: %s", err)
for _, b := range blks {
if err := m.api.SyncSubmitBlock(ctx, b); err != nil {
log.Errorf("failed to submit newly mined block: %s", err)
}
}
} else {
nextRound := time.Unix(int64(base.ts.MinTimestamp()+uint64(build.BlockDelay*len(base.tickets))), 0)
@ -229,14 +243,14 @@ func (m *Miner) GetBestMiningCandidate(ctx context.Context) (*MiningBase, error)
}, nil
}
func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, error) {
func (m *Miner) mineOne(ctx context.Context, addr address.Address, base *MiningBase) (*types.BlockMsg, error) {
log.Debugw("attempting to mine a block", "tipset", types.LogCids(base.ts.Cids()))
ticket, err := m.scratchTicket(ctx, base)
ticket, err := m.scratchTicket(ctx, addr, base)
if err != nil {
return nil, errors.Wrap(err, "scratching ticket failed")
}
win, proof, err := gen.IsRoundWinner(ctx, base.ts, append(base.tickets, ticket), m.addresses[0], &m.api)
win, proof, err := gen.IsRoundWinner(ctx, base.ts, append(base.tickets, ticket), addr, &m.api)
if err != nil {
return nil, errors.Wrap(err, "failed to check if we win next round")
}
@ -246,7 +260,7 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
return nil, nil
}
b, err := m.createBlock(base, ticket, proof)
b, err := m.createBlock(base, addr, ticket, proof)
if err != nil {
return nil, errors.Wrap(err, "failed to create block")
}
@ -260,8 +274,8 @@ func (m *Miner) submitNullTicket(base *MiningBase, ticket *types.Ticket) {
m.lastWork = base
}
func (m *Miner) computeVRF(ctx context.Context, input []byte) ([]byte, error) {
w, err := m.getMinerWorker(ctx, m.addresses[0], nil)
func (m *Miner) computeVRF(ctx context.Context, addr address.Address, input []byte) ([]byte, error) {
w, err := m.getMinerWorker(ctx, addr, nil)
if err != nil {
return nil, err
}
@ -291,7 +305,7 @@ func (m *Miner) getMinerWorker(ctx context.Context, addr address.Address, ts *ty
return w, nil
}
func (m *Miner) scratchTicket(ctx context.Context, base *MiningBase) (*types.Ticket, error) {
func (m *Miner) scratchTicket(ctx context.Context, addr address.Address, base *MiningBase) (*types.Ticket, error) {
var lastTicket *types.Ticket
if len(base.tickets) > 0 {
lastTicket = base.tickets[len(base.tickets)-1]
@ -299,7 +313,7 @@ func (m *Miner) scratchTicket(ctx context.Context, base *MiningBase) (*types.Tic
lastTicket = base.ts.MinTicket()
}
vrfOut, err := m.computeVRF(ctx, lastTicket.VRFProof)
vrfOut, err := m.computeVRF(ctx, addr, lastTicket.VRFProof)
if err != nil {
return nil, err
}
@ -309,7 +323,7 @@ func (m *Miner) scratchTicket(ctx context.Context, base *MiningBase) (*types.Tic
}, nil
}
func (m *Miner) createBlock(base *MiningBase, ticket *types.Ticket, proof types.ElectionProof) (*types.BlockMsg, error) {
func (m *Miner) createBlock(base *MiningBase, addr address.Address, ticket *types.Ticket, proof types.ElectionProof) (*types.BlockMsg, error) {
pending, err := m.api.MpoolPending(context.TODO(), base.ts)
if err != nil {
@ -324,7 +338,7 @@ func (m *Miner) createBlock(base *MiningBase, ticket *types.Ticket, proof types.
uts := base.ts.MinTimestamp() + uint64(build.BlockDelay*(len(base.tickets)+1))
// why even return this? that api call could just submit it for us
return m.api.MinerCreateBlock(context.TODO(), m.addresses[0], base.ts, append(base.tickets, ticket), proof, msgs, uint64(uts))
return m.api.MinerCreateBlock(context.TODO(), addr, base.ts, append(base.tickets, ticket), proof, msgs, uint64(uts))
}
type actorLookup func(context.Context, address.Address, *types.TipSet) (*types.Actor, error)