Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refine show segment ouputs #292

Merged
merged 1 commit into from
Aug 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 103 additions & 62 deletions states/etcd/show/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strings"
"time"

"github.com/samber/lo"

"github.com/milvus-io/birdwatcher/framework"
"github.com/milvus-io/birdwatcher/models"
"github.com/milvus-io/birdwatcher/states/etcd/common"
Expand All @@ -25,6 +27,15 @@ type SegmentParam struct {
Level string `name:"level" default:"" desc:"target segment level"`
}

type segStats struct {
binlogLogSize map[int64]int64
binlogMemSize map[int64]int64
deltaLogSize int64
deltaMemSize int64
statsLogSize int64
statsMemSize int64
}

// SegmentCommand returns show segments command.
func (c *ComponentShow) SegmentCommand(ctx context.Context, p *SegmentParam) error {
segments, err := common.ListSegmentsVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(segment *models.Segment) bool {
Expand All @@ -46,79 +57,81 @@ func (c *ComponentShow) SegmentCommand(ctx context.Context, p *SegmentParam) err
var small, other int
var smallCnt, otherCnt int64

var (
binlogLogSize = make(map[int64]int64)
binlogMemSize = make(map[int64]int64)
deltaLogSize int64
deltaMemSize int64
statsLogSize int64
statsMemSize int64
)

for _, info := range segments {
if info.State != models.SegmentStateDropped {
totalRC += info.NumOfRows
healthy++
}
switch info.State {
case models.SegmentStateGrowing:
growing++
case models.SegmentStateSealed:
sealed++
case models.SegmentStateFlushing, models.SegmentStateFlushed:
flushed++
if float64(info.NumOfRows)/float64(info.MaxRowNum) < 0.2 {
small++
smallCnt += info.NumOfRows
} else {
other++
otherCnt += info.NumOfRows
}
case models.SegmentStateDropped:
dropped++
collectionID2SegStats := make(map[int64]*segStats)
collectionID2Segments := lo.GroupBy(segments, func(s *models.Segment) int64 {
return s.CollectionID
})

for collectionID, segs := range collectionID2Segments {
fmt.Printf("===============================CollectionID: %d===========================\n", collectionID)
collectionID2SegStats[collectionID] = &segStats{
binlogLogSize: make(map[int64]int64),
binlogMemSize: make(map[int64]int64),
}

switch p.Format {
case "table":
PrintSegmentInfo(info, p.Detail)
case "line":
fmt.Printf("SegmentID: %d State: %s, Level: %s, Row Count:%d, PartitionStatsVersion:%d \n", info.ID, info.State.String(),
info.Level.String(), info.NumOfRows, info.PartitionStatsVersion)
case "statistics":
for _, info := range segs {
if info.State != models.SegmentStateDropped {
for _, binlog := range info.GetBinlogs() {
for _, log := range binlog.Binlogs {
binlogLogSize[binlog.FieldID] += log.LogSize
binlogMemSize[binlog.FieldID] += log.MemSize
}
totalRC += info.NumOfRows
healthy++
}
switch info.State {
case models.SegmentStateGrowing:
growing++
case models.SegmentStateSealed:
sealed++
case models.SegmentStateFlushing, models.SegmentStateFlushed:
flushed++
if float64(info.NumOfRows)/float64(info.MaxRowNum) < 0.2 {
small++
smallCnt += info.NumOfRows
} else {
other++
otherCnt += info.NumOfRows
}
for _, delta := range info.GetDeltalogs() {
for _, log := range delta.Binlogs {
deltaLogSize += log.LogSize
deltaMemSize += log.MemSize
case models.SegmentStateDropped:
dropped++
}

switch p.Format {
case "table":
PrintSegmentInfo(info, p.Detail)
case "line":
fmt.Printf("SegmentID: %d PartitionID: %d State: %s, Level: %s, Row Count:%d, PartitionStatsVersion:%d \n",
info.ID, info.PartitionID, info.State.String(), info.Level.String(), info.NumOfRows, info.PartitionStatsVersion)
case "statistics":
if info.State != models.SegmentStateDropped {
for _, binlog := range info.GetBinlogs() {
for _, log := range binlog.Binlogs {
collectionID2SegStats[collectionID].binlogLogSize[binlog.FieldID] += log.LogSize
collectionID2SegStats[collectionID].binlogMemSize[binlog.FieldID] += log.MemSize
}
}
}
for _, statslog := range info.GetStatslogs() {
for _, binlog := range statslog.Binlogs {
statsLogSize += binlog.LogSize
statsMemSize += binlog.MemSize
for _, delta := range info.GetDeltalogs() {
for _, log := range delta.Binlogs {
collectionID2SegStats[collectionID].deltaLogSize += log.LogSize
collectionID2SegStats[collectionID].deltaMemSize += log.MemSize
}
}
for _, statslog := range info.GetStatslogs() {
for _, binlog := range statslog.Binlogs {
collectionID2SegStats[collectionID].statsLogSize += binlog.LogSize
collectionID2SegStats[collectionID].statsMemSize += binlog.MemSize
}
}
}
default:
err := fmt.Errorf("unsupport format:%s\n", p.Format)
return err
}
}
if p.Format == "statistics" {
outputStats("Collection", collectionID2SegStats[collectionID])
}
fmt.Printf("\n")
}

if p.Format == "statistics" {
var totalBinlogLogSize int64
var totalBinlogMemSize int64
for fieldID, logSize := range binlogLogSize {
memSize := binlogMemSize[fieldID]
fmt.Printf("\t field binlog size[%d]: %s, mem size[%d]: %s\n", fieldID, hrSize(logSize), fieldID, hrSize(memSize))
totalBinlogLogSize += logSize
totalBinlogMemSize += memSize
}
fmt.Printf("--- Total binlog size: %s, mem size: %s\n", hrSize(totalBinlogLogSize), hrSize(totalBinlogMemSize))
fmt.Printf("--- Total deltalog size: %s, mem size: %s\n", hrSize(deltaLogSize), hrSize(deltaMemSize))
fmt.Printf("--- Total statslog size: %s, mem size: %s\n", hrSize(statsLogSize), hrSize(statsMemSize))
outputStats("Total", lo.Values(collectionID2SegStats)...)
}

fmt.Printf("--- Growing: %d, Sealed: %d, Flushed: %d, Dropped: %d\n", growing, sealed, flushed, dropped)
Expand All @@ -127,6 +140,34 @@ func (c *ComponentShow) SegmentCommand(ctx context.Context, p *SegmentParam) err
return nil
}

func outputStats(scope string, stats ...*segStats) {
var totalBinlogLogSize int64
var totalBinlogMemSize int64
var TotalDeltaLogSize int64
var TotalDeltaMemSize int64
var TotalStatsLogSize int64
var TotalStatsMemSize int64
for _, s := range stats {
for fieldID, logSize := range s.binlogLogSize {
memSize := s.binlogMemSize[fieldID]
if scope != "Total" {
fmt.Printf("field[%d] binlog size: %s, mem size: %s\n", fieldID, hrSize(logSize), hrSize(memSize))
}
totalBinlogLogSize += logSize
totalBinlogMemSize += memSize
}

TotalDeltaLogSize += s.deltaLogSize
TotalDeltaMemSize += s.deltaMemSize
TotalStatsLogSize += s.statsLogSize
TotalStatsMemSize += s.statsMemSize
}

fmt.Printf("--- %s binlog size: %s, mem size: %s\n", scope, hrSize(totalBinlogLogSize), hrSize(totalBinlogMemSize))
fmt.Printf("--- %s deltalog size: %s, mem size: %s\n", scope, hrSize(TotalDeltaLogSize), hrSize(TotalDeltaMemSize))
fmt.Printf("--- %s statslog size: %s, mem size: %s\n", scope, hrSize(TotalStatsLogSize), hrSize(TotalStatsMemSize))
}

func hrSize(size int64) string {
sf := float64(size)
units := []string{"Bytes", "KB", "MB", "GB"}
Expand Down
Loading