diff --git a/cmd/go-judge/stream/stream.go b/cmd/go-judge/stream/stream.go index 832b1a8..530264b 100644 --- a/cmd/go-judge/stream/stream.go +++ b/cmd/go-judge/stream/stream.go @@ -79,15 +79,19 @@ func Start(baseCtx context.Context, s Stream, w worker.Worker, srcPrefix []strin if err != nil { return fmt.Errorf("convert exec request: %w", err) } - logger.Sugar().Debugf("request: %+v", rq) - defer func() { + closeFunc := func() { for _, f := range streamIn { f.Close() } + streamIn = nil for _, f := range streamOut { f.Close() } - }() + streamOut = nil + } + defer closeFunc() + + logger.Sugar().Debugf("request: %+v", rq) var wg errgroup.Group execCtx, execCancel := context.WithCancel(baseCtx) @@ -120,6 +124,8 @@ func Start(baseCtx context.Context, s Stream, w worker.Worker, srcPrefix []strin err = sendLoop(ctx, s, outCh, rtCh, logger) cancel() + closeFunc() + streamOut = nil wg.Wait() return err } diff --git a/cmd/go-judge/ws_executor/websocket.go b/cmd/go-judge/ws_executor/websocket.go index 9618348..c361f65 100644 --- a/cmd/go-judge/ws_executor/websocket.go +++ b/cmd/go-judge/ws_executor/websocket.go @@ -197,7 +197,10 @@ func (h *wsHandle) handleStream(c *gin.Context) { w := &streamWrapper{ctx: ctx, conn: conn, sendCh: make(chan stream.Response)} go w.sendLoop() - stream.Start(ctx, w, h.worker, h.srcPrefix, h.logger) + if err := stream.Start(ctx, w, h.worker, h.srcPrefix, h.logger); err != nil { + h.logger.Sugar().Debugln("stream start: ", err) + c.Error(err) + } } type contextMap struct {