diff --git a/Makefile b/Makefile index 68d97227b..c3c46f71e 100644 --- a/Makefile +++ b/Makefile @@ -356,7 +356,7 @@ docsgen-md-bin: api-gen actors-gen docsgen-openrpc-bin: api-gen actors-gen $(GOCC) build $(GOFLAGS) -o docgen-openrpc ./api/docgen-openrpc/cmd -docsgen-md: docsgen-md-full docsgen-md-storage docsgen-md-worker +docsgen-md: docsgen-md-full docsgen-md-storage docsgen-md-worker docsgen-md-provider docsgen-md-full: docsgen-md-bin ./docgen-md "api/api_full.go" "FullNode" "api" "./api" > documentation/en/api-v1-unstable-methods.md @@ -365,6 +365,8 @@ docsgen-md-storage: docsgen-md-bin ./docgen-md "api/api_storage.go" "StorageMiner" "api" "./api" > documentation/en/api-v0-methods-miner.md docsgen-md-worker: docsgen-md-bin ./docgen-md "api/api_worker.go" "Worker" "api" "./api" > documentation/en/api-v0-methods-worker.md +docsgen-md-provider: docsgen-md-bin + ./docgen-md "api/api_lp.go" "Provider" "api" "./api" > documentation/en/api-v0-methods-provider.md docsgen-openrpc: docsgen-openrpc-full docsgen-openrpc-storage docsgen-openrpc-worker docsgen-openrpc-gateway diff --git a/api/docgen/docgen.go b/api/docgen/docgen.go index 018629600..5a05c8d0e 100644 --- a/api/docgen/docgen.go +++ b/api/docgen/docgen.go @@ -432,6 +432,10 @@ func GetAPIType(name, pkg string) (i interface{}, t reflect.Type, permStruct []r i = &api.GatewayStruct{} t = reflect.TypeOf(new(struct{ api.Gateway })).Elem() permStruct = append(permStruct, reflect.TypeOf(api.GatewayStruct{}.Internal)) + case "Provider": + i = &api.LotusProviderStruct{} + t = reflect.TypeOf(new(struct{ api.LotusProvider })).Elem() + permStruct = append(permStruct, reflect.TypeOf(api.LotusProviderStruct{}.Internal)) default: panic("unknown type") } diff --git a/build/openrpc/full.json.gz b/build/openrpc/full.json.gz index 4e3f31fba..ad2116cb9 100644 Binary files a/build/openrpc/full.json.gz and b/build/openrpc/full.json.gz differ diff --git a/build/openrpc/gateway.json.gz b/build/openrpc/gateway.json.gz index fe52522fd..48ad91135 100644 Binary files a/build/openrpc/gateway.json.gz and b/build/openrpc/gateway.json.gz differ diff --git a/build/openrpc/miner.json.gz b/build/openrpc/miner.json.gz index 30e2c6dd1..a16ed4f26 100644 Binary files a/build/openrpc/miner.json.gz and b/build/openrpc/miner.json.gz differ diff --git a/build/openrpc/worker.json.gz b/build/openrpc/worker.json.gz index de03ea05b..8e9d63d79 100644 Binary files a/build/openrpc/worker.json.gz and b/build/openrpc/worker.json.gz differ diff --git a/build/version.go b/build/version.go index 0758686f6..215940e7a 100644 --- a/build/version.go +++ b/build/version.go @@ -37,7 +37,7 @@ func BuildTypeString() string { } // BuildVersion is the local build version -const BuildVersion = "1.25.1-dev" +const BuildVersion = "1.25.2-dev" func UserVersion() string { if os.Getenv("LOTUS_VERSION_IGNORE_COMMIT") == "1" { diff --git a/chain/exchange/cbor_gen.go b/chain/exchange/cbor_gen.go index e66b6d798..71c75869d 100644 --- a/chain/exchange/cbor_gen.go +++ b/chain/exchange/cbor_gen.go @@ -306,9 +306,9 @@ func (t *Response) UnmarshalCBOR(r io.Reader) (err error) { return nil } -var lengthBufCompactedMessages = []byte{132} +var lengthBufCompactedMessagesCBOR = []byte{132} -func (t *CompactedMessages) MarshalCBOR(w io.Writer) error { +func (t *CompactedMessagesCBOR) MarshalCBOR(w io.Writer) error { if t == nil { _, err := w.Write(cbg.CborNull) return err @@ -316,12 +316,12 @@ func (t *CompactedMessages) MarshalCBOR(w io.Writer) error { cw := cbg.NewCborWriter(w) - if _, err := cw.Write(lengthBufCompactedMessages); err != nil { + if _, err := cw.Write(lengthBufCompactedMessagesCBOR); err != nil { return err } // t.Bls ([]*types.Message) (slice) - if len(t.Bls) > cbg.MaxLength { + if len(t.Bls) > 150000 { return xerrors.Errorf("Slice value in field t.Bls was too long") } @@ -334,7 +334,7 @@ func (t *CompactedMessages) MarshalCBOR(w io.Writer) error { } } - // t.BlsIncludes ([][]uint64) (slice) + // t.BlsIncludes ([]exchange.messageIndices) (slice) if len(t.BlsIncludes) > cbg.MaxLength { return xerrors.Errorf("Slice value in field t.BlsIncludes was too long") } @@ -343,24 +343,13 @@ func (t *CompactedMessages) MarshalCBOR(w io.Writer) error { return err } for _, v := range t.BlsIncludes { - if len(v) > cbg.MaxLength { - return xerrors.Errorf("Slice value in field v was too long") - } - - if err := cw.WriteMajorTypeHeader(cbg.MajArray, uint64(len(v))); err != nil { + if err := v.MarshalCBOR(cw); err != nil { return err } - for _, v := range v { - - if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(v)); err != nil { - return err - } - - } } // t.Secpk ([]*types.SignedMessage) (slice) - if len(t.Secpk) > cbg.MaxLength { + if len(t.Secpk) > 150000 { return xerrors.Errorf("Slice value in field t.Secpk was too long") } @@ -373,7 +362,7 @@ func (t *CompactedMessages) MarshalCBOR(w io.Writer) error { } } - // t.SecpkIncludes ([][]uint64) (slice) + // t.SecpkIncludes ([]exchange.messageIndices) (slice) if len(t.SecpkIncludes) > cbg.MaxLength { return xerrors.Errorf("Slice value in field t.SecpkIncludes was too long") } @@ -382,26 +371,15 @@ func (t *CompactedMessages) MarshalCBOR(w io.Writer) error { return err } for _, v := range t.SecpkIncludes { - if len(v) > cbg.MaxLength { - return xerrors.Errorf("Slice value in field v was too long") - } - - if err := cw.WriteMajorTypeHeader(cbg.MajArray, uint64(len(v))); err != nil { + if err := v.MarshalCBOR(cw); err != nil { return err } - for _, v := range v { - - if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(v)); err != nil { - return err - } - - } } return nil } -func (t *CompactedMessages) UnmarshalCBOR(r io.Reader) (err error) { - *t = CompactedMessages{} +func (t *CompactedMessagesCBOR) UnmarshalCBOR(r io.Reader) (err error) { + *t = CompactedMessagesCBOR{} cr := cbg.NewCborReader(r) @@ -430,7 +408,7 @@ func (t *CompactedMessages) UnmarshalCBOR(r io.Reader) (err error) { return err } - if extra > cbg.MaxLength { + if extra > 150000 { return fmt.Errorf("t.Bls: array too large (%d)", extra) } @@ -471,7 +449,7 @@ func (t *CompactedMessages) UnmarshalCBOR(r io.Reader) (err error) { } } - // t.BlsIncludes ([][]uint64) (slice) + // t.BlsIncludes ([]exchange.messageIndices) (slice) maj, extra, err = cr.ReadHeader() if err != nil { @@ -487,7 +465,7 @@ func (t *CompactedMessages) UnmarshalCBOR(r io.Reader) (err error) { } if extra > 0 { - t.BlsIncludes = make([][]uint64, extra) + t.BlsIncludes = make([]messageIndices, extra) } for i := 0; i < int(extra); i++ { @@ -499,47 +477,13 @@ func (t *CompactedMessages) UnmarshalCBOR(r io.Reader) (err error) { _ = extra _ = err - maj, extra, err = cr.ReadHeader() - if err != nil { - return err - } + { - if extra > cbg.MaxLength { - return fmt.Errorf("t.BlsIncludes[i]: array too large (%d)", extra) - } - - if maj != cbg.MajArray { - return fmt.Errorf("expected cbor array") - } - - if extra > 0 { - t.BlsIncludes[i] = make([]uint64, extra) - } - - for j := 0; j < int(extra); j++ { - { - var maj byte - var extra uint64 - var err error - _ = maj - _ = extra - _ = err - - { - - maj, extra, err = cr.ReadHeader() - if err != nil { - return err - } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.BlsIncludes[i][j] = uint64(extra) - - } + if err := t.BlsIncludes[i].UnmarshalCBOR(cr); err != nil { + return xerrors.Errorf("unmarshaling t.BlsIncludes[i]: %w", err) } - } + } } } @@ -550,7 +494,7 @@ func (t *CompactedMessages) UnmarshalCBOR(r io.Reader) (err error) { return err } - if extra > cbg.MaxLength { + if extra > 150000 { return fmt.Errorf("t.Secpk: array too large (%d)", extra) } @@ -591,7 +535,7 @@ func (t *CompactedMessages) UnmarshalCBOR(r io.Reader) (err error) { } } - // t.SecpkIncludes ([][]uint64) (slice) + // t.SecpkIncludes ([]exchange.messageIndices) (slice) maj, extra, err = cr.ReadHeader() if err != nil { @@ -607,7 +551,7 @@ func (t *CompactedMessages) UnmarshalCBOR(r io.Reader) (err error) { } if extra > 0 { - t.SecpkIncludes = make([][]uint64, extra) + t.SecpkIncludes = make([]messageIndices, extra) } for i := 0; i < int(extra); i++ { @@ -619,47 +563,13 @@ func (t *CompactedMessages) UnmarshalCBOR(r io.Reader) (err error) { _ = extra _ = err - maj, extra, err = cr.ReadHeader() - if err != nil { - return err - } + { - if extra > cbg.MaxLength { - return fmt.Errorf("t.SecpkIncludes[i]: array too large (%d)", extra) - } - - if maj != cbg.MajArray { - return fmt.Errorf("expected cbor array") - } - - if extra > 0 { - t.SecpkIncludes[i] = make([]uint64, extra) - } - - for j := 0; j < int(extra); j++ { - { - var maj byte - var extra uint64 - var err error - _ = maj - _ = extra - _ = err - - { - - maj, extra, err = cr.ReadHeader() - if err != nil { - return err - } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.SecpkIncludes[i][j] = uint64(extra) - - } + if err := t.SecpkIncludes[i].UnmarshalCBOR(cr); err != nil { + return xerrors.Errorf("unmarshaling t.SecpkIncludes[i]: %w", err) } - } + } } } diff --git a/chain/exchange/client.go b/chain/exchange/client.go index 120b554a1..769c375ca 100644 --- a/chain/exchange/client.go +++ b/chain/exchange/client.go @@ -4,6 +4,7 @@ import ( "bufio" "context" "fmt" + "io" "math/rand" "time" @@ -23,6 +24,10 @@ import ( "github.com/filecoin-project/lotus/lib/peermgr" ) +// Set the max exchange message size to 120MiB. Purely based on gas numbers, we can include ~8MiB of +// messages per block, so I've set this to 120MiB to be _very_ safe. +const maxExchangeMessageSize = (15 * 8) << 20 + // client implements exchange.Client, using the libp2p ChainExchange protocol // as the fetching mechanism. type client struct { @@ -434,10 +439,11 @@ func (c *client) sendRequestToPeer(ctx context.Context, peer peer.ID, req *Reque log.Warnw("CloseWrite err", "error", err) } - // Read response. + // Read response, limiting the size of the response to maxExchangeMessageSize as we allow a + // lot of messages (10k+) but they'll mostly be quite small. var res Response err = cborutil.ReadCborRPC( - bufio.NewReader(incrt.New(stream, ReadResMinSpeed, ReadResDeadline)), + bufio.NewReader(io.LimitReader(incrt.New(stream, ReadResMinSpeed, ReadResDeadline), maxExchangeMessageSize)), &res) if err != nil { c.peerTracker.logFailure(peer, build.Clock.Since(connectionStart), req.Length) diff --git a/chain/exchange/protocol.go b/chain/exchange/protocol.go index 5e12d31cc..cd25f4a43 100644 --- a/chain/exchange/protocol.go +++ b/chain/exchange/protocol.go @@ -154,6 +154,8 @@ type BSTipSet struct { // FIXME: The logic to decompress this structure should belong // // to itself, not to the consumer. +// +// NOTE: Max messages is: BlockMessageLimit (10k) * MaxTipsetSize (15) = 150k type CompactedMessages struct { Bls []*types.Message BlsIncludes [][]uint64 diff --git a/chain/exchange/protocol_encoding.go b/chain/exchange/protocol_encoding.go new file mode 100644 index 000000000..040dd0d40 --- /dev/null +++ b/chain/exchange/protocol_encoding.go @@ -0,0 +1,125 @@ +package exchange + +import ( + "fmt" + "io" + + cbg "github.com/whyrusleeping/cbor-gen" + xerrors "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/build" + types "github.com/filecoin-project/lotus/chain/types" +) + +// Type used for encoding/decoding compacted messages. This is a ustom type as we need custom limits. +// - Max messages is 150,000 as that's 15 times the max block size (in messages). It needs to be +// large enough to cover a full tipset full of full blocks. +type CompactedMessagesCBOR struct { + Bls []*types.Message `cborgen:"maxlen=150000"` + BlsIncludes []messageIndices + + Secpk []*types.SignedMessage `cborgen:"maxlen=150000"` + SecpkIncludes []messageIndices +} + +// Unmarshal into the "decoding" struct, then copy into the actual struct. +func (t *CompactedMessages) UnmarshalCBOR(r io.Reader) (err error) { + var c CompactedMessagesCBOR + if err := c.UnmarshalCBOR(r); err != nil { + return err + } + t.Bls = c.Bls + t.BlsIncludes = make([][]uint64, len(c.BlsIncludes)) + for i, v := range c.BlsIncludes { + t.BlsIncludes[i] = v.v + } + t.Secpk = c.Secpk + t.SecpkIncludes = make([][]uint64, len(c.SecpkIncludes)) + for i, v := range c.SecpkIncludes { + t.SecpkIncludes[i] = v.v + } + return nil +} + +// Copy into the encoding struct, then marshal. +func (t *CompactedMessages) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + + var c CompactedMessagesCBOR + c.Bls = t.Bls + c.BlsIncludes = make([]messageIndices, len(t.BlsIncludes)) + for i, v := range t.BlsIncludes { + c.BlsIncludes[i].v = v + } + c.Secpk = t.Secpk + c.SecpkIncludes = make([]messageIndices, len(t.SecpkIncludes)) + for i, v := range t.SecpkIncludes { + c.SecpkIncludes[i].v = v + } + return c.MarshalCBOR(w) +} + +// this needs to be a struct or cborgen will peak into it and ignore the Unmarshal/Marshal functions +type messageIndices struct { + v []uint64 +} + +func (t *messageIndices) UnmarshalCBOR(r io.Reader) (err error) { + cr := cbg.NewCborReader(r) + + maj, extra, err := cr.ReadHeader() + if err != nil { + return err + } + + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra > uint64(build.BlockMessageLimit) { + return fmt.Errorf("cbor input had wrong number of fields") + } + + if extra > 0 { + t.v = make([]uint64, extra) + } + + for i := 0; i < int(extra); i++ { + maj, extra, err := cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.v[i] = extra + + } + return nil +} + +func (t *messageIndices) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + + cw := cbg.NewCborWriter(w) + + if len(t.v) > build.BlockMessageLimit { + return xerrors.Errorf("Slice value in field v was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajArray, uint64(len(t.v))); err != nil { + return err + } + for _, v := range t.v { + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, v); err != nil { + return err + } + } + return nil +} diff --git a/cmd/lotus-provider/migrate.go b/cmd/lotus-provider/migrate.go index 819499402..3869c7dfb 100644 --- a/cmd/lotus-provider/migrate.go +++ b/cmd/lotus-provider/migrate.go @@ -231,12 +231,17 @@ func fromMiner(cctx *cli.Context) (err error) { dbSettings += ` --db-name="` + smCfg.HarmonyDB.Database + `"` } + var layerMaybe string + if name != "base" { + layerMaybe = "--layer=" + name + } + msg += ` To work with the config: ` + cliCommandColor(`lotus-provider `+dbSettings+` config help `) msg += ` To run Lotus Provider: in its own machine or cgroup without other files, use the command: -` + cliCommandColor(`lotus-provider `+dbSettings+` run --layers="`+name+`"`) +` + cliCommandColor(`lotus-provider `+dbSettings+` run `+layerMaybe) fmt.Println(msg) return nil } diff --git a/cmd/lotus-provider/proving.go b/cmd/lotus-provider/proving.go index 001108712..1602739ed 100644 --- a/cmd/lotus-provider/proving.go +++ b/cmd/lotus-provider/proving.go @@ -83,7 +83,10 @@ var wdPostTaskCmd = &cli.Command{ return xerrors.Errorf("cannot get miner id %w", err) } var id int64 - _, err = deps.DB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { + + retryDelay := time.Millisecond * 10 + retryAddTask: + _, err = deps.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { err = tx.QueryRow(`INSERT INTO harmony_task (name, posted_time, added_by) VALUES ('WdPost', CURRENT_TIMESTAMP, 123) RETURNING id`).Scan(&id) if err != nil { log.Error("inserting harmony_task: ", err) @@ -103,6 +106,11 @@ var wdPostTaskCmd = &cli.Command{ return true, nil }) if err != nil { + if harmonydb.IsErrSerialization(err) { + time.Sleep(retryDelay) + retryDelay *= 2 + goto retryAddTask + } return xerrors.Errorf("writing SQL transaction: %w", err) } fmt.Printf("Inserted task %v. Waiting for success ", id) @@ -118,7 +126,7 @@ var wdPostTaskCmd = &cli.Command{ } fmt.Print(".") } - log.Infof("Result:", result.String) + log.Infof("Result: %s", result.String) return nil }, } diff --git a/cmd/lotus-provider/run.go b/cmd/lotus-provider/run.go index 87d073963..975762f0a 100644 --- a/cmd/lotus-provider/run.go +++ b/cmd/lotus-provider/run.go @@ -120,6 +120,7 @@ var runCmd = &cli.Command{ } taskEngine, err := tasks.StartTasks(ctx, dependencies) + if err != nil { return nil } @@ -135,4 +136,4 @@ var runCmd = &cli.Command{ <-finishCh return nil }, -} +} \ No newline at end of file diff --git a/documentation/en/api-v0-methods-provider.md b/documentation/en/api-v0-methods-provider.md new file mode 100644 index 000000000..fc4a2daf7 --- /dev/null +++ b/documentation/en/api-v0-methods-provider.md @@ -0,0 +1,25 @@ +# Groups +* [](#) + * [Shutdown](#Shutdown) + * [Version](#Version) +## + + +### Shutdown + + +Perms: admin + +Inputs: `null` + +Response: `{}` + +### Version + + +Perms: admin + +Inputs: `null` + +Response: `131840` + diff --git a/documentation/en/cli-lotus-miner.md b/documentation/en/cli-lotus-miner.md index f3fb8a9e1..82b6b300a 100644 --- a/documentation/en/cli-lotus-miner.md +++ b/documentation/en/cli-lotus-miner.md @@ -7,7 +7,7 @@ USAGE: lotus-miner [global options] command [command options] [arguments...] VERSION: - 1.25.1-dev + 1.25.2-dev COMMANDS: init Initialize a lotus miner repo diff --git a/documentation/en/cli-lotus-worker.md b/documentation/en/cli-lotus-worker.md index cf0b9fd36..73774503c 100644 --- a/documentation/en/cli-lotus-worker.md +++ b/documentation/en/cli-lotus-worker.md @@ -7,7 +7,7 @@ USAGE: lotus-worker [global options] command [command options] [arguments...] VERSION: - 1.25.1-dev + 1.25.2-dev COMMANDS: run Start lotus worker diff --git a/documentation/en/cli-lotus.md b/documentation/en/cli-lotus.md index a6a6ea30a..959aaefac 100644 --- a/documentation/en/cli-lotus.md +++ b/documentation/en/cli-lotus.md @@ -7,7 +7,7 @@ USAGE: lotus [global options] command [command options] [arguments...] VERSION: - 1.25.1-dev + 1.25.2-dev COMMANDS: daemon Start a lotus daemon process diff --git a/gen/main.go b/gen/main.go index 0cd3999c3..942b3ac2c 100644 --- a/gen/main.go +++ b/gen/main.go @@ -92,7 +92,7 @@ func main() { err = gen.WriteTupleEncodersToFile("./chain/exchange/cbor_gen.go", "exchange", exchange.Request{}, exchange.Response{}, - exchange.CompactedMessages{}, + exchange.CompactedMessagesCBOR{}, exchange.BSTipSet{}, ) if err != nil { diff --git a/lib/harmony/harmonydb/harmonydb.go b/lib/harmony/harmonydb/harmonydb.go index 0fed176d2..5ec9f5a25 100644 --- a/lib/harmony/harmonydb/harmonydb.go +++ b/lib/harmony/harmonydb/harmonydb.go @@ -10,6 +10,8 @@ import ( "sort" "strconv" "strings" + "sync" + "sync/atomic" "time" logging "github.com/ipfs/go-log/v2" @@ -33,6 +35,8 @@ type DB struct { cfg *pgxpool.Config schema string hostnames []string + BTFPOnce sync.Once + BTFP atomic.Uintptr } var logger = logging.Logger("harmonydb") diff --git a/lib/harmony/harmonydb/userfuncs.go b/lib/harmony/harmonydb/userfuncs.go index 788ca4a34..7fcf76dcd 100644 --- a/lib/harmony/harmonydb/userfuncs.go +++ b/lib/harmony/harmonydb/userfuncs.go @@ -3,13 +3,17 @@ package harmonydb import ( "context" "errors" + "runtime" "github.com/georgysavva/scany/v2/pgxscan" "github.com/jackc/pgerrcode" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" + "github.com/samber/lo" ) +var errTx = errors.New("Cannot use a non-transaction func in a transaction") + // rawStringOnly is _intentionally_private_ to force only basic strings in SQL queries. // In any package, raw strings will satisfy compilation. Ex: // @@ -22,6 +26,9 @@ type rawStringOnly string // Note, for CREATE & DROP please keep these permanent and express // them in the ./sql/ files (next number). func (db *DB) Exec(ctx context.Context, sql rawStringOnly, arguments ...any) (count int, err error) { + if db.usedInTransaction() { + return 0, errTx + } res, err := db.pgx.Exec(ctx, string(sql), arguments...) return int(res.RowsAffected()), err } @@ -55,6 +62,9 @@ type Query struct { // fmt.Println(id, name) // } func (db *DB) Query(ctx context.Context, sql rawStringOnly, arguments ...any) (*Query, error) { + if db.usedInTransaction() { + return &Query{}, errTx + } q, err := db.pgx.Query(ctx, string(sql), arguments...) return &Query{q}, err } @@ -66,6 +76,10 @@ type Row interface { Scan(...any) error } +type rowErr struct{} + +func (rowErr) Scan(_ ...any) error { return errTx } + // QueryRow gets 1 row using column order matching. // This is a timesaver for the special case of wanting the first row returned only. // EX: @@ -74,6 +88,9 @@ type Row interface { // var ID = 123 // err := db.QueryRow(ctx, "SELECT name, pet FROM users WHERE ID=?", ID).Scan(&name, &pet) func (db *DB) QueryRow(ctx context.Context, sql rawStringOnly, arguments ...any) Row { + if db.usedInTransaction() { + return rowErr{} + } return db.pgx.QueryRow(ctx, string(sql), arguments...) } @@ -92,6 +109,9 @@ Ex: err := db.Select(ctx, &users, "SELECT name, id, tel_no FROM customers WHERE pet=?", pet) */ func (db *DB) Select(ctx context.Context, sliceOfStructPtr any, sql rawStringOnly, arguments ...any) error { + if db.usedInTransaction() { + return errTx + } return pgxscan.Select(ctx, db.pgx, sliceOfStructPtr, string(sql), arguments...) } @@ -100,10 +120,32 @@ type Tx struct { ctx context.Context } +// usedInTransaction is a helper to prevent nesting transactions +// & non-transaction calls in transactions. It only checks 20 frames. +// Fast: This memory should all be in CPU Caches. +func (db *DB) usedInTransaction() bool { + var framePtrs = (&[20]uintptr{})[:] // 20 can be stack-local (no alloc) + framePtrs = framePtrs[:runtime.Callers(3, framePtrs)] // skip past our caller. + return lo.Contains(framePtrs, db.BTFP.Load()) // Unsafe read @ beginTx overlap, but 'return false' is correct there. +} + // BeginTransaction is how you can access transactions using this library. // The entire transaction happens in the function passed in. // The return must be true or a rollback will occur. +// Be sure to test the error for IsErrSerialization() if you want to retry +// +// when there is a DB serialization error. +// +//go:noinline func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, err error)) (didCommit bool, retErr error) { + db.BTFPOnce.Do(func() { + fp := make([]uintptr, 20) + runtime.Callers(1, fp) + db.BTFP.Store(fp[0]) + }) + if db.usedInTransaction() { + return false, errTx + } tx, err := db.pgx.BeginTx(ctx, pgx.TxOptions{}) if err != nil { return false, err @@ -156,3 +198,8 @@ func IsErrUniqueContraint(err error) bool { var e2 *pgconn.PgError return errors.As(err, &e2) && e2.Code == pgerrcode.UniqueViolation } + +func IsErrSerialization(err error) bool { + var e2 *pgconn.PgError + return errors.As(err, &e2) && e2.Code == pgerrcode.SerializationFailure +} diff --git a/lib/harmony/harmonytask/harmonytask.go b/lib/harmony/harmonytask/harmonytask.go index 595e5b63a..7577c5cf5 100644 --- a/lib/harmony/harmonytask/harmonytask.go +++ b/lib/harmony/harmonytask/harmonytask.go @@ -146,6 +146,11 @@ func New( TaskTypeDetails: c.TypeDetails(), TaskEngine: e, } + + if len(h.Name) > 16 { + return nil, fmt.Errorf("task name too long: %s, max 16 characters", h.Name) + } + e.handlers = append(e.handlers, &h) e.taskMap[h.TaskTypeDetails.Name] = &h } @@ -171,7 +176,7 @@ func New( continue // not really fatal, but not great } } - if !h.considerWork("recovered", []TaskID{TaskID(w.ID)}) { + if !h.considerWork(workSourceRecover, []TaskID{TaskID(w.ID)}) { log.Error("Strange: Unable to accept previously owned task: ", w.ID, w.Name) } } @@ -280,7 +285,7 @@ func (e *TaskEngine) pollerTryAllWork() { continue } if len(unownedTasks) > 0 { - accepted := v.considerWork("poller", unownedTasks) + accepted := v.considerWork(workSourcePoller, unownedTasks) if accepted { return // accept new work slowly and in priority order } diff --git a/lib/harmony/harmonytask/task_type_handler.go b/lib/harmony/harmonytask/task_type_handler.go index 79a156fef..ccfcc1f6f 100644 --- a/lib/harmony/harmonytask/task_type_handler.go +++ b/lib/harmony/harmonytask/task_type_handler.go @@ -25,6 +25,8 @@ type taskTypeHandler struct { func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error)) { var tID TaskID + retryWait := time.Millisecond * 100 +retryAddTask: _, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) { // create taskID (from DB) _, err := tx.Exec(`INSERT INTO harmony_task (name, added_by, posted_time) @@ -44,11 +46,21 @@ func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error log.Debugf("addtask(%s) saw unique constraint, so it's added already.", h.Name) return } + if harmonydb.IsErrSerialization(err) { + time.Sleep(retryWait) + retryWait *= 2 + goto retryAddTask + } log.Error("Could not add task. AddTasFunc failed: %v", err) return } } +const ( + workSourcePoller = "poller" + workSourceRecover = "recovered" +) + // considerWork is called to attempt to start work on a task-id of this task type. // It presumes single-threaded calling, so there should not be a multi-threaded re-entry. // The only caller should be the one work poller thread. This does spin off other threads, @@ -87,22 +99,25 @@ top: return false } - // 4. Can we claim the work for our hostname? - ct, err := h.TaskEngine.db.Exec(h.TaskEngine.ctx, "UPDATE harmony_task SET owner_id=$1 WHERE id=$2 AND owner_id IS NULL", h.TaskEngine.ownerID, *tID) - if err != nil { - log.Error(err) - return false - } - if ct == 0 { - log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "already Taken", "name", h.Name) - var tryAgain = make([]TaskID, 0, len(ids)-1) - for _, id := range ids { - if id != *tID { - tryAgain = append(tryAgain, id) - } + // if recovering we don't need to try to claim anything because those tasks are already claimed by us + if from != workSourceRecover { + // 4. Can we claim the work for our hostname? + ct, err := h.TaskEngine.db.Exec(h.TaskEngine.ctx, "UPDATE harmony_task SET owner_id=$1 WHERE id=$2 AND owner_id IS NULL", h.TaskEngine.ownerID, *tID) + if err != nil { + log.Error(err) + return false + } + if ct == 0 { + log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "already Taken", "name", h.Name) + var tryAgain = make([]TaskID, 0, len(ids)-1) + for _, id := range ids { + if id != *tID { + tryAgain = append(tryAgain, id) + } + } + ids = tryAgain + goto top } - ids = tryAgain - goto top } h.Count.Add(1) @@ -153,7 +168,8 @@ top: func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done bool, doErr error) { workEnd := time.Now() - + retryWait := time.Millisecond * 100 +retryRecordCompletion: cm, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) { var postedTime time.Time err := tx.QueryRow(`SELECT posted_time FROM harmony_task WHERE id=$1`, tID).Scan(&postedTime) @@ -206,6 +222,11 @@ VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, tID, h.Name, postedTime, workStart, wo return true, nil }) if err != nil { + if harmonydb.IsErrSerialization(err) { + time.Sleep(retryWait) + retryWait *= 2 + goto retryRecordCompletion + } log.Error("Could not record transaction: ", err) return } diff --git a/provider/lpmessage/sender.go b/provider/lpmessage/sender.go index 8d6cd4027..0db0c0b51 100644 --- a/provider/lpmessage/sender.go +++ b/provider/lpmessage/sender.go @@ -76,7 +76,11 @@ func (s *SendTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b SignedData []byte `db:"signed_data"` } - err = s.db.QueryRow(ctx, `select from_key, nonce, to_addr, unsigned_data, unsigned_cid from message_sends where id = $1`, taskID).Scan(&dbMsg) + err = s.db.QueryRow(ctx, ` + SELECT from_key, nonce, to_addr, unsigned_data, unsigned_cid + FROM message_sends + WHERE send_task_id = $1`, taskID).Scan( + &dbMsg.FromKey, &dbMsg.Nonce, &dbMsg.ToAddr, &dbMsg.UnsignedData, &dbMsg.UnsignedCid) if err != nil { return false, xerrors.Errorf("getting message from db: %w", err) } @@ -96,8 +100,11 @@ func (s *SendTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b } // try to acquire lock - cn, err := s.db.Exec(ctx, `INSERT INTO message_send_locks (from_key, task_id, claimed_at) VALUES ($1, $2, CURRENT_TIMESTAMP) - ON CONFLICT (from_key) DO UPDATE SET task_id = EXCLUDED.task_id, claimed_at = CURRENT_TIMESTAMP WHERE message_send_locks.task_id = $2;`, dbMsg.FromKey, taskID) + cn, err := s.db.Exec(ctx, ` + INSERT INTO message_send_locks (from_key, task_id, claimed_at) + VALUES ($1, $2, CURRENT_TIMESTAMP) ON CONFLICT (from_key) DO UPDATE + SET task_id = EXCLUDED.task_id, claimed_at = CURRENT_TIMESTAMP + WHERE message_send_locks.task_id = $2;`, dbMsg.FromKey, taskID) if err != nil { return false, xerrors.Errorf("acquiring send lock: %w", err) } @@ -114,7 +121,8 @@ func (s *SendTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b // defer release db send lock defer func() { - _, err2 := s.db.Exec(ctx, `delete from message_send_locks where from_key = $1 and task_id = $2`, dbMsg.FromKey, taskID) + _, err2 := s.db.Exec(ctx, ` + DELETE from message_send_locks WHERE from_key = $1 AND task_id = $2`, dbMsg.FromKey, taskID) if err2 != nil { log.Errorw("releasing send lock", "task_id", taskID, "from", dbMsg.FromKey, "error", err2) @@ -135,7 +143,8 @@ func (s *SendTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b // get nonce from db var dbNonce *uint64 - r := s.db.QueryRow(ctx, `select max(nonce) from message_sends where from_key = $1 and send_success = true`, msg.From.String()) + r := s.db.QueryRow(ctx, ` + SELECT MAX(nonce) FROM message_sends WHERE from_key = $1 AND send_success = true`, msg.From.String()) if err := r.Scan(&dbNonce); err != nil { return false, xerrors.Errorf("getting nonce from db: %w", err) } @@ -164,7 +173,9 @@ func (s *SendTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b // write to db - n, err := s.db.Exec(ctx, `update message_sends set nonce = $1, signed_data = $2, signed_json = $3, signed_cid = $4 where send_task_id = $5`, + n, err := s.db.Exec(ctx, ` + UPDATE message_sends SET nonce = $1, signed_data = $2, signed_json = $3, signed_cid = $4 + WHERE send_task_id = $5`, msg.Nonce, data, string(jsonBytes), sigMsg.Cid().String(), taskID) if err != nil { return false, xerrors.Errorf("updating db record: %w", err) @@ -198,7 +209,9 @@ func (s *SendTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b sendError = err.Error() } - _, err = s.db.Exec(ctx, `update message_sends set send_success = $1, send_error = $2, send_time = CURRENT_TIMESTAMP where send_task_id = $3`, sendSuccess, sendError, taskID) + _, err = s.db.Exec(ctx, ` + UPDATE message_sends SET send_success = $1, send_error = $2, send_time = CURRENT_TIMESTAMP + WHERE send_task_id = $3`, sendSuccess, sendError, taskID) if err != nil { return false, xerrors.Errorf("updating db record: %w", err) } @@ -311,6 +324,7 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS return cid.Undef, xerrors.Errorf("marshaling message: %w", err) } + var sendTaskID *harmonytask.TaskID taskAdder(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { _, err := tx.Exec(`insert into message_sends (from_key, to_addr, send_reason, unsigned_data, unsigned_cid, send_task_id) values ($1, $2, $3, $4, $5, $6)`, msg.From.String(), msg.To.String(), reason, unsBytes.Bytes(), msg.Cid().String(), id) @@ -318,9 +332,15 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS return false, xerrors.Errorf("inserting message into db: %w", err) } + sendTaskID = &id + return true, nil }) + if sendTaskID == nil { + return cid.Undef, xerrors.Errorf("failed to add task") + } + // wait for exec var ( pollInterval = 50 * time.Millisecond @@ -334,10 +354,10 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS for { var err error - var sigCidStr, sendError string + var sigCidStr, sendError *string var sendSuccess *bool - err = s.db.QueryRow(ctx, `select signed_cid, send_success, send_error from message_sends where send_task_id = $1`, taskAdder).Scan(&sigCidStr, &sendSuccess, &sendError) + err = s.db.QueryRow(ctx, `select signed_cid, send_success, send_error from message_sends where send_task_id = $1`, &sendTaskID).Scan(&sigCidStr, &sendSuccess, &sendError) if err != nil { return cid.Undef, xerrors.Errorf("getting cid for task: %w", err) } @@ -353,10 +373,15 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS continue } + if sigCidStr == nil || sendError == nil { + // should never happen because sendSuccess is already not null here + return cid.Undef, xerrors.Errorf("got null values for sigCidStr or sendError, this should never happen") + } + if !*sendSuccess { - sendErr = xerrors.Errorf("send error: %s", sendError) + sendErr = xerrors.Errorf("send error: %s", *sendError) } else { - sigCid, err = cid.Parse(sigCidStr) + sigCid, err = cid.Parse(*sigCidStr) if err != nil { return cid.Undef, xerrors.Errorf("parsing signed cid: %w", err) } diff --git a/provider/lpwindow/recover_task.go b/provider/lpwindow/recover_task.go index 6006f3c35..12f8522b5 100644 --- a/provider/lpwindow/recover_task.go +++ b/provider/lpwindow/recover_task.go @@ -217,7 +217,7 @@ func (w *WdPostRecoverDeclareTask) CanAccept(ids []harmonytask.TaskID, engine *h func (w *WdPostRecoverDeclareTask) TypeDetails() harmonytask.TaskTypeDetails { return harmonytask.TaskTypeDetails{ Max: 128, - Name: "WdPostRecoverDeclare", + Name: "WdPostRecover", Cost: resources.Resources{ Cpu: 1, Gpu: 0, diff --git a/provider/lpwinning/winning_task.go b/provider/lpwinning/winning_task.go index f02ffa1ae..907b594fd 100644 --- a/provider/lpwinning/winning_task.go +++ b/provider/lpwinning/winning_task.go @@ -4,10 +4,8 @@ import ( "bytes" "context" "crypto/rand" - "database/sql" "encoding/binary" "encoding/json" - "errors" "time" "github.com/ipfs/go-cid" @@ -579,12 +577,13 @@ func (t *WinPostTask) mineBasic(ctx context.Context) { taskFn(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { // First we check if the mining base includes blocks we may have mined previously to avoid getting slashed // select mining_tasks where epoch==base_epoch if win=true to maybe get base block cid which has to be included in our tipset - var baseBlockCid string - err := tx.QueryRow(`SELECT mined_cid FROM mining_tasks WHERE epoch = $1 AND sp_id = $2 AND won = true`, baseEpoch, spID).Scan(&baseBlockCid) - if err != nil && !errors.Is(err, sql.ErrNoRows) { + var baseBlockCids []string + err := tx.Select(&baseBlockCids, `SELECT mined_cid FROM mining_tasks WHERE epoch = $1 AND sp_id = $2 AND won = true`, baseEpoch, spID) + if err != nil { return false, xerrors.Errorf("querying mining_tasks: %w", err) } - if baseBlockCid != "" { + if len(baseBlockCids) >= 1 { + baseBlockCid := baseBlockCids[0] c, err := cid.Parse(baseBlockCid) if err != nil { return false, xerrors.Errorf("parsing mined_cid: %w", err) diff --git a/storage/paths/db_index.go b/storage/paths/db_index.go index 1e4abfab1..e6bf3e5da 100644 --- a/storage/paths/db_index.go +++ b/storage/paths/db_index.go @@ -180,12 +180,13 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, } } + retryWait := time.Millisecond * 100 +retryAttachStorage: // Single transaction to attach storage which is not present in the DB _, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { - var urls sql.NullString var storageId sql.NullString - err = dbi.harmonyDB.QueryRow(ctx, + err = tx.QueryRow( "Select storage_id, urls FROM storage_path WHERE storage_id = $1", string(si.ID)).Scan(&storageId, &urls) if err != nil && !strings.Contains(err.Error(), "no rows in result set") { return false, xerrors.Errorf("storage attach select fails: %v", err) @@ -200,7 +201,7 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, } currUrls = union(currUrls, si.URLs) - _, err = dbi.harmonyDB.Exec(ctx, + _, err = tx.Exec( "UPDATE storage_path set urls=$1, weight=$2, max_storage=$3, can_seal=$4, can_store=$5, groups=$6, allow_to=$7, allow_types=$8, deny_types=$9 WHERE storage_id=$10", strings.Join(currUrls, ","), si.Weight, @@ -220,7 +221,7 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, } // Insert storage id - _, err = dbi.harmonyDB.Exec(ctx, + _, err = tx.Exec( "INSERT INTO storage_path "+ "Values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)", si.ID, @@ -245,6 +246,11 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, return true, nil }) if err != nil { + if harmonydb.IsErrSerialization(err) { + time.Sleep(retryWait) + retryWait *= 2 + goto retryAttachStorage + } return err } @@ -284,22 +290,29 @@ func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url stri log.Warnw("Dropping sector path endpoint", "path", id, "url", url) } else { + retryWait := time.Millisecond * 100 + retryDropPath: // Single transaction to drop storage path and sector decls which have this as a storage path _, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { // Drop storage path completely - _, err = dbi.harmonyDB.Exec(ctx, "DELETE FROM storage_path WHERE storage_id=$1", id) + _, err = tx.Exec("DELETE FROM storage_path WHERE storage_id=$1", id) if err != nil { return false, err } // Drop all sectors entries which use this storage path - _, err = dbi.harmonyDB.Exec(ctx, "DELETE FROM sector_location WHERE storage_id=$1", id) + _, err = tx.Exec("DELETE FROM sector_location WHERE storage_id=$1", id) if err != nil { return false, err } return true, nil }) if err != nil { + if harmonydb.IsErrSerialization(err) { + time.Sleep(retryWait) + retryWait *= 2 + goto retryDropPath + } return err } log.Warnw("Dropping sector storage", "path", id) @@ -373,9 +386,11 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac return xerrors.Errorf("invalid filetype") } + retryWait := time.Millisecond * 100 +retryStorageDeclareSector: _, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { var currPrimary sql.NullBool - err = dbi.harmonyDB.QueryRow(ctx, + err = tx.QueryRow( "SELECT is_primary FROM sector_location WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4", uint64(s.Miner), uint64(s.Number), int(ft), string(storageID)).Scan(&currPrimary) if err != nil && !strings.Contains(err.Error(), "no rows in result set") { @@ -385,7 +400,7 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac // If storage id already exists for this sector, update primary if need be if currPrimary.Valid { if !currPrimary.Bool && primary { - _, err = dbi.harmonyDB.Exec(ctx, + _, err = tx.Exec( "UPDATE sector_location set is_primary = TRUE WHERE miner_id=$1 and sector_num=$2 and sector_filetype=$3 and storage_id=$4", s.Miner, s.Number, ft, storageID) if err != nil { @@ -395,7 +410,7 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac log.Warnf("sector %v redeclared in %s", s, storageID) } } else { - _, err = dbi.harmonyDB.Exec(ctx, + _, err = tx.Exec( "INSERT INTO sector_location "+ "values($1, $2, $3, $4, $5)", s.Miner, s.Number, ft, storageID, primary) @@ -407,6 +422,11 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac return true, nil }) if err != nil { + if harmonydb.IsErrSerialization(err) { + time.Sleep(retryWait) + retryWait *= 2 + goto retryStorageDeclareSector + } return err } @@ -750,7 +770,7 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac _, err := dbi.harmonyDB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { fts := (read | write).AllSet() - err = dbi.harmonyDB.Select(ctx, &rows, + err = tx.Select(&rows, `SELECT sector_filetype, read_ts, read_refs, write_ts FROM sector_location WHERE miner_id=$1 @@ -792,7 +812,7 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac } // Acquire write locks - _, err = dbi.harmonyDB.Exec(ctx, + _, err = tx.Exec( `UPDATE sector_location SET write_ts = NOW(), write_lock_owner = $1 WHERE miner_id=$2 @@ -807,7 +827,7 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac } // Acquire read locks - _, err = dbi.harmonyDB.Exec(ctx, + _, err = tx.Exec( `UPDATE sector_location SET read_ts = NOW(), read_refs = read_refs + 1 WHERE miner_id=$1