Skip to content

Commit

Permalink
Update executor.go
Browse files Browse the repository at this point in the history
  • Loading branch information
rushuinet committed Oct 17, 2022
1 parent 07a8f9a commit bac6dce
Showing 1 changed file with 15 additions and 12 deletions.
27 changes: 15 additions & 12 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ func NewExecutor(opts ...Option) Executor {

func newExecutor(opts ...Option) *executor {
options := newOptions(opts...)
executor := &executor{
e := &executor{
opts: options,
}
return executor
return e
}

type executor struct {
Expand All @@ -73,7 +73,7 @@ func (e *executor) Init(opts ...Option) {
e.runList = &taskList{
data: make(map[string]*Task),
}
e.address = ":" + e.opts.ExecutorPort
e.address = e.opts.ExecutorIp + ":" + e.opts.ExecutorPort
go e.registry()
}

Expand All @@ -93,7 +93,7 @@ func (e *executor) Run() (err error) {
mux.HandleFunc("/idleBeat", e.idleBeat)
// 创建服务器
server := &http.Server{
Addr: e.address,
Addr: ":" + e.opts.ExecutorPort,
WriteTimeout: time.Second * 3,
Handler: mux,
}
Expand All @@ -119,10 +119,11 @@ func (e *executor) RegTask(pattern string, task TaskFunc) {
return
}

//运行一个任务
// 运行一个任务
func (e *executor) runTask(writer http.ResponseWriter, request *http.Request) {
e.mu.Lock()
defer e.mu.Unlock()

req, _ := ioutil.ReadAll(request.Body)
param := &RunReq{}
err := json.Unmarshal(req, &param)
Expand Down Expand Up @@ -173,7 +174,7 @@ func (e *executor) runTask(writer http.ResponseWriter, request *http.Request) {
_, _ = writer.Write(returnGeneral())
}

//删除一个任务
// 删除一个任务
func (e *executor) killTask(writer http.ResponseWriter, request *http.Request) {
e.mu.Lock()
defer e.mu.Unlock()
Expand All @@ -191,7 +192,7 @@ func (e *executor) killTask(writer http.ResponseWriter, request *http.Request) {
_, _ = writer.Write(returnGeneral())
}

//任务日志
// 任务日志
func (e *executor) taskLog(writer http.ResponseWriter, request *http.Request) {
var res *LogRes
data, err := ioutil.ReadAll(request.Body)
Expand Down Expand Up @@ -227,6 +228,7 @@ func (e *executor) beat(writer http.ResponseWriter, request *http.Request) {
func (e *executor) idleBeat(writer http.ResponseWriter, request *http.Request) {
e.mu.Lock()
defer e.mu.Unlock()
defer request.Body.Close()
req, _ := ioutil.ReadAll(request.Body)
param := &idleBeatReq{}
err := json.Unmarshal(req, &param)
Expand All @@ -244,7 +246,7 @@ func (e *executor) idleBeat(writer http.ResponseWriter, request *http.Request) {
_, _ = writer.Write(returnGeneral())
}

//注册执行器到调度中心
// 注册执行器到调度中心
func (e *executor) registry() {

t := time.NewTimer(time.Second * 0) //初始立即执行
Expand Down Expand Up @@ -285,7 +287,7 @@ func (e *executor) registry() {
}
}

//执行器注册摘除
// 执行器注册摘除
func (e *executor) registryRemove() {
t := time.NewTimer(time.Second * 0) //初始立即执行
defer t.Stop()
Expand All @@ -304,19 +306,20 @@ func (e *executor) registryRemove() {
e.log.Error("执行器摘除失败:" + err.Error())
return
}
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
e.log.Info("执行器摘除成功:" + string(body))
_ = res.Body.Close()
}

//回调任务列表
// 回调任务列表
func (e *executor) callback(task *Task, code int64, msg string) {
e.runList.Del(Int64ToStr(task.Id))
res, err := e.post("/api/callback", string(returnCall(task.Param, code, msg)))
if err != nil {
e.log.Error("callback err : ", err.Error())
return
}
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
if err != nil {
e.log.Error("callback ReadAll err : ", err.Error())
Expand All @@ -325,7 +328,7 @@ func (e *executor) callback(task *Task, code int64, msg string) {
e.log.Info("任务回调成功:" + string(body))
}

//post
// post
func (e *executor) post(action, body string) (resp *http.Response, err error) {
request, err := http.NewRequest("POST", e.opts.ServerAddr+action, strings.NewReader(body))
if err != nil {
Expand Down

0 comments on commit bac6dce

Please sign in to comment.