Skip to content

Commit

Permalink
fix: improves memory leak in new container stream (#3076)
Browse files Browse the repository at this point in the history
  • Loading branch information
amir20 authored Jul 9, 2024
1 parent 00f2c4f commit ce2b008
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 26 deletions.
26 changes: 13 additions & 13 deletions internal/agent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,19 @@ func (s *server) StreamLogs(in *pb.StreamLogsRequest, out pb.AgentService_Stream
return err
}

g := docker.NewEventGenerator(reader, container)
g := docker.NewEventGenerator(out.Context(), reader, container)

for {
select {
case event := <-g.Events:
out.Send(&pb.StreamLogsResponse{
Event: logEventToPb(event),
})
case e := <-g.Errors:
return e
case <-out.Context().Done():
return nil
}
for event := range g.Events {
out.Send(&pb.StreamLogsResponse{
Event: logEventToPb(event),
})
}

select {
case e := <-g.Errors:
return e
default:
return nil
}
}

Expand All @@ -80,7 +80,7 @@ func (s *server) LogsBetweenDates(in *pb.LogsBetweenDatesRequest, out pb.AgentSe
return err
}

g := docker.NewEventGenerator(reader, container)
g := docker.NewEventGenerator(out.Context(), reader, container)

for {
select {
Expand Down
19 changes: 14 additions & 5 deletions internal/docker/event_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package docker
import (
"bufio"
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
Expand All @@ -28,6 +29,7 @@ type EventGenerator struct {
tty bool
wg sync.WaitGroup
containerID string
ctx context.Context
}

var bufPool = sync.Pool{
Expand All @@ -38,14 +40,15 @@ var bufPool = sync.Pool{

var ErrBadHeader = fmt.Errorf("dozzle/docker: unable to read header")

func NewEventGenerator(reader io.Reader, container Container) *EventGenerator {
func NewEventGenerator(ctx context.Context, reader io.Reader, container Container) *EventGenerator {
generator := &EventGenerator{
reader: bufio.NewReader(reader),
buffer: make(chan *LogEvent, 100),
Errors: make(chan error, 1),
Events: make(chan *LogEvent),
tty: container.Tty,
containerID: container.ID,
ctx: ctx,
}
generator.wg.Add(2)
go generator.consumeReader()
Expand All @@ -56,6 +59,7 @@ func NewEventGenerator(reader io.Reader, container Container) *EventGenerator {
func (g *EventGenerator) processBuffer() {
var current, next *LogEvent

loop:
for {
if g.next != nil {
current = g.next
Expand All @@ -64,18 +68,23 @@ func (g *EventGenerator) processBuffer() {
} else {
event, ok := <-g.buffer
if !ok {
close(g.Events)
break
break loop
}

current = event
next = g.peek()
}

checkPosition(current, next)

g.Events <- current
select {
case g.Events <- current:
case <-g.ctx.Done():
break loop
}
}

close(g.Events)

g.wg.Done()
}

Expand Down
9 changes: 5 additions & 4 deletions internal/docker/event_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package docker
import (
"bufio"
"bytes"
"context"
"encoding/binary"
"reflect"
"strings"
Expand All @@ -19,7 +20,7 @@ func TestEventGenerator_Events_tty(t *testing.T) {
input := "example input"
reader := bufio.NewReader(strings.NewReader(input))

g := NewEventGenerator(reader, Container{Tty: true})
g := NewEventGenerator(context.Background(), reader, Container{Tty: true})
event := <-g.Events

require.NotNil(t, event, "Expected event to not be nil, but got nil")
Expand All @@ -30,7 +31,7 @@ func TestEventGenerator_Events_non_tty(t *testing.T) {
input := "example input"
reader := bytes.NewReader(makeMessage(input, STDOUT))

g := NewEventGenerator(reader, Container{Tty: false})
g := NewEventGenerator(context.Background(), reader, Container{Tty: false})
event := <-g.Events

require.NotNil(t, event, "Expected event to not be nil, but got nil")
Expand All @@ -41,7 +42,7 @@ func TestEventGenerator_Events_non_tty_close_channel(t *testing.T) {
input := "example input"
reader := bytes.NewReader(makeMessage(input, STDOUT))

g := NewEventGenerator(reader, Container{Tty: false})
g := NewEventGenerator(context.Background(), reader, Container{Tty: false})
<-g.Events
_, ok := <-g.Events

Expand All @@ -52,7 +53,7 @@ func TestEventGenerator_Events_routines_done(t *testing.T) {
input := "example input"
reader := bytes.NewReader(makeMessage(input, STDOUT))

g := NewEventGenerator(reader, Container{Tty: false})
g := NewEventGenerator(context.Background(), reader, Container{Tty: false})
<-g.Events
assert.False(t, waitTimeout(&g.wg, 1*time.Second), "Expected routines to be done")
}
Expand Down
5 changes: 3 additions & 2 deletions internal/support/docker/client_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (d *dockerClientService) LogsBetweenDates(ctx context.Context, container do
return nil, err
}

g := docker.NewEventGenerator(reader, container)
g := docker.NewEventGenerator(ctx, reader, container)
return g.Events, nil
}

Expand All @@ -57,10 +57,11 @@ func (d *dockerClientService) StreamLogs(ctx context.Context, container docker.C
return err
}

g := docker.NewEventGenerator(reader, container)
g := docker.NewEventGenerator(ctx, reader, container)
for event := range g.Events {
events <- event
}

select {
case e := <-g.Errors:
return e
Expand Down
4 changes: 4 additions & 0 deletions internal/support/docker/multi_host_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ func (m *MultiHostService) SubscribeContainersStarted(ctx context.Context, conta
for _, client := range m.clients {
client.SubscribeContainersStarted(ctx, newContainers)
}
go func() {
<-ctx.Done()
close(newContainers)
}()

go func() {
for container := range newContainers {
Expand Down
3 changes: 2 additions & 1 deletion internal/web/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ func createRouter(h *handler) *chi.Mux {

r.Get("/healthcheck", h.healthcheck)

// r.Mount("/debug", middleware.Profiler())
})

if base != "/" {
Expand All @@ -126,6 +125,8 @@ func createRouter(h *handler) *chi.Mux {

fileServer = http.FileServer(http.FS(h.content))

// r.Mount("/debug", middleware.Profiler())

return r
}

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"watch:backend": "LIVE_FS=true DEV=true DOZZLE_ADDR=localhost:3100 reflex -c .reflex.server",
"dev": "concurrently --kill-others \"npm:watch:*\"",
"build": "vite build",
"preview": "LIVE_FS=true DOZZLE_ADDR=localhost:3100 reflex -c .reflex",
"preview": "LIVE_FS=true DOZZLE_ADDR=localhost:3100 reflex -c .reflex.server",
"release": "bumpp",
"test": "TZ=UTC vitest",
"typecheck": "vue-tsc --noEmit",
Expand Down

0 comments on commit ce2b008

Please sign in to comment.