Merge pull request #637 from filecoin-project/feat/multiple-miners
Support multiple miners
This commit is contained in:
commit
a5f79da05e
121
miner/miner.go
121
miner/miner.go
@ -72,59 +72,58 @@ func (m *Miner) Register(addr address.Address) error {
|
|||||||
defer m.lk.Unlock()
|
defer m.lk.Unlock()
|
||||||
|
|
||||||
if len(m.addresses) > 0 {
|
if len(m.addresses) > 0 {
|
||||||
if len(m.addresses) > 1 || m.addresses[0] != addr {
|
for _, a := range m.addresses {
|
||||||
return errors.New("mining with more than one storage miner instance not supported yet") // TODO !
|
if a == addr {
|
||||||
|
log.Warnf("miner.Register called more than once for actor '%s'", addr)
|
||||||
|
return xerrors.Errorf("miner.Register called more than once for actor '%s'", addr)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Warnf("miner.Register called more than once for actor '%s'", addr)
|
|
||||||
return xerrors.Errorf("miner.Register called more than once for actor '%s'", addr)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
m.addresses = append(m.addresses, addr)
|
m.addresses = append(m.addresses, addr)
|
||||||
m.stop = make(chan struct{})
|
if len(m.addresses) == 1 {
|
||||||
|
m.stop = make(chan struct{})
|
||||||
go m.mine(context.TODO())
|
go m.mine(context.TODO())
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) Unregister(ctx context.Context, addr address.Address) error {
|
func (m *Miner) Unregister(ctx context.Context, addr address.Address) error {
|
||||||
m.lk.Lock()
|
m.lk.Lock()
|
||||||
|
defer m.lk.Unlock()
|
||||||
if len(m.addresses) == 0 {
|
if len(m.addresses) == 0 {
|
||||||
m.lk.Unlock()
|
|
||||||
return xerrors.New("no addresses registered")
|
return xerrors.New("no addresses registered")
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(m.addresses) > 1 {
|
idx := -1
|
||||||
m.lk.Unlock()
|
|
||||||
log.Errorf("UNREGISTER NOT IMPLEMENTED FOR MORE THAN ONE ADDRESS!")
|
|
||||||
return xerrors.New("can't unregister when more than one actor is registered: not implemented")
|
|
||||||
}
|
|
||||||
|
|
||||||
if m.addresses[0] != addr {
|
for i, a := range m.addresses {
|
||||||
m.lk.Unlock()
|
if a == addr {
|
||||||
|
idx = i
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if idx == -1 {
|
||||||
return xerrors.New("unregister: address not found")
|
return xerrors.New("unregister: address not found")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
m.addresses[idx] = m.addresses[len(m.addresses)-1]
|
||||||
|
m.addresses = m.addresses[:len(m.addresses)-1]
|
||||||
|
|
||||||
// Unregistering last address, stop mining first
|
// Unregistering last address, stop mining first
|
||||||
if m.stop != nil {
|
if len(m.addresses) == 0 && m.stop != nil {
|
||||||
if m.stopping == nil {
|
m.stopping = make(chan struct{})
|
||||||
m.stopping = make(chan struct{})
|
|
||||||
close(m.stop)
|
|
||||||
}
|
|
||||||
stopping := m.stopping
|
stopping := m.stopping
|
||||||
m.lk.Unlock()
|
close(m.stop)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-stopping:
|
case <-stopping:
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
}
|
}
|
||||||
m.lk.Lock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
m.addresses = []address.Address{}
|
|
||||||
|
|
||||||
m.lk.Unlock()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -134,21 +133,23 @@ func (m *Miner) mine(ctx context.Context) {
|
|||||||
|
|
||||||
var lastBase MiningBase
|
var lastBase MiningBase
|
||||||
|
|
||||||
|
eventLoop:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-m.stop:
|
case <-m.stop:
|
||||||
m.lk.Lock()
|
stopping := m.stopping
|
||||||
|
|
||||||
close(m.stopping)
|
|
||||||
m.stop = nil
|
m.stop = nil
|
||||||
m.stopping = nil
|
m.stopping = nil
|
||||||
|
close(stopping)
|
||||||
m.lk.Unlock()
|
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
m.lk.Lock()
|
||||||
|
addrs := m.addresses
|
||||||
|
m.lk.Unlock()
|
||||||
|
|
||||||
// Sleep a small amount in order to wait for other blocks to arrive
|
// Sleep a small amount in order to wait for other blocks to arrive
|
||||||
if err := m.waitFunc(ctx); err != nil {
|
if err := m.waitFunc(ctx); err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
@ -167,16 +168,21 @@ func (m *Miner) mine(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
lastBase = *base
|
lastBase = *base
|
||||||
|
|
||||||
b, err := m.mineOne(ctx, base)
|
blks := make([]*types.BlockMsg, 0)
|
||||||
if err != nil {
|
|
||||||
log.Errorf("mining block failed: %s", err)
|
for _, addr := range addrs {
|
||||||
log.Warn("waiting 400ms before attempting to mine a block")
|
b, err := m.mineOne(ctx, addr, base)
|
||||||
time.Sleep(400 * time.Millisecond)
|
if err != nil {
|
||||||
continue
|
log.Errorf("mining block failed: %s", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if b != nil {
|
||||||
|
blks = append(blks, b)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if b != nil {
|
if len(blks) != 0 {
|
||||||
btime := time.Unix(int64(b.Header.Timestamp), 0)
|
btime := time.Unix(int64(blks[0].Header.Timestamp), 0)
|
||||||
if time.Now().Before(btime) {
|
if time.Now().Before(btime) {
|
||||||
time.Sleep(time.Until(btime))
|
time.Sleep(time.Until(btime))
|
||||||
} else {
|
} else {
|
||||||
@ -184,8 +190,19 @@ func (m *Miner) mine(ctx context.Context) {
|
|||||||
"time", time.Now(), "duration", time.Now().Sub(btime))
|
"time", time.Now(), "duration", time.Now().Sub(btime))
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := m.api.SyncSubmitBlock(ctx, b); err != nil {
|
mWon := make(map[address.Address]struct{})
|
||||||
log.Errorf("failed to submit newly mined block: %s", err)
|
for _, b := range blks {
|
||||||
|
_, notOk := mWon[b.Header.Miner]
|
||||||
|
if notOk {
|
||||||
|
log.Errorw("2 blocks for the same miner. Throwing hands in the air. Report this. It is important.", "bloks", blks)
|
||||||
|
continue eventLoop
|
||||||
|
}
|
||||||
|
mWon[b.Header.Miner] = struct{}{}
|
||||||
|
}
|
||||||
|
for _, b := range blks {
|
||||||
|
if err := m.api.SyncSubmitBlock(ctx, b); err != nil {
|
||||||
|
log.Errorf("failed to submit newly mined block: %s", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
nextRound := time.Unix(int64(base.ts.MinTimestamp()+uint64(build.BlockDelay*len(base.tickets))), 0)
|
nextRound := time.Unix(int64(base.ts.MinTimestamp()+uint64(build.BlockDelay*len(base.tickets))), 0)
|
||||||
@ -229,14 +246,14 @@ func (m *Miner) GetBestMiningCandidate(ctx context.Context) (*MiningBase, error)
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, error) {
|
func (m *Miner) mineOne(ctx context.Context, addr address.Address, base *MiningBase) (*types.BlockMsg, error) {
|
||||||
log.Debugw("attempting to mine a block", "tipset", types.LogCids(base.ts.Cids()))
|
log.Debugw("attempting to mine a block", "tipset", types.LogCids(base.ts.Cids()))
|
||||||
ticket, err := m.scratchTicket(ctx, base)
|
ticket, err := m.scratchTicket(ctx, addr, base)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "scratching ticket failed")
|
return nil, errors.Wrap(err, "scratching ticket failed")
|
||||||
}
|
}
|
||||||
|
|
||||||
win, proof, err := gen.IsRoundWinner(ctx, base.ts, append(base.tickets, ticket), m.addresses[0], &m.api)
|
win, proof, err := gen.IsRoundWinner(ctx, base.ts, append(base.tickets, ticket), addr, &m.api)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "failed to check if we win next round")
|
return nil, errors.Wrap(err, "failed to check if we win next round")
|
||||||
}
|
}
|
||||||
@ -246,7 +263,7 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
b, err := m.createBlock(base, ticket, proof)
|
b, err := m.createBlock(base, addr, ticket, proof)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "failed to create block")
|
return nil, errors.Wrap(err, "failed to create block")
|
||||||
}
|
}
|
||||||
@ -260,8 +277,8 @@ func (m *Miner) submitNullTicket(base *MiningBase, ticket *types.Ticket) {
|
|||||||
m.lastWork = base
|
m.lastWork = base
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) computeVRF(ctx context.Context, input []byte) ([]byte, error) {
|
func (m *Miner) computeVRF(ctx context.Context, addr address.Address, input []byte) ([]byte, error) {
|
||||||
w, err := m.getMinerWorker(ctx, m.addresses[0], nil)
|
w, err := m.getMinerWorker(ctx, addr, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -291,7 +308,7 @@ func (m *Miner) getMinerWorker(ctx context.Context, addr address.Address, ts *ty
|
|||||||
return w, nil
|
return w, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) scratchTicket(ctx context.Context, base *MiningBase) (*types.Ticket, error) {
|
func (m *Miner) scratchTicket(ctx context.Context, addr address.Address, base *MiningBase) (*types.Ticket, error) {
|
||||||
var lastTicket *types.Ticket
|
var lastTicket *types.Ticket
|
||||||
if len(base.tickets) > 0 {
|
if len(base.tickets) > 0 {
|
||||||
lastTicket = base.tickets[len(base.tickets)-1]
|
lastTicket = base.tickets[len(base.tickets)-1]
|
||||||
@ -299,7 +316,7 @@ func (m *Miner) scratchTicket(ctx context.Context, base *MiningBase) (*types.Tic
|
|||||||
lastTicket = base.ts.MinTicket()
|
lastTicket = base.ts.MinTicket()
|
||||||
}
|
}
|
||||||
|
|
||||||
vrfOut, err := m.computeVRF(ctx, lastTicket.VRFProof)
|
vrfOut, err := m.computeVRF(ctx, addr, lastTicket.VRFProof)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -309,7 +326,7 @@ func (m *Miner) scratchTicket(ctx context.Context, base *MiningBase) (*types.Tic
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) createBlock(base *MiningBase, ticket *types.Ticket, proof types.ElectionProof) (*types.BlockMsg, error) {
|
func (m *Miner) createBlock(base *MiningBase, addr address.Address, ticket *types.Ticket, proof types.ElectionProof) (*types.BlockMsg, error) {
|
||||||
|
|
||||||
pending, err := m.api.MpoolPending(context.TODO(), base.ts)
|
pending, err := m.api.MpoolPending(context.TODO(), base.ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -324,7 +341,7 @@ func (m *Miner) createBlock(base *MiningBase, ticket *types.Ticket, proof types.
|
|||||||
uts := base.ts.MinTimestamp() + uint64(build.BlockDelay*(len(base.tickets)+1))
|
uts := base.ts.MinTimestamp() + uint64(build.BlockDelay*(len(base.tickets)+1))
|
||||||
|
|
||||||
// why even return this? that api call could just submit it for us
|
// why even return this? that api call could just submit it for us
|
||||||
return m.api.MinerCreateBlock(context.TODO(), m.addresses[0], base.ts, append(base.tickets, ticket), proof, msgs, uint64(uts))
|
return m.api.MinerCreateBlock(context.TODO(), addr, base.ts, append(base.tickets, ticket), proof, msgs, uint64(uts))
|
||||||
}
|
}
|
||||||
|
|
||||||
type actorLookup func(context.Context, address.Address, *types.TipSet) (*types.Actor, error)
|
type actorLookup func(context.Context, address.Address, *types.TipSet) (*types.Actor, error)
|
||||||
|
Loading…
Reference in New Issue
Block a user