diff --git a/README.md b/README.md index 124b582..f0304d0 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,7 @@ 3.任务注册,像写http.Handler一样方便 4.任务panic处理 5.阻塞策略处理 +6.任务完成支持返回执行备注 ``` ## Example @@ -16,6 +17,7 @@ package main import ( xxl "github.com/xxl-job/go-client" "github.com/xxl-job/go-client/example/task" + "log" ) func main() { @@ -29,9 +31,9 @@ func main() { exec.Init() exec.RegTask("task.test", task.Test) exec.RegTask("task.test2", task.Test2) - exec.Run() + exec.RegTask("task.panic", task.Panic) + log.Fatal(exec.Run()) } - ``` # see github.com/xxl-job/go-client/example/ diff --git a/example/task/panic.go b/example/task/panic.go index ab3b333..b1305a9 100644 --- a/example/task/panic.go +++ b/example/task/panic.go @@ -5,6 +5,6 @@ import ( xxl "github.com/xxl-job/go-client" ) -func Panic(cxt context.Context, param *xxl.RunReq) { +func Panic(cxt context.Context, param *xxl.RunReq) (msg string) { panic("test panic") } diff --git a/example/task/test.go b/example/task/test.go index 6deb947..ea7c346 100644 --- a/example/task/test.go +++ b/example/task/test.go @@ -6,6 +6,7 @@ import ( xxl "github.com/xxl-job/go-client" ) -func Test(cxt context.Context, param *xxl.RunReq) { - fmt.Println("test one task" + param.ExecutorHandler + " param:" + param.ExecutorParams) +func Test(cxt context.Context, param *xxl.RunReq) (msg string) { + fmt.Println("test one task" + param.ExecutorHandler + " param:" + param.ExecutorParams + "log_id" + xxl.Int64ToStr(param.LogID)) + return "test done" } diff --git a/example/task/test2.go b/example/task/test2.go index 36dba92..ecfc2c4 100644 --- a/example/task/test2.go +++ b/example/task/test2.go @@ -7,7 +7,7 @@ import ( "time" ) -func Test2(cxt context.Context, param *xxl.RunReq) { +func Test2(cxt context.Context, param *xxl.RunReq) (msg string) { num := 1 for { diff --git a/executor.go b/executor.go index 31c699a..757561f 100644 --- a/executor.go +++ b/executor.go @@ -95,7 +95,12 @@ func (e *executor) runTask(writer http.ResponseWriter, request *http.Request) { defer e.mu.Unlock() req, _ := ioutil.ReadAll(request.Body) param := &RunReq{} - json.Unmarshal(req, ¶m) + err := json.Unmarshal(req, ¶m) + if err != nil { + writer.Write(returnCall(param, 500, "params err")) + log.Println("参数解析错误:" + string(req)) + return + } log.Printf("任务参数:%v", param) if !e.regList.Exists(param.ExecutorHandler) { writer.Write(returnCall(param, 500, "Task not registered")) @@ -138,7 +143,7 @@ func (e *executor) killTask(writer http.ResponseWriter, request *http.Request) { defer e.mu.Unlock() req, _ := ioutil.ReadAll(request.Body) param := &killReq{} - json.Unmarshal(req, ¶m) + _ = json.Unmarshal(req, ¶m) if !e.runList.Exists(Int64ToStr(param.JobID)) { writer.Write(returnKill(param, 500)) log.Println("任务[" + Int64ToStr(param.JobID) + "]没有运行") @@ -154,8 +159,7 @@ func (e *executor) killTask(writer http.ResponseWriter, request *http.Request) { func (e *executor) taskLog(writer http.ResponseWriter, request *http.Request) { data, _ := ioutil.ReadAll(request.Body) req := &logReq{} - json.Unmarshal(data, &req) - + _ = json.Unmarshal(data, &req) writer.Write(returnLog(req, 200)) } @@ -181,12 +185,12 @@ func (e *executor) registry() { } body, err := ioutil.ReadAll(result.Body) res := &res{} - json.Unmarshal(body, &res) + _ = json.Unmarshal(body, &res) if res.Code != 200 { log.Println("执行器注册失败:" + string(body)) } log.Println("执行器注册成功:" + string(body)) - result.Body.Close() + _ = result.Body.Close() t.Reset(time.Second * time.Duration(20)) //20秒心跳防止过期 } } @@ -210,7 +214,7 @@ func (e *executor) registryRemove() { } body, err := ioutil.ReadAll(res.Body) log.Println("执行器摘除成功:" + string(body)) - res.Body.Close() + _ = res.Body.Close() } //回调任务列表 diff --git a/optinos.go b/optinos.go index f210ac2..8ac8c24 100644 --- a/optinos.go +++ b/optinos.go @@ -34,7 +34,6 @@ type Option func(o *Options) var ( DefaultExecutorPort = "9999" DefaultRegistryKey = "golang-jobs" - DefaultTimeOut = 10 * time.Second ) // 设置调度中心地址 @@ -51,13 +50,6 @@ func AccessToken(token string) Option { } } -// 请求令牌 -func Timeout(timeout time.Duration) Option { - return func(o *Options) { - o.Timeout = timeout - } -} - // 设置执行器IP func ExecutorIp(ip string) Option { return func(o *Options) { diff --git a/task.go b/task.go index 64be769..32804cd 100644 --- a/task.go +++ b/task.go @@ -7,7 +7,7 @@ import ( ) //任务执行函数 -type TaskFunc func(cxt context.Context, param *RunReq) +type TaskFunc func(cxt context.Context, param *RunReq) string //任务 type Task struct { @@ -23,7 +23,6 @@ type Task struct { //运行任务 func (t *Task) Run(callback func(code int64, msg string)) { - t.Ext, t.Cancel = context.WithCancel(context.Background()) defer func(cancel func()) { if err := recover(); err != nil { log.Println(t.Info()+" panic: ", err) @@ -31,8 +30,8 @@ func (t *Task) Run(callback func(code int64, msg string)) { cancel() } }(t.Cancel) - t.fn(t.Ext, t.Param) - callback(200, "") + msg := t.fn(t.Ext, t.Param) + callback(200, msg) return }