diff --git a/Makefile b/Makefile index 68d97227b..c3c46f71e 100644 --- a/Makefile +++ b/Makefile @@ -356,7 +356,7 @@ docsgen-md-bin: api-gen actors-gen docsgen-openrpc-bin: api-gen actors-gen $(GOCC) build $(GOFLAGS) -o docgen-openrpc ./api/docgen-openrpc/cmd -docsgen-md: docsgen-md-full docsgen-md-storage docsgen-md-worker +docsgen-md: docsgen-md-full docsgen-md-storage docsgen-md-worker docsgen-md-provider docsgen-md-full: docsgen-md-bin ./docgen-md "api/api_full.go" "FullNode" "api" "./api" > documentation/en/api-v1-unstable-methods.md @@ -365,6 +365,8 @@ docsgen-md-storage: docsgen-md-bin ./docgen-md "api/api_storage.go" "StorageMiner" "api" "./api" > documentation/en/api-v0-methods-miner.md docsgen-md-worker: docsgen-md-bin ./docgen-md "api/api_worker.go" "Worker" "api" "./api" > documentation/en/api-v0-methods-worker.md +docsgen-md-provider: docsgen-md-bin + ./docgen-md "api/api_lp.go" "Provider" "api" "./api" > documentation/en/api-v0-methods-provider.md docsgen-openrpc: docsgen-openrpc-full docsgen-openrpc-storage docsgen-openrpc-worker docsgen-openrpc-gateway diff --git a/api/docgen/docgen.go b/api/docgen/docgen.go index 018629600..5a05c8d0e 100644 --- a/api/docgen/docgen.go +++ b/api/docgen/docgen.go @@ -432,6 +432,10 @@ func GetAPIType(name, pkg string) (i interface{}, t reflect.Type, permStruct []r i = &api.GatewayStruct{} t = reflect.TypeOf(new(struct{ api.Gateway })).Elem() permStruct = append(permStruct, reflect.TypeOf(api.GatewayStruct{}.Internal)) + case "Provider": + i = &api.LotusProviderStruct{} + t = reflect.TypeOf(new(struct{ api.LotusProvider })).Elem() + permStruct = append(permStruct, reflect.TypeOf(api.LotusProviderStruct{}.Internal)) default: panic("unknown type") } diff --git a/cmd/lotus-provider/migrate.go b/cmd/lotus-provider/migrate.go index 819499402..3869c7dfb 100644 --- a/cmd/lotus-provider/migrate.go +++ b/cmd/lotus-provider/migrate.go @@ -231,12 +231,17 @@ func fromMiner(cctx *cli.Context) (err error) { dbSettings += ` --db-name="` + smCfg.HarmonyDB.Database + `"` } + var layerMaybe string + if name != "base" { + layerMaybe = "--layer=" + name + } + msg += ` To work with the config: ` + cliCommandColor(`lotus-provider `+dbSettings+` config help `) msg += ` To run Lotus Provider: in its own machine or cgroup without other files, use the command: -` + cliCommandColor(`lotus-provider `+dbSettings+` run --layers="`+name+`"`) +` + cliCommandColor(`lotus-provider `+dbSettings+` run `+layerMaybe) fmt.Println(msg) return nil } diff --git a/cmd/lotus-provider/proving.go b/cmd/lotus-provider/proving.go index 577b5b5f9..a3211b176 100644 --- a/cmd/lotus-provider/proving.go +++ b/cmd/lotus-provider/proving.go @@ -117,7 +117,7 @@ var wdPostTaskCmd = &cli.Command{ } fmt.Print(".") } - log.Infof("Result:", result.String) + log.Infof("Result: %s", result.String) return nil }, } diff --git a/cmd/lotus-provider/run.go b/cmd/lotus-provider/run.go index bf19ee537..de97aa766 100644 --- a/cmd/lotus-provider/run.go +++ b/cmd/lotus-provider/run.go @@ -176,7 +176,7 @@ var runCmd = &cli.Command{ } } log.Infow("This lotus_provider instance handles", - "miner_addresses", maddrs, + "miner_addresses", minerAddressesToStrings(maddrs), "tasks", lo.Map(activeTasks, func(t harmonytask.TaskInterface, _ int) string { return t.TypeDetails().Name })) taskEngine, err := harmonytask.New(db, activeTasks, deps.listenAddr) @@ -457,3 +457,11 @@ func (p *ProviderAPI) Shutdown(context.Context) error { close(p.ShutdownChan) return nil } + +func minerAddressesToStrings(maddrs []dtypes.MinerAddress) []string { + strs := make([]string, len(maddrs)) + for i, addr := range maddrs { + strs[i] = address.Address(addr).String() + } + return strs +} diff --git a/documentation/en/api-v0-methods-provider.md b/documentation/en/api-v0-methods-provider.md new file mode 100644 index 000000000..fc4a2daf7 --- /dev/null +++ b/documentation/en/api-v0-methods-provider.md @@ -0,0 +1,25 @@ +# Groups +* [](#) + * [Shutdown](#Shutdown) + * [Version](#Version) +## + + +### Shutdown + + +Perms: admin + +Inputs: `null` + +Response: `{}` + +### Version + + +Perms: admin + +Inputs: `null` + +Response: `131840` + diff --git a/lib/harmony/harmonytask/harmonytask.go b/lib/harmony/harmonytask/harmonytask.go index 595e5b63a..7577c5cf5 100644 --- a/lib/harmony/harmonytask/harmonytask.go +++ b/lib/harmony/harmonytask/harmonytask.go @@ -146,6 +146,11 @@ func New( TaskTypeDetails: c.TypeDetails(), TaskEngine: e, } + + if len(h.Name) > 16 { + return nil, fmt.Errorf("task name too long: %s, max 16 characters", h.Name) + } + e.handlers = append(e.handlers, &h) e.taskMap[h.TaskTypeDetails.Name] = &h } @@ -171,7 +176,7 @@ func New( continue // not really fatal, but not great } } - if !h.considerWork("recovered", []TaskID{TaskID(w.ID)}) { + if !h.considerWork(workSourceRecover, []TaskID{TaskID(w.ID)}) { log.Error("Strange: Unable to accept previously owned task: ", w.ID, w.Name) } } @@ -280,7 +285,7 @@ func (e *TaskEngine) pollerTryAllWork() { continue } if len(unownedTasks) > 0 { - accepted := v.considerWork("poller", unownedTasks) + accepted := v.considerWork(workSourcePoller, unownedTasks) if accepted { return // accept new work slowly and in priority order } diff --git a/lib/harmony/harmonytask/task_type_handler.go b/lib/harmony/harmonytask/task_type_handler.go index 79a156fef..34f7a5c3e 100644 --- a/lib/harmony/harmonytask/task_type_handler.go +++ b/lib/harmony/harmonytask/task_type_handler.go @@ -49,6 +49,11 @@ func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error } } +const ( + workSourcePoller = "poller" + workSourceRecover = "recovered" +) + // considerWork is called to attempt to start work on a task-id of this task type. // It presumes single-threaded calling, so there should not be a multi-threaded re-entry. // The only caller should be the one work poller thread. This does spin off other threads, @@ -87,22 +92,25 @@ top: return false } - // 4. Can we claim the work for our hostname? - ct, err := h.TaskEngine.db.Exec(h.TaskEngine.ctx, "UPDATE harmony_task SET owner_id=$1 WHERE id=$2 AND owner_id IS NULL", h.TaskEngine.ownerID, *tID) - if err != nil { - log.Error(err) - return false - } - if ct == 0 { - log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "already Taken", "name", h.Name) - var tryAgain = make([]TaskID, 0, len(ids)-1) - for _, id := range ids { - if id != *tID { - tryAgain = append(tryAgain, id) - } + // if recovering we don't need to try to claim anything because those tasks are already claimed by us + if from != workSourceRecover { + // 4. Can we claim the work for our hostname? + ct, err := h.TaskEngine.db.Exec(h.TaskEngine.ctx, "UPDATE harmony_task SET owner_id=$1 WHERE id=$2 AND owner_id IS NULL", h.TaskEngine.ownerID, *tID) + if err != nil { + log.Error(err) + return false + } + if ct == 0 { + log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "already Taken", "name", h.Name) + var tryAgain = make([]TaskID, 0, len(ids)-1) + for _, id := range ids { + if id != *tID { + tryAgain = append(tryAgain, id) + } + } + ids = tryAgain + goto top } - ids = tryAgain - goto top } h.Count.Add(1) diff --git a/provider/lpmessage/sender.go b/provider/lpmessage/sender.go index bee22ff69..0db0c0b51 100644 --- a/provider/lpmessage/sender.go +++ b/provider/lpmessage/sender.go @@ -324,6 +324,7 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS return cid.Undef, xerrors.Errorf("marshaling message: %w", err) } + var sendTaskID *harmonytask.TaskID taskAdder(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { _, err := tx.Exec(`insert into message_sends (from_key, to_addr, send_reason, unsigned_data, unsigned_cid, send_task_id) values ($1, $2, $3, $4, $5, $6)`, msg.From.String(), msg.To.String(), reason, unsBytes.Bytes(), msg.Cid().String(), id) @@ -331,9 +332,15 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS return false, xerrors.Errorf("inserting message into db: %w", err) } + sendTaskID = &id + return true, nil }) + if sendTaskID == nil { + return cid.Undef, xerrors.Errorf("failed to add task") + } + // wait for exec var ( pollInterval = 50 * time.Millisecond @@ -347,10 +354,10 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS for { var err error - var sigCidStr, sendError string + var sigCidStr, sendError *string var sendSuccess *bool - err = s.db.QueryRow(ctx, `select signed_cid, send_success, send_error from message_sends where send_task_id = $1`, taskAdder).Scan(&sigCidStr, &sendSuccess, &sendError) + err = s.db.QueryRow(ctx, `select signed_cid, send_success, send_error from message_sends where send_task_id = $1`, &sendTaskID).Scan(&sigCidStr, &sendSuccess, &sendError) if err != nil { return cid.Undef, xerrors.Errorf("getting cid for task: %w", err) } @@ -366,10 +373,15 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS continue } + if sigCidStr == nil || sendError == nil { + // should never happen because sendSuccess is already not null here + return cid.Undef, xerrors.Errorf("got null values for sigCidStr or sendError, this should never happen") + } + if !*sendSuccess { - sendErr = xerrors.Errorf("send error: %s", sendError) + sendErr = xerrors.Errorf("send error: %s", *sendError) } else { - sigCid, err = cid.Parse(sigCidStr) + sigCid, err = cid.Parse(*sigCidStr) if err != nil { return cid.Undef, xerrors.Errorf("parsing signed cid: %w", err) } diff --git a/provider/lpwindow/recover_task.go b/provider/lpwindow/recover_task.go index 6006f3c35..12f8522b5 100644 --- a/provider/lpwindow/recover_task.go +++ b/provider/lpwindow/recover_task.go @@ -217,7 +217,7 @@ func (w *WdPostRecoverDeclareTask) CanAccept(ids []harmonytask.TaskID, engine *h func (w *WdPostRecoverDeclareTask) TypeDetails() harmonytask.TaskTypeDetails { return harmonytask.TaskTypeDetails{ Max: 128, - Name: "WdPostRecoverDeclare", + Name: "WdPostRecover", Cost: resources.Resources{ Cpu: 1, Gpu: 0,