Patch for concurrent iterator & others (onto v1.11.6) #386

Closed
roysc wants to merge 1565 commits from v1.11.6-statediff-v5 into master
4 changed files with 95 additions and 30 deletions
Showing only changes of commit 0d076d92db - Show all commits

View File

@ -245,7 +245,7 @@ func makeListDecoder(typ reflect.Type, tag tags) (decoder, error) {
} }
return decodeByteSlice, nil return decodeByteSlice, nil
} }
etypeinfo := cachedTypeInfo1(etype, tags{}) etypeinfo := theTC.infoWhileGenerating(etype, tags{})
if etypeinfo.decoderErr != nil { if etypeinfo.decoderErr != nil {
return nil, etypeinfo.decoderErr return nil, etypeinfo.decoderErr
} }
@ -424,7 +424,7 @@ func zeroFields(structval reflect.Value, fields []field) {
// makePtrDecoder creates a decoder that decodes into the pointer's element type. // makePtrDecoder creates a decoder that decodes into the pointer's element type.
func makePtrDecoder(typ reflect.Type, tag tags) (decoder, error) { func makePtrDecoder(typ reflect.Type, tag tags) (decoder, error) {
etype := typ.Elem() etype := typ.Elem()
etypeinfo := cachedTypeInfo1(etype, tags{}) etypeinfo := theTC.infoWhileGenerating(etype, tags{})
switch { switch {
case etypeinfo.decoderErr != nil: case etypeinfo.decoderErr != nil:
return nil, etypeinfo.decoderErr return nil, etypeinfo.decoderErr

View File

@ -517,7 +517,7 @@ func writeInterface(val reflect.Value, w *encbuf) error {
} }
func makeSliceWriter(typ reflect.Type, ts tags) (writer, error) { func makeSliceWriter(typ reflect.Type, ts tags) (writer, error) {
etypeinfo := cachedTypeInfo1(typ.Elem(), tags{}) etypeinfo := theTC.infoWhileGenerating(typ.Elem(), tags{})
if etypeinfo.writerErr != nil { if etypeinfo.writerErr != nil {
return nil, etypeinfo.writerErr return nil, etypeinfo.writerErr
} }
@ -585,7 +585,7 @@ func makeStructWriter(typ reflect.Type) (writer, error) {
} }
func makePtrWriter(typ reflect.Type, ts tags) (writer, error) { func makePtrWriter(typ reflect.Type, ts tags) (writer, error) {
etypeinfo := cachedTypeInfo1(typ.Elem(), tags{}) etypeinfo := theTC.infoWhileGenerating(typ.Elem(), tags{})
if etypeinfo.writerErr != nil { if etypeinfo.writerErr != nil {
return nil, etypeinfo.writerErr return nil, etypeinfo.writerErr
} }

View File

@ -23,6 +23,7 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"math/big" "math/big"
"runtime"
"sync" "sync"
"testing" "testing"
@ -480,3 +481,35 @@ func BenchmarkEncodeBigInts(b *testing.B) {
} }
} }
} }
func BenchmarkEncodeConcurrentInterface(b *testing.B) {
type struct1 struct {
A string
B *big.Int
C [20]byte
}
value := []interface{}{
uint(999),
&struct1{A: "hello", B: big.NewInt(0xFFFFFFFF)},
[10]byte{1, 2, 3, 4, 5, 6},
[]string{"yeah", "yeah", "yeah"},
}
var wg sync.WaitGroup
for cpu := 0; cpu < runtime.NumCPU(); cpu++ {
wg.Add(1)
go func() {
defer wg.Done()
var buffer bytes.Buffer
for i := 0; i < b.N; i++ {
buffer.Reset()
err := Encode(&buffer, value)
if err != nil {
panic(err)
}
}
}()
}
wg.Wait()
}

View File

@ -21,13 +21,10 @@ import (
"reflect" "reflect"
"strings" "strings"
"sync" "sync"
"sync/atomic"
) )
var ( // typeinfo is an entry in the type cache.
typeCacheMutex sync.RWMutex
typeCache = make(map[typekey]*typeinfo)
)
type typeinfo struct { type typeinfo struct {
decoder decoder decoder decoder
decoderErr error // error from makeDecoder decoderErr error // error from makeDecoder
@ -65,41 +62,76 @@ type decoder func(*Stream, reflect.Value) error
type writer func(reflect.Value, *encbuf) error type writer func(reflect.Value, *encbuf) error
var theTC = newTypeCache()
type typeCache struct {
cur atomic.Value
// This lock synchronizes writers.
mu sync.Mutex
next map[typekey]*typeinfo
}
func newTypeCache() *typeCache {
c := new(typeCache)
c.cur.Store(make(map[typekey]*typeinfo))
return c
}
func cachedDecoder(typ reflect.Type) (decoder, error) { func cachedDecoder(typ reflect.Type) (decoder, error) {
info := cachedTypeInfo(typ, tags{}) info := theTC.info(typ)
return info.decoder, info.decoderErr return info.decoder, info.decoderErr
} }
func cachedWriter(typ reflect.Type) (writer, error) { func cachedWriter(typ reflect.Type) (writer, error) {
info := cachedTypeInfo(typ, tags{}) info := theTC.info(typ)
return info.writer, info.writerErr return info.writer, info.writerErr
} }
func cachedTypeInfo(typ reflect.Type, tags tags) *typeinfo { func (c *typeCache) info(typ reflect.Type) *typeinfo {
typeCacheMutex.RLock() key := typekey{Type: typ}
info := typeCache[typekey{typ, tags}] if info := c.cur.Load().(map[typekey]*typeinfo)[key]; info != nil {
typeCacheMutex.RUnlock()
if info != nil {
return info return info
} }
// not in the cache, need to generate info for this type.
typeCacheMutex.Lock() // Not in the cache, need to generate info for this type.
defer typeCacheMutex.Unlock() return c.generate(typ, tags{})
return cachedTypeInfo1(typ, tags)
} }
func cachedTypeInfo1(typ reflect.Type, tags tags) *typeinfo { func (c *typeCache) generate(typ reflect.Type, tags tags) *typeinfo {
key := typekey{typ, tags} c.mu.Lock()
info := typeCache[key] defer c.mu.Unlock()
if info != nil {
// another goroutine got the write lock first cur := c.cur.Load().(map[typekey]*typeinfo)
if info := cur[typekey{typ, tags}]; info != nil {
return info return info
} }
// put a dummy value into the cache before generating.
// if the generator tries to lookup itself, it will get // Copy cur to next.
c.next = make(map[typekey]*typeinfo, len(cur)+1)
for k, v := range cur {
c.next[k] = v
}
// Generate.
info := c.infoWhileGenerating(typ, tags)
// next -> cur
c.cur.Store(c.next)
c.next = nil
return info
}
func (c *typeCache) infoWhileGenerating(typ reflect.Type, tags tags) *typeinfo {
key := typekey{typ, tags}
if info := c.next[key]; info != nil {
return info
}
// Put a dummy value into the cache before generating.
// If the generator tries to lookup itself, it will get
// the dummy value and won't call itself recursively. // the dummy value and won't call itself recursively.
info = new(typeinfo) info := new(typeinfo)
typeCache[key] = info c.next[key] = info
info.generate(typ, tags) info.generate(typ, tags)
return info return info
} }
@ -133,7 +165,7 @@ func structFields(typ reflect.Type) (fields []field, err error) {
} else if anyOptional { } else if anyOptional {
return nil, fmt.Errorf(`rlp: struct field %v.%s needs "optional" tag`, typ, f.Name) return nil, fmt.Errorf(`rlp: struct field %v.%s needs "optional" tag`, typ, f.Name)
} }
info := cachedTypeInfo1(f.Type, tags) info := theTC.infoWhileGenerating(f.Type, tags)
fields = append(fields, field{i, info, tags.optional}) fields = append(fields, field{i, info, tags.optional})
} }
} }