From 8a9395819cea6af4c1b32bd5446359bbc4e60116 Mon Sep 17 00:00:00 2001 From: Matt K <1036969+mkrump@users.noreply.github.com> Date: Tue, 27 Mar 2018 16:06:12 -0500 Subject: [PATCH] Get transactions (#45) * Make transactions requests in parallel * Update transaction error handling --- .travis.yml | 3 +- Gopkg.lock | 8 +- Makefile | 2 +- integration_test/block_rewards_test.go | 6 +- integration_test/geth_blockchain_test.go | 24 +- pkg/core/blockchain.go | 2 +- pkg/datastore/postgres/postgres_suite_test.go | 7 + pkg/datastore/postgres/postgres_test.go | 7 - pkg/fakes/blockchain.go | 11 +- pkg/geth/block_to_core_block.go | 55 ++- pkg/geth/block_to_core_block_test.go | 106 ++++- pkg/geth/blockchain.go | 13 +- pkg/geth/node/node.go | 7 +- pkg/history/history_suite_test.go | 6 + pkg/history/populate_blocks.go | 6 +- pkg/history/populate_blocks_test.go | 10 + pkg/history/validate_blocks_test.go | 7 - scripts/install-postgres-10.sh | 17 + vendor/golang.org/x/sync/AUTHORS | 3 + vendor/golang.org/x/sync/CONTRIBUTING.md | 26 ++ vendor/golang.org/x/sync/CONTRIBUTORS | 3 + vendor/golang.org/x/sync/LICENSE | 27 ++ vendor/golang.org/x/sync/PATENTS | 22 ++ vendor/golang.org/x/sync/README.md | 18 + vendor/golang.org/x/sync/codereview.cfg | 1 + vendor/golang.org/x/sync/errgroup/errgroup.go | 67 ++++ .../errgroup/errgroup_example_md5all_test.go | 101 +++++ .../x/sync/errgroup/errgroup_test.go | 176 +++++++++ .../golang.org/x/sync/semaphore/semaphore.go | 131 ++++++ .../x/sync/semaphore/semaphore_bench_test.go | 131 ++++++ .../sync/semaphore/semaphore_example_test.go | 84 ++++ .../x/sync/semaphore/semaphore_test.go | 171 ++++++++ .../x/sync/singleflight/singleflight.go | 111 ++++++ .../x/sync/singleflight/singleflight_test.go | 87 ++++ vendor/golang.org/x/sync/syncmap/map.go | 372 ++++++++++++++++++ .../x/sync/syncmap/map_bench_test.go | 216 ++++++++++ .../x/sync/syncmap/map_reference_test.go | 151 +++++++ vendor/golang.org/x/sync/syncmap/map_test.go | 172 ++++++++ 38 files changed, 2303 insertions(+), 64 deletions(-) create mode 100755 scripts/install-postgres-10.sh create mode 100644 vendor/golang.org/x/sync/AUTHORS create mode 100644 vendor/golang.org/x/sync/CONTRIBUTING.md create mode 100644 vendor/golang.org/x/sync/CONTRIBUTORS create mode 100644 vendor/golang.org/x/sync/LICENSE create mode 100644 vendor/golang.org/x/sync/PATENTS create mode 100644 vendor/golang.org/x/sync/README.md create mode 100644 vendor/golang.org/x/sync/codereview.cfg create mode 100644 vendor/golang.org/x/sync/errgroup/errgroup.go create mode 100644 vendor/golang.org/x/sync/errgroup/errgroup_example_md5all_test.go create mode 100644 vendor/golang.org/x/sync/errgroup/errgroup_test.go create mode 100644 vendor/golang.org/x/sync/semaphore/semaphore.go create mode 100644 vendor/golang.org/x/sync/semaphore/semaphore_bench_test.go create mode 100644 vendor/golang.org/x/sync/semaphore/semaphore_example_test.go create mode 100644 vendor/golang.org/x/sync/semaphore/semaphore_test.go create mode 100644 vendor/golang.org/x/sync/singleflight/singleflight.go create mode 100644 vendor/golang.org/x/sync/singleflight/singleflight_test.go create mode 100644 vendor/golang.org/x/sync/syncmap/map.go create mode 100644 vendor/golang.org/x/sync/syncmap/map_bench_test.go create mode 100644 vendor/golang.org/x/sync/syncmap/map_reference_test.go create mode 100644 vendor/golang.org/x/sync/syncmap/map_test.go diff --git a/.travis.yml b/.travis.yml index ff84c75c..8d52cda4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,10 +12,11 @@ go_import_path: github.com/vulcanize/vulcanizedb before_install: # ginkgo golint dep migrate - make installtools + - bash ./scripts/install-postgres-10.sh before_script: - sudo -u postgres createdb vulcanize_private - - make migrate HOST_NAME=localhost NAME=vulcanize_private PORT=5432 + - make migrate NAME=vulcanize_private script: - make test diff --git a/Gopkg.lock b/Gopkg.lock index 121d7124..bde55c82 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -274,6 +274,12 @@ ] revision = "faacc1b5e36e3ff02cbec9661c69ac63dd5a83ad" +[[projects]] + branch = "master" + name = "golang.org/x/sync" + packages = ["errgroup"] + revision = "1d60e4601c6fd243af51cc01ddf169918a5407ca" + [[projects]] branch = "master" name = "golang.org/x/sys" @@ -334,6 +340,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "b35f7b80a7695fb1a7d247be178212e16a0647363affc0071c5a2280817dd906" + inputs-digest = "4dcddfb7fa2db8ba7e3d8e70ec8ba23a7e9b6a1d99c62933ed63624412967aeb" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Makefile b/Makefile index 4de14e51..886dbe7b 100644 --- a/Makefile +++ b/Makefile @@ -18,7 +18,7 @@ $(BIN)/migrate: LINT = $(BIN)/golint $(BIN)/golint: - go get github.com/golang/lint/golint + go get -u golang.org/x/lint/golint METALINT = $(BIN)/gometalinter.v2 $(BIN)/gometalinter.v2: diff --git a/integration_test/block_rewards_test.go b/integration_test/block_rewards_test.go index 02ca5e72..0fd7b65b 100644 --- a/integration_test/block_rewards_test.go +++ b/integration_test/block_rewards_test.go @@ -11,13 +11,15 @@ var _ = Describe("Rewards calculations", func() { It("calculates a block reward for a real block", func() { blockchain := geth.NewBlockchain(test_config.InfuraClient.IPCPath) - block := blockchain.GetBlockByNumber(1071819) + block, err := blockchain.GetBlockByNumber(1071819) + Expect(err).ToNot(HaveOccurred()) Expect(block.Reward).To(Equal(5.31355)) }) It("calculates an uncle reward for a real block", func() { blockchain := geth.NewBlockchain(test_config.InfuraClient.IPCPath) - block := blockchain.GetBlockByNumber(1071819) + block, err := blockchain.GetBlockByNumber(1071819) + Expect(err).ToNot(HaveOccurred()) Expect(block.UnclesReward).To(Equal(6.875)) }) diff --git a/integration_test/geth_blockchain_test.go b/integration_test/geth_blockchain_test.go index 8eab9382..964f463b 100644 --- a/integration_test/geth_blockchain_test.go +++ b/integration_test/geth_blockchain_test.go @@ -3,6 +3,7 @@ package integration_test import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/inmemory" "github.com/vulcanize/vulcanizedb/pkg/geth" "github.com/vulcanize/vulcanizedb/pkg/history" @@ -28,8 +29,10 @@ var _ = Describe("Reading from the Geth blockchain", func() { }, 30) It("retrieves the genesis block and first block", func(done Done) { - genesisBlock := blockchain.GetBlockByNumber(int64(0)) - firstBlock := blockchain.GetBlockByNumber(int64(1)) + genesisBlock, err := blockchain.GetBlockByNumber(int64(0)) + Expect(err).ToNot(HaveOccurred()) + firstBlock, err := blockchain.GetBlockByNumber(int64(1)) + Expect(err).ToNot(HaveOccurred()) lastBlockNumber := blockchain.LastBlock() Expect(genesisBlock.Number).To(Equal(int64(0))) @@ -40,14 +43,27 @@ var _ = Describe("Reading from the Geth blockchain", func() { It("retrieves the node info", func(done Done) { node := blockchain.Node() - devNetworkNodeId := float64(1) + mainnetID := float64(1) Expect(node.GenesisBlock).ToNot(BeNil()) - Expect(node.NetworkID).To(Equal(devNetworkNodeId)) + Expect(node.NetworkID).To(Equal(mainnetID)) Expect(len(node.ID)).ToNot(BeZero()) Expect(node.ClientName).ToNot(BeZero()) close(done) }, 15) + //Benchmarking test: remove skip to test performance of block retrieval + XMeasure("retrieving n blocks", func(b Benchmarker) { + b.Time("runtime", func() { + var blocks []core.Block + n := 10 + for i := 5327459; i > 5327459-n; i-- { + block, err := blockchain.GetBlockByNumber(int64(i)) + Expect(err).ToNot(HaveOccurred()) + blocks = append(blocks, block) + } + Expect(len(blocks)).To(Equal(n)) + }) + }, 10) }) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index b68aee2e..e67af221 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -4,7 +4,7 @@ import "math/big" type Blockchain interface { ContractDataFetcher - GetBlockByNumber(blockNumber int64) Block + GetBlockByNumber(blockNumber int64) (Block, error) GetLogs(contract Contract, startingBlockNumber *big.Int, endingBlockNumber *big.Int) ([]Log, error) LastBlock() *big.Int Node() Node diff --git a/pkg/datastore/postgres/postgres_suite_test.go b/pkg/datastore/postgres/postgres_suite_test.go index 642065b6..d2b9876e 100644 --- a/pkg/datastore/postgres/postgres_suite_test.go +++ b/pkg/datastore/postgres/postgres_suite_test.go @@ -3,10 +3,17 @@ package postgres_test import ( "testing" + "io/ioutil" + "log" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) +func init() { + log.SetOutput(ioutil.Discard) +} + func TestPostgres(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "Postgres Suite") diff --git a/pkg/datastore/postgres/postgres_test.go b/pkg/datastore/postgres/postgres_test.go index 5c512ca1..523b8f7f 100644 --- a/pkg/datastore/postgres/postgres_test.go +++ b/pkg/datastore/postgres/postgres_test.go @@ -4,9 +4,6 @@ import ( "fmt" "strings" - "io/ioutil" - "log" - "math/big" "github.com/jmoiron/sqlx" @@ -20,10 +17,6 @@ import ( "github.com/vulcanize/vulcanizedb/test_config" ) -func init() { - log.SetOutput(ioutil.Discard) -} - var _ = Describe("Postgres DB", func() { var sqlxdb *sqlx.DB diff --git a/pkg/fakes/blockchain.go b/pkg/fakes/blockchain.go index 3c3c3901..ffc8d1ab 100644 --- a/pkg/fakes/blockchain.go +++ b/pkg/fakes/blockchain.go @@ -14,6 +14,7 @@ type Blockchain struct { WasToldToStop bool node core.Node ContractReturnValue []byte + err error } func (blockchain *Blockchain) FetchContractData(abiJSON string, address string, method string, methodArg interface{}, result interface{}, blockNumber int64) error { @@ -42,12 +43,13 @@ func (blockchain *Blockchain) Node() core.Node { return blockchain.node } -func NewBlockchain() *Blockchain { +func NewBlockchain(err error) *Blockchain { return &Blockchain{ blocks: make(map[int64]core.Block), logs: make(map[string][]core.Log), contractAttributes: make(map[string]map[string]string), node: core.Node{GenesisBlock: "GENESIS", NetworkID: 1, ID: "x123", ClientName: "Geth"}, + err: err, } } @@ -61,8 +63,11 @@ func NewBlockchainWithBlocks(blocks []core.Block) *Blockchain { } } -func (blockchain *Blockchain) GetBlockByNumber(blockNumber int64) core.Block { - return blockchain.blocks[blockNumber] +func (blockchain *Blockchain) GetBlockByNumber(blockNumber int64) (core.Block, error) { + if blockchain.err != nil { + return core.Block{}, blockchain.err + } + return blockchain.blocks[blockNumber], nil } func (blockchain *Blockchain) AddBlock(block core.Block) { diff --git a/pkg/geth/block_to_core_block.go b/pkg/geth/block_to_core_block.go index 905d0656..17a5d355 100644 --- a/pkg/geth/block_to_core_block.go +++ b/pkg/geth/block_to_core_block.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/vulcanize/vulcanizedb/pkg/core" "golang.org/x/net/context" + "golang.org/x/sync/errgroup" ) type Client interface { @@ -17,8 +18,11 @@ type Client interface { TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) } -func ToCoreBlock(gethBlock *types.Block, client Client) core.Block { - transactions := convertTransactionsToCore(gethBlock, client) +func ToCoreBlock(gethBlock *types.Block, client Client) (core.Block, error) { + transactions, err := convertTransactionsToCore(gethBlock, client) + if err != nil { + return core.Block{}, err + } coreBlock := core.Block{ Difficulty: gethBlock.Difficulty().Int64(), ExtraData: hexutil.Encode(gethBlock.Extra()), @@ -36,35 +40,48 @@ func ToCoreBlock(gethBlock *types.Block, client Client) core.Block { } coreBlock.Reward = CalcBlockReward(coreBlock, gethBlock.Uncles()) coreBlock.UnclesReward = CalcUnclesReward(coreBlock, gethBlock.Uncles()) - return coreBlock + return coreBlock, nil } -func convertTransactionsToCore(gethBlock *types.Block, client Client) []core.Transaction { - transactions := make([]core.Transaction, 0) - for i, gethTransaction := range gethBlock.Transactions() { - from, err := client.TransactionSender(context.Background(), gethTransaction, gethBlock.Hash(), uint(i)) - if err != nil { - log.Println(err) - } - transaction := transToCoreTrans(gethTransaction, &from) - transaction, err = appendReceiptToTransaction(client, transaction) - if err != nil { - log.Println(err) - } - transactions = append(transactions, transaction) +func convertTransactionsToCore(gethBlock *types.Block, client Client) ([]core.Transaction, error) { + var g errgroup.Group + coreTransactions := make([]core.Transaction, len(gethBlock.Transactions())) + + for gethTransactionIndex, gethTransaction := range gethBlock.Transactions() { + //https://golang.org/doc/faq#closures_and_goroutines + transaction := gethTransaction + transactionIndex := uint(gethTransactionIndex) + g.Go(func() error { + from, err := client.TransactionSender(context.Background(), transaction, gethBlock.Hash(), transactionIndex) + if err != nil { + log.Println("transaction sender: ", err) + return err + } + coreTransaction := transToCoreTrans(transaction, &from) + coreTransaction, err = appendReceiptToTransaction(client, coreTransaction) + if err != nil { + log.Println("receipt: ", err) + return err + } + coreTransactions[transactionIndex] = coreTransaction + return nil + }) } - return transactions + if err := g.Wait(); err != nil { + log.Println("transactions: ", err) + return coreTransactions, err + } + return coreTransactions, nil } func appendReceiptToTransaction(client Client, transaction core.Transaction) (core.Transaction, error) { gethReceipt, err := client.TransactionReceipt(context.Background(), common.HexToHash(transaction.Hash)) if err != nil { - log.Println(err) return transaction, err } receipt := ReceiptToCoreReceipt(gethReceipt) transaction.Receipt = receipt - return transaction, err + return transaction, nil } func transToCoreTrans(transaction *types.Transaction, from *common.Address) core.Transaction { diff --git a/pkg/geth/block_to_core_block_test.go b/pkg/geth/block_to_core_block_test.go index 0b661084..733aeabd 100644 --- a/pkg/geth/block_to_core_block_test.go +++ b/pkg/geth/block_to_core_block_test.go @@ -5,6 +5,13 @@ import ( "context" + "fmt" + + "io/ioutil" + "log" + + "os" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" @@ -15,11 +22,25 @@ import ( type FakeGethClient struct { receipts map[string]*types.Receipt + err error } -func NewFakeClient() *FakeGethClient { +type TransActionReceiptError struct{} + +func (tarErr TransActionReceiptError) Error() string { + return fmt.Sprintf("transaction receipt error") +} + +type TransactionSenderError struct{} + +func (tasErr TransactionSenderError) Error() string { + return fmt.Sprintf("transaction sender error") +} + +func NewFakeClient(err error) *FakeGethClient { return &FakeGethClient{ receipts: make(map[string]*types.Receipt), + err: err, } } @@ -30,6 +51,9 @@ func (client *FakeGethClient) AddReceipts(receipts []*types.Receipt) { } func (client *FakeGethClient) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) { + if err, ok := client.err.(TransActionReceiptError); ok { + return &types.Receipt{}, err + } if gasUsed, ok := client.receipts[txHash.Hex()]; ok { return gasUsed, nil } @@ -37,6 +61,9 @@ func (client *FakeGethClient) TransactionReceipt(ctx context.Context, txHash com } func (client *FakeGethClient) TransactionSender(ctx context.Context, tx *types.Transaction, block common.Hash, index uint) (common.Address, error) { + if err, ok := client.err.(TransactionSenderError); ok { + return common.Address{}, err + } return common.HexToAddress("0x123"), nil } @@ -66,8 +93,9 @@ var _ = Describe("Conversion of GethBlock to core.Block", func() { } block := types.NewBlock(&header, []*types.Transaction{}, []*types.Header{}, []*types.Receipt{}) client := &FakeGethClient{} - gethBlock := geth.ToCoreBlock(block, client) + gethBlock, err := geth.ToCoreBlock(block, client) + Expect(err).ToNot(HaveOccurred()) Expect(gethBlock.Difficulty).To(Equal(difficulty.Int64())) Expect(gethBlock.GasLimit).To(Equal(gasLimit)) Expect(gethBlock.Miner).To(Equal(miner.Hex())) @@ -104,7 +132,7 @@ var _ = Describe("Conversion of GethBlock to core.Block", func() { } receipts := []*types.Receipt{&receipt} - client := NewFakeClient() + client := NewFakeClient(nil) client.AddReceipts(receipts) number := int64(1071819) @@ -113,8 +141,9 @@ var _ = Describe("Conversion of GethBlock to core.Block", func() { } uncles := []*types.Header{{Number: big.NewInt(1071817)}, {Number: big.NewInt(1071818)}} block := types.NewBlock(&header, transactions, uncles, []*types.Receipt{&receipt}) - coreBlock := geth.ToCoreBlock(block, client) + coreBlock, err := geth.ToCoreBlock(block, client) + Expect(err).ToNot(HaveOccurred()) Expect(geth.CalcBlockReward(coreBlock, block.Uncles())).To(Equal(5.31355)) }) @@ -144,11 +173,12 @@ var _ = Describe("Conversion of GethBlock to core.Block", func() { } block := types.NewBlock(&header, transactions, uncles, receipts) - client := NewFakeClient() + client := NewFakeClient(nil) client.AddReceipts(receipts) - coreBlock := geth.ToCoreBlock(block, client) + coreBlock, err := geth.ToCoreBlock(block, client) + Expect(err).ToNot(HaveOccurred()) Expect(geth.CalcUnclesReward(coreBlock, block.Uncles())).To(Equal(6.875)) }) @@ -190,10 +220,11 @@ var _ = Describe("Conversion of GethBlock to core.Block", func() { var uncles []*types.Header block := types.NewBlock(&header, transactions, uncles, receipts) - client := NewFakeClient() + client := NewFakeClient(nil) client.AddReceipts(receipts) - coreBlock := geth.ToCoreBlock(block, client) + coreBlock, err := geth.ToCoreBlock(block, client) + Expect(err).ToNot(HaveOccurred()) Expect(geth.CalcBlockReward(coreBlock, block.Uncles())).To(Equal(3.024990672)) }) }) @@ -203,8 +234,9 @@ var _ = Describe("Conversion of GethBlock to core.Block", func() { header := types.Header{} block := types.NewBlock(&header, []*types.Transaction{}, []*types.Header{}, []*types.Receipt{}) client := &FakeGethClient{} - coreBlock := geth.ToCoreBlock(block, client) + coreBlock, err := geth.ToCoreBlock(block, client) + Expect(err).ToNot(HaveOccurred()) Expect(len(coreBlock.Transactions)).To(Equal(0)) }) @@ -227,7 +259,7 @@ var _ = Describe("Conversion of GethBlock to core.Block", func() { TxHash: gethTransaction.Hash(), } - client := NewFakeClient() + client := NewFakeClient(nil) client.AddReceipts([]*types.Receipt{gethReceipt}) header := types.Header{} @@ -237,8 +269,9 @@ var _ = Describe("Conversion of GethBlock to core.Block", func() { []*types.Header{}, []*types.Receipt{gethReceipt}, ) - coreBlock := geth.ToCoreBlock(gethBlock, client) + coreBlock, err := geth.ToCoreBlock(gethBlock, client) + Expect(err).ToNot(HaveOccurred()) Expect(len(coreBlock.Transactions)).To(Equal(1)) coreTransaction := coreBlock.Transactions[0] Expect(coreTransaction.Data).To(Equal("0xf7d8c8830000000000000000000000000000000000000000000000000000000000037788000000000000000000000000000000000000000000000000000000000003bd14")) @@ -271,7 +304,7 @@ var _ = Describe("Conversion of GethBlock to core.Block", func() { ContractAddress: common.HexToAddress("0x1023342345"), } - client := NewFakeClient() + client := NewFakeClient(nil) client.AddReceipts([]*types.Receipt{gethReceipt}) gethBlock := types.NewBlock( @@ -281,8 +314,9 @@ var _ = Describe("Conversion of GethBlock to core.Block", func() { []*types.Receipt{gethReceipt}, ) - coreBlock := geth.ToCoreBlock(gethBlock, client) + coreBlock, err := geth.ToCoreBlock(gethBlock, client) + Expect(err).ToNot(HaveOccurred()) coreTransaction := coreBlock.Transactions[0] Expect(coreTransaction.To).To(Equal("")) @@ -292,4 +326,50 @@ var _ = Describe("Conversion of GethBlock to core.Block", func() { }) }) + Describe("transaction error handling", func() { + var gethTransaction *types.Transaction + var gethReceipt *types.Receipt + var header *types.Header + var gethBlock *types.Block + + BeforeEach(func() { + log.SetOutput(ioutil.Discard) + gethTransaction = types.NewTransaction( + uint64(0), + common.Address{}, + big.NewInt(0), + uint64(0), + big.NewInt(0), + []byte{}, + ) + gethReceipt = &types.Receipt{} + header = &types.Header{} + gethBlock = types.NewBlock( + header, + []*types.Transaction{gethTransaction}, + []*types.Header{}, + []*types.Receipt{gethReceipt}, + ) + + }) + + AfterEach(func() { + defer log.SetOutput(os.Stdout) + }) + + It("returns an error when transaction sender call fails", func() { + client := NewFakeClient(TransactionSenderError{}) + client.AddReceipts([]*types.Receipt{}) + _, err := geth.ToCoreBlock(gethBlock, client) + Expect(err).To(Equal(TransactionSenderError{})) + }) + + It("returns an error when transaction receipt call fails", func() { + client := NewFakeClient(TransActionReceiptError{}) + client.AddReceipts([]*types.Receipt{}) + _, err := geth.ToCoreBlock(gethBlock, client) + Expect(err).To(Equal(TransActionReceiptError{})) + }) + }) + }) diff --git a/pkg/geth/blockchain.go b/pkg/geth/blockchain.go index ab07a41c..a6a12695 100644 --- a/pkg/geth/blockchain.go +++ b/pkg/geth/blockchain.go @@ -58,9 +58,16 @@ func (blockchain *Blockchain) Node() core.Node { return blockchain.node } -func (blockchain *Blockchain) GetBlockByNumber(blockNumber int64) core.Block { - gethBlock, _ := blockchain.client.BlockByNumber(context.Background(), big.NewInt(blockNumber)) - return ToCoreBlock(gethBlock, blockchain.client) +func (blockchain *Blockchain) GetBlockByNumber(blockNumber int64) (core.Block, error) { + gethBlock, err := blockchain.client.BlockByNumber(context.Background(), big.NewInt(blockNumber)) + if err != nil { + return core.Block{}, err + } + block, err := ToCoreBlock(gethBlock, blockchain.client) + if err != nil { + return core.Block{}, err + } + return block, nil } func (blockchain *Blockchain) LastBlock() *big.Int { diff --git a/pkg/geth/node/node.go b/pkg/geth/node/node.go index f7fc3ba1..daa20e76 100644 --- a/pkg/geth/node/node.go +++ b/pkg/geth/node/node.go @@ -9,6 +9,8 @@ import ( "strings" + "log" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/p2p" "github.com/vulcanize/vulcanizedb/pkg/core" @@ -79,7 +81,10 @@ func MakeNode(wrapper ClientWrapper) core.Node { func (client ClientWrapper) NetworkId() float64 { var version string - client.CallContext(context.Background(), &version, "net_version") + err := client.CallContext(context.Background(), &version, "net_version") + if err != nil { + log.Println(err) + } networkId, _ := strconv.ParseFloat(version, 64) return networkId } diff --git a/pkg/history/history_suite_test.go b/pkg/history/history_suite_test.go index 98ce5532..6bf3894c 100644 --- a/pkg/history/history_suite_test.go +++ b/pkg/history/history_suite_test.go @@ -4,9 +4,15 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "io/ioutil" + "log" "testing" ) +func init() { + log.SetOutput(ioutil.Discard) +} + func TestHistory(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "History Suite") diff --git a/pkg/history/populate_blocks.go b/pkg/history/populate_blocks.go index 19d63e72..09d81a8c 100644 --- a/pkg/history/populate_blocks.go +++ b/pkg/history/populate_blocks.go @@ -18,7 +18,11 @@ func PopulateMissingBlocks(blockchain core.Blockchain, blockRepository datastore func RetrieveAndUpdateBlocks(blockchain core.Blockchain, blockRepository datastore.BlockRepository, blockNumbers []int64) int { for _, blockNumber := range blockNumbers { - block := blockchain.GetBlockByNumber(blockNumber) + block, err := blockchain.GetBlockByNumber(blockNumber) + if err != nil { + log.Printf("failed to retrieve block number: %d\n", blockNumber) + return 0 + } blockRepository.CreateOrUpdateBlock(block) } return len(blockNumbers) diff --git a/pkg/history/populate_blocks_test.go b/pkg/history/populate_blocks_test.go index 14e6d61f..a98c76aa 100644 --- a/pkg/history/populate_blocks_test.go +++ b/pkg/history/populate_blocks_test.go @@ -1,6 +1,8 @@ package history_test import ( + "errors" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/vulcanize/vulcanizedb/pkg/core" @@ -100,4 +102,12 @@ var _ = Describe("Populating blocks", func() { Expect(blockRepository.CreateOrUpdateBlockCallCount).To(Equal(3)) }) + It("does not call repository create block when there is an error", func() { + blockchain := fakes.NewBlockchain(errors.New("error getting block")) + blocks := history.MakeRange(1, 10) + history.RetrieveAndUpdateBlocks(blockchain, blockRepository, blocks) + Expect(blockRepository.BlockCount()).To(Equal(0)) + Expect(blockRepository.CreateOrUpdateBlockCallCount).To(Equal(0)) + }) + }) diff --git a/pkg/history/validate_blocks_test.go b/pkg/history/validate_blocks_test.go index 056e84e1..fc92521e 100644 --- a/pkg/history/validate_blocks_test.go +++ b/pkg/history/validate_blocks_test.go @@ -3,9 +3,6 @@ package history_test import ( "bytes" - "io/ioutil" - "log" - . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/vulcanize/vulcanizedb/pkg/core" @@ -14,10 +11,6 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/history" ) -func init() { - log.SetOutput(ioutil.Discard) -} - var _ = Describe("Blocks validator", func() { It("creates a ValidationWindow equal to (HEAD-windowSize, HEAD)", func() { diff --git a/scripts/install-postgres-10.sh b/scripts/install-postgres-10.sh new file mode 100755 index 00000000..9d7e124c --- /dev/null +++ b/scripts/install-postgres-10.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env bash +# Travis doesn't support postgres 10 +# https://github.com/travis-ci/travis-ci/issues/8537 + +set -ex + +echo "Installing Postgres 10" +sudo service postgresql stop +sudo apt-get remove -q 'postgresql-*' +sudo apt-get update -q +sudo apt-get install -q postgresql-10 postgresql-client-10 +sudo cp /etc/postgresql/{9.6,10}/main/pg_hba.conf + +echo "Restarting Postgres 10" +sudo service postgresql restart + +sudo psql -c 'CREATE ROLE travis SUPERUSER LOGIN CREATEDB;' -U postgres \ No newline at end of file diff --git a/vendor/golang.org/x/sync/AUTHORS b/vendor/golang.org/x/sync/AUTHORS new file mode 100644 index 00000000..15167cd7 --- /dev/null +++ b/vendor/golang.org/x/sync/AUTHORS @@ -0,0 +1,3 @@ +# This source code refers to The Go Authors for copyright purposes. +# The master list of authors is in the main Go distribution, +# visible at http://tip.golang.org/AUTHORS. diff --git a/vendor/golang.org/x/sync/CONTRIBUTING.md b/vendor/golang.org/x/sync/CONTRIBUTING.md new file mode 100644 index 00000000..d0485e88 --- /dev/null +++ b/vendor/golang.org/x/sync/CONTRIBUTING.md @@ -0,0 +1,26 @@ +# Contributing to Go + +Go is an open source project. + +It is the work of hundreds of contributors. We appreciate your help! + +## Filing issues + +When [filing an issue](https://golang.org/issue/new), make sure to answer these five questions: + +1. What version of Go are you using (`go version`)? +2. What operating system and processor architecture are you using? +3. What did you do? +4. What did you expect to see? +5. What did you see instead? + +General questions should go to the [golang-nuts mailing list](https://groups.google.com/group/golang-nuts) instead of the issue tracker. +The gophers there will answer or ask you to file an issue if you've tripped over a bug. + +## Contributing code + +Please read the [Contribution Guidelines](https://golang.org/doc/contribute.html) +before sending patches. + +Unless otherwise noted, the Go source files are distributed under +the BSD-style license found in the LICENSE file. diff --git a/vendor/golang.org/x/sync/CONTRIBUTORS b/vendor/golang.org/x/sync/CONTRIBUTORS new file mode 100644 index 00000000..1c4577e9 --- /dev/null +++ b/vendor/golang.org/x/sync/CONTRIBUTORS @@ -0,0 +1,3 @@ +# This source code was written by the Go contributors. +# The master list of contributors is in the main Go distribution, +# visible at http://tip.golang.org/CONTRIBUTORS. diff --git a/vendor/golang.org/x/sync/LICENSE b/vendor/golang.org/x/sync/LICENSE new file mode 100644 index 00000000..6a66aea5 --- /dev/null +++ b/vendor/golang.org/x/sync/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/golang.org/x/sync/PATENTS b/vendor/golang.org/x/sync/PATENTS new file mode 100644 index 00000000..73309904 --- /dev/null +++ b/vendor/golang.org/x/sync/PATENTS @@ -0,0 +1,22 @@ +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. diff --git a/vendor/golang.org/x/sync/README.md b/vendor/golang.org/x/sync/README.md new file mode 100644 index 00000000..1f8436cc --- /dev/null +++ b/vendor/golang.org/x/sync/README.md @@ -0,0 +1,18 @@ +# Go Sync + +This repository provides Go concurrency primitives in addition to the +ones provided by the language and "sync" and "sync/atomic" packages. + +## Download/Install + +The easiest way to install is to run `go get -u golang.org/x/sync`. You can +also manually git clone the repository to `$GOPATH/src/golang.org/x/sync`. + +## Report Issues / Send Patches + +This repository uses Gerrit for code changes. To learn how to submit changes to +this repository, see https://golang.org/doc/contribute.html. + +The main issue tracker for the sync repository is located at +https://github.com/golang/go/issues. Prefix your issue with "x/sync:" in the +subject line, so it is easy to find. diff --git a/vendor/golang.org/x/sync/codereview.cfg b/vendor/golang.org/x/sync/codereview.cfg new file mode 100644 index 00000000..3f8b14b6 --- /dev/null +++ b/vendor/golang.org/x/sync/codereview.cfg @@ -0,0 +1 @@ +issuerepo: golang/go diff --git a/vendor/golang.org/x/sync/errgroup/errgroup.go b/vendor/golang.org/x/sync/errgroup/errgroup.go new file mode 100644 index 00000000..533438d9 --- /dev/null +++ b/vendor/golang.org/x/sync/errgroup/errgroup.go @@ -0,0 +1,67 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package errgroup provides synchronization, error propagation, and Context +// cancelation for groups of goroutines working on subtasks of a common task. +package errgroup + +import ( + "sync" + + "golang.org/x/net/context" +) + +// A Group is a collection of goroutines working on subtasks that are part of +// the same overall task. +// +// A zero Group is valid and does not cancel on error. +type Group struct { + cancel func() + + wg sync.WaitGroup + + errOnce sync.Once + err error +} + +// WithContext returns a new Group and an associated Context derived from ctx. +// +// The derived Context is canceled the first time a function passed to Go +// returns a non-nil error or the first time Wait returns, whichever occurs +// first. +func WithContext(ctx context.Context) (*Group, context.Context) { + ctx, cancel := context.WithCancel(ctx) + return &Group{cancel: cancel}, ctx +} + +// Wait blocks until all function calls from the Go method have returned, then +// returns the first non-nil error (if any) from them. +func (g *Group) Wait() error { + g.wg.Wait() + if g.cancel != nil { + g.cancel() + } + return g.err +} + +// Go calls the given function in a new goroutine. +// +// The first call to return a non-nil error cancels the group; its error will be +// returned by Wait. +func (g *Group) Go(f func() error) { + g.wg.Add(1) + + go func() { + defer g.wg.Done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel() + } + }) + } + }() +} diff --git a/vendor/golang.org/x/sync/errgroup/errgroup_example_md5all_test.go b/vendor/golang.org/x/sync/errgroup/errgroup_example_md5all_test.go new file mode 100644 index 00000000..714b5aea --- /dev/null +++ b/vendor/golang.org/x/sync/errgroup/errgroup_example_md5all_test.go @@ -0,0 +1,101 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package errgroup_test + +import ( + "crypto/md5" + "fmt" + "io/ioutil" + "log" + "os" + "path/filepath" + + "golang.org/x/net/context" + "golang.org/x/sync/errgroup" +) + +// Pipeline demonstrates the use of a Group to implement a multi-stage +// pipeline: a version of the MD5All function with bounded parallelism from +// https://blog.golang.org/pipelines. +func ExampleGroup_pipeline() { + m, err := MD5All(context.Background(), ".") + if err != nil { + log.Fatal(err) + } + + for k, sum := range m { + fmt.Printf("%s:\t%x\n", k, sum) + } +} + +type result struct { + path string + sum [md5.Size]byte +} + +// MD5All reads all the files in the file tree rooted at root and returns a map +// from file path to the MD5 sum of the file's contents. If the directory walk +// fails or any read operation fails, MD5All returns an error. +func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) { + // ctx is canceled when g.Wait() returns. When this version of MD5All returns + // - even in case of error! - we know that all of the goroutines have finished + // and the memory they were using can be garbage-collected. + g, ctx := errgroup.WithContext(ctx) + paths := make(chan string) + + g.Go(func() error { + defer close(paths) + return filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.Mode().IsRegular() { + return nil + } + select { + case paths <- path: + case <-ctx.Done(): + return ctx.Err() + } + return nil + }) + }) + + // Start a fixed number of goroutines to read and digest files. + c := make(chan result) + const numDigesters = 20 + for i := 0; i < numDigesters; i++ { + g.Go(func() error { + for path := range paths { + data, err := ioutil.ReadFile(path) + if err != nil { + return err + } + select { + case c <- result{path, md5.Sum(data)}: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil + }) + } + go func() { + g.Wait() + close(c) + }() + + m := make(map[string][md5.Size]byte) + for r := range c { + m[r.path] = r.sum + } + // Check whether any of the goroutines failed. Since g is accumulating the + // errors, we don't need to send them (or check for them) in the individual + // results sent on the channel. + if err := g.Wait(); err != nil { + return nil, err + } + return m, nil +} diff --git a/vendor/golang.org/x/sync/errgroup/errgroup_test.go b/vendor/golang.org/x/sync/errgroup/errgroup_test.go new file mode 100644 index 00000000..6a9696ef --- /dev/null +++ b/vendor/golang.org/x/sync/errgroup/errgroup_test.go @@ -0,0 +1,176 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package errgroup_test + +import ( + "errors" + "fmt" + "net/http" + "os" + "testing" + + "golang.org/x/net/context" + "golang.org/x/sync/errgroup" +) + +var ( + Web = fakeSearch("web") + Image = fakeSearch("image") + Video = fakeSearch("video") +) + +type Result string +type Search func(ctx context.Context, query string) (Result, error) + +func fakeSearch(kind string) Search { + return func(_ context.Context, query string) (Result, error) { + return Result(fmt.Sprintf("%s result for %q", kind, query)), nil + } +} + +// JustErrors illustrates the use of a Group in place of a sync.WaitGroup to +// simplify goroutine counting and error handling. This example is derived from +// the sync.WaitGroup example at https://golang.org/pkg/sync/#example_WaitGroup. +func ExampleGroup_justErrors() { + var g errgroup.Group + var urls = []string{ + "http://www.golang.org/", + "http://www.google.com/", + "http://www.somestupidname.com/", + } + for _, url := range urls { + // Launch a goroutine to fetch the URL. + url := url // https://golang.org/doc/faq#closures_and_goroutines + g.Go(func() error { + // Fetch the URL. + resp, err := http.Get(url) + if err == nil { + resp.Body.Close() + } + return err + }) + } + // Wait for all HTTP fetches to complete. + if err := g.Wait(); err == nil { + fmt.Println("Successfully fetched all URLs.") + } +} + +// Parallel illustrates the use of a Group for synchronizing a simple parallel +// task: the "Google Search 2.0" function from +// https://talks.golang.org/2012/concurrency.slide#46, augmented with a Context +// and error-handling. +func ExampleGroup_parallel() { + Google := func(ctx context.Context, query string) ([]Result, error) { + g, ctx := errgroup.WithContext(ctx) + + searches := []Search{Web, Image, Video} + results := make([]Result, len(searches)) + for i, search := range searches { + i, search := i, search // https://golang.org/doc/faq#closures_and_goroutines + g.Go(func() error { + result, err := search(ctx, query) + if err == nil { + results[i] = result + } + return err + }) + } + if err := g.Wait(); err != nil { + return nil, err + } + return results, nil + } + + results, err := Google(context.Background(), "golang") + if err != nil { + fmt.Fprintln(os.Stderr, err) + return + } + for _, result := range results { + fmt.Println(result) + } + + // Output: + // web result for "golang" + // image result for "golang" + // video result for "golang" +} + +func TestZeroGroup(t *testing.T) { + err1 := errors.New("errgroup_test: 1") + err2 := errors.New("errgroup_test: 2") + + cases := []struct { + errs []error + }{ + {errs: []error{}}, + {errs: []error{nil}}, + {errs: []error{err1}}, + {errs: []error{err1, nil}}, + {errs: []error{err1, nil, err2}}, + } + + for _, tc := range cases { + var g errgroup.Group + + var firstErr error + for i, err := range tc.errs { + err := err + g.Go(func() error { return err }) + + if firstErr == nil && err != nil { + firstErr = err + } + + if gErr := g.Wait(); gErr != firstErr { + t.Errorf("after %T.Go(func() error { return err }) for err in %v\n"+ + "g.Wait() = %v; want %v", + g, tc.errs[:i+1], err, firstErr) + } + } + } +} + +func TestWithContext(t *testing.T) { + errDoom := errors.New("group_test: doomed") + + cases := []struct { + errs []error + want error + }{ + {want: nil}, + {errs: []error{nil}, want: nil}, + {errs: []error{errDoom}, want: errDoom}, + {errs: []error{errDoom, nil}, want: errDoom}, + } + + for _, tc := range cases { + g, ctx := errgroup.WithContext(context.Background()) + + for _, err := range tc.errs { + err := err + g.Go(func() error { return err }) + } + + if err := g.Wait(); err != tc.want { + t.Errorf("after %T.Go(func() error { return err }) for err in %v\n"+ + "g.Wait() = %v; want %v", + g, tc.errs, err, tc.want) + } + + canceled := false + select { + case <-ctx.Done(): + canceled = true + default: + } + if !canceled { + t.Errorf("after %T.Go(func() error { return err }) for err in %v\n"+ + "ctx.Done() was not closed", + g, tc.errs) + } + } +} diff --git a/vendor/golang.org/x/sync/semaphore/semaphore.go b/vendor/golang.org/x/sync/semaphore/semaphore.go new file mode 100644 index 00000000..e9d2d79a --- /dev/null +++ b/vendor/golang.org/x/sync/semaphore/semaphore.go @@ -0,0 +1,131 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package semaphore provides a weighted semaphore implementation. +package semaphore // import "golang.org/x/sync/semaphore" + +import ( + "container/list" + "sync" + + // Use the old context because packages that depend on this one + // (e.g. cloud.google.com/go/...) must run on Go 1.6. + // TODO(jba): update to "context" when possible. + "golang.org/x/net/context" +) + +type waiter struct { + n int64 + ready chan<- struct{} // Closed when semaphore acquired. +} + +// NewWeighted creates a new weighted semaphore with the given +// maximum combined weight for concurrent access. +func NewWeighted(n int64) *Weighted { + w := &Weighted{size: n} + return w +} + +// Weighted provides a way to bound concurrent access to a resource. +// The callers can request access with a given weight. +type Weighted struct { + size int64 + cur int64 + mu sync.Mutex + waiters list.List +} + +// Acquire acquires the semaphore with a weight of n, blocking only until ctx +// is done. On success, returns nil. On failure, returns ctx.Err() and leaves +// the semaphore unchanged. +// +// If ctx is already done, Acquire may still succeed without blocking. +func (s *Weighted) Acquire(ctx context.Context, n int64) error { + s.mu.Lock() + if s.size-s.cur >= n && s.waiters.Len() == 0 { + s.cur += n + s.mu.Unlock() + return nil + } + + if n > s.size { + // Don't make other Acquire calls block on one that's doomed to fail. + s.mu.Unlock() + <-ctx.Done() + return ctx.Err() + } + + ready := make(chan struct{}) + w := waiter{n: n, ready: ready} + elem := s.waiters.PushBack(w) + s.mu.Unlock() + + select { + case <-ctx.Done(): + err := ctx.Err() + s.mu.Lock() + select { + case <-ready: + // Acquired the semaphore after we were canceled. Rather than trying to + // fix up the queue, just pretend we didn't notice the cancelation. + err = nil + default: + s.waiters.Remove(elem) + } + s.mu.Unlock() + return err + + case <-ready: + return nil + } +} + +// TryAcquire acquires the semaphore with a weight of n without blocking. +// On success, returns true. On failure, returns false and leaves the semaphore unchanged. +func (s *Weighted) TryAcquire(n int64) bool { + s.mu.Lock() + success := s.size-s.cur >= n && s.waiters.Len() == 0 + if success { + s.cur += n + } + s.mu.Unlock() + return success +} + +// Release releases the semaphore with a weight of n. +func (s *Weighted) Release(n int64) { + s.mu.Lock() + s.cur -= n + if s.cur < 0 { + s.mu.Unlock() + panic("semaphore: bad release") + } + for { + next := s.waiters.Front() + if next == nil { + break // No more waiters blocked. + } + + w := next.Value.(waiter) + if s.size-s.cur < w.n { + // Not enough tokens for the next waiter. We could keep going (to try to + // find a waiter with a smaller request), but under load that could cause + // starvation for large requests; instead, we leave all remaining waiters + // blocked. + // + // Consider a semaphore used as a read-write lock, with N tokens, N + // readers, and one writer. Each reader can Acquire(1) to obtain a read + // lock. The writer can Acquire(N) to obtain a write lock, excluding all + // of the readers. If we allow the readers to jump ahead in the queue, + // the writer will starve — there is always one token available for every + // reader. + break + } + + s.cur += w.n + s.waiters.Remove(next) + close(w.ready) + } + s.mu.Unlock() +} diff --git a/vendor/golang.org/x/sync/semaphore/semaphore_bench_test.go b/vendor/golang.org/x/sync/semaphore/semaphore_bench_test.go new file mode 100644 index 00000000..1e3ab75f --- /dev/null +++ b/vendor/golang.org/x/sync/semaphore/semaphore_bench_test.go @@ -0,0 +1,131 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build go1.7 + +package semaphore_test + +import ( + "fmt" + "testing" + + "golang.org/x/net/context" + "golang.org/x/sync/semaphore" +) + +// weighted is an interface matching a subset of *Weighted. It allows +// alternate implementations for testing and benchmarking. +type weighted interface { + Acquire(context.Context, int64) error + TryAcquire(int64) bool + Release(int64) +} + +// semChan implements Weighted using a channel for +// comparing against the condition variable-based implementation. +type semChan chan struct{} + +func newSemChan(n int64) semChan { + return semChan(make(chan struct{}, n)) +} + +func (s semChan) Acquire(_ context.Context, n int64) error { + for i := int64(0); i < n; i++ { + s <- struct{}{} + } + return nil +} + +func (s semChan) TryAcquire(n int64) bool { + if int64(len(s))+n > int64(cap(s)) { + return false + } + + for i := int64(0); i < n; i++ { + s <- struct{}{} + } + return true +} + +func (s semChan) Release(n int64) { + for i := int64(0); i < n; i++ { + <-s + } +} + +// acquireN calls Acquire(size) on sem N times and then calls Release(size) N times. +func acquireN(b *testing.B, sem weighted, size int64, N int) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + for j := 0; j < N; j++ { + sem.Acquire(context.Background(), size) + } + for j := 0; j < N; j++ { + sem.Release(size) + } + } +} + +// tryAcquireN calls TryAcquire(size) on sem N times and then calls Release(size) N times. +func tryAcquireN(b *testing.B, sem weighted, size int64, N int) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + for j := 0; j < N; j++ { + if !sem.TryAcquire(size) { + b.Fatalf("TryAcquire(%v) = false, want true", size) + } + } + for j := 0; j < N; j++ { + sem.Release(size) + } + } +} + +func BenchmarkNewSeq(b *testing.B) { + for _, cap := range []int64{1, 128} { + b.Run(fmt.Sprintf("Weighted-%d", cap), func(b *testing.B) { + for i := 0; i < b.N; i++ { + _ = semaphore.NewWeighted(cap) + } + }) + b.Run(fmt.Sprintf("semChan-%d", cap), func(b *testing.B) { + for i := 0; i < b.N; i++ { + _ = newSemChan(cap) + } + }) + } +} + +func BenchmarkAcquireSeq(b *testing.B) { + for _, c := range []struct { + cap, size int64 + N int + }{ + {1, 1, 1}, + {2, 1, 1}, + {16, 1, 1}, + {128, 1, 1}, + {2, 2, 1}, + {16, 2, 8}, + {128, 2, 64}, + {2, 1, 2}, + {16, 8, 2}, + {128, 64, 2}, + } { + for _, w := range []struct { + name string + w weighted + }{ + {"Weighted", semaphore.NewWeighted(c.cap)}, + {"semChan", newSemChan(c.cap)}, + } { + b.Run(fmt.Sprintf("%s-acquire-%d-%d-%d", w.name, c.cap, c.size, c.N), func(b *testing.B) { + acquireN(b, w.w, c.size, c.N) + }) + b.Run(fmt.Sprintf("%s-tryAcquire-%d-%d-%d", w.name, c.cap, c.size, c.N), func(b *testing.B) { + tryAcquireN(b, w.w, c.size, c.N) + }) + } + } +} diff --git a/vendor/golang.org/x/sync/semaphore/semaphore_example_test.go b/vendor/golang.org/x/sync/semaphore/semaphore_example_test.go new file mode 100644 index 00000000..e75cd79f --- /dev/null +++ b/vendor/golang.org/x/sync/semaphore/semaphore_example_test.go @@ -0,0 +1,84 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package semaphore_test + +import ( + "context" + "fmt" + "log" + "runtime" + + "golang.org/x/sync/semaphore" +) + +// Example_workerPool demonstrates how to use a semaphore to limit the number of +// goroutines working on parallel tasks. +// +// This use of a semaphore mimics a typical “worker pool” pattern, but without +// the need to explicitly shut down idle workers when the work is done. +func Example_workerPool() { + ctx := context.TODO() + + var ( + maxWorkers = runtime.GOMAXPROCS(0) + sem = semaphore.NewWeighted(int64(maxWorkers)) + out = make([]int, 32) + ) + + // Compute the output using up to maxWorkers goroutines at a time. + for i := range out { + // When maxWorkers goroutines are in flight, Acquire blocks until one of the + // workers finishes. + if err := sem.Acquire(ctx, 1); err != nil { + log.Printf("Failed to acquire semaphore: %v", err) + break + } + + go func(i int) { + defer sem.Release(1) + out[i] = collatzSteps(i + 1) + }(i) + } + + // Acquire all of the tokens to wait for any remaining workers to finish. + // + // If you are already waiting for the workers by some other means (such as an + // errgroup.Group), you can omit this final Acquire call. + if err := sem.Acquire(ctx, int64(maxWorkers)); err != nil { + log.Printf("Failed to acquire semaphore: %v", err) + } + + fmt.Println(out) + + // Output: + // [0 1 7 2 5 8 16 3 19 6 14 9 9 17 17 4 12 20 20 7 7 15 15 10 23 10 111 18 18 18 106 5] +} + +// collatzSteps computes the number of steps to reach 1 under the Collatz +// conjecture. (See https://en.wikipedia.org/wiki/Collatz_conjecture.) +func collatzSteps(n int) (steps int) { + if n <= 0 { + panic("nonpositive input") + } + + for ; n > 1; steps++ { + if steps < 0 { + panic("too many steps") + } + + if n%2 == 0 { + n /= 2 + continue + } + + const maxInt = int(^uint(0) >> 1) + if n > (maxInt-1)/3 { + panic("overflow") + } + n = 3*n + 1 + } + + return steps +} diff --git a/vendor/golang.org/x/sync/semaphore/semaphore_test.go b/vendor/golang.org/x/sync/semaphore/semaphore_test.go new file mode 100644 index 00000000..2541b906 --- /dev/null +++ b/vendor/golang.org/x/sync/semaphore/semaphore_test.go @@ -0,0 +1,171 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package semaphore_test + +import ( + "math/rand" + "runtime" + "sync" + "testing" + "time" + + "golang.org/x/net/context" + "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" +) + +const maxSleep = 1 * time.Millisecond + +func HammerWeighted(sem *semaphore.Weighted, n int64, loops int) { + for i := 0; i < loops; i++ { + sem.Acquire(context.Background(), n) + time.Sleep(time.Duration(rand.Int63n(int64(maxSleep/time.Nanosecond))) * time.Nanosecond) + sem.Release(n) + } +} + +func TestWeighted(t *testing.T) { + t.Parallel() + + n := runtime.GOMAXPROCS(0) + loops := 10000 / n + sem := semaphore.NewWeighted(int64(n)) + var wg sync.WaitGroup + wg.Add(n) + for i := 0; i < n; i++ { + i := i + go func() { + defer wg.Done() + HammerWeighted(sem, int64(i), loops) + }() + } + wg.Wait() +} + +func TestWeightedPanic(t *testing.T) { + t.Parallel() + + defer func() { + if recover() == nil { + t.Fatal("release of an unacquired weighted semaphore did not panic") + } + }() + w := semaphore.NewWeighted(1) + w.Release(1) +} + +func TestWeightedTryAcquire(t *testing.T) { + t.Parallel() + + ctx := context.Background() + sem := semaphore.NewWeighted(2) + tries := []bool{} + sem.Acquire(ctx, 1) + tries = append(tries, sem.TryAcquire(1)) + tries = append(tries, sem.TryAcquire(1)) + + sem.Release(2) + + tries = append(tries, sem.TryAcquire(1)) + sem.Acquire(ctx, 1) + tries = append(tries, sem.TryAcquire(1)) + + want := []bool{true, false, true, false} + for i := range tries { + if tries[i] != want[i] { + t.Errorf("tries[%d]: got %t, want %t", i, tries[i], want[i]) + } + } +} + +func TestWeightedAcquire(t *testing.T) { + t.Parallel() + + ctx := context.Background() + sem := semaphore.NewWeighted(2) + tryAcquire := func(n int64) bool { + ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + defer cancel() + return sem.Acquire(ctx, n) == nil + } + + tries := []bool{} + sem.Acquire(ctx, 1) + tries = append(tries, tryAcquire(1)) + tries = append(tries, tryAcquire(1)) + + sem.Release(2) + + tries = append(tries, tryAcquire(1)) + sem.Acquire(ctx, 1) + tries = append(tries, tryAcquire(1)) + + want := []bool{true, false, true, false} + for i := range tries { + if tries[i] != want[i] { + t.Errorf("tries[%d]: got %t, want %t", i, tries[i], want[i]) + } + } +} + +func TestWeightedDoesntBlockIfTooBig(t *testing.T) { + t.Parallel() + + const n = 2 + sem := semaphore.NewWeighted(n) + { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go sem.Acquire(ctx, n+1) + } + + g, ctx := errgroup.WithContext(context.Background()) + for i := n * 3; i > 0; i-- { + g.Go(func() error { + err := sem.Acquire(ctx, 1) + if err == nil { + time.Sleep(1 * time.Millisecond) + sem.Release(1) + } + return err + }) + } + if err := g.Wait(); err != nil { + t.Errorf("semaphore.NewWeighted(%v) failed to AcquireCtx(_, 1) with AcquireCtx(_, %v) pending", n, n+1) + } +} + +// TestLargeAcquireDoesntStarve times out if a large call to Acquire starves. +// Merely returning from the test function indicates success. +func TestLargeAcquireDoesntStarve(t *testing.T) { + t.Parallel() + + ctx := context.Background() + n := int64(runtime.GOMAXPROCS(0)) + sem := semaphore.NewWeighted(n) + running := true + + var wg sync.WaitGroup + wg.Add(int(n)) + for i := n; i > 0; i-- { + sem.Acquire(ctx, 1) + go func() { + defer func() { + sem.Release(1) + wg.Done() + }() + for running { + time.Sleep(1 * time.Millisecond) + sem.Release(1) + sem.Acquire(ctx, 1) + } + }() + } + + sem.Acquire(ctx, n) + running = false + sem.Release(n) + wg.Wait() +} diff --git a/vendor/golang.org/x/sync/singleflight/singleflight.go b/vendor/golang.org/x/sync/singleflight/singleflight.go new file mode 100644 index 00000000..9a4f8d59 --- /dev/null +++ b/vendor/golang.org/x/sync/singleflight/singleflight.go @@ -0,0 +1,111 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package singleflight provides a duplicate function call suppression +// mechanism. +package singleflight // import "golang.org/x/sync/singleflight" + +import "sync" + +// call is an in-flight or completed singleflight.Do call +type call struct { + wg sync.WaitGroup + + // These fields are written once before the WaitGroup is done + // and are only read after the WaitGroup is done. + val interface{} + err error + + // These fields are read and written with the singleflight + // mutex held before the WaitGroup is done, and are read but + // not written after the WaitGroup is done. + dups int + chans []chan<- Result +} + +// Group represents a class of work and forms a namespace in +// which units of work can be executed with duplicate suppression. +type Group struct { + mu sync.Mutex // protects m + m map[string]*call // lazily initialized +} + +// Result holds the results of Do, so they can be passed +// on a channel. +type Result struct { + Val interface{} + Err error + Shared bool +} + +// Do executes and returns the results of the given function, making +// sure that only one execution is in-flight for a given key at a +// time. If a duplicate comes in, the duplicate caller waits for the +// original to complete and receives the same results. +// The return value shared indicates whether v was given to multiple callers. +func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + g.mu.Unlock() + c.wg.Wait() + return c.val, c.err, true + } + c := new(call) + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + g.doCall(c, key, fn) + return c.val, c.err, c.dups > 0 +} + +// DoChan is like Do but returns a channel that will receive the +// results when they are ready. +func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { + ch := make(chan Result, 1) + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + c.chans = append(c.chans, ch) + g.mu.Unlock() + return ch + } + c := &call{chans: []chan<- Result{ch}} + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + go g.doCall(c, key, fn) + + return ch +} + +// doCall handles the single call for a key. +func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { + c.val, c.err = fn() + c.wg.Done() + + g.mu.Lock() + delete(g.m, key) + for _, ch := range c.chans { + ch <- Result{c.val, c.err, c.dups > 0} + } + g.mu.Unlock() +} + +// Forget tells the singleflight to forget about a key. Future calls +// to Do for this key will call the function rather than waiting for +// an earlier call to complete. +func (g *Group) Forget(key string) { + g.mu.Lock() + delete(g.m, key) + g.mu.Unlock() +} diff --git a/vendor/golang.org/x/sync/singleflight/singleflight_test.go b/vendor/golang.org/x/sync/singleflight/singleflight_test.go new file mode 100644 index 00000000..5e6f1b32 --- /dev/null +++ b/vendor/golang.org/x/sync/singleflight/singleflight_test.go @@ -0,0 +1,87 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package singleflight + +import ( + "errors" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestDo(t *testing.T) { + var g Group + v, err, _ := g.Do("key", func() (interface{}, error) { + return "bar", nil + }) + if got, want := fmt.Sprintf("%v (%T)", v, v), "bar (string)"; got != want { + t.Errorf("Do = %v; want %v", got, want) + } + if err != nil { + t.Errorf("Do error = %v", err) + } +} + +func TestDoErr(t *testing.T) { + var g Group + someErr := errors.New("Some error") + v, err, _ := g.Do("key", func() (interface{}, error) { + return nil, someErr + }) + if err != someErr { + t.Errorf("Do error = %v; want someErr %v", err, someErr) + } + if v != nil { + t.Errorf("unexpected non-nil value %#v", v) + } +} + +func TestDoDupSuppress(t *testing.T) { + var g Group + var wg1, wg2 sync.WaitGroup + c := make(chan string, 1) + var calls int32 + fn := func() (interface{}, error) { + if atomic.AddInt32(&calls, 1) == 1 { + // First invocation. + wg1.Done() + } + v := <-c + c <- v // pump; make available for any future calls + + time.Sleep(10 * time.Millisecond) // let more goroutines enter Do + + return v, nil + } + + const n = 10 + wg1.Add(1) + for i := 0; i < n; i++ { + wg1.Add(1) + wg2.Add(1) + go func() { + defer wg2.Done() + wg1.Done() + v, err, _ := g.Do("key", fn) + if err != nil { + t.Errorf("Do error: %v", err) + return + } + if s, _ := v.(string); s != "bar" { + t.Errorf("Do = %T %v; want %q", v, v, "bar") + } + }() + } + wg1.Wait() + // At least one goroutine is in fn now and all of them have at + // least reached the line before the Do. + c <- "bar" + wg2.Wait() + if got := atomic.LoadInt32(&calls); got <= 0 || got >= n { + t.Errorf("number of calls = %d; want over 0 and less than %d", got, n) + } +} diff --git a/vendor/golang.org/x/sync/syncmap/map.go b/vendor/golang.org/x/sync/syncmap/map.go new file mode 100644 index 00000000..80e15847 --- /dev/null +++ b/vendor/golang.org/x/sync/syncmap/map.go @@ -0,0 +1,372 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package syncmap provides a concurrent map implementation. +// It is a prototype for a proposed addition to the sync package +// in the standard library. +// (https://golang.org/issue/18177) +package syncmap + +import ( + "sync" + "sync/atomic" + "unsafe" +) + +// Map is a concurrent map with amortized-constant-time loads, stores, and deletes. +// It is safe for multiple goroutines to call a Map's methods concurrently. +// +// The zero Map is valid and empty. +// +// A Map must not be copied after first use. +type Map struct { + mu sync.Mutex + + // read contains the portion of the map's contents that are safe for + // concurrent access (with or without mu held). + // + // The read field itself is always safe to load, but must only be stored with + // mu held. + // + // Entries stored in read may be updated concurrently without mu, but updating + // a previously-expunged entry requires that the entry be copied to the dirty + // map and unexpunged with mu held. + read atomic.Value // readOnly + + // dirty contains the portion of the map's contents that require mu to be + // held. To ensure that the dirty map can be promoted to the read map quickly, + // it also includes all of the non-expunged entries in the read map. + // + // Expunged entries are not stored in the dirty map. An expunged entry in the + // clean map must be unexpunged and added to the dirty map before a new value + // can be stored to it. + // + // If the dirty map is nil, the next write to the map will initialize it by + // making a shallow copy of the clean map, omitting stale entries. + dirty map[interface{}]*entry + + // misses counts the number of loads since the read map was last updated that + // needed to lock mu to determine whether the key was present. + // + // Once enough misses have occurred to cover the cost of copying the dirty + // map, the dirty map will be promoted to the read map (in the unamended + // state) and the next store to the map will make a new dirty copy. + misses int +} + +// readOnly is an immutable struct stored atomically in the Map.read field. +type readOnly struct { + m map[interface{}]*entry + amended bool // true if the dirty map contains some key not in m. +} + +// expunged is an arbitrary pointer that marks entries which have been deleted +// from the dirty map. +var expunged = unsafe.Pointer(new(interface{})) + +// An entry is a slot in the map corresponding to a particular key. +type entry struct { + // p points to the interface{} value stored for the entry. + // + // If p == nil, the entry has been deleted and m.dirty == nil. + // + // If p == expunged, the entry has been deleted, m.dirty != nil, and the entry + // is missing from m.dirty. + // + // Otherwise, the entry is valid and recorded in m.read.m[key] and, if m.dirty + // != nil, in m.dirty[key]. + // + // An entry can be deleted by atomic replacement with nil: when m.dirty is + // next created, it will atomically replace nil with expunged and leave + // m.dirty[key] unset. + // + // An entry's associated value can be updated by atomic replacement, provided + // p != expunged. If p == expunged, an entry's associated value can be updated + // only after first setting m.dirty[key] = e so that lookups using the dirty + // map find the entry. + p unsafe.Pointer // *interface{} +} + +func newEntry(i interface{}) *entry { + return &entry{p: unsafe.Pointer(&i)} +} + +// Load returns the value stored in the map for a key, or nil if no +// value is present. +// The ok result indicates whether value was found in the map. +func (m *Map) Load(key interface{}) (value interface{}, ok bool) { + read, _ := m.read.Load().(readOnly) + e, ok := read.m[key] + if !ok && read.amended { + m.mu.Lock() + // Avoid reporting a spurious miss if m.dirty got promoted while we were + // blocked on m.mu. (If further loads of the same key will not miss, it's + // not worth copying the dirty map for this key.) + read, _ = m.read.Load().(readOnly) + e, ok = read.m[key] + if !ok && read.amended { + e, ok = m.dirty[key] + // Regardless of whether the entry was present, record a miss: this key + // will take the slow path until the dirty map is promoted to the read + // map. + m.missLocked() + } + m.mu.Unlock() + } + if !ok { + return nil, false + } + return e.load() +} + +func (e *entry) load() (value interface{}, ok bool) { + p := atomic.LoadPointer(&e.p) + if p == nil || p == expunged { + return nil, false + } + return *(*interface{})(p), true +} + +// Store sets the value for a key. +func (m *Map) Store(key, value interface{}) { + read, _ := m.read.Load().(readOnly) + if e, ok := read.m[key]; ok && e.tryStore(&value) { + return + } + + m.mu.Lock() + read, _ = m.read.Load().(readOnly) + if e, ok := read.m[key]; ok { + if e.unexpungeLocked() { + // The entry was previously expunged, which implies that there is a + // non-nil dirty map and this entry is not in it. + m.dirty[key] = e + } + e.storeLocked(&value) + } else if e, ok := m.dirty[key]; ok { + e.storeLocked(&value) + } else { + if !read.amended { + // We're adding the first new key to the dirty map. + // Make sure it is allocated and mark the read-only map as incomplete. + m.dirtyLocked() + m.read.Store(readOnly{m: read.m, amended: true}) + } + m.dirty[key] = newEntry(value) + } + m.mu.Unlock() +} + +// tryStore stores a value if the entry has not been expunged. +// +// If the entry is expunged, tryStore returns false and leaves the entry +// unchanged. +func (e *entry) tryStore(i *interface{}) bool { + p := atomic.LoadPointer(&e.p) + if p == expunged { + return false + } + for { + if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) { + return true + } + p = atomic.LoadPointer(&e.p) + if p == expunged { + return false + } + } +} + +// unexpungeLocked ensures that the entry is not marked as expunged. +// +// If the entry was previously expunged, it must be added to the dirty map +// before m.mu is unlocked. +func (e *entry) unexpungeLocked() (wasExpunged bool) { + return atomic.CompareAndSwapPointer(&e.p, expunged, nil) +} + +// storeLocked unconditionally stores a value to the entry. +// +// The entry must be known not to be expunged. +func (e *entry) storeLocked(i *interface{}) { + atomic.StorePointer(&e.p, unsafe.Pointer(i)) +} + +// LoadOrStore returns the existing value for the key if present. +// Otherwise, it stores and returns the given value. +// The loaded result is true if the value was loaded, false if stored. +func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) { + // Avoid locking if it's a clean hit. + read, _ := m.read.Load().(readOnly) + if e, ok := read.m[key]; ok { + actual, loaded, ok := e.tryLoadOrStore(value) + if ok { + return actual, loaded + } + } + + m.mu.Lock() + read, _ = m.read.Load().(readOnly) + if e, ok := read.m[key]; ok { + if e.unexpungeLocked() { + m.dirty[key] = e + } + actual, loaded, _ = e.tryLoadOrStore(value) + } else if e, ok := m.dirty[key]; ok { + actual, loaded, _ = e.tryLoadOrStore(value) + m.missLocked() + } else { + if !read.amended { + // We're adding the first new key to the dirty map. + // Make sure it is allocated and mark the read-only map as incomplete. + m.dirtyLocked() + m.read.Store(readOnly{m: read.m, amended: true}) + } + m.dirty[key] = newEntry(value) + actual, loaded = value, false + } + m.mu.Unlock() + + return actual, loaded +} + +// tryLoadOrStore atomically loads or stores a value if the entry is not +// expunged. +// +// If the entry is expunged, tryLoadOrStore leaves the entry unchanged and +// returns with ok==false. +func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) { + p := atomic.LoadPointer(&e.p) + if p == expunged { + return nil, false, false + } + if p != nil { + return *(*interface{})(p), true, true + } + + // Copy the interface after the first load to make this method more amenable + // to escape analysis: if we hit the "load" path or the entry is expunged, we + // shouldn't bother heap-allocating. + ic := i + for { + if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) { + return i, false, true + } + p = atomic.LoadPointer(&e.p) + if p == expunged { + return nil, false, false + } + if p != nil { + return *(*interface{})(p), true, true + } + } +} + +// Delete deletes the value for a key. +func (m *Map) Delete(key interface{}) { + read, _ := m.read.Load().(readOnly) + e, ok := read.m[key] + if !ok && read.amended { + m.mu.Lock() + read, _ = m.read.Load().(readOnly) + e, ok = read.m[key] + if !ok && read.amended { + delete(m.dirty, key) + } + m.mu.Unlock() + } + if ok { + e.delete() + } +} + +func (e *entry) delete() (hadValue bool) { + for { + p := atomic.LoadPointer(&e.p) + if p == nil || p == expunged { + return false + } + if atomic.CompareAndSwapPointer(&e.p, p, nil) { + return true + } + } +} + +// Range calls f sequentially for each key and value present in the map. +// If f returns false, range stops the iteration. +// +// Range does not necessarily correspond to any consistent snapshot of the Map's +// contents: no key will be visited more than once, but if the value for any key +// is stored or deleted concurrently, Range may reflect any mapping for that key +// from any point during the Range call. +// +// Range may be O(N) with the number of elements in the map even if f returns +// false after a constant number of calls. +func (m *Map) Range(f func(key, value interface{}) bool) { + // We need to be able to iterate over all of the keys that were already + // present at the start of the call to Range. + // If read.amended is false, then read.m satisfies that property without + // requiring us to hold m.mu for a long time. + read, _ := m.read.Load().(readOnly) + if read.amended { + // m.dirty contains keys not in read.m. Fortunately, Range is already O(N) + // (assuming the caller does not break out early), so a call to Range + // amortizes an entire copy of the map: we can promote the dirty copy + // immediately! + m.mu.Lock() + read, _ = m.read.Load().(readOnly) + if read.amended { + read = readOnly{m: m.dirty} + m.read.Store(read) + m.dirty = nil + m.misses = 0 + } + m.mu.Unlock() + } + + for k, e := range read.m { + v, ok := e.load() + if !ok { + continue + } + if !f(k, v) { + break + } + } +} + +func (m *Map) missLocked() { + m.misses++ + if m.misses < len(m.dirty) { + return + } + m.read.Store(readOnly{m: m.dirty}) + m.dirty = nil + m.misses = 0 +} + +func (m *Map) dirtyLocked() { + if m.dirty != nil { + return + } + + read, _ := m.read.Load().(readOnly) + m.dirty = make(map[interface{}]*entry, len(read.m)) + for k, e := range read.m { + if !e.tryExpungeLocked() { + m.dirty[k] = e + } + } +} + +func (e *entry) tryExpungeLocked() (isExpunged bool) { + p := atomic.LoadPointer(&e.p) + for p == nil { + if atomic.CompareAndSwapPointer(&e.p, nil, expunged) { + return true + } + p = atomic.LoadPointer(&e.p) + } + return p == expunged +} diff --git a/vendor/golang.org/x/sync/syncmap/map_bench_test.go b/vendor/golang.org/x/sync/syncmap/map_bench_test.go new file mode 100644 index 00000000..b279b4f7 --- /dev/null +++ b/vendor/golang.org/x/sync/syncmap/map_bench_test.go @@ -0,0 +1,216 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package syncmap_test + +import ( + "fmt" + "reflect" + "sync/atomic" + "testing" + + "golang.org/x/sync/syncmap" +) + +type bench struct { + setup func(*testing.B, mapInterface) + perG func(b *testing.B, pb *testing.PB, i int, m mapInterface) +} + +func benchMap(b *testing.B, bench bench) { + for _, m := range [...]mapInterface{&DeepCopyMap{}, &RWMutexMap{}, &syncmap.Map{}} { + b.Run(fmt.Sprintf("%T", m), func(b *testing.B) { + m = reflect.New(reflect.TypeOf(m).Elem()).Interface().(mapInterface) + if bench.setup != nil { + bench.setup(b, m) + } + + b.ResetTimer() + + var i int64 + b.RunParallel(func(pb *testing.PB) { + id := int(atomic.AddInt64(&i, 1) - 1) + bench.perG(b, pb, id*b.N, m) + }) + }) + } +} + +func BenchmarkLoadMostlyHits(b *testing.B) { + const hits, misses = 1023, 1 + + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + for i := 0; i < hits; i++ { + m.LoadOrStore(i, i) + } + // Prime the map to get it into a steady state. + for i := 0; i < hits*2; i++ { + m.Load(i % hits) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.Load(i % (hits + misses)) + } + }, + }) +} + +func BenchmarkLoadMostlyMisses(b *testing.B) { + const hits, misses = 1, 1023 + + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + for i := 0; i < hits; i++ { + m.LoadOrStore(i, i) + } + // Prime the map to get it into a steady state. + for i := 0; i < hits*2; i++ { + m.Load(i % hits) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.Load(i % (hits + misses)) + } + }, + }) +} + +func BenchmarkLoadOrStoreBalanced(b *testing.B) { + const hits, misses = 128, 128 + + benchMap(b, bench{ + setup: func(b *testing.B, m mapInterface) { + if _, ok := m.(*DeepCopyMap); ok { + b.Skip("DeepCopyMap has quadratic running time.") + } + for i := 0; i < hits; i++ { + m.LoadOrStore(i, i) + } + // Prime the map to get it into a steady state. + for i := 0; i < hits*2; i++ { + m.Load(i % hits) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + j := i % (hits + misses) + if j < hits { + if _, ok := m.LoadOrStore(j, i); !ok { + b.Fatalf("unexpected miss for %v", j) + } + } else { + if v, loaded := m.LoadOrStore(i, i); loaded { + b.Fatalf("failed to store %v: existing value %v", i, v) + } + } + } + }, + }) +} + +func BenchmarkLoadOrStoreUnique(b *testing.B) { + benchMap(b, bench{ + setup: func(b *testing.B, m mapInterface) { + if _, ok := m.(*DeepCopyMap); ok { + b.Skip("DeepCopyMap has quadratic running time.") + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.LoadOrStore(i, i) + } + }, + }) +} + +func BenchmarkLoadOrStoreCollision(b *testing.B) { + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + m.LoadOrStore(0, 0) + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.LoadOrStore(0, 0) + } + }, + }) +} + +func BenchmarkRange(b *testing.B) { + const mapSize = 1 << 10 + + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + for i := 0; i < mapSize; i++ { + m.Store(i, i) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.Range(func(_, _ interface{}) bool { return true }) + } + }, + }) +} + +// BenchmarkAdversarialAlloc tests performance when we store a new value +// immediately whenever the map is promoted to clean and otherwise load a +// unique, missing key. +// +// This forces the Load calls to always acquire the map's mutex. +func BenchmarkAdversarialAlloc(b *testing.B) { + benchMap(b, bench{ + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + var stores, loadsSinceStore int64 + for ; pb.Next(); i++ { + m.Load(i) + if loadsSinceStore++; loadsSinceStore > stores { + m.LoadOrStore(i, stores) + loadsSinceStore = 0 + stores++ + } + } + }, + }) +} + +// BenchmarkAdversarialDelete tests performance when we periodically delete +// one key and add a different one in a large map. +// +// This forces the Load calls to always acquire the map's mutex and periodically +// makes a full copy of the map despite changing only one entry. +func BenchmarkAdversarialDelete(b *testing.B) { + const mapSize = 1 << 10 + + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + for i := 0; i < mapSize; i++ { + m.Store(i, i) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.Load(i) + + if i%mapSize == 0 { + m.Range(func(k, _ interface{}) bool { + m.Delete(k) + return false + }) + m.Store(i, i) + } + } + }, + }) +} diff --git a/vendor/golang.org/x/sync/syncmap/map_reference_test.go b/vendor/golang.org/x/sync/syncmap/map_reference_test.go new file mode 100644 index 00000000..923c51b7 --- /dev/null +++ b/vendor/golang.org/x/sync/syncmap/map_reference_test.go @@ -0,0 +1,151 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package syncmap_test + +import ( + "sync" + "sync/atomic" +) + +// This file contains reference map implementations for unit-tests. + +// mapInterface is the interface Map implements. +type mapInterface interface { + Load(interface{}) (interface{}, bool) + Store(key, value interface{}) + LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) + Delete(interface{}) + Range(func(key, value interface{}) (shouldContinue bool)) +} + +// RWMutexMap is an implementation of mapInterface using a sync.RWMutex. +type RWMutexMap struct { + mu sync.RWMutex + dirty map[interface{}]interface{} +} + +func (m *RWMutexMap) Load(key interface{}) (value interface{}, ok bool) { + m.mu.RLock() + value, ok = m.dirty[key] + m.mu.RUnlock() + return +} + +func (m *RWMutexMap) Store(key, value interface{}) { + m.mu.Lock() + if m.dirty == nil { + m.dirty = make(map[interface{}]interface{}) + } + m.dirty[key] = value + m.mu.Unlock() +} + +func (m *RWMutexMap) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) { + m.mu.Lock() + actual, loaded = m.dirty[key] + if !loaded { + actual = value + if m.dirty == nil { + m.dirty = make(map[interface{}]interface{}) + } + m.dirty[key] = value + } + m.mu.Unlock() + return actual, loaded +} + +func (m *RWMutexMap) Delete(key interface{}) { + m.mu.Lock() + delete(m.dirty, key) + m.mu.Unlock() +} + +func (m *RWMutexMap) Range(f func(key, value interface{}) (shouldContinue bool)) { + m.mu.RLock() + keys := make([]interface{}, 0, len(m.dirty)) + for k := range m.dirty { + keys = append(keys, k) + } + m.mu.RUnlock() + + for _, k := range keys { + v, ok := m.Load(k) + if !ok { + continue + } + if !f(k, v) { + break + } + } +} + +// DeepCopyMap is an implementation of mapInterface using a Mutex and +// atomic.Value. It makes deep copies of the map on every write to avoid +// acquiring the Mutex in Load. +type DeepCopyMap struct { + mu sync.Mutex + clean atomic.Value +} + +func (m *DeepCopyMap) Load(key interface{}) (value interface{}, ok bool) { + clean, _ := m.clean.Load().(map[interface{}]interface{}) + value, ok = clean[key] + return value, ok +} + +func (m *DeepCopyMap) Store(key, value interface{}) { + m.mu.Lock() + dirty := m.dirty() + dirty[key] = value + m.clean.Store(dirty) + m.mu.Unlock() +} + +func (m *DeepCopyMap) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) { + clean, _ := m.clean.Load().(map[interface{}]interface{}) + actual, loaded = clean[key] + if loaded { + return actual, loaded + } + + m.mu.Lock() + // Reload clean in case it changed while we were waiting on m.mu. + clean, _ = m.clean.Load().(map[interface{}]interface{}) + actual, loaded = clean[key] + if !loaded { + dirty := m.dirty() + dirty[key] = value + actual = value + m.clean.Store(dirty) + } + m.mu.Unlock() + return actual, loaded +} + +func (m *DeepCopyMap) Delete(key interface{}) { + m.mu.Lock() + dirty := m.dirty() + delete(dirty, key) + m.clean.Store(dirty) + m.mu.Unlock() +} + +func (m *DeepCopyMap) Range(f func(key, value interface{}) (shouldContinue bool)) { + clean, _ := m.clean.Load().(map[interface{}]interface{}) + for k, v := range clean { + if !f(k, v) { + break + } + } +} + +func (m *DeepCopyMap) dirty() map[interface{}]interface{} { + clean, _ := m.clean.Load().(map[interface{}]interface{}) + dirty := make(map[interface{}]interface{}, len(clean)+1) + for k, v := range clean { + dirty[k] = v + } + return dirty +} diff --git a/vendor/golang.org/x/sync/syncmap/map_test.go b/vendor/golang.org/x/sync/syncmap/map_test.go new file mode 100644 index 00000000..c883f176 --- /dev/null +++ b/vendor/golang.org/x/sync/syncmap/map_test.go @@ -0,0 +1,172 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package syncmap_test + +import ( + "math/rand" + "reflect" + "runtime" + "sync" + "testing" + "testing/quick" + + "golang.org/x/sync/syncmap" +) + +type mapOp string + +const ( + opLoad = mapOp("Load") + opStore = mapOp("Store") + opLoadOrStore = mapOp("LoadOrStore") + opDelete = mapOp("Delete") +) + +var mapOps = [...]mapOp{opLoad, opStore, opLoadOrStore, opDelete} + +// mapCall is a quick.Generator for calls on mapInterface. +type mapCall struct { + op mapOp + k, v interface{} +} + +func (c mapCall) apply(m mapInterface) (interface{}, bool) { + switch c.op { + case opLoad: + return m.Load(c.k) + case opStore: + m.Store(c.k, c.v) + return nil, false + case opLoadOrStore: + return m.LoadOrStore(c.k, c.v) + case opDelete: + m.Delete(c.k) + return nil, false + default: + panic("invalid mapOp") + } +} + +type mapResult struct { + value interface{} + ok bool +} + +func randValue(r *rand.Rand) interface{} { + b := make([]byte, r.Intn(4)) + for i := range b { + b[i] = 'a' + byte(rand.Intn(26)) + } + return string(b) +} + +func (mapCall) Generate(r *rand.Rand, size int) reflect.Value { + c := mapCall{op: mapOps[rand.Intn(len(mapOps))], k: randValue(r)} + switch c.op { + case opStore, opLoadOrStore: + c.v = randValue(r) + } + return reflect.ValueOf(c) +} + +func applyCalls(m mapInterface, calls []mapCall) (results []mapResult, final map[interface{}]interface{}) { + for _, c := range calls { + v, ok := c.apply(m) + results = append(results, mapResult{v, ok}) + } + + final = make(map[interface{}]interface{}) + m.Range(func(k, v interface{}) bool { + final[k] = v + return true + }) + + return results, final +} + +func applyMap(calls []mapCall) ([]mapResult, map[interface{}]interface{}) { + return applyCalls(new(syncmap.Map), calls) +} + +func applyRWMutexMap(calls []mapCall) ([]mapResult, map[interface{}]interface{}) { + return applyCalls(new(RWMutexMap), calls) +} + +func applyDeepCopyMap(calls []mapCall) ([]mapResult, map[interface{}]interface{}) { + return applyCalls(new(DeepCopyMap), calls) +} + +func TestMapMatchesRWMutex(t *testing.T) { + if err := quick.CheckEqual(applyMap, applyRWMutexMap, nil); err != nil { + t.Error(err) + } +} + +func TestMapMatchesDeepCopy(t *testing.T) { + if err := quick.CheckEqual(applyMap, applyDeepCopyMap, nil); err != nil { + t.Error(err) + } +} + +func TestConcurrentRange(t *testing.T) { + const mapSize = 1 << 10 + + m := new(syncmap.Map) + for n := int64(1); n <= mapSize; n++ { + m.Store(n, int64(n)) + } + + done := make(chan struct{}) + var wg sync.WaitGroup + defer func() { + close(done) + wg.Wait() + }() + for g := int64(runtime.GOMAXPROCS(0)); g > 0; g-- { + r := rand.New(rand.NewSource(g)) + wg.Add(1) + go func(g int64) { + defer wg.Done() + for i := int64(0); ; i++ { + select { + case <-done: + return + default: + } + for n := int64(1); n < mapSize; n++ { + if r.Int63n(mapSize) == 0 { + m.Store(n, n*i*g) + } else { + m.Load(n) + } + } + } + }(g) + } + + iters := 1 << 10 + if testing.Short() { + iters = 16 + } + for n := iters; n > 0; n-- { + seen := make(map[int64]bool, mapSize) + + m.Range(func(ki, vi interface{}) bool { + k, v := ki.(int64), vi.(int64) + if v%k != 0 { + t.Fatalf("while Storing multiples of %v, Range saw value %v", k, v) + } + if seen[k] { + t.Fatalf("Range visited key %v twice", k) + } + seen[k] = true + return true + }) + + if len(seen) != mapSize { + t.Fatalf("Range visited %v elements of %v-element Map", len(seen), mapSize) + } + } +}