Get transactions (#45)

* Make transactions requests in parallel

* Update transaction error handling
This commit is contained in:
Matt K 2018-03-27 16:06:12 -05:00 committed by GitHub
parent 88210e436a
commit 8a9395819c
38 changed files with 2303 additions and 64 deletions

View File

@ -12,10 +12,11 @@ go_import_path: github.com/vulcanize/vulcanizedb
before_install:
# ginkgo golint dep migrate
- make installtools
- bash ./scripts/install-postgres-10.sh
before_script:
- sudo -u postgres createdb vulcanize_private
- make migrate HOST_NAME=localhost NAME=vulcanize_private PORT=5432
- make migrate NAME=vulcanize_private
script:
- make test

8
Gopkg.lock generated
View File

@ -274,6 +274,12 @@
]
revision = "faacc1b5e36e3ff02cbec9661c69ac63dd5a83ad"
[[projects]]
branch = "master"
name = "golang.org/x/sync"
packages = ["errgroup"]
revision = "1d60e4601c6fd243af51cc01ddf169918a5407ca"
[[projects]]
branch = "master"
name = "golang.org/x/sys"
@ -334,6 +340,6 @@
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "b35f7b80a7695fb1a7d247be178212e16a0647363affc0071c5a2280817dd906"
inputs-digest = "4dcddfb7fa2db8ba7e3d8e70ec8ba23a7e9b6a1d99c62933ed63624412967aeb"
solver-name = "gps-cdcl"
solver-version = 1

View File

@ -18,7 +18,7 @@ $(BIN)/migrate:
LINT = $(BIN)/golint
$(BIN)/golint:
go get github.com/golang/lint/golint
go get -u golang.org/x/lint/golint
METALINT = $(BIN)/gometalinter.v2
$(BIN)/gometalinter.v2:

View File

@ -11,13 +11,15 @@ var _ = Describe("Rewards calculations", func() {
It("calculates a block reward for a real block", func() {
blockchain := geth.NewBlockchain(test_config.InfuraClient.IPCPath)
block := blockchain.GetBlockByNumber(1071819)
block, err := blockchain.GetBlockByNumber(1071819)
Expect(err).ToNot(HaveOccurred())
Expect(block.Reward).To(Equal(5.31355))
})
It("calculates an uncle reward for a real block", func() {
blockchain := geth.NewBlockchain(test_config.InfuraClient.IPCPath)
block := blockchain.GetBlockByNumber(1071819)
block, err := blockchain.GetBlockByNumber(1071819)
Expect(err).ToNot(HaveOccurred())
Expect(block.UnclesReward).To(Equal(6.875))
})

View File

@ -3,6 +3,7 @@ package integration_test
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/inmemory"
"github.com/vulcanize/vulcanizedb/pkg/geth"
"github.com/vulcanize/vulcanizedb/pkg/history"
@ -28,8 +29,10 @@ var _ = Describe("Reading from the Geth blockchain", func() {
}, 30)
It("retrieves the genesis block and first block", func(done Done) {
genesisBlock := blockchain.GetBlockByNumber(int64(0))
firstBlock := blockchain.GetBlockByNumber(int64(1))
genesisBlock, err := blockchain.GetBlockByNumber(int64(0))
Expect(err).ToNot(HaveOccurred())
firstBlock, err := blockchain.GetBlockByNumber(int64(1))
Expect(err).ToNot(HaveOccurred())
lastBlockNumber := blockchain.LastBlock()
Expect(genesisBlock.Number).To(Equal(int64(0)))
@ -40,14 +43,27 @@ var _ = Describe("Reading from the Geth blockchain", func() {
It("retrieves the node info", func(done Done) {
node := blockchain.Node()
devNetworkNodeId := float64(1)
mainnetID := float64(1)
Expect(node.GenesisBlock).ToNot(BeNil())
Expect(node.NetworkID).To(Equal(devNetworkNodeId))
Expect(node.NetworkID).To(Equal(mainnetID))
Expect(len(node.ID)).ToNot(BeZero())
Expect(node.ClientName).ToNot(BeZero())
close(done)
}, 15)
//Benchmarking test: remove skip to test performance of block retrieval
XMeasure("retrieving n blocks", func(b Benchmarker) {
b.Time("runtime", func() {
var blocks []core.Block
n := 10
for i := 5327459; i > 5327459-n; i-- {
block, err := blockchain.GetBlockByNumber(int64(i))
Expect(err).ToNot(HaveOccurred())
blocks = append(blocks, block)
}
Expect(len(blocks)).To(Equal(n))
})
}, 10)
})

View File

@ -4,7 +4,7 @@ import "math/big"
type Blockchain interface {
ContractDataFetcher
GetBlockByNumber(blockNumber int64) Block
GetBlockByNumber(blockNumber int64) (Block, error)
GetLogs(contract Contract, startingBlockNumber *big.Int, endingBlockNumber *big.Int) ([]Log, error)
LastBlock() *big.Int
Node() Node

View File

@ -3,10 +3,17 @@ package postgres_test
import (
"testing"
"io/ioutil"
"log"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
func init() {
log.SetOutput(ioutil.Discard)
}
func TestPostgres(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Postgres Suite")

View File

@ -4,9 +4,6 @@ import (
"fmt"
"strings"
"io/ioutil"
"log"
"math/big"
"github.com/jmoiron/sqlx"
@ -20,10 +17,6 @@ import (
"github.com/vulcanize/vulcanizedb/test_config"
)
func init() {
log.SetOutput(ioutil.Discard)
}
var _ = Describe("Postgres DB", func() {
var sqlxdb *sqlx.DB

View File

@ -14,6 +14,7 @@ type Blockchain struct {
WasToldToStop bool
node core.Node
ContractReturnValue []byte
err error
}
func (blockchain *Blockchain) FetchContractData(abiJSON string, address string, method string, methodArg interface{}, result interface{}, blockNumber int64) error {
@ -42,12 +43,13 @@ func (blockchain *Blockchain) Node() core.Node {
return blockchain.node
}
func NewBlockchain() *Blockchain {
func NewBlockchain(err error) *Blockchain {
return &Blockchain{
blocks: make(map[int64]core.Block),
logs: make(map[string][]core.Log),
contractAttributes: make(map[string]map[string]string),
node: core.Node{GenesisBlock: "GENESIS", NetworkID: 1, ID: "x123", ClientName: "Geth"},
err: err,
}
}
@ -61,8 +63,11 @@ func NewBlockchainWithBlocks(blocks []core.Block) *Blockchain {
}
}
func (blockchain *Blockchain) GetBlockByNumber(blockNumber int64) core.Block {
return blockchain.blocks[blockNumber]
func (blockchain *Blockchain) GetBlockByNumber(blockNumber int64) (core.Block, error) {
if blockchain.err != nil {
return core.Block{}, blockchain.err
}
return blockchain.blocks[blockNumber], nil
}
func (blockchain *Blockchain) AddBlock(block core.Block) {

View File

@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/vulcanize/vulcanizedb/pkg/core"
"golang.org/x/net/context"
"golang.org/x/sync/errgroup"
)
type Client interface {
@ -17,8 +18,11 @@ type Client interface {
TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error)
}
func ToCoreBlock(gethBlock *types.Block, client Client) core.Block {
transactions := convertTransactionsToCore(gethBlock, client)
func ToCoreBlock(gethBlock *types.Block, client Client) (core.Block, error) {
transactions, err := convertTransactionsToCore(gethBlock, client)
if err != nil {
return core.Block{}, err
}
coreBlock := core.Block{
Difficulty: gethBlock.Difficulty().Int64(),
ExtraData: hexutil.Encode(gethBlock.Extra()),
@ -36,35 +40,48 @@ func ToCoreBlock(gethBlock *types.Block, client Client) core.Block {
}
coreBlock.Reward = CalcBlockReward(coreBlock, gethBlock.Uncles())
coreBlock.UnclesReward = CalcUnclesReward(coreBlock, gethBlock.Uncles())
return coreBlock
return coreBlock, nil
}
func convertTransactionsToCore(gethBlock *types.Block, client Client) []core.Transaction {
transactions := make([]core.Transaction, 0)
for i, gethTransaction := range gethBlock.Transactions() {
from, err := client.TransactionSender(context.Background(), gethTransaction, gethBlock.Hash(), uint(i))
if err != nil {
log.Println(err)
}
transaction := transToCoreTrans(gethTransaction, &from)
transaction, err = appendReceiptToTransaction(client, transaction)
if err != nil {
log.Println(err)
}
transactions = append(transactions, transaction)
func convertTransactionsToCore(gethBlock *types.Block, client Client) ([]core.Transaction, error) {
var g errgroup.Group
coreTransactions := make([]core.Transaction, len(gethBlock.Transactions()))
for gethTransactionIndex, gethTransaction := range gethBlock.Transactions() {
//https://golang.org/doc/faq#closures_and_goroutines
transaction := gethTransaction
transactionIndex := uint(gethTransactionIndex)
g.Go(func() error {
from, err := client.TransactionSender(context.Background(), transaction, gethBlock.Hash(), transactionIndex)
if err != nil {
log.Println("transaction sender: ", err)
return err
}
coreTransaction := transToCoreTrans(transaction, &from)
coreTransaction, err = appendReceiptToTransaction(client, coreTransaction)
if err != nil {
log.Println("receipt: ", err)
return err
}
coreTransactions[transactionIndex] = coreTransaction
return nil
})
}
return transactions
if err := g.Wait(); err != nil {
log.Println("transactions: ", err)
return coreTransactions, err
}
return coreTransactions, nil
}
func appendReceiptToTransaction(client Client, transaction core.Transaction) (core.Transaction, error) {
gethReceipt, err := client.TransactionReceipt(context.Background(), common.HexToHash(transaction.Hash))
if err != nil {
log.Println(err)
return transaction, err
}
receipt := ReceiptToCoreReceipt(gethReceipt)
transaction.Receipt = receipt
return transaction, err
return transaction, nil
}
func transToCoreTrans(transaction *types.Transaction, from *common.Address) core.Transaction {

View File

@ -5,6 +5,13 @@ import (
"context"
"fmt"
"io/ioutil"
"log"
"os"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
@ -15,11 +22,25 @@ import (
type FakeGethClient struct {
receipts map[string]*types.Receipt
err error
}
func NewFakeClient() *FakeGethClient {
type TransActionReceiptError struct{}
func (tarErr TransActionReceiptError) Error() string {
return fmt.Sprintf("transaction receipt error")
}
type TransactionSenderError struct{}
func (tasErr TransactionSenderError) Error() string {
return fmt.Sprintf("transaction sender error")
}
func NewFakeClient(err error) *FakeGethClient {
return &FakeGethClient{
receipts: make(map[string]*types.Receipt),
err: err,
}
}
@ -30,6 +51,9 @@ func (client *FakeGethClient) AddReceipts(receipts []*types.Receipt) {
}
func (client *FakeGethClient) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) {
if err, ok := client.err.(TransActionReceiptError); ok {
return &types.Receipt{}, err
}
if gasUsed, ok := client.receipts[txHash.Hex()]; ok {
return gasUsed, nil
}
@ -37,6 +61,9 @@ func (client *FakeGethClient) TransactionReceipt(ctx context.Context, txHash com
}
func (client *FakeGethClient) TransactionSender(ctx context.Context, tx *types.Transaction, block common.Hash, index uint) (common.Address, error) {
if err, ok := client.err.(TransactionSenderError); ok {
return common.Address{}, err
}
return common.HexToAddress("0x123"), nil
}
@ -66,8 +93,9 @@ var _ = Describe("Conversion of GethBlock to core.Block", func() {
}
block := types.NewBlock(&header, []*types.Transaction{}, []*types.Header{}, []*types.Receipt{})
client := &FakeGethClient{}
gethBlock := geth.ToCoreBlock(block, client)
gethBlock, err := geth.ToCoreBlock(block, client)
Expect(err).ToNot(HaveOccurred())
Expect(gethBlock.Difficulty).To(Equal(difficulty.Int64()))
Expect(gethBlock.GasLimit).To(Equal(gasLimit))
Expect(gethBlock.Miner).To(Equal(miner.Hex()))
@ -104,7 +132,7 @@ var _ = Describe("Conversion of GethBlock to core.Block", func() {
}
receipts := []*types.Receipt{&receipt}
client := NewFakeClient()
client := NewFakeClient(nil)
client.AddReceipts(receipts)
number := int64(1071819)
@ -113,8 +141,9 @@ var _ = Describe("Conversion of GethBlock to core.Block", func() {
}
uncles := []*types.Header{{Number: big.NewInt(1071817)}, {Number: big.NewInt(1071818)}}
block := types.NewBlock(&header, transactions, uncles, []*types.Receipt{&receipt})
coreBlock := geth.ToCoreBlock(block, client)
coreBlock, err := geth.ToCoreBlock(block, client)
Expect(err).ToNot(HaveOccurred())
Expect(geth.CalcBlockReward(coreBlock, block.Uncles())).To(Equal(5.31355))
})
@ -144,11 +173,12 @@ var _ = Describe("Conversion of GethBlock to core.Block", func() {
}
block := types.NewBlock(&header, transactions, uncles, receipts)
client := NewFakeClient()
client := NewFakeClient(nil)
client.AddReceipts(receipts)
coreBlock := geth.ToCoreBlock(block, client)
coreBlock, err := geth.ToCoreBlock(block, client)
Expect(err).ToNot(HaveOccurred())
Expect(geth.CalcUnclesReward(coreBlock, block.Uncles())).To(Equal(6.875))
})
@ -190,10 +220,11 @@ var _ = Describe("Conversion of GethBlock to core.Block", func() {
var uncles []*types.Header
block := types.NewBlock(&header, transactions, uncles, receipts)
client := NewFakeClient()
client := NewFakeClient(nil)
client.AddReceipts(receipts)
coreBlock := geth.ToCoreBlock(block, client)
coreBlock, err := geth.ToCoreBlock(block, client)
Expect(err).ToNot(HaveOccurred())
Expect(geth.CalcBlockReward(coreBlock, block.Uncles())).To(Equal(3.024990672))
})
})
@ -203,8 +234,9 @@ var _ = Describe("Conversion of GethBlock to core.Block", func() {
header := types.Header{}
block := types.NewBlock(&header, []*types.Transaction{}, []*types.Header{}, []*types.Receipt{})
client := &FakeGethClient{}
coreBlock := geth.ToCoreBlock(block, client)
coreBlock, err := geth.ToCoreBlock(block, client)
Expect(err).ToNot(HaveOccurred())
Expect(len(coreBlock.Transactions)).To(Equal(0))
})
@ -227,7 +259,7 @@ var _ = Describe("Conversion of GethBlock to core.Block", func() {
TxHash: gethTransaction.Hash(),
}
client := NewFakeClient()
client := NewFakeClient(nil)
client.AddReceipts([]*types.Receipt{gethReceipt})
header := types.Header{}
@ -237,8 +269,9 @@ var _ = Describe("Conversion of GethBlock to core.Block", func() {
[]*types.Header{},
[]*types.Receipt{gethReceipt},
)
coreBlock := geth.ToCoreBlock(gethBlock, client)
coreBlock, err := geth.ToCoreBlock(gethBlock, client)
Expect(err).ToNot(HaveOccurred())
Expect(len(coreBlock.Transactions)).To(Equal(1))
coreTransaction := coreBlock.Transactions[0]
Expect(coreTransaction.Data).To(Equal("0xf7d8c8830000000000000000000000000000000000000000000000000000000000037788000000000000000000000000000000000000000000000000000000000003bd14"))
@ -271,7 +304,7 @@ var _ = Describe("Conversion of GethBlock to core.Block", func() {
ContractAddress: common.HexToAddress("0x1023342345"),
}
client := NewFakeClient()
client := NewFakeClient(nil)
client.AddReceipts([]*types.Receipt{gethReceipt})
gethBlock := types.NewBlock(
@ -281,8 +314,9 @@ var _ = Describe("Conversion of GethBlock to core.Block", func() {
[]*types.Receipt{gethReceipt},
)
coreBlock := geth.ToCoreBlock(gethBlock, client)
coreBlock, err := geth.ToCoreBlock(gethBlock, client)
Expect(err).ToNot(HaveOccurred())
coreTransaction := coreBlock.Transactions[0]
Expect(coreTransaction.To).To(Equal(""))
@ -292,4 +326,50 @@ var _ = Describe("Conversion of GethBlock to core.Block", func() {
})
})
Describe("transaction error handling", func() {
var gethTransaction *types.Transaction
var gethReceipt *types.Receipt
var header *types.Header
var gethBlock *types.Block
BeforeEach(func() {
log.SetOutput(ioutil.Discard)
gethTransaction = types.NewTransaction(
uint64(0),
common.Address{},
big.NewInt(0),
uint64(0),
big.NewInt(0),
[]byte{},
)
gethReceipt = &types.Receipt{}
header = &types.Header{}
gethBlock = types.NewBlock(
header,
[]*types.Transaction{gethTransaction},
[]*types.Header{},
[]*types.Receipt{gethReceipt},
)
})
AfterEach(func() {
defer log.SetOutput(os.Stdout)
})
It("returns an error when transaction sender call fails", func() {
client := NewFakeClient(TransactionSenderError{})
client.AddReceipts([]*types.Receipt{})
_, err := geth.ToCoreBlock(gethBlock, client)
Expect(err).To(Equal(TransactionSenderError{}))
})
It("returns an error when transaction receipt call fails", func() {
client := NewFakeClient(TransActionReceiptError{})
client.AddReceipts([]*types.Receipt{})
_, err := geth.ToCoreBlock(gethBlock, client)
Expect(err).To(Equal(TransActionReceiptError{}))
})
})
})

View File

@ -58,9 +58,16 @@ func (blockchain *Blockchain) Node() core.Node {
return blockchain.node
}
func (blockchain *Blockchain) GetBlockByNumber(blockNumber int64) core.Block {
gethBlock, _ := blockchain.client.BlockByNumber(context.Background(), big.NewInt(blockNumber))
return ToCoreBlock(gethBlock, blockchain.client)
func (blockchain *Blockchain) GetBlockByNumber(blockNumber int64) (core.Block, error) {
gethBlock, err := blockchain.client.BlockByNumber(context.Background(), big.NewInt(blockNumber))
if err != nil {
return core.Block{}, err
}
block, err := ToCoreBlock(gethBlock, blockchain.client)
if err != nil {
return core.Block{}, err
}
return block, nil
}
func (blockchain *Blockchain) LastBlock() *big.Int {

View File

@ -9,6 +9,8 @@ import (
"strings"
"log"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/p2p"
"github.com/vulcanize/vulcanizedb/pkg/core"
@ -79,7 +81,10 @@ func MakeNode(wrapper ClientWrapper) core.Node {
func (client ClientWrapper) NetworkId() float64 {
var version string
client.CallContext(context.Background(), &version, "net_version")
err := client.CallContext(context.Background(), &version, "net_version")
if err != nil {
log.Println(err)
}
networkId, _ := strconv.ParseFloat(version, 64)
return networkId
}

View File

@ -4,9 +4,15 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"io/ioutil"
"log"
"testing"
)
func init() {
log.SetOutput(ioutil.Discard)
}
func TestHistory(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "History Suite")

View File

@ -18,7 +18,11 @@ func PopulateMissingBlocks(blockchain core.Blockchain, blockRepository datastore
func RetrieveAndUpdateBlocks(blockchain core.Blockchain, blockRepository datastore.BlockRepository, blockNumbers []int64) int {
for _, blockNumber := range blockNumbers {
block := blockchain.GetBlockByNumber(blockNumber)
block, err := blockchain.GetBlockByNumber(blockNumber)
if err != nil {
log.Printf("failed to retrieve block number: %d\n", blockNumber)
return 0
}
blockRepository.CreateOrUpdateBlock(block)
}
return len(blockNumbers)

View File

@ -1,6 +1,8 @@
package history_test
import (
"errors"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/core"
@ -100,4 +102,12 @@ var _ = Describe("Populating blocks", func() {
Expect(blockRepository.CreateOrUpdateBlockCallCount).To(Equal(3))
})
It("does not call repository create block when there is an error", func() {
blockchain := fakes.NewBlockchain(errors.New("error getting block"))
blocks := history.MakeRange(1, 10)
history.RetrieveAndUpdateBlocks(blockchain, blockRepository, blocks)
Expect(blockRepository.BlockCount()).To(Equal(0))
Expect(blockRepository.CreateOrUpdateBlockCallCount).To(Equal(0))
})
})

View File

@ -3,9 +3,6 @@ package history_test
import (
"bytes"
"io/ioutil"
"log"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/core"
@ -14,10 +11,6 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/history"
)
func init() {
log.SetOutput(ioutil.Discard)
}
var _ = Describe("Blocks validator", func() {
It("creates a ValidationWindow equal to (HEAD-windowSize, HEAD)", func() {

17
scripts/install-postgres-10.sh Executable file
View File

@ -0,0 +1,17 @@
#!/usr/bin/env bash
# Travis doesn't support postgres 10
# https://github.com/travis-ci/travis-ci/issues/8537
set -ex
echo "Installing Postgres 10"
sudo service postgresql stop
sudo apt-get remove -q 'postgresql-*'
sudo apt-get update -q
sudo apt-get install -q postgresql-10 postgresql-client-10
sudo cp /etc/postgresql/{9.6,10}/main/pg_hba.conf
echo "Restarting Postgres 10"
sudo service postgresql restart
sudo psql -c 'CREATE ROLE travis SUPERUSER LOGIN CREATEDB;' -U postgres

3
vendor/golang.org/x/sync/AUTHORS generated vendored Normal file
View File

@ -0,0 +1,3 @@
# This source code refers to The Go Authors for copyright purposes.
# The master list of authors is in the main Go distribution,
# visible at http://tip.golang.org/AUTHORS.

26
vendor/golang.org/x/sync/CONTRIBUTING.md generated vendored Normal file
View File

@ -0,0 +1,26 @@
# Contributing to Go
Go is an open source project.
It is the work of hundreds of contributors. We appreciate your help!
## Filing issues
When [filing an issue](https://golang.org/issue/new), make sure to answer these five questions:
1. What version of Go are you using (`go version`)?
2. What operating system and processor architecture are you using?
3. What did you do?
4. What did you expect to see?
5. What did you see instead?
General questions should go to the [golang-nuts mailing list](https://groups.google.com/group/golang-nuts) instead of the issue tracker.
The gophers there will answer or ask you to file an issue if you've tripped over a bug.
## Contributing code
Please read the [Contribution Guidelines](https://golang.org/doc/contribute.html)
before sending patches.
Unless otherwise noted, the Go source files are distributed under
the BSD-style license found in the LICENSE file.

3
vendor/golang.org/x/sync/CONTRIBUTORS generated vendored Normal file
View File

@ -0,0 +1,3 @@
# This source code was written by the Go contributors.
# The master list of contributors is in the main Go distribution,
# visible at http://tip.golang.org/CONTRIBUTORS.

27
vendor/golang.org/x/sync/LICENSE generated vendored Normal file
View File

@ -0,0 +1,27 @@
Copyright (c) 2009 The Go Authors. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

22
vendor/golang.org/x/sync/PATENTS generated vendored Normal file
View File

@ -0,0 +1,22 @@
Additional IP Rights Grant (Patents)
"This implementation" means the copyrightable works distributed by
Google as part of the Go project.
Google hereby grants to You a perpetual, worldwide, non-exclusive,
no-charge, royalty-free, irrevocable (except as stated in this section)
patent license to make, have made, use, offer to sell, sell, import,
transfer and otherwise run, modify and propagate the contents of this
implementation of Go, where such license applies only to those patent
claims, both currently owned or controlled by Google and acquired in
the future, licensable by Google that are necessarily infringed by this
implementation of Go. This grant does not include claims that would be
infringed only as a consequence of further modification of this
implementation. If you or your agent or exclusive licensee institute or
order or agree to the institution of patent litigation against any
entity (including a cross-claim or counterclaim in a lawsuit) alleging
that this implementation of Go or any code incorporated within this
implementation of Go constitutes direct or contributory patent
infringement, or inducement of patent infringement, then any patent
rights granted to you under this License for this implementation of Go
shall terminate as of the date such litigation is filed.

18
vendor/golang.org/x/sync/README.md generated vendored Normal file
View File

@ -0,0 +1,18 @@
# Go Sync
This repository provides Go concurrency primitives in addition to the
ones provided by the language and "sync" and "sync/atomic" packages.
## Download/Install
The easiest way to install is to run `go get -u golang.org/x/sync`. You can
also manually git clone the repository to `$GOPATH/src/golang.org/x/sync`.
## Report Issues / Send Patches
This repository uses Gerrit for code changes. To learn how to submit changes to
this repository, see https://golang.org/doc/contribute.html.
The main issue tracker for the sync repository is located at
https://github.com/golang/go/issues. Prefix your issue with "x/sync:" in the
subject line, so it is easy to find.

1
vendor/golang.org/x/sync/codereview.cfg generated vendored Normal file
View File

@ -0,0 +1 @@
issuerepo: golang/go

67
vendor/golang.org/x/sync/errgroup/errgroup.go generated vendored Normal file
View File

@ -0,0 +1,67 @@
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package errgroup provides synchronization, error propagation, and Context
// cancelation for groups of goroutines working on subtasks of a common task.
package errgroup
import (
"sync"
"golang.org/x/net/context"
)
// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid and does not cancel on error.
type Group struct {
cancel func()
wg sync.WaitGroup
errOnce sync.Once
err error
}
// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancel(ctx)
return &Group{cancel: cancel}, ctx
}
// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
return g.err
}
// Go calls the given function in a new goroutine.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
func (g *Group) Go(f func() error) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
}()
}

View File

@ -0,0 +1,101 @@
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package errgroup_test
import (
"crypto/md5"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"golang.org/x/net/context"
"golang.org/x/sync/errgroup"
)
// Pipeline demonstrates the use of a Group to implement a multi-stage
// pipeline: a version of the MD5All function with bounded parallelism from
// https://blog.golang.org/pipelines.
func ExampleGroup_pipeline() {
m, err := MD5All(context.Background(), ".")
if err != nil {
log.Fatal(err)
}
for k, sum := range m {
fmt.Printf("%s:\t%x\n", k, sum)
}
}
type result struct {
path string
sum [md5.Size]byte
}
// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
// ctx is canceled when g.Wait() returns. When this version of MD5All returns
// - even in case of error! - we know that all of the goroutines have finished
// and the memory they were using can be garbage-collected.
g, ctx := errgroup.WithContext(ctx)
paths := make(chan string)
g.Go(func() error {
defer close(paths)
return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
select {
case paths <- path:
case <-ctx.Done():
return ctx.Err()
}
return nil
})
})
// Start a fixed number of goroutines to read and digest files.
c := make(chan result)
const numDigesters = 20
for i := 0; i < numDigesters; i++ {
g.Go(func() error {
for path := range paths {
data, err := ioutil.ReadFile(path)
if err != nil {
return err
}
select {
case c <- result{path, md5.Sum(data)}:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
}
go func() {
g.Wait()
close(c)
}()
m := make(map[string][md5.Size]byte)
for r := range c {
m[r.path] = r.sum
}
// Check whether any of the goroutines failed. Since g is accumulating the
// errors, we don't need to send them (or check for them) in the individual
// results sent on the channel.
if err := g.Wait(); err != nil {
return nil, err
}
return m, nil
}

176
vendor/golang.org/x/sync/errgroup/errgroup_test.go generated vendored Normal file
View File

@ -0,0 +1,176 @@
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package errgroup_test
import (
"errors"
"fmt"
"net/http"
"os"
"testing"
"golang.org/x/net/context"
"golang.org/x/sync/errgroup"
)
var (
Web = fakeSearch("web")
Image = fakeSearch("image")
Video = fakeSearch("video")
)
type Result string
type Search func(ctx context.Context, query string) (Result, error)
func fakeSearch(kind string) Search {
return func(_ context.Context, query string) (Result, error) {
return Result(fmt.Sprintf("%s result for %q", kind, query)), nil
}
}
// JustErrors illustrates the use of a Group in place of a sync.WaitGroup to
// simplify goroutine counting and error handling. This example is derived from
// the sync.WaitGroup example at https://golang.org/pkg/sync/#example_WaitGroup.
func ExampleGroup_justErrors() {
var g errgroup.Group
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
for _, url := range urls {
// Launch a goroutine to fetch the URL.
url := url // https://golang.org/doc/faq#closures_and_goroutines
g.Go(func() error {
// Fetch the URL.
resp, err := http.Get(url)
if err == nil {
resp.Body.Close()
}
return err
})
}
// Wait for all HTTP fetches to complete.
if err := g.Wait(); err == nil {
fmt.Println("Successfully fetched all URLs.")
}
}
// Parallel illustrates the use of a Group for synchronizing a simple parallel
// task: the "Google Search 2.0" function from
// https://talks.golang.org/2012/concurrency.slide#46, augmented with a Context
// and error-handling.
func ExampleGroup_parallel() {
Google := func(ctx context.Context, query string) ([]Result, error) {
g, ctx := errgroup.WithContext(ctx)
searches := []Search{Web, Image, Video}
results := make([]Result, len(searches))
for i, search := range searches {
i, search := i, search // https://golang.org/doc/faq#closures_and_goroutines
g.Go(func() error {
result, err := search(ctx, query)
if err == nil {
results[i] = result
}
return err
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return results, nil
}
results, err := Google(context.Background(), "golang")
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
for _, result := range results {
fmt.Println(result)
}
// Output:
// web result for "golang"
// image result for "golang"
// video result for "golang"
}
func TestZeroGroup(t *testing.T) {
err1 := errors.New("errgroup_test: 1")
err2 := errors.New("errgroup_test: 2")
cases := []struct {
errs []error
}{
{errs: []error{}},
{errs: []error{nil}},
{errs: []error{err1}},
{errs: []error{err1, nil}},
{errs: []error{err1, nil, err2}},
}
for _, tc := range cases {
var g errgroup.Group
var firstErr error
for i, err := range tc.errs {
err := err
g.Go(func() error { return err })
if firstErr == nil && err != nil {
firstErr = err
}
if gErr := g.Wait(); gErr != firstErr {
t.Errorf("after %T.Go(func() error { return err }) for err in %v\n"+
"g.Wait() = %v; want %v",
g, tc.errs[:i+1], err, firstErr)
}
}
}
}
func TestWithContext(t *testing.T) {
errDoom := errors.New("group_test: doomed")
cases := []struct {
errs []error
want error
}{
{want: nil},
{errs: []error{nil}, want: nil},
{errs: []error{errDoom}, want: errDoom},
{errs: []error{errDoom, nil}, want: errDoom},
}
for _, tc := range cases {
g, ctx := errgroup.WithContext(context.Background())
for _, err := range tc.errs {
err := err
g.Go(func() error { return err })
}
if err := g.Wait(); err != tc.want {
t.Errorf("after %T.Go(func() error { return err }) for err in %v\n"+
"g.Wait() = %v; want %v",
g, tc.errs, err, tc.want)
}
canceled := false
select {
case <-ctx.Done():
canceled = true
default:
}
if !canceled {
t.Errorf("after %T.Go(func() error { return err }) for err in %v\n"+
"ctx.Done() was not closed",
g, tc.errs)
}
}
}

131
vendor/golang.org/x/sync/semaphore/semaphore.go generated vendored Normal file
View File

@ -0,0 +1,131 @@
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package semaphore provides a weighted semaphore implementation.
package semaphore // import "golang.org/x/sync/semaphore"
import (
"container/list"
"sync"
// Use the old context because packages that depend on this one
// (e.g. cloud.google.com/go/...) must run on Go 1.6.
// TODO(jba): update to "context" when possible.
"golang.org/x/net/context"
)
type waiter struct {
n int64
ready chan<- struct{} // Closed when semaphore acquired.
}
// NewWeighted creates a new weighted semaphore with the given
// maximum combined weight for concurrent access.
func NewWeighted(n int64) *Weighted {
w := &Weighted{size: n}
return w
}
// Weighted provides a way to bound concurrent access to a resource.
// The callers can request access with a given weight.
type Weighted struct {
size int64
cur int64
mu sync.Mutex
waiters list.List
}
// Acquire acquires the semaphore with a weight of n, blocking only until ctx
// is done. On success, returns nil. On failure, returns ctx.Err() and leaves
// the semaphore unchanged.
//
// If ctx is already done, Acquire may still succeed without blocking.
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
s.mu.Lock()
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n
s.mu.Unlock()
return nil
}
if n > s.size {
// Don't make other Acquire calls block on one that's doomed to fail.
s.mu.Unlock()
<-ctx.Done()
return ctx.Err()
}
ready := make(chan struct{})
w := waiter{n: n, ready: ready}
elem := s.waiters.PushBack(w)
s.mu.Unlock()
select {
case <-ctx.Done():
err := ctx.Err()
s.mu.Lock()
select {
case <-ready:
// Acquired the semaphore after we were canceled. Rather than trying to
// fix up the queue, just pretend we didn't notice the cancelation.
err = nil
default:
s.waiters.Remove(elem)
}
s.mu.Unlock()
return err
case <-ready:
return nil
}
}
// TryAcquire acquires the semaphore with a weight of n without blocking.
// On success, returns true. On failure, returns false and leaves the semaphore unchanged.
func (s *Weighted) TryAcquire(n int64) bool {
s.mu.Lock()
success := s.size-s.cur >= n && s.waiters.Len() == 0
if success {
s.cur += n
}
s.mu.Unlock()
return success
}
// Release releases the semaphore with a weight of n.
func (s *Weighted) Release(n int64) {
s.mu.Lock()
s.cur -= n
if s.cur < 0 {
s.mu.Unlock()
panic("semaphore: bad release")
}
for {
next := s.waiters.Front()
if next == nil {
break // No more waiters blocked.
}
w := next.Value.(waiter)
if s.size-s.cur < w.n {
// Not enough tokens for the next waiter. We could keep going (to try to
// find a waiter with a smaller request), but under load that could cause
// starvation for large requests; instead, we leave all remaining waiters
// blocked.
//
// Consider a semaphore used as a read-write lock, with N tokens, N
// readers, and one writer. Each reader can Acquire(1) to obtain a read
// lock. The writer can Acquire(N) to obtain a write lock, excluding all
// of the readers. If we allow the readers to jump ahead in the queue,
// the writer will starve — there is always one token available for every
// reader.
break
}
s.cur += w.n
s.waiters.Remove(next)
close(w.ready)
}
s.mu.Unlock()
}

View File

@ -0,0 +1,131 @@
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build go1.7
package semaphore_test
import (
"fmt"
"testing"
"golang.org/x/net/context"
"golang.org/x/sync/semaphore"
)
// weighted is an interface matching a subset of *Weighted. It allows
// alternate implementations for testing and benchmarking.
type weighted interface {
Acquire(context.Context, int64) error
TryAcquire(int64) bool
Release(int64)
}
// semChan implements Weighted using a channel for
// comparing against the condition variable-based implementation.
type semChan chan struct{}
func newSemChan(n int64) semChan {
return semChan(make(chan struct{}, n))
}
func (s semChan) Acquire(_ context.Context, n int64) error {
for i := int64(0); i < n; i++ {
s <- struct{}{}
}
return nil
}
func (s semChan) TryAcquire(n int64) bool {
if int64(len(s))+n > int64(cap(s)) {
return false
}
for i := int64(0); i < n; i++ {
s <- struct{}{}
}
return true
}
func (s semChan) Release(n int64) {
for i := int64(0); i < n; i++ {
<-s
}
}
// acquireN calls Acquire(size) on sem N times and then calls Release(size) N times.
func acquireN(b *testing.B, sem weighted, size int64, N int) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < N; j++ {
sem.Acquire(context.Background(), size)
}
for j := 0; j < N; j++ {
sem.Release(size)
}
}
}
// tryAcquireN calls TryAcquire(size) on sem N times and then calls Release(size) N times.
func tryAcquireN(b *testing.B, sem weighted, size int64, N int) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < N; j++ {
if !sem.TryAcquire(size) {
b.Fatalf("TryAcquire(%v) = false, want true", size)
}
}
for j := 0; j < N; j++ {
sem.Release(size)
}
}
}
func BenchmarkNewSeq(b *testing.B) {
for _, cap := range []int64{1, 128} {
b.Run(fmt.Sprintf("Weighted-%d", cap), func(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = semaphore.NewWeighted(cap)
}
})
b.Run(fmt.Sprintf("semChan-%d", cap), func(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = newSemChan(cap)
}
})
}
}
func BenchmarkAcquireSeq(b *testing.B) {
for _, c := range []struct {
cap, size int64
N int
}{
{1, 1, 1},
{2, 1, 1},
{16, 1, 1},
{128, 1, 1},
{2, 2, 1},
{16, 2, 8},
{128, 2, 64},
{2, 1, 2},
{16, 8, 2},
{128, 64, 2},
} {
for _, w := range []struct {
name string
w weighted
}{
{"Weighted", semaphore.NewWeighted(c.cap)},
{"semChan", newSemChan(c.cap)},
} {
b.Run(fmt.Sprintf("%s-acquire-%d-%d-%d", w.name, c.cap, c.size, c.N), func(b *testing.B) {
acquireN(b, w.w, c.size, c.N)
})
b.Run(fmt.Sprintf("%s-tryAcquire-%d-%d-%d", w.name, c.cap, c.size, c.N), func(b *testing.B) {
tryAcquireN(b, w.w, c.size, c.N)
})
}
}
}

View File

@ -0,0 +1,84 @@
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package semaphore_test
import (
"context"
"fmt"
"log"
"runtime"
"golang.org/x/sync/semaphore"
)
// Example_workerPool demonstrates how to use a semaphore to limit the number of
// goroutines working on parallel tasks.
//
// This use of a semaphore mimics a typical “worker pool” pattern, but without
// the need to explicitly shut down idle workers when the work is done.
func Example_workerPool() {
ctx := context.TODO()
var (
maxWorkers = runtime.GOMAXPROCS(0)
sem = semaphore.NewWeighted(int64(maxWorkers))
out = make([]int, 32)
)
// Compute the output using up to maxWorkers goroutines at a time.
for i := range out {
// When maxWorkers goroutines are in flight, Acquire blocks until one of the
// workers finishes.
if err := sem.Acquire(ctx, 1); err != nil {
log.Printf("Failed to acquire semaphore: %v", err)
break
}
go func(i int) {
defer sem.Release(1)
out[i] = collatzSteps(i + 1)
}(i)
}
// Acquire all of the tokens to wait for any remaining workers to finish.
//
// If you are already waiting for the workers by some other means (such as an
// errgroup.Group), you can omit this final Acquire call.
if err := sem.Acquire(ctx, int64(maxWorkers)); err != nil {
log.Printf("Failed to acquire semaphore: %v", err)
}
fmt.Println(out)
// Output:
// [0 1 7 2 5 8 16 3 19 6 14 9 9 17 17 4 12 20 20 7 7 15 15 10 23 10 111 18 18 18 106 5]
}
// collatzSteps computes the number of steps to reach 1 under the Collatz
// conjecture. (See https://en.wikipedia.org/wiki/Collatz_conjecture.)
func collatzSteps(n int) (steps int) {
if n <= 0 {
panic("nonpositive input")
}
for ; n > 1; steps++ {
if steps < 0 {
panic("too many steps")
}
if n%2 == 0 {
n /= 2
continue
}
const maxInt = int(^uint(0) >> 1)
if n > (maxInt-1)/3 {
panic("overflow")
}
n = 3*n + 1
}
return steps
}

171
vendor/golang.org/x/sync/semaphore/semaphore_test.go generated vendored Normal file
View File

@ -0,0 +1,171 @@
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package semaphore_test
import (
"math/rand"
"runtime"
"sync"
"testing"
"time"
"golang.org/x/net/context"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
const maxSleep = 1 * time.Millisecond
func HammerWeighted(sem *semaphore.Weighted, n int64, loops int) {
for i := 0; i < loops; i++ {
sem.Acquire(context.Background(), n)
time.Sleep(time.Duration(rand.Int63n(int64(maxSleep/time.Nanosecond))) * time.Nanosecond)
sem.Release(n)
}
}
func TestWeighted(t *testing.T) {
t.Parallel()
n := runtime.GOMAXPROCS(0)
loops := 10000 / n
sem := semaphore.NewWeighted(int64(n))
var wg sync.WaitGroup
wg.Add(n)
for i := 0; i < n; i++ {
i := i
go func() {
defer wg.Done()
HammerWeighted(sem, int64(i), loops)
}()
}
wg.Wait()
}
func TestWeightedPanic(t *testing.T) {
t.Parallel()
defer func() {
if recover() == nil {
t.Fatal("release of an unacquired weighted semaphore did not panic")
}
}()
w := semaphore.NewWeighted(1)
w.Release(1)
}
func TestWeightedTryAcquire(t *testing.T) {
t.Parallel()
ctx := context.Background()
sem := semaphore.NewWeighted(2)
tries := []bool{}
sem.Acquire(ctx, 1)
tries = append(tries, sem.TryAcquire(1))
tries = append(tries, sem.TryAcquire(1))
sem.Release(2)
tries = append(tries, sem.TryAcquire(1))
sem.Acquire(ctx, 1)
tries = append(tries, sem.TryAcquire(1))
want := []bool{true, false, true, false}
for i := range tries {
if tries[i] != want[i] {
t.Errorf("tries[%d]: got %t, want %t", i, tries[i], want[i])
}
}
}
func TestWeightedAcquire(t *testing.T) {
t.Parallel()
ctx := context.Background()
sem := semaphore.NewWeighted(2)
tryAcquire := func(n int64) bool {
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
return sem.Acquire(ctx, n) == nil
}
tries := []bool{}
sem.Acquire(ctx, 1)
tries = append(tries, tryAcquire(1))
tries = append(tries, tryAcquire(1))
sem.Release(2)
tries = append(tries, tryAcquire(1))
sem.Acquire(ctx, 1)
tries = append(tries, tryAcquire(1))
want := []bool{true, false, true, false}
for i := range tries {
if tries[i] != want[i] {
t.Errorf("tries[%d]: got %t, want %t", i, tries[i], want[i])
}
}
}
func TestWeightedDoesntBlockIfTooBig(t *testing.T) {
t.Parallel()
const n = 2
sem := semaphore.NewWeighted(n)
{
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go sem.Acquire(ctx, n+1)
}
g, ctx := errgroup.WithContext(context.Background())
for i := n * 3; i > 0; i-- {
g.Go(func() error {
err := sem.Acquire(ctx, 1)
if err == nil {
time.Sleep(1 * time.Millisecond)
sem.Release(1)
}
return err
})
}
if err := g.Wait(); err != nil {
t.Errorf("semaphore.NewWeighted(%v) failed to AcquireCtx(_, 1) with AcquireCtx(_, %v) pending", n, n+1)
}
}
// TestLargeAcquireDoesntStarve times out if a large call to Acquire starves.
// Merely returning from the test function indicates success.
func TestLargeAcquireDoesntStarve(t *testing.T) {
t.Parallel()
ctx := context.Background()
n := int64(runtime.GOMAXPROCS(0))
sem := semaphore.NewWeighted(n)
running := true
var wg sync.WaitGroup
wg.Add(int(n))
for i := n; i > 0; i-- {
sem.Acquire(ctx, 1)
go func() {
defer func() {
sem.Release(1)
wg.Done()
}()
for running {
time.Sleep(1 * time.Millisecond)
sem.Release(1)
sem.Acquire(ctx, 1)
}
}()
}
sem.Acquire(ctx, n)
running = false
sem.Release(n)
wg.Wait()
}

111
vendor/golang.org/x/sync/singleflight/singleflight.go generated vendored Normal file
View File

@ -0,0 +1,111 @@
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package singleflight provides a duplicate function call suppression
// mechanism.
package singleflight // import "golang.org/x/sync/singleflight"
import "sync"
// call is an in-flight or completed singleflight.Do call
type call struct {
wg sync.WaitGroup
// These fields are written once before the WaitGroup is done
// and are only read after the WaitGroup is done.
val interface{}
err error
// These fields are read and written with the singleflight
// mutex held before the WaitGroup is done, and are read but
// not written after the WaitGroup is done.
dups int
chans []chan<- Result
}
// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
// Result holds the results of Do, so they can be passed
// on a channel.
type Result struct {
Val interface{}
Err error
Shared bool
}
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err, true
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
// DoChan is like Do but returns a channel that will receive the
// results when they are ready.
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
go g.doCall(c, key, fn)
return ch
}
// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
c.val, c.err = fn()
c.wg.Done()
g.mu.Lock()
delete(g.m, key)
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
g.mu.Unlock()
}
// Forget tells the singleflight to forget about a key. Future calls
// to Do for this key will call the function rather than waiting for
// an earlier call to complete.
func (g *Group) Forget(key string) {
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
}

View File

@ -0,0 +1,87 @@
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package singleflight
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
)
func TestDo(t *testing.T) {
var g Group
v, err, _ := g.Do("key", func() (interface{}, error) {
return "bar", nil
})
if got, want := fmt.Sprintf("%v (%T)", v, v), "bar (string)"; got != want {
t.Errorf("Do = %v; want %v", got, want)
}
if err != nil {
t.Errorf("Do error = %v", err)
}
}
func TestDoErr(t *testing.T) {
var g Group
someErr := errors.New("Some error")
v, err, _ := g.Do("key", func() (interface{}, error) {
return nil, someErr
})
if err != someErr {
t.Errorf("Do error = %v; want someErr %v", err, someErr)
}
if v != nil {
t.Errorf("unexpected non-nil value %#v", v)
}
}
func TestDoDupSuppress(t *testing.T) {
var g Group
var wg1, wg2 sync.WaitGroup
c := make(chan string, 1)
var calls int32
fn := func() (interface{}, error) {
if atomic.AddInt32(&calls, 1) == 1 {
// First invocation.
wg1.Done()
}
v := <-c
c <- v // pump; make available for any future calls
time.Sleep(10 * time.Millisecond) // let more goroutines enter Do
return v, nil
}
const n = 10
wg1.Add(1)
for i := 0; i < n; i++ {
wg1.Add(1)
wg2.Add(1)
go func() {
defer wg2.Done()
wg1.Done()
v, err, _ := g.Do("key", fn)
if err != nil {
t.Errorf("Do error: %v", err)
return
}
if s, _ := v.(string); s != "bar" {
t.Errorf("Do = %T %v; want %q", v, v, "bar")
}
}()
}
wg1.Wait()
// At least one goroutine is in fn now and all of them have at
// least reached the line before the Do.
c <- "bar"
wg2.Wait()
if got := atomic.LoadInt32(&calls); got <= 0 || got >= n {
t.Errorf("number of calls = %d; want over 0 and less than %d", got, n)
}
}

372
vendor/golang.org/x/sync/syncmap/map.go generated vendored Normal file
View File

@ -0,0 +1,372 @@
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package syncmap provides a concurrent map implementation.
// It is a prototype for a proposed addition to the sync package
// in the standard library.
// (https://golang.org/issue/18177)
package syncmap
import (
"sync"
"sync/atomic"
"unsafe"
)
// Map is a concurrent map with amortized-constant-time loads, stores, and deletes.
// It is safe for multiple goroutines to call a Map's methods concurrently.
//
// The zero Map is valid and empty.
//
// A Map must not be copied after first use.
type Map struct {
mu sync.Mutex
// read contains the portion of the map's contents that are safe for
// concurrent access (with or without mu held).
//
// The read field itself is always safe to load, but must only be stored with
// mu held.
//
// Entries stored in read may be updated concurrently without mu, but updating
// a previously-expunged entry requires that the entry be copied to the dirty
// map and unexpunged with mu held.
read atomic.Value // readOnly
// dirty contains the portion of the map's contents that require mu to be
// held. To ensure that the dirty map can be promoted to the read map quickly,
// it also includes all of the non-expunged entries in the read map.
//
// Expunged entries are not stored in the dirty map. An expunged entry in the
// clean map must be unexpunged and added to the dirty map before a new value
// can be stored to it.
//
// If the dirty map is nil, the next write to the map will initialize it by
// making a shallow copy of the clean map, omitting stale entries.
dirty map[interface{}]*entry
// misses counts the number of loads since the read map was last updated that
// needed to lock mu to determine whether the key was present.
//
// Once enough misses have occurred to cover the cost of copying the dirty
// map, the dirty map will be promoted to the read map (in the unamended
// state) and the next store to the map will make a new dirty copy.
misses int
}
// readOnly is an immutable struct stored atomically in the Map.read field.
type readOnly struct {
m map[interface{}]*entry
amended bool // true if the dirty map contains some key not in m.
}
// expunged is an arbitrary pointer that marks entries which have been deleted
// from the dirty map.
var expunged = unsafe.Pointer(new(interface{}))
// An entry is a slot in the map corresponding to a particular key.
type entry struct {
// p points to the interface{} value stored for the entry.
//
// If p == nil, the entry has been deleted and m.dirty == nil.
//
// If p == expunged, the entry has been deleted, m.dirty != nil, and the entry
// is missing from m.dirty.
//
// Otherwise, the entry is valid and recorded in m.read.m[key] and, if m.dirty
// != nil, in m.dirty[key].
//
// An entry can be deleted by atomic replacement with nil: when m.dirty is
// next created, it will atomically replace nil with expunged and leave
// m.dirty[key] unset.
//
// An entry's associated value can be updated by atomic replacement, provided
// p != expunged. If p == expunged, an entry's associated value can be updated
// only after first setting m.dirty[key] = e so that lookups using the dirty
// map find the entry.
p unsafe.Pointer // *interface{}
}
func newEntry(i interface{}) *entry {
return &entry{p: unsafe.Pointer(&i)}
}
// Load returns the value stored in the map for a key, or nil if no
// value is present.
// The ok result indicates whether value was found in the map.
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
// Avoid reporting a spurious miss if m.dirty got promoted while we were
// blocked on m.mu. (If further loads of the same key will not miss, it's
// not worth copying the dirty map for this key.)
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
// Regardless of whether the entry was present, record a miss: this key
// will take the slow path until the dirty map is promoted to the read
// map.
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}
func (e *entry) load() (value interface{}, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
return *(*interface{})(p), true
}
// Store sets the value for a key.
func (m *Map) Store(key, value interface{}) {
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
// The entry was previously expunged, which implies that there is a
// non-nil dirty map and this entry is not in it.
m.dirty[key] = e
}
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
e.storeLocked(&value)
} else {
if !read.amended {
// We're adding the first new key to the dirty map.
// Make sure it is allocated and mark the read-only map as incomplete.
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
// tryStore stores a value if the entry has not been expunged.
//
// If the entry is expunged, tryStore returns false and leaves the entry
// unchanged.
func (e *entry) tryStore(i *interface{}) bool {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
for {
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
p = atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
}
}
// unexpungeLocked ensures that the entry is not marked as expunged.
//
// If the entry was previously expunged, it must be added to the dirty map
// before m.mu is unlocked.
func (e *entry) unexpungeLocked() (wasExpunged bool) {
return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}
// storeLocked unconditionally stores a value to the entry.
//
// The entry must be known not to be expunged.
func (e *entry) storeLocked(i *interface{}) {
atomic.StorePointer(&e.p, unsafe.Pointer(i))
}
// LoadOrStore returns the existing value for the key if present.
// Otherwise, it stores and returns the given value.
// The loaded result is true if the value was loaded, false if stored.
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {
// Avoid locking if it's a clean hit.
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
actual, loaded, ok := e.tryLoadOrStore(value)
if ok {
return actual, loaded
}
}
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
m.dirty[key] = e
}
actual, loaded, _ = e.tryLoadOrStore(value)
} else if e, ok := m.dirty[key]; ok {
actual, loaded, _ = e.tryLoadOrStore(value)
m.missLocked()
} else {
if !read.amended {
// We're adding the first new key to the dirty map.
// Make sure it is allocated and mark the read-only map as incomplete.
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
actual, loaded = value, false
}
m.mu.Unlock()
return actual, loaded
}
// tryLoadOrStore atomically loads or stores a value if the entry is not
// expunged.
//
// If the entry is expunged, tryLoadOrStore leaves the entry unchanged and
// returns with ok==false.
func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*interface{})(p), true, true
}
// Copy the interface after the first load to make this method more amenable
// to escape analysis: if we hit the "load" path or the entry is expunged, we
// shouldn't bother heap-allocating.
ic := i
for {
if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
return i, false, true
}
p = atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*interface{})(p), true, true
}
}
}
// Delete deletes the value for a key.
func (m *Map) Delete(key interface{}) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
delete(m.dirty, key)
}
m.mu.Unlock()
}
if ok {
e.delete()
}
}
func (e *entry) delete() (hadValue bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return true
}
}
}
// Range calls f sequentially for each key and value present in the map.
// If f returns false, range stops the iteration.
//
// Range does not necessarily correspond to any consistent snapshot of the Map's
// contents: no key will be visited more than once, but if the value for any key
// is stored or deleted concurrently, Range may reflect any mapping for that key
// from any point during the Range call.
//
// Range may be O(N) with the number of elements in the map even if f returns
// false after a constant number of calls.
func (m *Map) Range(f func(key, value interface{}) bool) {
// We need to be able to iterate over all of the keys that were already
// present at the start of the call to Range.
// If read.amended is false, then read.m satisfies that property without
// requiring us to hold m.mu for a long time.
read, _ := m.read.Load().(readOnly)
if read.amended {
// m.dirty contains keys not in read.m. Fortunately, Range is already O(N)
// (assuming the caller does not break out early), so a call to Range
// amortizes an entire copy of the map: we can promote the dirty copy
// immediately!
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if read.amended {
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}
for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read, _ := m.read.Load().(readOnly)
m.dirty = make(map[interface{}]*entry, len(read.m))
for k, e := range read.m {
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
func (e *entry) tryExpungeLocked() (isExpunged bool) {
p := atomic.LoadPointer(&e.p)
for p == nil {
if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
return true
}
p = atomic.LoadPointer(&e.p)
}
return p == expunged
}

216
vendor/golang.org/x/sync/syncmap/map_bench_test.go generated vendored Normal file
View File

@ -0,0 +1,216 @@
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package syncmap_test
import (
"fmt"
"reflect"
"sync/atomic"
"testing"
"golang.org/x/sync/syncmap"
)
type bench struct {
setup func(*testing.B, mapInterface)
perG func(b *testing.B, pb *testing.PB, i int, m mapInterface)
}
func benchMap(b *testing.B, bench bench) {
for _, m := range [...]mapInterface{&DeepCopyMap{}, &RWMutexMap{}, &syncmap.Map{}} {
b.Run(fmt.Sprintf("%T", m), func(b *testing.B) {
m = reflect.New(reflect.TypeOf(m).Elem()).Interface().(mapInterface)
if bench.setup != nil {
bench.setup(b, m)
}
b.ResetTimer()
var i int64
b.RunParallel(func(pb *testing.PB) {
id := int(atomic.AddInt64(&i, 1) - 1)
bench.perG(b, pb, id*b.N, m)
})
})
}
}
func BenchmarkLoadMostlyHits(b *testing.B) {
const hits, misses = 1023, 1
benchMap(b, bench{
setup: func(_ *testing.B, m mapInterface) {
for i := 0; i < hits; i++ {
m.LoadOrStore(i, i)
}
// Prime the map to get it into a steady state.
for i := 0; i < hits*2; i++ {
m.Load(i % hits)
}
},
perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) {
for ; pb.Next(); i++ {
m.Load(i % (hits + misses))
}
},
})
}
func BenchmarkLoadMostlyMisses(b *testing.B) {
const hits, misses = 1, 1023
benchMap(b, bench{
setup: func(_ *testing.B, m mapInterface) {
for i := 0; i < hits; i++ {
m.LoadOrStore(i, i)
}
// Prime the map to get it into a steady state.
for i := 0; i < hits*2; i++ {
m.Load(i % hits)
}
},
perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) {
for ; pb.Next(); i++ {
m.Load(i % (hits + misses))
}
},
})
}
func BenchmarkLoadOrStoreBalanced(b *testing.B) {
const hits, misses = 128, 128
benchMap(b, bench{
setup: func(b *testing.B, m mapInterface) {
if _, ok := m.(*DeepCopyMap); ok {
b.Skip("DeepCopyMap has quadratic running time.")
}
for i := 0; i < hits; i++ {
m.LoadOrStore(i, i)
}
// Prime the map to get it into a steady state.
for i := 0; i < hits*2; i++ {
m.Load(i % hits)
}
},
perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) {
for ; pb.Next(); i++ {
j := i % (hits + misses)
if j < hits {
if _, ok := m.LoadOrStore(j, i); !ok {
b.Fatalf("unexpected miss for %v", j)
}
} else {
if v, loaded := m.LoadOrStore(i, i); loaded {
b.Fatalf("failed to store %v: existing value %v", i, v)
}
}
}
},
})
}
func BenchmarkLoadOrStoreUnique(b *testing.B) {
benchMap(b, bench{
setup: func(b *testing.B, m mapInterface) {
if _, ok := m.(*DeepCopyMap); ok {
b.Skip("DeepCopyMap has quadratic running time.")
}
},
perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) {
for ; pb.Next(); i++ {
m.LoadOrStore(i, i)
}
},
})
}
func BenchmarkLoadOrStoreCollision(b *testing.B) {
benchMap(b, bench{
setup: func(_ *testing.B, m mapInterface) {
m.LoadOrStore(0, 0)
},
perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) {
for ; pb.Next(); i++ {
m.LoadOrStore(0, 0)
}
},
})
}
func BenchmarkRange(b *testing.B) {
const mapSize = 1 << 10
benchMap(b, bench{
setup: func(_ *testing.B, m mapInterface) {
for i := 0; i < mapSize; i++ {
m.Store(i, i)
}
},
perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) {
for ; pb.Next(); i++ {
m.Range(func(_, _ interface{}) bool { return true })
}
},
})
}
// BenchmarkAdversarialAlloc tests performance when we store a new value
// immediately whenever the map is promoted to clean and otherwise load a
// unique, missing key.
//
// This forces the Load calls to always acquire the map's mutex.
func BenchmarkAdversarialAlloc(b *testing.B) {
benchMap(b, bench{
perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) {
var stores, loadsSinceStore int64
for ; pb.Next(); i++ {
m.Load(i)
if loadsSinceStore++; loadsSinceStore > stores {
m.LoadOrStore(i, stores)
loadsSinceStore = 0
stores++
}
}
},
})
}
// BenchmarkAdversarialDelete tests performance when we periodically delete
// one key and add a different one in a large map.
//
// This forces the Load calls to always acquire the map's mutex and periodically
// makes a full copy of the map despite changing only one entry.
func BenchmarkAdversarialDelete(b *testing.B) {
const mapSize = 1 << 10
benchMap(b, bench{
setup: func(_ *testing.B, m mapInterface) {
for i := 0; i < mapSize; i++ {
m.Store(i, i)
}
},
perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) {
for ; pb.Next(); i++ {
m.Load(i)
if i%mapSize == 0 {
m.Range(func(k, _ interface{}) bool {
m.Delete(k)
return false
})
m.Store(i, i)
}
}
},
})
}

151
vendor/golang.org/x/sync/syncmap/map_reference_test.go generated vendored Normal file
View File

@ -0,0 +1,151 @@
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package syncmap_test
import (
"sync"
"sync/atomic"
)
// This file contains reference map implementations for unit-tests.
// mapInterface is the interface Map implements.
type mapInterface interface {
Load(interface{}) (interface{}, bool)
Store(key, value interface{})
LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)
Delete(interface{})
Range(func(key, value interface{}) (shouldContinue bool))
}
// RWMutexMap is an implementation of mapInterface using a sync.RWMutex.
type RWMutexMap struct {
mu sync.RWMutex
dirty map[interface{}]interface{}
}
func (m *RWMutexMap) Load(key interface{}) (value interface{}, ok bool) {
m.mu.RLock()
value, ok = m.dirty[key]
m.mu.RUnlock()
return
}
func (m *RWMutexMap) Store(key, value interface{}) {
m.mu.Lock()
if m.dirty == nil {
m.dirty = make(map[interface{}]interface{})
}
m.dirty[key] = value
m.mu.Unlock()
}
func (m *RWMutexMap) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {
m.mu.Lock()
actual, loaded = m.dirty[key]
if !loaded {
actual = value
if m.dirty == nil {
m.dirty = make(map[interface{}]interface{})
}
m.dirty[key] = value
}
m.mu.Unlock()
return actual, loaded
}
func (m *RWMutexMap) Delete(key interface{}) {
m.mu.Lock()
delete(m.dirty, key)
m.mu.Unlock()
}
func (m *RWMutexMap) Range(f func(key, value interface{}) (shouldContinue bool)) {
m.mu.RLock()
keys := make([]interface{}, 0, len(m.dirty))
for k := range m.dirty {
keys = append(keys, k)
}
m.mu.RUnlock()
for _, k := range keys {
v, ok := m.Load(k)
if !ok {
continue
}
if !f(k, v) {
break
}
}
}
// DeepCopyMap is an implementation of mapInterface using a Mutex and
// atomic.Value. It makes deep copies of the map on every write to avoid
// acquiring the Mutex in Load.
type DeepCopyMap struct {
mu sync.Mutex
clean atomic.Value
}
func (m *DeepCopyMap) Load(key interface{}) (value interface{}, ok bool) {
clean, _ := m.clean.Load().(map[interface{}]interface{})
value, ok = clean[key]
return value, ok
}
func (m *DeepCopyMap) Store(key, value interface{}) {
m.mu.Lock()
dirty := m.dirty()
dirty[key] = value
m.clean.Store(dirty)
m.mu.Unlock()
}
func (m *DeepCopyMap) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {
clean, _ := m.clean.Load().(map[interface{}]interface{})
actual, loaded = clean[key]
if loaded {
return actual, loaded
}
m.mu.Lock()
// Reload clean in case it changed while we were waiting on m.mu.
clean, _ = m.clean.Load().(map[interface{}]interface{})
actual, loaded = clean[key]
if !loaded {
dirty := m.dirty()
dirty[key] = value
actual = value
m.clean.Store(dirty)
}
m.mu.Unlock()
return actual, loaded
}
func (m *DeepCopyMap) Delete(key interface{}) {
m.mu.Lock()
dirty := m.dirty()
delete(dirty, key)
m.clean.Store(dirty)
m.mu.Unlock()
}
func (m *DeepCopyMap) Range(f func(key, value interface{}) (shouldContinue bool)) {
clean, _ := m.clean.Load().(map[interface{}]interface{})
for k, v := range clean {
if !f(k, v) {
break
}
}
}
func (m *DeepCopyMap) dirty() map[interface{}]interface{} {
clean, _ := m.clean.Load().(map[interface{}]interface{})
dirty := make(map[interface{}]interface{}, len(clean)+1)
for k, v := range clean {
dirty[k] = v
}
return dirty
}

172
vendor/golang.org/x/sync/syncmap/map_test.go generated vendored Normal file
View File

@ -0,0 +1,172 @@
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package syncmap_test
import (
"math/rand"
"reflect"
"runtime"
"sync"
"testing"
"testing/quick"
"golang.org/x/sync/syncmap"
)
type mapOp string
const (
opLoad = mapOp("Load")
opStore = mapOp("Store")
opLoadOrStore = mapOp("LoadOrStore")
opDelete = mapOp("Delete")
)
var mapOps = [...]mapOp{opLoad, opStore, opLoadOrStore, opDelete}
// mapCall is a quick.Generator for calls on mapInterface.
type mapCall struct {
op mapOp
k, v interface{}
}
func (c mapCall) apply(m mapInterface) (interface{}, bool) {
switch c.op {
case opLoad:
return m.Load(c.k)
case opStore:
m.Store(c.k, c.v)
return nil, false
case opLoadOrStore:
return m.LoadOrStore(c.k, c.v)
case opDelete:
m.Delete(c.k)
return nil, false
default:
panic("invalid mapOp")
}
}
type mapResult struct {
value interface{}
ok bool
}
func randValue(r *rand.Rand) interface{} {
b := make([]byte, r.Intn(4))
for i := range b {
b[i] = 'a' + byte(rand.Intn(26))
}
return string(b)
}
func (mapCall) Generate(r *rand.Rand, size int) reflect.Value {
c := mapCall{op: mapOps[rand.Intn(len(mapOps))], k: randValue(r)}
switch c.op {
case opStore, opLoadOrStore:
c.v = randValue(r)
}
return reflect.ValueOf(c)
}
func applyCalls(m mapInterface, calls []mapCall) (results []mapResult, final map[interface{}]interface{}) {
for _, c := range calls {
v, ok := c.apply(m)
results = append(results, mapResult{v, ok})
}
final = make(map[interface{}]interface{})
m.Range(func(k, v interface{}) bool {
final[k] = v
return true
})
return results, final
}
func applyMap(calls []mapCall) ([]mapResult, map[interface{}]interface{}) {
return applyCalls(new(syncmap.Map), calls)
}
func applyRWMutexMap(calls []mapCall) ([]mapResult, map[interface{}]interface{}) {
return applyCalls(new(RWMutexMap), calls)
}
func applyDeepCopyMap(calls []mapCall) ([]mapResult, map[interface{}]interface{}) {
return applyCalls(new(DeepCopyMap), calls)
}
func TestMapMatchesRWMutex(t *testing.T) {
if err := quick.CheckEqual(applyMap, applyRWMutexMap, nil); err != nil {
t.Error(err)
}
}
func TestMapMatchesDeepCopy(t *testing.T) {
if err := quick.CheckEqual(applyMap, applyDeepCopyMap, nil); err != nil {
t.Error(err)
}
}
func TestConcurrentRange(t *testing.T) {
const mapSize = 1 << 10
m := new(syncmap.Map)
for n := int64(1); n <= mapSize; n++ {
m.Store(n, int64(n))
}
done := make(chan struct{})
var wg sync.WaitGroup
defer func() {
close(done)
wg.Wait()
}()
for g := int64(runtime.GOMAXPROCS(0)); g > 0; g-- {
r := rand.New(rand.NewSource(g))
wg.Add(1)
go func(g int64) {
defer wg.Done()
for i := int64(0); ; i++ {
select {
case <-done:
return
default:
}
for n := int64(1); n < mapSize; n++ {
if r.Int63n(mapSize) == 0 {
m.Store(n, n*i*g)
} else {
m.Load(n)
}
}
}
}(g)
}
iters := 1 << 10
if testing.Short() {
iters = 16
}
for n := iters; n > 0; n-- {
seen := make(map[int64]bool, mapSize)
m.Range(func(ki, vi interface{}) bool {
k, v := ki.(int64), vi.(int64)
if v%k != 0 {
t.Fatalf("while Storing multiples of %v, Range saw value %v", k, v)
}
if seen[k] {
t.Fatalf("Range visited key %v twice", k)
}
seen[k] = true
return true
})
if len(seen) != mapSize {
t.Fatalf("Range visited %v elements of %v-element Map", len(seen), mapSize)
}
}
}