Skip to content

Commit

Permalink
add msg
Browse files Browse the repository at this point in the history
  • Loading branch information
rushuinet committed Sep 7, 2020
1 parent 128a9e3 commit f7b10c1
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 25 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
3.任务注册,像写http.Handler一样方便
4.任务panic处理
5.阻塞策略处理
6.任务完成支持返回执行备注
```

## Example
Expand All @@ -16,6 +17,7 @@ package main
import (
xxl "github.com/xxl-job/go-client"
"github.com/xxl-job/go-client/example/task"
"log"
)
func main() {
Expand All @@ -29,9 +31,9 @@ func main() {
exec.Init()
exec.RegTask("task.test", task.Test)
exec.RegTask("task.test2", task.Test2)
exec.Run()
exec.RegTask("task.panic", task.Panic)
log.Fatal(exec.Run())
}
```
# see
github.com/xxl-job/go-client/example/
2 changes: 1 addition & 1 deletion example/task/panic.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ import (
xxl "github.com/xxl-job/go-client"
)

func Panic(cxt context.Context, param *xxl.RunReq) {
func Panic(cxt context.Context, param *xxl.RunReq) (msg string) {
panic("test panic")
}
5 changes: 3 additions & 2 deletions example/task/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
xxl "github.com/xxl-job/go-client"
)

func Test(cxt context.Context, param *xxl.RunReq) {
fmt.Println("test one task" + param.ExecutorHandler + " param:" + param.ExecutorParams)
func Test(cxt context.Context, param *xxl.RunReq) (msg string) {
fmt.Println("test one task" + param.ExecutorHandler + " param:" + param.ExecutorParams + "log_id" + xxl.Int64ToStr(param.LogID))
return "test done"
}
2 changes: 1 addition & 1 deletion example/task/test2.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"time"
)

func Test2(cxt context.Context, param *xxl.RunReq) {
func Test2(cxt context.Context, param *xxl.RunReq) (msg string) {
num := 1
for {

Expand Down
18 changes: 11 additions & 7 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,12 @@ func (e *executor) runTask(writer http.ResponseWriter, request *http.Request) {
defer e.mu.Unlock()
req, _ := ioutil.ReadAll(request.Body)
param := &RunReq{}
json.Unmarshal(req, &param)
err := json.Unmarshal(req, &param)
if err != nil {
writer.Write(returnCall(param, 500, "params err"))
log.Println("参数解析错误:" + string(req))
return
}
log.Printf("任务参数:%v", param)
if !e.regList.Exists(param.ExecutorHandler) {
writer.Write(returnCall(param, 500, "Task not registered"))
Expand Down Expand Up @@ -138,7 +143,7 @@ func (e *executor) killTask(writer http.ResponseWriter, request *http.Request) {
defer e.mu.Unlock()
req, _ := ioutil.ReadAll(request.Body)
param := &killReq{}
json.Unmarshal(req, &param)
_ = json.Unmarshal(req, &param)
if !e.runList.Exists(Int64ToStr(param.JobID)) {
writer.Write(returnKill(param, 500))
log.Println("任务[" + Int64ToStr(param.JobID) + "]没有运行")
Expand All @@ -154,8 +159,7 @@ func (e *executor) killTask(writer http.ResponseWriter, request *http.Request) {
func (e *executor) taskLog(writer http.ResponseWriter, request *http.Request) {
data, _ := ioutil.ReadAll(request.Body)
req := &logReq{}
json.Unmarshal(data, &req)

_ = json.Unmarshal(data, &req)
writer.Write(returnLog(req, 200))
}

Expand All @@ -181,12 +185,12 @@ func (e *executor) registry() {
}
body, err := ioutil.ReadAll(result.Body)
res := &res{}
json.Unmarshal(body, &res)
_ = json.Unmarshal(body, &res)
if res.Code != 200 {
log.Println("执行器注册失败:" + string(body))
}
log.Println("执行器注册成功:" + string(body))
result.Body.Close()
_ = result.Body.Close()
t.Reset(time.Second * time.Duration(20)) //20秒心跳防止过期
}
}
Expand All @@ -210,7 +214,7 @@ func (e *executor) registryRemove() {
}
body, err := ioutil.ReadAll(res.Body)
log.Println("执行器摘除成功:" + string(body))
res.Body.Close()
_ = res.Body.Close()
}

//回调任务列表
Expand Down
8 changes: 0 additions & 8 deletions optinos.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ type Option func(o *Options)
var (
DefaultExecutorPort = "9999"
DefaultRegistryKey = "golang-jobs"
DefaultTimeOut = 10 * time.Second
)

// 设置调度中心地址
Expand All @@ -51,13 +50,6 @@ func AccessToken(token string) Option {
}
}

// 请求令牌
func Timeout(timeout time.Duration) Option {
return func(o *Options) {
o.Timeout = timeout
}
}

// 设置执行器IP
func ExecutorIp(ip string) Option {
return func(o *Options) {
Expand Down
7 changes: 3 additions & 4 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

//任务执行函数
type TaskFunc func(cxt context.Context, param *RunReq)
type TaskFunc func(cxt context.Context, param *RunReq) string

//任务
type Task struct {
Expand All @@ -23,16 +23,15 @@ type Task struct {

//运行任务
func (t *Task) Run(callback func(code int64, msg string)) {
t.Ext, t.Cancel = context.WithCancel(context.Background())
defer func(cancel func()) {
if err := recover(); err != nil {
log.Println(t.Info()+" panic: ", err)
callback(500, "task panic:"+fmt.Sprintf("%v", err))
cancel()
}
}(t.Cancel)
t.fn(t.Ext, t.Param)
callback(200, "")
msg := t.fn(t.Ext, t.Param)
callback(200, msg)
return
}

Expand Down

0 comments on commit f7b10c1

Please sign in to comment.