diff --git a/states/etcd/show/segment.go b/states/etcd/show/segment.go index d2d3d21..17c4ee0 100644 --- a/states/etcd/show/segment.go +++ b/states/etcd/show/segment.go @@ -7,6 +7,8 @@ import ( "strings" "time" + "github.com/samber/lo" + "github.com/milvus-io/birdwatcher/framework" "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/states/etcd/common" @@ -25,6 +27,15 @@ type SegmentParam struct { Level string `name:"level" default:"" desc:"target segment level"` } +type segStats struct { + binlogLogSize map[int64]int64 + binlogMemSize map[int64]int64 + deltaLogSize int64 + deltaMemSize int64 + statsLogSize int64 + statsMemSize int64 +} + // SegmentCommand returns show segments command. func (c *ComponentShow) SegmentCommand(ctx context.Context, p *SegmentParam) error { segments, err := common.ListSegmentsVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(segment *models.Segment) bool { @@ -46,79 +57,81 @@ func (c *ComponentShow) SegmentCommand(ctx context.Context, p *SegmentParam) err var small, other int var smallCnt, otherCnt int64 - var ( - binlogLogSize = make(map[int64]int64) - binlogMemSize = make(map[int64]int64) - deltaLogSize int64 - deltaMemSize int64 - statsLogSize int64 - statsMemSize int64 - ) - - for _, info := range segments { - if info.State != models.SegmentStateDropped { - totalRC += info.NumOfRows - healthy++ - } - switch info.State { - case models.SegmentStateGrowing: - growing++ - case models.SegmentStateSealed: - sealed++ - case models.SegmentStateFlushing, models.SegmentStateFlushed: - flushed++ - if float64(info.NumOfRows)/float64(info.MaxRowNum) < 0.2 { - small++ - smallCnt += info.NumOfRows - } else { - other++ - otherCnt += info.NumOfRows - } - case models.SegmentStateDropped: - dropped++ + collectionID2SegStats := make(map[int64]*segStats) + collectionID2Segments := lo.GroupBy(segments, func(s *models.Segment) int64 { + return s.CollectionID + }) + + for collectionID, segs := range collectionID2Segments { + fmt.Printf("===============================CollectionID: %d===========================\n", collectionID) + collectionID2SegStats[collectionID] = &segStats{ + binlogLogSize: make(map[int64]int64), + binlogMemSize: make(map[int64]int64), } - switch p.Format { - case "table": - PrintSegmentInfo(info, p.Detail) - case "line": - fmt.Printf("SegmentID: %d State: %s, Level: %s, Row Count:%d, PartitionStatsVersion:%d \n", info.ID, info.State.String(), - info.Level.String(), info.NumOfRows, info.PartitionStatsVersion) - case "statistics": + for _, info := range segs { if info.State != models.SegmentStateDropped { - for _, binlog := range info.GetBinlogs() { - for _, log := range binlog.Binlogs { - binlogLogSize[binlog.FieldID] += log.LogSize - binlogMemSize[binlog.FieldID] += log.MemSize - } + totalRC += info.NumOfRows + healthy++ + } + switch info.State { + case models.SegmentStateGrowing: + growing++ + case models.SegmentStateSealed: + sealed++ + case models.SegmentStateFlushing, models.SegmentStateFlushed: + flushed++ + if float64(info.NumOfRows)/float64(info.MaxRowNum) < 0.2 { + small++ + smallCnt += info.NumOfRows + } else { + other++ + otherCnt += info.NumOfRows } - for _, delta := range info.GetDeltalogs() { - for _, log := range delta.Binlogs { - deltaLogSize += log.LogSize - deltaMemSize += log.MemSize + case models.SegmentStateDropped: + dropped++ + } + + switch p.Format { + case "table": + PrintSegmentInfo(info, p.Detail) + case "line": + fmt.Printf("SegmentID: %d PartitionID: %d State: %s, Level: %s, Row Count:%d, PartitionStatsVersion:%d \n", + info.ID, info.PartitionID, info.State.String(), info.Level.String(), info.NumOfRows, info.PartitionStatsVersion) + case "statistics": + if info.State != models.SegmentStateDropped { + for _, binlog := range info.GetBinlogs() { + for _, log := range binlog.Binlogs { + collectionID2SegStats[collectionID].binlogLogSize[binlog.FieldID] += log.LogSize + collectionID2SegStats[collectionID].binlogMemSize[binlog.FieldID] += log.MemSize + } } - } - for _, statslog := range info.GetStatslogs() { - for _, binlog := range statslog.Binlogs { - statsLogSize += binlog.LogSize - statsMemSize += binlog.MemSize + for _, delta := range info.GetDeltalogs() { + for _, log := range delta.Binlogs { + collectionID2SegStats[collectionID].deltaLogSize += log.LogSize + collectionID2SegStats[collectionID].deltaMemSize += log.MemSize + } + } + for _, statslog := range info.GetStatslogs() { + for _, binlog := range statslog.Binlogs { + collectionID2SegStats[collectionID].statsLogSize += binlog.LogSize + collectionID2SegStats[collectionID].statsMemSize += binlog.MemSize + } } } + default: + err := fmt.Errorf("unsupport format:%s\n", p.Format) + return err } } + if p.Format == "statistics" { + outputStats("Collection", collectionID2SegStats[collectionID]) + } + fmt.Printf("\n") } + if p.Format == "statistics" { - var totalBinlogLogSize int64 - var totalBinlogMemSize int64 - for fieldID, logSize := range binlogLogSize { - memSize := binlogMemSize[fieldID] - fmt.Printf("\t field binlog size[%d]: %s, mem size[%d]: %s\n", fieldID, hrSize(logSize), fieldID, hrSize(memSize)) - totalBinlogLogSize += logSize - totalBinlogMemSize += memSize - } - fmt.Printf("--- Total binlog size: %s, mem size: %s\n", hrSize(totalBinlogLogSize), hrSize(totalBinlogMemSize)) - fmt.Printf("--- Total deltalog size: %s, mem size: %s\n", hrSize(deltaLogSize), hrSize(deltaMemSize)) - fmt.Printf("--- Total statslog size: %s, mem size: %s\n", hrSize(statsLogSize), hrSize(statsMemSize)) + outputStats("Total", lo.Values(collectionID2SegStats)...) } fmt.Printf("--- Growing: %d, Sealed: %d, Flushed: %d, Dropped: %d\n", growing, sealed, flushed, dropped) @@ -127,6 +140,34 @@ func (c *ComponentShow) SegmentCommand(ctx context.Context, p *SegmentParam) err return nil } +func outputStats(scope string, stats ...*segStats) { + var totalBinlogLogSize int64 + var totalBinlogMemSize int64 + var TotalDeltaLogSize int64 + var TotalDeltaMemSize int64 + var TotalStatsLogSize int64 + var TotalStatsMemSize int64 + for _, s := range stats { + for fieldID, logSize := range s.binlogLogSize { + memSize := s.binlogMemSize[fieldID] + if scope != "Total" { + fmt.Printf("field[%d] binlog size: %s, mem size: %s\n", fieldID, hrSize(logSize), hrSize(memSize)) + } + totalBinlogLogSize += logSize + totalBinlogMemSize += memSize + } + + TotalDeltaLogSize += s.deltaLogSize + TotalDeltaMemSize += s.deltaMemSize + TotalStatsLogSize += s.statsLogSize + TotalStatsMemSize += s.statsMemSize + } + + fmt.Printf("--- %s binlog size: %s, mem size: %s\n", scope, hrSize(totalBinlogLogSize), hrSize(totalBinlogMemSize)) + fmt.Printf("--- %s deltalog size: %s, mem size: %s\n", scope, hrSize(TotalDeltaLogSize), hrSize(TotalDeltaMemSize)) + fmt.Printf("--- %s statslog size: %s, mem size: %s\n", scope, hrSize(TotalStatsLogSize), hrSize(TotalStatsMemSize)) +} + func hrSize(size int64) string { sf := float64(size) units := []string{"Bytes", "KB", "MB", "GB"}