Fix two races in events
Also race fix: depends on https://github.com/ipfs/go-blockservice/pull/65 Resolves #2092, #2099, #2108, #1930, #2110 Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai>
This commit is contained in:
parent
7d9f9ba756
commit
008a2969b2
@ -8,6 +8,7 @@ import (
|
|||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -52,11 +53,11 @@ func TestDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, carExport
|
|||||||
}
|
}
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
|
|
||||||
mine := true
|
mine := int64(1)
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
defer close(done)
|
defer close(done)
|
||||||
for mine {
|
for atomic.LoadInt64(&mine) == 1 {
|
||||||
time.Sleep(blocktime)
|
time.Sleep(blocktime)
|
||||||
if err := sn[0].MineOne(ctx, func(bool) {}); err != nil {
|
if err := sn[0].MineOne(ctx, func(bool) {}); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
@ -66,7 +67,7 @@ func TestDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, carExport
|
|||||||
|
|
||||||
makeDeal(t, ctx, 6, client, miner, carExport)
|
makeDeal(t, ctx, 6, client, miner, carExport)
|
||||||
|
|
||||||
mine = false
|
atomic.AddInt64(&mine, -1)
|
||||||
fmt.Println("shutting down mining")
|
fmt.Println("shutting down mining")
|
||||||
<-done
|
<-done
|
||||||
}
|
}
|
||||||
@ -89,12 +90,12 @@ func TestDoubleDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration) {
|
|||||||
}
|
}
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
|
|
||||||
mine := true
|
mine := int64(1)
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer close(done)
|
defer close(done)
|
||||||
for mine {
|
for atomic.LoadInt64(&mine) == 1 {
|
||||||
time.Sleep(blocktime)
|
time.Sleep(blocktime)
|
||||||
if err := sn[0].MineOne(ctx, func(bool) {}); err != nil {
|
if err := sn[0].MineOne(ctx, func(bool) {}); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
@ -105,7 +106,7 @@ func TestDoubleDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration) {
|
|||||||
makeDeal(t, ctx, 6, client, miner, false)
|
makeDeal(t, ctx, 6, client, miner, false)
|
||||||
makeDeal(t, ctx, 7, client, miner, false)
|
makeDeal(t, ctx, 7, client, miner, false)
|
||||||
|
|
||||||
mine = false
|
atomic.AddInt64(&mine, -1)
|
||||||
fmt.Println("shutting down mining")
|
fmt.Println("shutting down mining")
|
||||||
<-done
|
<-done
|
||||||
}
|
}
|
||||||
|
@ -126,6 +126,7 @@ func TestDealMining(t *testing.T, b APIBuilder, blocktime time.Duration, carExpo
|
|||||||
minedTwo := make(chan struct{})
|
minedTwo := make(chan struct{})
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
doneMinedTwo := false
|
||||||
defer close(done)
|
defer close(done)
|
||||||
|
|
||||||
prevExpect := 0
|
prevExpect := 0
|
||||||
@ -175,9 +176,9 @@ func TestDealMining(t *testing.T, b APIBuilder, blocktime time.Duration, carExpo
|
|||||||
time.Sleep(blocktime)
|
time.Sleep(blocktime)
|
||||||
}
|
}
|
||||||
|
|
||||||
if prevExpect == 2 && expect == 2 && minedTwo != nil {
|
if prevExpect == 2 && expect == 2 && !doneMinedTwo {
|
||||||
close(minedTwo)
|
close(minedTwo)
|
||||||
minedTwo = nil
|
doneMinedTwo = true
|
||||||
}
|
}
|
||||||
|
|
||||||
prevExpect = expect
|
prevExpect = expect
|
||||||
|
@ -84,6 +84,9 @@ type calledEvents struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (e *calledEvents) headChangeCalled(rev, app []*types.TipSet) error {
|
func (e *calledEvents) headChangeCalled(rev, app []*types.TipSet) error {
|
||||||
|
e.lk.Lock()
|
||||||
|
defer e.lk.Unlock()
|
||||||
|
|
||||||
for _, ts := range rev {
|
for _, ts := range rev {
|
||||||
e.handleReverts(ts)
|
e.handleReverts(ts)
|
||||||
e.at = ts.Height()
|
e.at = ts.Height()
|
||||||
@ -134,7 +137,6 @@ func (e *calledEvents) checkNewCalls(ts *types.TipSet) {
|
|||||||
|
|
||||||
e.messagesForTs(pts, func(msg *types.Message) {
|
e.messagesForTs(pts, func(msg *types.Message) {
|
||||||
// TODO: provide receipts
|
// TODO: provide receipts
|
||||||
|
|
||||||
for tid, matchFns := range e.matchers {
|
for tid, matchFns := range e.matchers {
|
||||||
var matched bool
|
var matched bool
|
||||||
for _, matchFn := range matchFns {
|
for _, matchFn := range matchFns {
|
||||||
|
@ -26,12 +26,15 @@ type heightEvents struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
|
func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
|
||||||
|
|
||||||
ctx, span := trace.StartSpan(e.ctx, "events.HeightHeadChange")
|
ctx, span := trace.StartSpan(e.ctx, "events.HeightHeadChange")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
span.AddAttributes(trace.Int64Attribute("endHeight", int64(app[0].Height())))
|
span.AddAttributes(trace.Int64Attribute("endHeight", int64(app[0].Height())))
|
||||||
span.AddAttributes(trace.Int64Attribute("reverts", int64(len(rev))))
|
span.AddAttributes(trace.Int64Attribute("reverts", int64(len(rev))))
|
||||||
span.AddAttributes(trace.Int64Attribute("applies", int64(len(app))))
|
span.AddAttributes(trace.Int64Attribute("applies", int64(len(app))))
|
||||||
|
|
||||||
|
e.lk.Lock()
|
||||||
|
defer e.lk.Unlock()
|
||||||
for _, ts := range rev {
|
for _, ts := range rev {
|
||||||
// TODO: log error if h below gcconfidence
|
// TODO: log error if h below gcconfidence
|
||||||
// revert height-based triggers
|
// revert height-based triggers
|
||||||
@ -40,7 +43,10 @@ func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
|
|||||||
for _, tid := range e.htHeights[h] {
|
for _, tid := range e.htHeights[h] {
|
||||||
ctx, span := trace.StartSpan(ctx, "events.HeightRevert")
|
ctx, span := trace.StartSpan(ctx, "events.HeightRevert")
|
||||||
|
|
||||||
err := e.heightTriggers[tid].revert(ctx, ts)
|
rev := e.heightTriggers[tid].revert
|
||||||
|
e.lk.Unlock()
|
||||||
|
err := rev(ctx, ts)
|
||||||
|
e.lk.Lock()
|
||||||
e.heightTriggers[tid].called = false
|
e.heightTriggers[tid].called = false
|
||||||
|
|
||||||
span.End()
|
span.End()
|
||||||
@ -98,8 +104,10 @@ func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
|
|||||||
|
|
||||||
ctx, span := trace.StartSpan(ctx, "events.HeightApply")
|
ctx, span := trace.StartSpan(ctx, "events.HeightApply")
|
||||||
span.AddAttributes(trace.BoolAttribute("immediate", false))
|
span.AddAttributes(trace.BoolAttribute("immediate", false))
|
||||||
|
handle := hnd.handle
|
||||||
err = hnd.handle(ctx, incTs, h)
|
e.lk.Unlock()
|
||||||
|
err = handle(ctx, incTs, h)
|
||||||
|
e.lk.Lock()
|
||||||
span.End()
|
span.End()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
2
go.mod
2
go.mod
@ -44,7 +44,7 @@ require (
|
|||||||
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d
|
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d
|
||||||
github.com/ipfs/go-bitswap v0.2.8
|
github.com/ipfs/go-bitswap v0.2.8
|
||||||
github.com/ipfs/go-block-format v0.0.2
|
github.com/ipfs/go-block-format v0.0.2
|
||||||
github.com/ipfs/go-blockservice v0.1.3
|
github.com/ipfs/go-blockservice v0.1.4-0.20200624145336-a978cec6e834
|
||||||
github.com/ipfs/go-cid v0.0.6
|
github.com/ipfs/go-cid v0.0.6
|
||||||
github.com/ipfs/go-cidutil v0.0.2
|
github.com/ipfs/go-cidutil v0.0.2
|
||||||
github.com/ipfs/go-datastore v0.4.4
|
github.com/ipfs/go-datastore v0.4.4
|
||||||
|
2
go.sum
2
go.sum
@ -467,6 +467,8 @@ github.com/ipfs/go-blockservice v0.0.7/go.mod h1:EOfb9k/Y878ZTRY/CH0x5+ATtaipfbR
|
|||||||
github.com/ipfs/go-blockservice v0.1.0/go.mod h1:hzmMScl1kXHg3M2BjTymbVPjv627N7sYcvYaKbop39M=
|
github.com/ipfs/go-blockservice v0.1.0/go.mod h1:hzmMScl1kXHg3M2BjTymbVPjv627N7sYcvYaKbop39M=
|
||||||
github.com/ipfs/go-blockservice v0.1.3 h1:9XgsPMwwWJSC9uVr2pMDsW2qFTBSkxpGMhmna8mIjPM=
|
github.com/ipfs/go-blockservice v0.1.3 h1:9XgsPMwwWJSC9uVr2pMDsW2qFTBSkxpGMhmna8mIjPM=
|
||||||
github.com/ipfs/go-blockservice v0.1.3/go.mod h1:OTZhFpkgY48kNzbgyvcexW9cHrpjBYIjSR0KoDOFOLU=
|
github.com/ipfs/go-blockservice v0.1.3/go.mod h1:OTZhFpkgY48kNzbgyvcexW9cHrpjBYIjSR0KoDOFOLU=
|
||||||
|
github.com/ipfs/go-blockservice v0.1.4-0.20200624145336-a978cec6e834 h1:hFJoI1D2a3MqiNkSb4nKwrdkhCngUxUTFNwVwovZX2s=
|
||||||
|
github.com/ipfs/go-blockservice v0.1.4-0.20200624145336-a978cec6e834/go.mod h1:OTZhFpkgY48kNzbgyvcexW9cHrpjBYIjSR0KoDOFOLU=
|
||||||
github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
|
github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
|
||||||
github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
|
github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
|
||||||
github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
|
github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
|
||||||
|
@ -215,6 +215,9 @@ type MiningBase struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) GetBestMiningCandidate(ctx context.Context) (*MiningBase, error) {
|
func (m *Miner) GetBestMiningCandidate(ctx context.Context) (*MiningBase, error) {
|
||||||
|
m.lk.Lock()
|
||||||
|
defer m.lk.Unlock()
|
||||||
|
|
||||||
bts, err := m.api.ChainHead(ctx)
|
bts, err := m.api.ChainHead(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
Loading…
Reference in New Issue
Block a user