Merge branch 'diBreakout' of https://github.com/filecoin-project/lotus into diBreakout
This commit is contained in:
commit
d0cfcd99f5
4
Makefile
4
Makefile
@ -356,7 +356,7 @@ docsgen-md-bin: api-gen actors-gen
|
|||||||
docsgen-openrpc-bin: api-gen actors-gen
|
docsgen-openrpc-bin: api-gen actors-gen
|
||||||
$(GOCC) build $(GOFLAGS) -o docgen-openrpc ./api/docgen-openrpc/cmd
|
$(GOCC) build $(GOFLAGS) -o docgen-openrpc ./api/docgen-openrpc/cmd
|
||||||
|
|
||||||
docsgen-md: docsgen-md-full docsgen-md-storage docsgen-md-worker
|
docsgen-md: docsgen-md-full docsgen-md-storage docsgen-md-worker docsgen-md-provider
|
||||||
|
|
||||||
docsgen-md-full: docsgen-md-bin
|
docsgen-md-full: docsgen-md-bin
|
||||||
./docgen-md "api/api_full.go" "FullNode" "api" "./api" > documentation/en/api-v1-unstable-methods.md
|
./docgen-md "api/api_full.go" "FullNode" "api" "./api" > documentation/en/api-v1-unstable-methods.md
|
||||||
@ -365,6 +365,8 @@ docsgen-md-storage: docsgen-md-bin
|
|||||||
./docgen-md "api/api_storage.go" "StorageMiner" "api" "./api" > documentation/en/api-v0-methods-miner.md
|
./docgen-md "api/api_storage.go" "StorageMiner" "api" "./api" > documentation/en/api-v0-methods-miner.md
|
||||||
docsgen-md-worker: docsgen-md-bin
|
docsgen-md-worker: docsgen-md-bin
|
||||||
./docgen-md "api/api_worker.go" "Worker" "api" "./api" > documentation/en/api-v0-methods-worker.md
|
./docgen-md "api/api_worker.go" "Worker" "api" "./api" > documentation/en/api-v0-methods-worker.md
|
||||||
|
docsgen-md-provider: docsgen-md-bin
|
||||||
|
./docgen-md "api/api_lp.go" "Provider" "api" "./api" > documentation/en/api-v0-methods-provider.md
|
||||||
|
|
||||||
docsgen-openrpc: docsgen-openrpc-full docsgen-openrpc-storage docsgen-openrpc-worker docsgen-openrpc-gateway
|
docsgen-openrpc: docsgen-openrpc-full docsgen-openrpc-storage docsgen-openrpc-worker docsgen-openrpc-gateway
|
||||||
|
|
||||||
|
@ -432,6 +432,10 @@ func GetAPIType(name, pkg string) (i interface{}, t reflect.Type, permStruct []r
|
|||||||
i = &api.GatewayStruct{}
|
i = &api.GatewayStruct{}
|
||||||
t = reflect.TypeOf(new(struct{ api.Gateway })).Elem()
|
t = reflect.TypeOf(new(struct{ api.Gateway })).Elem()
|
||||||
permStruct = append(permStruct, reflect.TypeOf(api.GatewayStruct{}.Internal))
|
permStruct = append(permStruct, reflect.TypeOf(api.GatewayStruct{}.Internal))
|
||||||
|
case "Provider":
|
||||||
|
i = &api.LotusProviderStruct{}
|
||||||
|
t = reflect.TypeOf(new(struct{ api.LotusProvider })).Elem()
|
||||||
|
permStruct = append(permStruct, reflect.TypeOf(api.LotusProviderStruct{}.Internal))
|
||||||
default:
|
default:
|
||||||
panic("unknown type")
|
panic("unknown type")
|
||||||
}
|
}
|
||||||
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -37,7 +37,7 @@ func BuildTypeString() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// BuildVersion is the local build version
|
// BuildVersion is the local build version
|
||||||
const BuildVersion = "1.25.1-dev"
|
const BuildVersion = "1.25.2-dev"
|
||||||
|
|
||||||
func UserVersion() string {
|
func UserVersion() string {
|
||||||
if os.Getenv("LOTUS_VERSION_IGNORE_COMMIT") == "1" {
|
if os.Getenv("LOTUS_VERSION_IGNORE_COMMIT") == "1" {
|
||||||
|
@ -306,9 +306,9 @@ func (t *Response) UnmarshalCBOR(r io.Reader) (err error) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var lengthBufCompactedMessages = []byte{132}
|
var lengthBufCompactedMessagesCBOR = []byte{132}
|
||||||
|
|
||||||
func (t *CompactedMessages) MarshalCBOR(w io.Writer) error {
|
func (t *CompactedMessagesCBOR) MarshalCBOR(w io.Writer) error {
|
||||||
if t == nil {
|
if t == nil {
|
||||||
_, err := w.Write(cbg.CborNull)
|
_, err := w.Write(cbg.CborNull)
|
||||||
return err
|
return err
|
||||||
@ -316,12 +316,12 @@ func (t *CompactedMessages) MarshalCBOR(w io.Writer) error {
|
|||||||
|
|
||||||
cw := cbg.NewCborWriter(w)
|
cw := cbg.NewCborWriter(w)
|
||||||
|
|
||||||
if _, err := cw.Write(lengthBufCompactedMessages); err != nil {
|
if _, err := cw.Write(lengthBufCompactedMessagesCBOR); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// t.Bls ([]*types.Message) (slice)
|
// t.Bls ([]*types.Message) (slice)
|
||||||
if len(t.Bls) > cbg.MaxLength {
|
if len(t.Bls) > 150000 {
|
||||||
return xerrors.Errorf("Slice value in field t.Bls was too long")
|
return xerrors.Errorf("Slice value in field t.Bls was too long")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -334,7 +334,7 @@ func (t *CompactedMessages) MarshalCBOR(w io.Writer) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// t.BlsIncludes ([][]uint64) (slice)
|
// t.BlsIncludes ([]exchange.messageIndices) (slice)
|
||||||
if len(t.BlsIncludes) > cbg.MaxLength {
|
if len(t.BlsIncludes) > cbg.MaxLength {
|
||||||
return xerrors.Errorf("Slice value in field t.BlsIncludes was too long")
|
return xerrors.Errorf("Slice value in field t.BlsIncludes was too long")
|
||||||
}
|
}
|
||||||
@ -343,24 +343,13 @@ func (t *CompactedMessages) MarshalCBOR(w io.Writer) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, v := range t.BlsIncludes {
|
for _, v := range t.BlsIncludes {
|
||||||
if len(v) > cbg.MaxLength {
|
if err := v.MarshalCBOR(cw); err != nil {
|
||||||
return xerrors.Errorf("Slice value in field v was too long")
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := cw.WriteMajorTypeHeader(cbg.MajArray, uint64(len(v))); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, v := range v {
|
|
||||||
|
|
||||||
if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(v)); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// t.Secpk ([]*types.SignedMessage) (slice)
|
// t.Secpk ([]*types.SignedMessage) (slice)
|
||||||
if len(t.Secpk) > cbg.MaxLength {
|
if len(t.Secpk) > 150000 {
|
||||||
return xerrors.Errorf("Slice value in field t.Secpk was too long")
|
return xerrors.Errorf("Slice value in field t.Secpk was too long")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -373,7 +362,7 @@ func (t *CompactedMessages) MarshalCBOR(w io.Writer) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// t.SecpkIncludes ([][]uint64) (slice)
|
// t.SecpkIncludes ([]exchange.messageIndices) (slice)
|
||||||
if len(t.SecpkIncludes) > cbg.MaxLength {
|
if len(t.SecpkIncludes) > cbg.MaxLength {
|
||||||
return xerrors.Errorf("Slice value in field t.SecpkIncludes was too long")
|
return xerrors.Errorf("Slice value in field t.SecpkIncludes was too long")
|
||||||
}
|
}
|
||||||
@ -382,26 +371,15 @@ func (t *CompactedMessages) MarshalCBOR(w io.Writer) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, v := range t.SecpkIncludes {
|
for _, v := range t.SecpkIncludes {
|
||||||
if len(v) > cbg.MaxLength {
|
if err := v.MarshalCBOR(cw); err != nil {
|
||||||
return xerrors.Errorf("Slice value in field v was too long")
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := cw.WriteMajorTypeHeader(cbg.MajArray, uint64(len(v))); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, v := range v {
|
|
||||||
|
|
||||||
if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(v)); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *CompactedMessages) UnmarshalCBOR(r io.Reader) (err error) {
|
func (t *CompactedMessagesCBOR) UnmarshalCBOR(r io.Reader) (err error) {
|
||||||
*t = CompactedMessages{}
|
*t = CompactedMessagesCBOR{}
|
||||||
|
|
||||||
cr := cbg.NewCborReader(r)
|
cr := cbg.NewCborReader(r)
|
||||||
|
|
||||||
@ -430,7 +408,7 @@ func (t *CompactedMessages) UnmarshalCBOR(r io.Reader) (err error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if extra > cbg.MaxLength {
|
if extra > 150000 {
|
||||||
return fmt.Errorf("t.Bls: array too large (%d)", extra)
|
return fmt.Errorf("t.Bls: array too large (%d)", extra)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -471,7 +449,7 @@ func (t *CompactedMessages) UnmarshalCBOR(r io.Reader) (err error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// t.BlsIncludes ([][]uint64) (slice)
|
// t.BlsIncludes ([]exchange.messageIndices) (slice)
|
||||||
|
|
||||||
maj, extra, err = cr.ReadHeader()
|
maj, extra, err = cr.ReadHeader()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -487,7 +465,7 @@ func (t *CompactedMessages) UnmarshalCBOR(r io.Reader) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if extra > 0 {
|
if extra > 0 {
|
||||||
t.BlsIncludes = make([][]uint64, extra)
|
t.BlsIncludes = make([]messageIndices, extra)
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < int(extra); i++ {
|
for i := 0; i < int(extra); i++ {
|
||||||
@ -499,47 +477,13 @@ func (t *CompactedMessages) UnmarshalCBOR(r io.Reader) (err error) {
|
|||||||
_ = extra
|
_ = extra
|
||||||
_ = err
|
_ = err
|
||||||
|
|
||||||
maj, extra, err = cr.ReadHeader()
|
{
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if extra > cbg.MaxLength {
|
if err := t.BlsIncludes[i].UnmarshalCBOR(cr); err != nil {
|
||||||
return fmt.Errorf("t.BlsIncludes[i]: array too large (%d)", extra)
|
return xerrors.Errorf("unmarshaling t.BlsIncludes[i]: %w", err)
|
||||||
}
|
|
||||||
|
|
||||||
if maj != cbg.MajArray {
|
|
||||||
return fmt.Errorf("expected cbor array")
|
|
||||||
}
|
|
||||||
|
|
||||||
if extra > 0 {
|
|
||||||
t.BlsIncludes[i] = make([]uint64, extra)
|
|
||||||
}
|
|
||||||
|
|
||||||
for j := 0; j < int(extra); j++ {
|
|
||||||
{
|
|
||||||
var maj byte
|
|
||||||
var extra uint64
|
|
||||||
var err error
|
|
||||||
_ = maj
|
|
||||||
_ = extra
|
|
||||||
_ = err
|
|
||||||
|
|
||||||
{
|
|
||||||
|
|
||||||
maj, extra, err = cr.ReadHeader()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if maj != cbg.MajUnsignedInt {
|
|
||||||
return fmt.Errorf("wrong type for uint64 field")
|
|
||||||
}
|
|
||||||
t.BlsIncludes[i][j] = uint64(extra)
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -550,7 +494,7 @@ func (t *CompactedMessages) UnmarshalCBOR(r io.Reader) (err error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if extra > cbg.MaxLength {
|
if extra > 150000 {
|
||||||
return fmt.Errorf("t.Secpk: array too large (%d)", extra)
|
return fmt.Errorf("t.Secpk: array too large (%d)", extra)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -591,7 +535,7 @@ func (t *CompactedMessages) UnmarshalCBOR(r io.Reader) (err error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// t.SecpkIncludes ([][]uint64) (slice)
|
// t.SecpkIncludes ([]exchange.messageIndices) (slice)
|
||||||
|
|
||||||
maj, extra, err = cr.ReadHeader()
|
maj, extra, err = cr.ReadHeader()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -607,7 +551,7 @@ func (t *CompactedMessages) UnmarshalCBOR(r io.Reader) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if extra > 0 {
|
if extra > 0 {
|
||||||
t.SecpkIncludes = make([][]uint64, extra)
|
t.SecpkIncludes = make([]messageIndices, extra)
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < int(extra); i++ {
|
for i := 0; i < int(extra); i++ {
|
||||||
@ -619,47 +563,13 @@ func (t *CompactedMessages) UnmarshalCBOR(r io.Reader) (err error) {
|
|||||||
_ = extra
|
_ = extra
|
||||||
_ = err
|
_ = err
|
||||||
|
|
||||||
maj, extra, err = cr.ReadHeader()
|
{
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if extra > cbg.MaxLength {
|
if err := t.SecpkIncludes[i].UnmarshalCBOR(cr); err != nil {
|
||||||
return fmt.Errorf("t.SecpkIncludes[i]: array too large (%d)", extra)
|
return xerrors.Errorf("unmarshaling t.SecpkIncludes[i]: %w", err)
|
||||||
}
|
|
||||||
|
|
||||||
if maj != cbg.MajArray {
|
|
||||||
return fmt.Errorf("expected cbor array")
|
|
||||||
}
|
|
||||||
|
|
||||||
if extra > 0 {
|
|
||||||
t.SecpkIncludes[i] = make([]uint64, extra)
|
|
||||||
}
|
|
||||||
|
|
||||||
for j := 0; j < int(extra); j++ {
|
|
||||||
{
|
|
||||||
var maj byte
|
|
||||||
var extra uint64
|
|
||||||
var err error
|
|
||||||
_ = maj
|
|
||||||
_ = extra
|
|
||||||
_ = err
|
|
||||||
|
|
||||||
{
|
|
||||||
|
|
||||||
maj, extra, err = cr.ReadHeader()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if maj != cbg.MajUnsignedInt {
|
|
||||||
return fmt.Errorf("wrong type for uint64 field")
|
|
||||||
}
|
|
||||||
t.SecpkIncludes[i][j] = uint64(extra)
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"bufio"
|
"bufio"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -23,6 +24,10 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/lib/peermgr"
|
"github.com/filecoin-project/lotus/lib/peermgr"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Set the max exchange message size to 120MiB. Purely based on gas numbers, we can include ~8MiB of
|
||||||
|
// messages per block, so I've set this to 120MiB to be _very_ safe.
|
||||||
|
const maxExchangeMessageSize = (15 * 8) << 20
|
||||||
|
|
||||||
// client implements exchange.Client, using the libp2p ChainExchange protocol
|
// client implements exchange.Client, using the libp2p ChainExchange protocol
|
||||||
// as the fetching mechanism.
|
// as the fetching mechanism.
|
||||||
type client struct {
|
type client struct {
|
||||||
@ -434,10 +439,11 @@ func (c *client) sendRequestToPeer(ctx context.Context, peer peer.ID, req *Reque
|
|||||||
log.Warnw("CloseWrite err", "error", err)
|
log.Warnw("CloseWrite err", "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read response.
|
// Read response, limiting the size of the response to maxExchangeMessageSize as we allow a
|
||||||
|
// lot of messages (10k+) but they'll mostly be quite small.
|
||||||
var res Response
|
var res Response
|
||||||
err = cborutil.ReadCborRPC(
|
err = cborutil.ReadCborRPC(
|
||||||
bufio.NewReader(incrt.New(stream, ReadResMinSpeed, ReadResDeadline)),
|
bufio.NewReader(io.LimitReader(incrt.New(stream, ReadResMinSpeed, ReadResDeadline), maxExchangeMessageSize)),
|
||||||
&res)
|
&res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.peerTracker.logFailure(peer, build.Clock.Since(connectionStart), req.Length)
|
c.peerTracker.logFailure(peer, build.Clock.Since(connectionStart), req.Length)
|
||||||
|
@ -154,6 +154,8 @@ type BSTipSet struct {
|
|||||||
// FIXME: The logic to decompress this structure should belong
|
// FIXME: The logic to decompress this structure should belong
|
||||||
//
|
//
|
||||||
// to itself, not to the consumer.
|
// to itself, not to the consumer.
|
||||||
|
//
|
||||||
|
// NOTE: Max messages is: BlockMessageLimit (10k) * MaxTipsetSize (15) = 150k
|
||||||
type CompactedMessages struct {
|
type CompactedMessages struct {
|
||||||
Bls []*types.Message
|
Bls []*types.Message
|
||||||
BlsIncludes [][]uint64
|
BlsIncludes [][]uint64
|
||||||
|
125
chain/exchange/protocol_encoding.go
Normal file
125
chain/exchange/protocol_encoding.go
Normal file
@ -0,0 +1,125 @@
|
|||||||
|
package exchange
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
cbg "github.com/whyrusleeping/cbor-gen"
|
||||||
|
xerrors "golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/build"
|
||||||
|
types "github.com/filecoin-project/lotus/chain/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Type used for encoding/decoding compacted messages. This is a ustom type as we need custom limits.
|
||||||
|
// - Max messages is 150,000 as that's 15 times the max block size (in messages). It needs to be
|
||||||
|
// large enough to cover a full tipset full of full blocks.
|
||||||
|
type CompactedMessagesCBOR struct {
|
||||||
|
Bls []*types.Message `cborgen:"maxlen=150000"`
|
||||||
|
BlsIncludes []messageIndices
|
||||||
|
|
||||||
|
Secpk []*types.SignedMessage `cborgen:"maxlen=150000"`
|
||||||
|
SecpkIncludes []messageIndices
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unmarshal into the "decoding" struct, then copy into the actual struct.
|
||||||
|
func (t *CompactedMessages) UnmarshalCBOR(r io.Reader) (err error) {
|
||||||
|
var c CompactedMessagesCBOR
|
||||||
|
if err := c.UnmarshalCBOR(r); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
t.Bls = c.Bls
|
||||||
|
t.BlsIncludes = make([][]uint64, len(c.BlsIncludes))
|
||||||
|
for i, v := range c.BlsIncludes {
|
||||||
|
t.BlsIncludes[i] = v.v
|
||||||
|
}
|
||||||
|
t.Secpk = c.Secpk
|
||||||
|
t.SecpkIncludes = make([][]uint64, len(c.SecpkIncludes))
|
||||||
|
for i, v := range c.SecpkIncludes {
|
||||||
|
t.SecpkIncludes[i] = v.v
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy into the encoding struct, then marshal.
|
||||||
|
func (t *CompactedMessages) MarshalCBOR(w io.Writer) error {
|
||||||
|
if t == nil {
|
||||||
|
_, err := w.Write(cbg.CborNull)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var c CompactedMessagesCBOR
|
||||||
|
c.Bls = t.Bls
|
||||||
|
c.BlsIncludes = make([]messageIndices, len(t.BlsIncludes))
|
||||||
|
for i, v := range t.BlsIncludes {
|
||||||
|
c.BlsIncludes[i].v = v
|
||||||
|
}
|
||||||
|
c.Secpk = t.Secpk
|
||||||
|
c.SecpkIncludes = make([]messageIndices, len(t.SecpkIncludes))
|
||||||
|
for i, v := range t.SecpkIncludes {
|
||||||
|
c.SecpkIncludes[i].v = v
|
||||||
|
}
|
||||||
|
return c.MarshalCBOR(w)
|
||||||
|
}
|
||||||
|
|
||||||
|
// this needs to be a struct or cborgen will peak into it and ignore the Unmarshal/Marshal functions
|
||||||
|
type messageIndices struct {
|
||||||
|
v []uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *messageIndices) UnmarshalCBOR(r io.Reader) (err error) {
|
||||||
|
cr := cbg.NewCborReader(r)
|
||||||
|
|
||||||
|
maj, extra, err := cr.ReadHeader()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if maj != cbg.MajArray {
|
||||||
|
return fmt.Errorf("cbor input should be of type array")
|
||||||
|
}
|
||||||
|
|
||||||
|
if extra > uint64(build.BlockMessageLimit) {
|
||||||
|
return fmt.Errorf("cbor input had wrong number of fields")
|
||||||
|
}
|
||||||
|
|
||||||
|
if extra > 0 {
|
||||||
|
t.v = make([]uint64, extra)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < int(extra); i++ {
|
||||||
|
maj, extra, err := cr.ReadHeader()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if maj != cbg.MajUnsignedInt {
|
||||||
|
return fmt.Errorf("wrong type for uint64 field")
|
||||||
|
}
|
||||||
|
t.v[i] = extra
|
||||||
|
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *messageIndices) MarshalCBOR(w io.Writer) error {
|
||||||
|
if t == nil {
|
||||||
|
_, err := w.Write(cbg.CborNull)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
cw := cbg.NewCborWriter(w)
|
||||||
|
|
||||||
|
if len(t.v) > build.BlockMessageLimit {
|
||||||
|
return xerrors.Errorf("Slice value in field v was too long")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cw.WriteMajorTypeHeader(cbg.MajArray, uint64(len(t.v))); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, v := range t.v {
|
||||||
|
if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, v); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
@ -231,12 +231,17 @@ func fromMiner(cctx *cli.Context) (err error) {
|
|||||||
dbSettings += ` --db-name="` + smCfg.HarmonyDB.Database + `"`
|
dbSettings += ` --db-name="` + smCfg.HarmonyDB.Database + `"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var layerMaybe string
|
||||||
|
if name != "base" {
|
||||||
|
layerMaybe = "--layer=" + name
|
||||||
|
}
|
||||||
|
|
||||||
msg += `
|
msg += `
|
||||||
To work with the config:
|
To work with the config:
|
||||||
` + cliCommandColor(`lotus-provider `+dbSettings+` config help `)
|
` + cliCommandColor(`lotus-provider `+dbSettings+` config help `)
|
||||||
msg += `
|
msg += `
|
||||||
To run Lotus Provider: in its own machine or cgroup without other files, use the command:
|
To run Lotus Provider: in its own machine or cgroup without other files, use the command:
|
||||||
` + cliCommandColor(`lotus-provider `+dbSettings+` run --layers="`+name+`"`)
|
` + cliCommandColor(`lotus-provider `+dbSettings+` run `+layerMaybe)
|
||||||
fmt.Println(msg)
|
fmt.Println(msg)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -83,7 +83,10 @@ var wdPostTaskCmd = &cli.Command{
|
|||||||
return xerrors.Errorf("cannot get miner id %w", err)
|
return xerrors.Errorf("cannot get miner id %w", err)
|
||||||
}
|
}
|
||||||
var id int64
|
var id int64
|
||||||
_, err = deps.DB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
|
||||||
|
retryDelay := time.Millisecond * 10
|
||||||
|
retryAddTask:
|
||||||
|
_, err = deps.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
||||||
err = tx.QueryRow(`INSERT INTO harmony_task (name, posted_time, added_by) VALUES ('WdPost', CURRENT_TIMESTAMP, 123) RETURNING id`).Scan(&id)
|
err = tx.QueryRow(`INSERT INTO harmony_task (name, posted_time, added_by) VALUES ('WdPost', CURRENT_TIMESTAMP, 123) RETURNING id`).Scan(&id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("inserting harmony_task: ", err)
|
log.Error("inserting harmony_task: ", err)
|
||||||
@ -103,6 +106,11 @@ var wdPostTaskCmd = &cli.Command{
|
|||||||
return true, nil
|
return true, nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if harmonydb.IsErrSerialization(err) {
|
||||||
|
time.Sleep(retryDelay)
|
||||||
|
retryDelay *= 2
|
||||||
|
goto retryAddTask
|
||||||
|
}
|
||||||
return xerrors.Errorf("writing SQL transaction: %w", err)
|
return xerrors.Errorf("writing SQL transaction: %w", err)
|
||||||
}
|
}
|
||||||
fmt.Printf("Inserted task %v. Waiting for success ", id)
|
fmt.Printf("Inserted task %v. Waiting for success ", id)
|
||||||
@ -118,7 +126,7 @@ var wdPostTaskCmd = &cli.Command{
|
|||||||
}
|
}
|
||||||
fmt.Print(".")
|
fmt.Print(".")
|
||||||
}
|
}
|
||||||
log.Infof("Result:", result.String)
|
log.Infof("Result: %s", result.String)
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -120,6 +120,7 @@ var runCmd = &cli.Command{
|
|||||||
}
|
}
|
||||||
|
|
||||||
taskEngine, err := tasks.StartTasks(ctx, dependencies)
|
taskEngine, err := tasks.StartTasks(ctx, dependencies)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -135,4 +136,4 @@ var runCmd = &cli.Command{
|
|||||||
<-finishCh
|
<-finishCh
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
}
|
}
|
25
documentation/en/api-v0-methods-provider.md
Normal file
25
documentation/en/api-v0-methods-provider.md
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
# Groups
|
||||||
|
* [](#)
|
||||||
|
* [Shutdown](#Shutdown)
|
||||||
|
* [Version](#Version)
|
||||||
|
##
|
||||||
|
|
||||||
|
|
||||||
|
### Shutdown
|
||||||
|
|
||||||
|
|
||||||
|
Perms: admin
|
||||||
|
|
||||||
|
Inputs: `null`
|
||||||
|
|
||||||
|
Response: `{}`
|
||||||
|
|
||||||
|
### Version
|
||||||
|
|
||||||
|
|
||||||
|
Perms: admin
|
||||||
|
|
||||||
|
Inputs: `null`
|
||||||
|
|
||||||
|
Response: `131840`
|
||||||
|
|
@ -7,7 +7,7 @@ USAGE:
|
|||||||
lotus-miner [global options] command [command options] [arguments...]
|
lotus-miner [global options] command [command options] [arguments...]
|
||||||
|
|
||||||
VERSION:
|
VERSION:
|
||||||
1.25.1-dev
|
1.25.2-dev
|
||||||
|
|
||||||
COMMANDS:
|
COMMANDS:
|
||||||
init Initialize a lotus miner repo
|
init Initialize a lotus miner repo
|
||||||
|
@ -7,7 +7,7 @@ USAGE:
|
|||||||
lotus-worker [global options] command [command options] [arguments...]
|
lotus-worker [global options] command [command options] [arguments...]
|
||||||
|
|
||||||
VERSION:
|
VERSION:
|
||||||
1.25.1-dev
|
1.25.2-dev
|
||||||
|
|
||||||
COMMANDS:
|
COMMANDS:
|
||||||
run Start lotus worker
|
run Start lotus worker
|
||||||
|
@ -7,7 +7,7 @@ USAGE:
|
|||||||
lotus [global options] command [command options] [arguments...]
|
lotus [global options] command [command options] [arguments...]
|
||||||
|
|
||||||
VERSION:
|
VERSION:
|
||||||
1.25.1-dev
|
1.25.2-dev
|
||||||
|
|
||||||
COMMANDS:
|
COMMANDS:
|
||||||
daemon Start a lotus daemon process
|
daemon Start a lotus daemon process
|
||||||
|
@ -92,7 +92,7 @@ func main() {
|
|||||||
err = gen.WriteTupleEncodersToFile("./chain/exchange/cbor_gen.go", "exchange",
|
err = gen.WriteTupleEncodersToFile("./chain/exchange/cbor_gen.go", "exchange",
|
||||||
exchange.Request{},
|
exchange.Request{},
|
||||||
exchange.Response{},
|
exchange.Response{},
|
||||||
exchange.CompactedMessages{},
|
exchange.CompactedMessagesCBOR{},
|
||||||
exchange.BSTipSet{},
|
exchange.BSTipSet{},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -10,6 +10,8 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
@ -33,6 +35,8 @@ type DB struct {
|
|||||||
cfg *pgxpool.Config
|
cfg *pgxpool.Config
|
||||||
schema string
|
schema string
|
||||||
hostnames []string
|
hostnames []string
|
||||||
|
BTFPOnce sync.Once
|
||||||
|
BTFP atomic.Uintptr
|
||||||
}
|
}
|
||||||
|
|
||||||
var logger = logging.Logger("harmonydb")
|
var logger = logging.Logger("harmonydb")
|
||||||
|
@ -3,13 +3,17 @@ package harmonydb
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"runtime"
|
||||||
|
|
||||||
"github.com/georgysavva/scany/v2/pgxscan"
|
"github.com/georgysavva/scany/v2/pgxscan"
|
||||||
"github.com/jackc/pgerrcode"
|
"github.com/jackc/pgerrcode"
|
||||||
"github.com/jackc/pgx/v5"
|
"github.com/jackc/pgx/v5"
|
||||||
"github.com/jackc/pgx/v5/pgconn"
|
"github.com/jackc/pgx/v5/pgconn"
|
||||||
|
"github.com/samber/lo"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var errTx = errors.New("Cannot use a non-transaction func in a transaction")
|
||||||
|
|
||||||
// rawStringOnly is _intentionally_private_ to force only basic strings in SQL queries.
|
// rawStringOnly is _intentionally_private_ to force only basic strings in SQL queries.
|
||||||
// In any package, raw strings will satisfy compilation. Ex:
|
// In any package, raw strings will satisfy compilation. Ex:
|
||||||
//
|
//
|
||||||
@ -22,6 +26,9 @@ type rawStringOnly string
|
|||||||
// Note, for CREATE & DROP please keep these permanent and express
|
// Note, for CREATE & DROP please keep these permanent and express
|
||||||
// them in the ./sql/ files (next number).
|
// them in the ./sql/ files (next number).
|
||||||
func (db *DB) Exec(ctx context.Context, sql rawStringOnly, arguments ...any) (count int, err error) {
|
func (db *DB) Exec(ctx context.Context, sql rawStringOnly, arguments ...any) (count int, err error) {
|
||||||
|
if db.usedInTransaction() {
|
||||||
|
return 0, errTx
|
||||||
|
}
|
||||||
res, err := db.pgx.Exec(ctx, string(sql), arguments...)
|
res, err := db.pgx.Exec(ctx, string(sql), arguments...)
|
||||||
return int(res.RowsAffected()), err
|
return int(res.RowsAffected()), err
|
||||||
}
|
}
|
||||||
@ -55,6 +62,9 @@ type Query struct {
|
|||||||
// fmt.Println(id, name)
|
// fmt.Println(id, name)
|
||||||
// }
|
// }
|
||||||
func (db *DB) Query(ctx context.Context, sql rawStringOnly, arguments ...any) (*Query, error) {
|
func (db *DB) Query(ctx context.Context, sql rawStringOnly, arguments ...any) (*Query, error) {
|
||||||
|
if db.usedInTransaction() {
|
||||||
|
return &Query{}, errTx
|
||||||
|
}
|
||||||
q, err := db.pgx.Query(ctx, string(sql), arguments...)
|
q, err := db.pgx.Query(ctx, string(sql), arguments...)
|
||||||
return &Query{q}, err
|
return &Query{q}, err
|
||||||
}
|
}
|
||||||
@ -66,6 +76,10 @@ type Row interface {
|
|||||||
Scan(...any) error
|
Scan(...any) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type rowErr struct{}
|
||||||
|
|
||||||
|
func (rowErr) Scan(_ ...any) error { return errTx }
|
||||||
|
|
||||||
// QueryRow gets 1 row using column order matching.
|
// QueryRow gets 1 row using column order matching.
|
||||||
// This is a timesaver for the special case of wanting the first row returned only.
|
// This is a timesaver for the special case of wanting the first row returned only.
|
||||||
// EX:
|
// EX:
|
||||||
@ -74,6 +88,9 @@ type Row interface {
|
|||||||
// var ID = 123
|
// var ID = 123
|
||||||
// err := db.QueryRow(ctx, "SELECT name, pet FROM users WHERE ID=?", ID).Scan(&name, &pet)
|
// err := db.QueryRow(ctx, "SELECT name, pet FROM users WHERE ID=?", ID).Scan(&name, &pet)
|
||||||
func (db *DB) QueryRow(ctx context.Context, sql rawStringOnly, arguments ...any) Row {
|
func (db *DB) QueryRow(ctx context.Context, sql rawStringOnly, arguments ...any) Row {
|
||||||
|
if db.usedInTransaction() {
|
||||||
|
return rowErr{}
|
||||||
|
}
|
||||||
return db.pgx.QueryRow(ctx, string(sql), arguments...)
|
return db.pgx.QueryRow(ctx, string(sql), arguments...)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -92,6 +109,9 @@ Ex:
|
|||||||
err := db.Select(ctx, &users, "SELECT name, id, tel_no FROM customers WHERE pet=?", pet)
|
err := db.Select(ctx, &users, "SELECT name, id, tel_no FROM customers WHERE pet=?", pet)
|
||||||
*/
|
*/
|
||||||
func (db *DB) Select(ctx context.Context, sliceOfStructPtr any, sql rawStringOnly, arguments ...any) error {
|
func (db *DB) Select(ctx context.Context, sliceOfStructPtr any, sql rawStringOnly, arguments ...any) error {
|
||||||
|
if db.usedInTransaction() {
|
||||||
|
return errTx
|
||||||
|
}
|
||||||
return pgxscan.Select(ctx, db.pgx, sliceOfStructPtr, string(sql), arguments...)
|
return pgxscan.Select(ctx, db.pgx, sliceOfStructPtr, string(sql), arguments...)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -100,10 +120,32 @@ type Tx struct {
|
|||||||
ctx context.Context
|
ctx context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// usedInTransaction is a helper to prevent nesting transactions
|
||||||
|
// & non-transaction calls in transactions. It only checks 20 frames.
|
||||||
|
// Fast: This memory should all be in CPU Caches.
|
||||||
|
func (db *DB) usedInTransaction() bool {
|
||||||
|
var framePtrs = (&[20]uintptr{})[:] // 20 can be stack-local (no alloc)
|
||||||
|
framePtrs = framePtrs[:runtime.Callers(3, framePtrs)] // skip past our caller.
|
||||||
|
return lo.Contains(framePtrs, db.BTFP.Load()) // Unsafe read @ beginTx overlap, but 'return false' is correct there.
|
||||||
|
}
|
||||||
|
|
||||||
// BeginTransaction is how you can access transactions using this library.
|
// BeginTransaction is how you can access transactions using this library.
|
||||||
// The entire transaction happens in the function passed in.
|
// The entire transaction happens in the function passed in.
|
||||||
// The return must be true or a rollback will occur.
|
// The return must be true or a rollback will occur.
|
||||||
|
// Be sure to test the error for IsErrSerialization() if you want to retry
|
||||||
|
//
|
||||||
|
// when there is a DB serialization error.
|
||||||
|
//
|
||||||
|
//go:noinline
|
||||||
func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, err error)) (didCommit bool, retErr error) {
|
func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, err error)) (didCommit bool, retErr error) {
|
||||||
|
db.BTFPOnce.Do(func() {
|
||||||
|
fp := make([]uintptr, 20)
|
||||||
|
runtime.Callers(1, fp)
|
||||||
|
db.BTFP.Store(fp[0])
|
||||||
|
})
|
||||||
|
if db.usedInTransaction() {
|
||||||
|
return false, errTx
|
||||||
|
}
|
||||||
tx, err := db.pgx.BeginTx(ctx, pgx.TxOptions{})
|
tx, err := db.pgx.BeginTx(ctx, pgx.TxOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
@ -156,3 +198,8 @@ func IsErrUniqueContraint(err error) bool {
|
|||||||
var e2 *pgconn.PgError
|
var e2 *pgconn.PgError
|
||||||
return errors.As(err, &e2) && e2.Code == pgerrcode.UniqueViolation
|
return errors.As(err, &e2) && e2.Code == pgerrcode.UniqueViolation
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func IsErrSerialization(err error) bool {
|
||||||
|
var e2 *pgconn.PgError
|
||||||
|
return errors.As(err, &e2) && e2.Code == pgerrcode.SerializationFailure
|
||||||
|
}
|
||||||
|
@ -146,6 +146,11 @@ func New(
|
|||||||
TaskTypeDetails: c.TypeDetails(),
|
TaskTypeDetails: c.TypeDetails(),
|
||||||
TaskEngine: e,
|
TaskEngine: e,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(h.Name) > 16 {
|
||||||
|
return nil, fmt.Errorf("task name too long: %s, max 16 characters", h.Name)
|
||||||
|
}
|
||||||
|
|
||||||
e.handlers = append(e.handlers, &h)
|
e.handlers = append(e.handlers, &h)
|
||||||
e.taskMap[h.TaskTypeDetails.Name] = &h
|
e.taskMap[h.TaskTypeDetails.Name] = &h
|
||||||
}
|
}
|
||||||
@ -171,7 +176,7 @@ func New(
|
|||||||
continue // not really fatal, but not great
|
continue // not really fatal, but not great
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !h.considerWork("recovered", []TaskID{TaskID(w.ID)}) {
|
if !h.considerWork(workSourceRecover, []TaskID{TaskID(w.ID)}) {
|
||||||
log.Error("Strange: Unable to accept previously owned task: ", w.ID, w.Name)
|
log.Error("Strange: Unable to accept previously owned task: ", w.ID, w.Name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -280,7 +285,7 @@ func (e *TaskEngine) pollerTryAllWork() {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if len(unownedTasks) > 0 {
|
if len(unownedTasks) > 0 {
|
||||||
accepted := v.considerWork("poller", unownedTasks)
|
accepted := v.considerWork(workSourcePoller, unownedTasks)
|
||||||
if accepted {
|
if accepted {
|
||||||
return // accept new work slowly and in priority order
|
return // accept new work slowly and in priority order
|
||||||
}
|
}
|
||||||
|
@ -25,6 +25,8 @@ type taskTypeHandler struct {
|
|||||||
|
|
||||||
func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error)) {
|
func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error)) {
|
||||||
var tID TaskID
|
var tID TaskID
|
||||||
|
retryWait := time.Millisecond * 100
|
||||||
|
retryAddTask:
|
||||||
_, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) {
|
_, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) {
|
||||||
// create taskID (from DB)
|
// create taskID (from DB)
|
||||||
_, err := tx.Exec(`INSERT INTO harmony_task (name, added_by, posted_time)
|
_, err := tx.Exec(`INSERT INTO harmony_task (name, added_by, posted_time)
|
||||||
@ -44,11 +46,21 @@ func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error
|
|||||||
log.Debugf("addtask(%s) saw unique constraint, so it's added already.", h.Name)
|
log.Debugf("addtask(%s) saw unique constraint, so it's added already.", h.Name)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if harmonydb.IsErrSerialization(err) {
|
||||||
|
time.Sleep(retryWait)
|
||||||
|
retryWait *= 2
|
||||||
|
goto retryAddTask
|
||||||
|
}
|
||||||
log.Error("Could not add task. AddTasFunc failed: %v", err)
|
log.Error("Could not add task. AddTasFunc failed: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
workSourcePoller = "poller"
|
||||||
|
workSourceRecover = "recovered"
|
||||||
|
)
|
||||||
|
|
||||||
// considerWork is called to attempt to start work on a task-id of this task type.
|
// considerWork is called to attempt to start work on a task-id of this task type.
|
||||||
// It presumes single-threaded calling, so there should not be a multi-threaded re-entry.
|
// It presumes single-threaded calling, so there should not be a multi-threaded re-entry.
|
||||||
// The only caller should be the one work poller thread. This does spin off other threads,
|
// The only caller should be the one work poller thread. This does spin off other threads,
|
||||||
@ -87,22 +99,25 @@ top:
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4. Can we claim the work for our hostname?
|
// if recovering we don't need to try to claim anything because those tasks are already claimed by us
|
||||||
ct, err := h.TaskEngine.db.Exec(h.TaskEngine.ctx, "UPDATE harmony_task SET owner_id=$1 WHERE id=$2 AND owner_id IS NULL", h.TaskEngine.ownerID, *tID)
|
if from != workSourceRecover {
|
||||||
if err != nil {
|
// 4. Can we claim the work for our hostname?
|
||||||
log.Error(err)
|
ct, err := h.TaskEngine.db.Exec(h.TaskEngine.ctx, "UPDATE harmony_task SET owner_id=$1 WHERE id=$2 AND owner_id IS NULL", h.TaskEngine.ownerID, *tID)
|
||||||
return false
|
if err != nil {
|
||||||
}
|
log.Error(err)
|
||||||
if ct == 0 {
|
return false
|
||||||
log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "already Taken", "name", h.Name)
|
}
|
||||||
var tryAgain = make([]TaskID, 0, len(ids)-1)
|
if ct == 0 {
|
||||||
for _, id := range ids {
|
log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "already Taken", "name", h.Name)
|
||||||
if id != *tID {
|
var tryAgain = make([]TaskID, 0, len(ids)-1)
|
||||||
tryAgain = append(tryAgain, id)
|
for _, id := range ids {
|
||||||
}
|
if id != *tID {
|
||||||
|
tryAgain = append(tryAgain, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ids = tryAgain
|
||||||
|
goto top
|
||||||
}
|
}
|
||||||
ids = tryAgain
|
|
||||||
goto top
|
|
||||||
}
|
}
|
||||||
|
|
||||||
h.Count.Add(1)
|
h.Count.Add(1)
|
||||||
@ -153,7 +168,8 @@ top:
|
|||||||
|
|
||||||
func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done bool, doErr error) {
|
func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done bool, doErr error) {
|
||||||
workEnd := time.Now()
|
workEnd := time.Now()
|
||||||
|
retryWait := time.Millisecond * 100
|
||||||
|
retryRecordCompletion:
|
||||||
cm, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) {
|
cm, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) {
|
||||||
var postedTime time.Time
|
var postedTime time.Time
|
||||||
err := tx.QueryRow(`SELECT posted_time FROM harmony_task WHERE id=$1`, tID).Scan(&postedTime)
|
err := tx.QueryRow(`SELECT posted_time FROM harmony_task WHERE id=$1`, tID).Scan(&postedTime)
|
||||||
@ -206,6 +222,11 @@ VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, tID, h.Name, postedTime, workStart, wo
|
|||||||
return true, nil
|
return true, nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if harmonydb.IsErrSerialization(err) {
|
||||||
|
time.Sleep(retryWait)
|
||||||
|
retryWait *= 2
|
||||||
|
goto retryRecordCompletion
|
||||||
|
}
|
||||||
log.Error("Could not record transaction: ", err)
|
log.Error("Could not record transaction: ", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -76,7 +76,11 @@ func (s *SendTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b
|
|||||||
SignedData []byte `db:"signed_data"`
|
SignedData []byte `db:"signed_data"`
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s.db.QueryRow(ctx, `select from_key, nonce, to_addr, unsigned_data, unsigned_cid from message_sends where id = $1`, taskID).Scan(&dbMsg)
|
err = s.db.QueryRow(ctx, `
|
||||||
|
SELECT from_key, nonce, to_addr, unsigned_data, unsigned_cid
|
||||||
|
FROM message_sends
|
||||||
|
WHERE send_task_id = $1`, taskID).Scan(
|
||||||
|
&dbMsg.FromKey, &dbMsg.Nonce, &dbMsg.ToAddr, &dbMsg.UnsignedData, &dbMsg.UnsignedCid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("getting message from db: %w", err)
|
return false, xerrors.Errorf("getting message from db: %w", err)
|
||||||
}
|
}
|
||||||
@ -96,8 +100,11 @@ func (s *SendTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b
|
|||||||
}
|
}
|
||||||
|
|
||||||
// try to acquire lock
|
// try to acquire lock
|
||||||
cn, err := s.db.Exec(ctx, `INSERT INTO message_send_locks (from_key, task_id, claimed_at) VALUES ($1, $2, CURRENT_TIMESTAMP)
|
cn, err := s.db.Exec(ctx, `
|
||||||
ON CONFLICT (from_key) DO UPDATE SET task_id = EXCLUDED.task_id, claimed_at = CURRENT_TIMESTAMP WHERE message_send_locks.task_id = $2;`, dbMsg.FromKey, taskID)
|
INSERT INTO message_send_locks (from_key, task_id, claimed_at)
|
||||||
|
VALUES ($1, $2, CURRENT_TIMESTAMP) ON CONFLICT (from_key) DO UPDATE
|
||||||
|
SET task_id = EXCLUDED.task_id, claimed_at = CURRENT_TIMESTAMP
|
||||||
|
WHERE message_send_locks.task_id = $2;`, dbMsg.FromKey, taskID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("acquiring send lock: %w", err)
|
return false, xerrors.Errorf("acquiring send lock: %w", err)
|
||||||
}
|
}
|
||||||
@ -114,7 +121,8 @@ func (s *SendTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b
|
|||||||
|
|
||||||
// defer release db send lock
|
// defer release db send lock
|
||||||
defer func() {
|
defer func() {
|
||||||
_, err2 := s.db.Exec(ctx, `delete from message_send_locks where from_key = $1 and task_id = $2`, dbMsg.FromKey, taskID)
|
_, err2 := s.db.Exec(ctx, `
|
||||||
|
DELETE from message_send_locks WHERE from_key = $1 AND task_id = $2`, dbMsg.FromKey, taskID)
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
log.Errorw("releasing send lock", "task_id", taskID, "from", dbMsg.FromKey, "error", err2)
|
log.Errorw("releasing send lock", "task_id", taskID, "from", dbMsg.FromKey, "error", err2)
|
||||||
|
|
||||||
@ -135,7 +143,8 @@ func (s *SendTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b
|
|||||||
|
|
||||||
// get nonce from db
|
// get nonce from db
|
||||||
var dbNonce *uint64
|
var dbNonce *uint64
|
||||||
r := s.db.QueryRow(ctx, `select max(nonce) from message_sends where from_key = $1 and send_success = true`, msg.From.String())
|
r := s.db.QueryRow(ctx, `
|
||||||
|
SELECT MAX(nonce) FROM message_sends WHERE from_key = $1 AND send_success = true`, msg.From.String())
|
||||||
if err := r.Scan(&dbNonce); err != nil {
|
if err := r.Scan(&dbNonce); err != nil {
|
||||||
return false, xerrors.Errorf("getting nonce from db: %w", err)
|
return false, xerrors.Errorf("getting nonce from db: %w", err)
|
||||||
}
|
}
|
||||||
@ -164,7 +173,9 @@ func (s *SendTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b
|
|||||||
|
|
||||||
// write to db
|
// write to db
|
||||||
|
|
||||||
n, err := s.db.Exec(ctx, `update message_sends set nonce = $1, signed_data = $2, signed_json = $3, signed_cid = $4 where send_task_id = $5`,
|
n, err := s.db.Exec(ctx, `
|
||||||
|
UPDATE message_sends SET nonce = $1, signed_data = $2, signed_json = $3, signed_cid = $4
|
||||||
|
WHERE send_task_id = $5`,
|
||||||
msg.Nonce, data, string(jsonBytes), sigMsg.Cid().String(), taskID)
|
msg.Nonce, data, string(jsonBytes), sigMsg.Cid().String(), taskID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("updating db record: %w", err)
|
return false, xerrors.Errorf("updating db record: %w", err)
|
||||||
@ -198,7 +209,9 @@ func (s *SendTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b
|
|||||||
sendError = err.Error()
|
sendError = err.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = s.db.Exec(ctx, `update message_sends set send_success = $1, send_error = $2, send_time = CURRENT_TIMESTAMP where send_task_id = $3`, sendSuccess, sendError, taskID)
|
_, err = s.db.Exec(ctx, `
|
||||||
|
UPDATE message_sends SET send_success = $1, send_error = $2, send_time = CURRENT_TIMESTAMP
|
||||||
|
WHERE send_task_id = $3`, sendSuccess, sendError, taskID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("updating db record: %w", err)
|
return false, xerrors.Errorf("updating db record: %w", err)
|
||||||
}
|
}
|
||||||
@ -311,6 +324,7 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
|
|||||||
return cid.Undef, xerrors.Errorf("marshaling message: %w", err)
|
return cid.Undef, xerrors.Errorf("marshaling message: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var sendTaskID *harmonytask.TaskID
|
||||||
taskAdder(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
|
taskAdder(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
|
||||||
_, err := tx.Exec(`insert into message_sends (from_key, to_addr, send_reason, unsigned_data, unsigned_cid, send_task_id) values ($1, $2, $3, $4, $5, $6)`,
|
_, err := tx.Exec(`insert into message_sends (from_key, to_addr, send_reason, unsigned_data, unsigned_cid, send_task_id) values ($1, $2, $3, $4, $5, $6)`,
|
||||||
msg.From.String(), msg.To.String(), reason, unsBytes.Bytes(), msg.Cid().String(), id)
|
msg.From.String(), msg.To.String(), reason, unsBytes.Bytes(), msg.Cid().String(), id)
|
||||||
@ -318,9 +332,15 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
|
|||||||
return false, xerrors.Errorf("inserting message into db: %w", err)
|
return false, xerrors.Errorf("inserting message into db: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sendTaskID = &id
|
||||||
|
|
||||||
return true, nil
|
return true, nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
if sendTaskID == nil {
|
||||||
|
return cid.Undef, xerrors.Errorf("failed to add task")
|
||||||
|
}
|
||||||
|
|
||||||
// wait for exec
|
// wait for exec
|
||||||
var (
|
var (
|
||||||
pollInterval = 50 * time.Millisecond
|
pollInterval = 50 * time.Millisecond
|
||||||
@ -334,10 +354,10 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
var err error
|
var err error
|
||||||
var sigCidStr, sendError string
|
var sigCidStr, sendError *string
|
||||||
var sendSuccess *bool
|
var sendSuccess *bool
|
||||||
|
|
||||||
err = s.db.QueryRow(ctx, `select signed_cid, send_success, send_error from message_sends where send_task_id = $1`, taskAdder).Scan(&sigCidStr, &sendSuccess, &sendError)
|
err = s.db.QueryRow(ctx, `select signed_cid, send_success, send_error from message_sends where send_task_id = $1`, &sendTaskID).Scan(&sigCidStr, &sendSuccess, &sendError)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.Undef, xerrors.Errorf("getting cid for task: %w", err)
|
return cid.Undef, xerrors.Errorf("getting cid for task: %w", err)
|
||||||
}
|
}
|
||||||
@ -353,10 +373,15 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if sigCidStr == nil || sendError == nil {
|
||||||
|
// should never happen because sendSuccess is already not null here
|
||||||
|
return cid.Undef, xerrors.Errorf("got null values for sigCidStr or sendError, this should never happen")
|
||||||
|
}
|
||||||
|
|
||||||
if !*sendSuccess {
|
if !*sendSuccess {
|
||||||
sendErr = xerrors.Errorf("send error: %s", sendError)
|
sendErr = xerrors.Errorf("send error: %s", *sendError)
|
||||||
} else {
|
} else {
|
||||||
sigCid, err = cid.Parse(sigCidStr)
|
sigCid, err = cid.Parse(*sigCidStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.Undef, xerrors.Errorf("parsing signed cid: %w", err)
|
return cid.Undef, xerrors.Errorf("parsing signed cid: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -217,7 +217,7 @@ func (w *WdPostRecoverDeclareTask) CanAccept(ids []harmonytask.TaskID, engine *h
|
|||||||
func (w *WdPostRecoverDeclareTask) TypeDetails() harmonytask.TaskTypeDetails {
|
func (w *WdPostRecoverDeclareTask) TypeDetails() harmonytask.TaskTypeDetails {
|
||||||
return harmonytask.TaskTypeDetails{
|
return harmonytask.TaskTypeDetails{
|
||||||
Max: 128,
|
Max: 128,
|
||||||
Name: "WdPostRecoverDeclare",
|
Name: "WdPostRecover",
|
||||||
Cost: resources.Resources{
|
Cost: resources.Resources{
|
||||||
Cpu: 1,
|
Cpu: 1,
|
||||||
Gpu: 0,
|
Gpu: 0,
|
||||||
|
@ -4,10 +4,8 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
"database/sql"
|
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
@ -579,12 +577,13 @@ func (t *WinPostTask) mineBasic(ctx context.Context) {
|
|||||||
taskFn(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
|
taskFn(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
|
||||||
// First we check if the mining base includes blocks we may have mined previously to avoid getting slashed
|
// First we check if the mining base includes blocks we may have mined previously to avoid getting slashed
|
||||||
// select mining_tasks where epoch==base_epoch if win=true to maybe get base block cid which has to be included in our tipset
|
// select mining_tasks where epoch==base_epoch if win=true to maybe get base block cid which has to be included in our tipset
|
||||||
var baseBlockCid string
|
var baseBlockCids []string
|
||||||
err := tx.QueryRow(`SELECT mined_cid FROM mining_tasks WHERE epoch = $1 AND sp_id = $2 AND won = true`, baseEpoch, spID).Scan(&baseBlockCid)
|
err := tx.Select(&baseBlockCids, `SELECT mined_cid FROM mining_tasks WHERE epoch = $1 AND sp_id = $2 AND won = true`, baseEpoch, spID)
|
||||||
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("querying mining_tasks: %w", err)
|
return false, xerrors.Errorf("querying mining_tasks: %w", err)
|
||||||
}
|
}
|
||||||
if baseBlockCid != "" {
|
if len(baseBlockCids) >= 1 {
|
||||||
|
baseBlockCid := baseBlockCids[0]
|
||||||
c, err := cid.Parse(baseBlockCid)
|
c, err := cid.Parse(baseBlockCid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("parsing mined_cid: %w", err)
|
return false, xerrors.Errorf("parsing mined_cid: %w", err)
|
||||||
|
@ -180,12 +180,13 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
retryWait := time.Millisecond * 100
|
||||||
|
retryAttachStorage:
|
||||||
// Single transaction to attach storage which is not present in the DB
|
// Single transaction to attach storage which is not present in the DB
|
||||||
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
||||||
|
|
||||||
var urls sql.NullString
|
var urls sql.NullString
|
||||||
var storageId sql.NullString
|
var storageId sql.NullString
|
||||||
err = dbi.harmonyDB.QueryRow(ctx,
|
err = tx.QueryRow(
|
||||||
"Select storage_id, urls FROM storage_path WHERE storage_id = $1", string(si.ID)).Scan(&storageId, &urls)
|
"Select storage_id, urls FROM storage_path WHERE storage_id = $1", string(si.ID)).Scan(&storageId, &urls)
|
||||||
if err != nil && !strings.Contains(err.Error(), "no rows in result set") {
|
if err != nil && !strings.Contains(err.Error(), "no rows in result set") {
|
||||||
return false, xerrors.Errorf("storage attach select fails: %v", err)
|
return false, xerrors.Errorf("storage attach select fails: %v", err)
|
||||||
@ -200,7 +201,7 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo,
|
|||||||
}
|
}
|
||||||
currUrls = union(currUrls, si.URLs)
|
currUrls = union(currUrls, si.URLs)
|
||||||
|
|
||||||
_, err = dbi.harmonyDB.Exec(ctx,
|
_, err = tx.Exec(
|
||||||
"UPDATE storage_path set urls=$1, weight=$2, max_storage=$3, can_seal=$4, can_store=$5, groups=$6, allow_to=$7, allow_types=$8, deny_types=$9 WHERE storage_id=$10",
|
"UPDATE storage_path set urls=$1, weight=$2, max_storage=$3, can_seal=$4, can_store=$5, groups=$6, allow_to=$7, allow_types=$8, deny_types=$9 WHERE storage_id=$10",
|
||||||
strings.Join(currUrls, ","),
|
strings.Join(currUrls, ","),
|
||||||
si.Weight,
|
si.Weight,
|
||||||
@ -220,7 +221,7 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Insert storage id
|
// Insert storage id
|
||||||
_, err = dbi.harmonyDB.Exec(ctx,
|
_, err = tx.Exec(
|
||||||
"INSERT INTO storage_path "+
|
"INSERT INTO storage_path "+
|
||||||
"Values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)",
|
"Values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)",
|
||||||
si.ID,
|
si.ID,
|
||||||
@ -245,6 +246,11 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo,
|
|||||||
return true, nil
|
return true, nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if harmonydb.IsErrSerialization(err) {
|
||||||
|
time.Sleep(retryWait)
|
||||||
|
retryWait *= 2
|
||||||
|
goto retryAttachStorage
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -284,22 +290,29 @@ func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url stri
|
|||||||
|
|
||||||
log.Warnw("Dropping sector path endpoint", "path", id, "url", url)
|
log.Warnw("Dropping sector path endpoint", "path", id, "url", url)
|
||||||
} else {
|
} else {
|
||||||
|
retryWait := time.Millisecond * 100
|
||||||
|
retryDropPath:
|
||||||
// Single transaction to drop storage path and sector decls which have this as a storage path
|
// Single transaction to drop storage path and sector decls which have this as a storage path
|
||||||
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
||||||
// Drop storage path completely
|
// Drop storage path completely
|
||||||
_, err = dbi.harmonyDB.Exec(ctx, "DELETE FROM storage_path WHERE storage_id=$1", id)
|
_, err = tx.Exec("DELETE FROM storage_path WHERE storage_id=$1", id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Drop all sectors entries which use this storage path
|
// Drop all sectors entries which use this storage path
|
||||||
_, err = dbi.harmonyDB.Exec(ctx, "DELETE FROM sector_location WHERE storage_id=$1", id)
|
_, err = tx.Exec("DELETE FROM sector_location WHERE storage_id=$1", id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if harmonydb.IsErrSerialization(err) {
|
||||||
|
time.Sleep(retryWait)
|
||||||
|
retryWait *= 2
|
||||||
|
goto retryDropPath
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
log.Warnw("Dropping sector storage", "path", id)
|
log.Warnw("Dropping sector storage", "path", id)
|
||||||
@ -373,9 +386,11 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac
|
|||||||
return xerrors.Errorf("invalid filetype")
|
return xerrors.Errorf("invalid filetype")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
retryWait := time.Millisecond * 100
|
||||||
|
retryStorageDeclareSector:
|
||||||
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
||||||
var currPrimary sql.NullBool
|
var currPrimary sql.NullBool
|
||||||
err = dbi.harmonyDB.QueryRow(ctx,
|
err = tx.QueryRow(
|
||||||
"SELECT is_primary FROM sector_location WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4",
|
"SELECT is_primary FROM sector_location WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4",
|
||||||
uint64(s.Miner), uint64(s.Number), int(ft), string(storageID)).Scan(&currPrimary)
|
uint64(s.Miner), uint64(s.Number), int(ft), string(storageID)).Scan(&currPrimary)
|
||||||
if err != nil && !strings.Contains(err.Error(), "no rows in result set") {
|
if err != nil && !strings.Contains(err.Error(), "no rows in result set") {
|
||||||
@ -385,7 +400,7 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac
|
|||||||
// If storage id already exists for this sector, update primary if need be
|
// If storage id already exists for this sector, update primary if need be
|
||||||
if currPrimary.Valid {
|
if currPrimary.Valid {
|
||||||
if !currPrimary.Bool && primary {
|
if !currPrimary.Bool && primary {
|
||||||
_, err = dbi.harmonyDB.Exec(ctx,
|
_, err = tx.Exec(
|
||||||
"UPDATE sector_location set is_primary = TRUE WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4",
|
"UPDATE sector_location set is_primary = TRUE WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4",
|
||||||
s.Miner, s.Number, ft, storageID)
|
s.Miner, s.Number, ft, storageID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -395,7 +410,7 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac
|
|||||||
log.Warnf("sector %v redeclared in %s", s, storageID)
|
log.Warnf("sector %v redeclared in %s", s, storageID)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
_, err = dbi.harmonyDB.Exec(ctx,
|
_, err = tx.Exec(
|
||||||
"INSERT INTO sector_location "+
|
"INSERT INTO sector_location "+
|
||||||
"values($1, $2, $3, $4, $5)",
|
"values($1, $2, $3, $4, $5)",
|
||||||
s.Miner, s.Number, ft, storageID, primary)
|
s.Miner, s.Number, ft, storageID, primary)
|
||||||
@ -407,6 +422,11 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac
|
|||||||
return true, nil
|
return true, nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if harmonydb.IsErrSerialization(err) {
|
||||||
|
time.Sleep(retryWait)
|
||||||
|
retryWait *= 2
|
||||||
|
goto retryStorageDeclareSector
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -750,7 +770,7 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac
|
|||||||
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
_, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
||||||
|
|
||||||
fts := (read | write).AllSet()
|
fts := (read | write).AllSet()
|
||||||
err = dbi.harmonyDB.Select(ctx, &rows,
|
err = tx.Select(&rows,
|
||||||
`SELECT sector_filetype, read_ts, read_refs, write_ts
|
`SELECT sector_filetype, read_ts, read_refs, write_ts
|
||||||
FROM sector_location
|
FROM sector_location
|
||||||
WHERE miner_id=$1
|
WHERE miner_id=$1
|
||||||
@ -792,7 +812,7 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Acquire write locks
|
// Acquire write locks
|
||||||
_, err = dbi.harmonyDB.Exec(ctx,
|
_, err = tx.Exec(
|
||||||
`UPDATE sector_location
|
`UPDATE sector_location
|
||||||
SET write_ts = NOW(), write_lock_owner = $1
|
SET write_ts = NOW(), write_lock_owner = $1
|
||||||
WHERE miner_id=$2
|
WHERE miner_id=$2
|
||||||
@ -807,7 +827,7 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Acquire read locks
|
// Acquire read locks
|
||||||
_, err = dbi.harmonyDB.Exec(ctx,
|
_, err = tx.Exec(
|
||||||
`UPDATE sector_location
|
`UPDATE sector_location
|
||||||
SET read_ts = NOW(), read_refs = read_refs + 1
|
SET read_ts = NOW(), read_refs = read_refs + 1
|
||||||
WHERE miner_id=$1
|
WHERE miner_id=$1
|
||||||
|
Loading…
Reference in New Issue
Block a user