From bf556e2c995495e1426937b8651cf8fd228b8aab Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Mon, 18 Nov 2019 15:59:31 -0600 Subject: [PATCH] Support multiple miners License: MIT Signed-off-by: Jakub Sztandera --- miner/miner.go | 88 +++++++++++++++++++++++++++++--------------------- 1 file changed, 51 insertions(+), 37 deletions(-) diff --git a/miner/miner.go b/miner/miner.go index 72ec090c7..a65ec99b4 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -72,18 +72,20 @@ func (m *Miner) Register(addr address.Address) error { defer m.lk.Unlock() if len(m.addresses) > 0 { - if len(m.addresses) > 1 || m.addresses[0] != addr { - return errors.New("mining with more than one storage miner instance not supported yet") // TODO ! + for _, a := range m.addresses { + if a == addr { + log.Warnf("miner.Register called more than once for actor '%s'", addr) + return xerrors.Errorf("miner.Register called more than once for actor '%s'", addr) + } } - - log.Warnf("miner.Register called more than once for actor '%s'", addr) - return xerrors.Errorf("miner.Register called more than once for actor '%s'", addr) } m.addresses = append(m.addresses, addr) - m.stop = make(chan struct{}) - - go m.mine(context.TODO()) + if len(m.addresses) == 1 { + // TODO: there is probably a race here + m.stop = make(chan struct{}) + go m.mine(context.TODO()) + } return nil } @@ -95,19 +97,24 @@ func (m *Miner) Unregister(ctx context.Context, addr address.Address) error { return xerrors.New("no addresses registered") } - if len(m.addresses) > 1 { - m.lk.Unlock() - log.Errorf("UNREGISTER NOT IMPLEMENTED FOR MORE THAN ONE ADDRESS!") - return xerrors.New("can't unregister when more than one actor is registered: not implemented") - } + idx := -1 - if m.addresses[0] != addr { + for i, a := range m.addresses { + if a == addr { + idx = i + break + } + } + if idx == -1 { m.lk.Unlock() return xerrors.New("unregister: address not found") } + m.addresses[idx] = m.addresses[len(m.addresses)-1] + m.addresses = m.addresses[:len(m.addresses)-1] + // Unregistering last address, stop mining first - if m.stop != nil { + if len(m.addresses) == 0 && m.stop != nil { if m.stopping == nil { m.stopping = make(chan struct{}) close(m.stop) @@ -122,8 +129,6 @@ func (m *Miner) Unregister(ctx context.Context, addr address.Address) error { m.lk.Lock() } - m.addresses = []address.Address{} - m.lk.Unlock() return nil } @@ -149,6 +154,10 @@ func (m *Miner) mine(ctx context.Context) { default: } + m.lk.Lock() + addrs := m.addresses + m.lk.Unlock() + // Sleep a small amount in order to wait for other blocks to arrive if err := m.waitFunc(ctx); err != nil { log.Error(err) @@ -167,16 +176,19 @@ func (m *Miner) mine(ctx context.Context) { } lastBase = *base - b, err := m.mineOne(ctx, base) - if err != nil { - log.Errorf("mining block failed: %s", err) - log.Warn("waiting 400ms before attempting to mine a block") - time.Sleep(400 * time.Millisecond) - continue + blks := make([]*types.BlockMsg, 0) + + for _, addr := range addrs { + b, err := m.mineOne(ctx, addr, base) + if err != nil { + log.Errorf("mining block failed: %s", err) + continue + } + blks = append(blks, b) } - if b != nil { - btime := time.Unix(int64(b.Header.Timestamp), 0) + if len(blks) != 0 { + btime := time.Unix(int64(blks[0].Header.Timestamp), 0) if time.Now().Before(btime) { time.Sleep(time.Until(btime)) } else { @@ -184,8 +196,10 @@ func (m *Miner) mine(ctx context.Context) { "time", time.Now(), "duration", time.Now().Sub(btime)) } - if err := m.api.SyncSubmitBlock(ctx, b); err != nil { - log.Errorf("failed to submit newly mined block: %s", err) + for _, b := range blks { + if err := m.api.SyncSubmitBlock(ctx, b); err != nil { + log.Errorf("failed to submit newly mined block: %s", err) + } } } else { nextRound := time.Unix(int64(base.ts.MinTimestamp()+uint64(build.BlockDelay*len(base.tickets))), 0) @@ -229,14 +243,14 @@ func (m *Miner) GetBestMiningCandidate(ctx context.Context) (*MiningBase, error) }, nil } -func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, error) { +func (m *Miner) mineOne(ctx context.Context, addr address.Address, base *MiningBase) (*types.BlockMsg, error) { log.Debugw("attempting to mine a block", "tipset", types.LogCids(base.ts.Cids())) - ticket, err := m.scratchTicket(ctx, base) + ticket, err := m.scratchTicket(ctx, addr, base) if err != nil { return nil, errors.Wrap(err, "scratching ticket failed") } - win, proof, err := gen.IsRoundWinner(ctx, base.ts, append(base.tickets, ticket), m.addresses[0], &m.api) + win, proof, err := gen.IsRoundWinner(ctx, base.ts, append(base.tickets, ticket), addr, &m.api) if err != nil { return nil, errors.Wrap(err, "failed to check if we win next round") } @@ -246,7 +260,7 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, return nil, nil } - b, err := m.createBlock(base, ticket, proof) + b, err := m.createBlock(base, addr, ticket, proof) if err != nil { return nil, errors.Wrap(err, "failed to create block") } @@ -260,8 +274,8 @@ func (m *Miner) submitNullTicket(base *MiningBase, ticket *types.Ticket) { m.lastWork = base } -func (m *Miner) computeVRF(ctx context.Context, input []byte) ([]byte, error) { - w, err := m.getMinerWorker(ctx, m.addresses[0], nil) +func (m *Miner) computeVRF(ctx context.Context, addr address.Address, input []byte) ([]byte, error) { + w, err := m.getMinerWorker(ctx, addr, nil) if err != nil { return nil, err } @@ -291,7 +305,7 @@ func (m *Miner) getMinerWorker(ctx context.Context, addr address.Address, ts *ty return w, nil } -func (m *Miner) scratchTicket(ctx context.Context, base *MiningBase) (*types.Ticket, error) { +func (m *Miner) scratchTicket(ctx context.Context, addr address.Address, base *MiningBase) (*types.Ticket, error) { var lastTicket *types.Ticket if len(base.tickets) > 0 { lastTicket = base.tickets[len(base.tickets)-1] @@ -299,7 +313,7 @@ func (m *Miner) scratchTicket(ctx context.Context, base *MiningBase) (*types.Tic lastTicket = base.ts.MinTicket() } - vrfOut, err := m.computeVRF(ctx, lastTicket.VRFProof) + vrfOut, err := m.computeVRF(ctx, addr, lastTicket.VRFProof) if err != nil { return nil, err } @@ -309,7 +323,7 @@ func (m *Miner) scratchTicket(ctx context.Context, base *MiningBase) (*types.Tic }, nil } -func (m *Miner) createBlock(base *MiningBase, ticket *types.Ticket, proof types.ElectionProof) (*types.BlockMsg, error) { +func (m *Miner) createBlock(base *MiningBase, addr address.Address, ticket *types.Ticket, proof types.ElectionProof) (*types.BlockMsg, error) { pending, err := m.api.MpoolPending(context.TODO(), base.ts) if err != nil { @@ -324,7 +338,7 @@ func (m *Miner) createBlock(base *MiningBase, ticket *types.Ticket, proof types. uts := base.ts.MinTimestamp() + uint64(build.BlockDelay*(len(base.tickets)+1)) // why even return this? that api call could just submit it for us - return m.api.MinerCreateBlock(context.TODO(), m.addresses[0], base.ts, append(base.tickets, ticket), proof, msgs, uint64(uts)) + return m.api.MinerCreateBlock(context.TODO(), addr, base.ts, append(base.tickets, ticket), proof, msgs, uint64(uts)) } type actorLookup func(context.Context, address.Address, *types.TipSet) (*types.Actor, error)