From b8aa5bf079b3af87d5562878e4fdb4aa07a56bf7 Mon Sep 17 00:00:00 2001 From: yisaer Date: Thu, 14 Dec 2023 18:22:20 +0800 Subject: [PATCH] fix Signed-off-by: yisaer --- .github/workflows/build_packages.yaml | 1 - .github/workflows/run_fvt_tests.yaml | 4 +- internal/converter/json/converter.go | 20 ++++- internal/processor/stream.go | 2 + internal/topo/planner/dataSourcePlan.go | 8 ++ internal/topo/planner/filterPlan.go | 2 + internal/topo/planner/joinPlan.go | 8 +- internal/topo/planner/projectPlan.go | 4 + internal/topo/planner/util.go | 9 ++- test/change_stream_rule.jmx | 4 +- test/lookup_table_memory.jmx | 4 +- test/run_jmeter.sh | 103 ------------------------ 12 files changed, 54 insertions(+), 115 deletions(-) diff --git a/.github/workflows/build_packages.yaml b/.github/workflows/build_packages.yaml index f1a2334410..245d97fdeb 100644 --- a/.github/workflows/build_packages.yaml +++ b/.github/workflows/build_packages.yaml @@ -5,7 +5,6 @@ concurrency: cancel-in-progress: true on: - pull_request: release: types: - published diff --git a/.github/workflows/run_fvt_tests.yaml b/.github/workflows/run_fvt_tests.yaml index 0fc61bd0e2..66622fffc6 100644 --- a/.github/workflows/run_fvt_tests.yaml +++ b/.github/workflows/run_fvt_tests.yaml @@ -5,7 +5,7 @@ concurrency: cancel-in-progress: true on: - workflow_call: + pull_request: env: DEBIAN_FRONTEND: noninteractive @@ -90,7 +90,7 @@ jobs: ./test/setup_env.sh ./test/prepare_plugins.sh - name: run fvt tests - timeout-minutes: 12 + timeout-minutes: 2 run: ./test/run_jmeter.sh with_edgex=true - uses: actions/upload-artifact@v3 if: always() diff --git a/internal/converter/json/converter.go b/internal/converter/json/converter.go index 48efd88614..ac28bb211e 100644 --- a/internal/converter/json/converter.go +++ b/internal/converter/json/converter.go @@ -20,6 +20,7 @@ import ( "github.com/valyala/fastjson" + "github.com/lf-edge/ekuiper/internal/conf" "github.com/lf-edge/ekuiper/pkg/ast" "github.com/lf-edge/ekuiper/pkg/cast" "github.com/lf-edge/ekuiper/pkg/message" @@ -64,7 +65,14 @@ func (c *FastJsonConverter) Decode(b []byte) (interface{}, error) { return c.decodeWithSchema(b, c.schema) } -func (f *FastJsonConverter) decodeWithSchema(b []byte, schema map[string]*ast.JsonStreamField) (interface{}, error) { +func (f *FastJsonConverter) decodeWithSchema(b []byte, schema map[string]*ast.JsonStreamField) (m interface{}, err error) { + defer func() { + if err != nil { + conf.Log.Infof("decodeWithSchema,json:%v, schema:%v,err:%v", string(b), schema, err) + } else { + conf.Log.Infof("decodeWithSchema,json:%v, schema:%v,map:%v", string(b), schema, m) + } + }() var p fastjson.Parser v, err := p.ParseBytes(b) if err != nil { @@ -183,7 +191,7 @@ func (f *FastJsonConverter) decodeObject(obj *fastjson.Object, schema map[string if err != nil { return nil, err } - childMap, err := f.decodeObject(childObj, schema[key].Properties) + childMap, err := f.decodeObject(childObj, field.Properties) if err != nil { return nil, err } @@ -197,7 +205,11 @@ func (f *FastJsonConverter) decodeObject(obj *fastjson.Object, schema map[string if err != nil { return nil, err } - subList, err := f.decodeArray(childArray, schema[key].Items) + var items *ast.JsonStreamField + if field != nil { + items = field.Items + } + subList, err := f.decodeArray(childArray, items) if err != nil { return nil, err } @@ -255,6 +267,7 @@ func extractNumberValue(name string, v *fastjson.Value, field *ast.JsonStreamFie } return bv, nil default: + conf.Log.Infof("%v has wrong type:%v, expect:%v", name, fastjson.TypeNumber.String(), field.Type) return nil, fmt.Errorf("%v has wrong type:%v, expect:%v", name, fastjson.TypeNumber.String(), field.Type) } } @@ -276,6 +289,7 @@ func extractStringValue(name string, v *fastjson.Value, field *ast.JsonStreamFie case field.Type == "boolean": return getBooleanFromValue(v) default: + conf.Log.Infof("%v has wrong type:%v, expect:%v", name, fastjson.TypeString.String(), field.Type) return nil, fmt.Errorf("%v has wrong type:%v, expect:%v", name, fastjson.TypeString.String(), field.Type) } } diff --git a/internal/processor/stream.go b/internal/processor/stream.go index c1ca63655d..c8e15f0871 100644 --- a/internal/processor/stream.go +++ b/internal/processor/stream.go @@ -105,6 +105,8 @@ func (p *StreamProcessor) ExecStmt(statement string) (result []string, err error var r string r, err = p.execDrop(s, ast.TypeStream) result = append(result, r) + r = fmt.Sprintf("stream %s is dropped.", s.Name) + log.Printf("%s", r) case *ast.DropTableStatement: var r string r, err = p.execDrop(s, ast.TypeTable) diff --git a/internal/topo/planner/dataSourcePlan.go b/internal/topo/planner/dataSourcePlan.go index 88d92f920b..dcc4028d59 100644 --- a/internal/topo/planner/dataSourcePlan.go +++ b/internal/topo/planner/dataSourcePlan.go @@ -19,6 +19,7 @@ import ( "sort" "strings" + "github.com/lf-edge/ekuiper/internal/conf" "github.com/lf-edge/ekuiper/pkg/ast" "github.com/lf-edge/ekuiper/pkg/message" ) @@ -135,6 +136,10 @@ func (p *DataSourcePlan) extract(expr ast.Expr) (ast.Expr, ast.Expr) { } func (p *DataSourcePlan) PruneColumns(fields []ast.Expr) error { + for _, field := range fields { + conf.Log.Infof("PruneColumns before, datasource:%v, field:%v", p.name, field.String()) + } + // init values err := p.getProps() if err != nil { @@ -206,6 +211,9 @@ func (p *DataSourcePlan) PruneColumns(fields []ast.Expr) error { } } p.getAllFields() + for key := range p.streamFields { + conf.Log.Infof("PruneColumns after, datasource:%v, name:%v", p.name, key) + } return nil } diff --git a/internal/topo/planner/filterPlan.go b/internal/topo/planner/filterPlan.go index 6e2995bc3b..55c31ec093 100644 --- a/internal/topo/planner/filterPlan.go +++ b/internal/topo/planner/filterPlan.go @@ -15,6 +15,7 @@ package planner import ( + "github.com/lf-edge/ekuiper/internal/conf" "github.com/lf-edge/ekuiper/internal/xsql" "github.com/lf-edge/ekuiper/pkg/ast" ) @@ -71,6 +72,7 @@ func (p *FilterPlan) PushDownPredicate(condition ast.Expr) (ast.Expr, LogicalPla } func (p *FilterPlan) PruneColumns(fields []ast.Expr) error { + conf.Log.Infof("filter PruneColumns, condition:%v", p.condition.String()) f := getFields(p.condition) return p.baseLogicalPlan.PruneColumns(append(fields, f...)) } diff --git a/internal/topo/planner/joinPlan.go b/internal/topo/planner/joinPlan.go index b40d7897eb..3fa2136dbf 100644 --- a/internal/topo/planner/joinPlan.go +++ b/internal/topo/planner/joinPlan.go @@ -14,7 +14,10 @@ package planner -import "github.com/lf-edge/ekuiper/pkg/ast" +import ( + "github.com/lf-edge/ekuiper/internal/conf" + "github.com/lf-edge/ekuiper/pkg/ast" +) type JoinPlan struct { baseLogicalPlan @@ -92,6 +95,9 @@ func extractCondition(condition ast.Expr) (unpushable ast.Expr, pushable ast.Exp } func (p *JoinPlan) PruneColumns(fields []ast.Expr) error { + for _, j := range p.joins { + conf.Log.Infof("join PruneColumns, join:%v", j.Expr.String()) + } f := getFields(p.joins) return p.baseLogicalPlan.PruneColumns(append(fields, f...)) } diff --git a/internal/topo/planner/projectPlan.go b/internal/topo/planner/projectPlan.go index d384ae6c93..c6fead61c6 100644 --- a/internal/topo/planner/projectPlan.go +++ b/internal/topo/planner/projectPlan.go @@ -17,6 +17,7 @@ package planner import ( "strconv" + "github.com/lf-edge/ekuiper/internal/conf" "github.com/lf-edge/ekuiper/pkg/ast" ) @@ -95,6 +96,9 @@ func (p *ProjectPlan) BuildExplainInfo() { } func (p *ProjectPlan) PruneColumns(fields []ast.Expr) error { + for _, field := range fields { + conf.Log.Infof("proj PruneColumns, field:%v", field.String()) + } f := getFields(p.fields) return p.baseLogicalPlan.PruneColumns(append(fields, f...)) } diff --git a/internal/topo/planner/util.go b/internal/topo/planner/util.go index ca43889d38..5814d943be 100644 --- a/internal/topo/planner/util.go +++ b/internal/topo/planner/util.go @@ -14,7 +14,10 @@ package planner -import "github.com/lf-edge/ekuiper/pkg/ast" +import ( + "github.com/lf-edge/ekuiper/internal/conf" + "github.com/lf-edge/ekuiper/pkg/ast" +) func getRefSources(node ast.Node) ([]ast.StreamName, bool) { result := make(map[ast.StreamName]bool) @@ -74,5 +77,9 @@ func getFields(node ast.Node) []ast.Expr { } return true }) + + for _, r := range result { + conf.Log.Infof("getFields after, field:%v", r.String()) + } return result } diff --git a/test/change_stream_rule.jmx b/test/change_stream_rule.jmx index 8a14060ade..5bc3f85da4 100644 --- a/test/change_stream_rule.jmx +++ b/test/change_stream_rule.jmx @@ -132,7 +132,7 @@ false { -"sql" : "create stream demo (temperature float, light string) WITH (FORMAT=\"JSON\", DATASOURCE=\"devices/+/messages\",STRICT_VALIDATION=\"true\" )" +"sql" : "create stream demo (temperature float, light bigint) WITH (FORMAT=\"JSON\", DATASOURCE=\"devices/+/messages\",STRICT_VALIDATION=\"true\" )" } = @@ -181,7 +181,7 @@ "qos": 1, "clientId": "demo_001" } - } + },{ "log":{} } ] } = diff --git a/test/lookup_table_memory.jmx b/test/lookup_table_memory.jmx index a549a7ec8b..13bec30786 100644 --- a/test/lookup_table_memory.jmx +++ b/test/lookup_table_memory.jmx @@ -181,7 +181,7 @@ "topic": "alertVal", "sendSingle": true } - } + },{ "log":{} } ] } = @@ -305,7 +305,7 @@ "topic": "rule/alert", "sendSingle": true } - } + },{ "log":{} } ] } = diff --git a/test/run_jmeter.sh b/test/run_jmeter.sh index c724bffb0b..2bfbf75d50 100755 --- a/test/run_jmeter.sh +++ b/test/run_jmeter.sh @@ -59,108 +59,5 @@ fvt_dir=`pwd` rm -rf jmeter_logs -/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/streams_test.jmx -Dbase="$base_dir" -l jmeter_logs/stream_test.jtl -j jmeter_logs/stream_test.log -echo -e "---------------------------------------------\n" - -/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/rule_test.jmx -Dbase="$base_dir" -Dfvt="$fvt_dir" -l jmeter_logs/rule_test.jtl -j jmeter_logs/rule_test.log -echo -e "---------------------------------------------\n" - -/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/select_all_rule.jmx -l jmeter_logs/select_all_rule.jtl -j jmeter_logs/select_all_rule.log -echo -e "---------------------------------------------\n" - -/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/select_condition_rule.jmx -l jmeter_logs/select_condition_rule.jtl -j jmeter_logs/select_condition_rule.log -echo -e "---------------------------------------------\n" - -/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/select_aggr_rule.jmx -l jmeter_logs/select_aggr_rule.jtl -j jmeter_logs/select_aggr_rule.log -echo -e "---------------------------------------------\n" - -/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/change_rule_status.jmx -l jmeter_logs/change_rule_status.jtl -j jmeter_logs/change_rule_status.log -echo -e "---------------------------------------------\n" - -/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/change_stream_rule.jmx -l jmeter_logs/change_stream_rule.jtl -j jmeter_logs/change_stream_rule.log -echo -e "---------------------------------------------\n" - -/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/select_aggr_rule_order.jmx -l jmeter_logs/select_aggr_rule_order.jtl -j jmeter_logs/select_aggr_rule_order.log -echo -e "---------------------------------------------\n" - -/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/rule_pipeline.jmx -l jmeter_logs/rule_pipeline.jtl -j jmeter_logs/rule_pipeline.log -echo -e "---------------------------------------------\n" - -if test $with_edgex = true; then - /opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/select_edgex_condition_rule.jmx -Dbase="$base_dir" -Dfvt="$fvt_dir" -l jmeter_logs/select_edgex_condition_rule.jtl -j jmeter_logs/select_edgex_condition_rule.log - echo -e "---------------------------------------------\n" - - /opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/edgex_sink_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/edgex_sink_rule.jtl -j jmeter_logs/edgex_sink_rule.log - echo -e "---------------------------------------------\n" - - /opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/select_edgex_meta_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/select_edgex_meta_rule.jtl -j jmeter_logs/select_edgex_meta_rule.log - echo -e "---------------------------------------------\n" - - /opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/edgex_mqtt_sink_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/edgex_mqtt_sink_rule.jtl -j jmeter_logs/edgex_mqtt_sink_rule.log - echo -e "---------------------------------------------\n" - - /opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/edgex_redis_share_connection_sink_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/edgex_redis_share_connection_sink_rule.jtl -j jmeter_logs/edgex_redis_share_connection_sink_rule.log - echo -e "---------------------------------------------\n" - - - /opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/edgex_array_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/edgex_array_rule.jtl -j jmeter_logs/edgex_array_rule.log - echo -e "---------------------------------------------\n" -fi - -/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/plugin_end_2_end.jmx -Dfvt="$fvt_dir" -l jmeter_logs/plugin_end_2_end.jtl -j jmeter_logs/plugin_end_2_end.log -echo -e "---------------------------------------------\n" - -/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/portable_end_2_end.jmx -Dfvt="$fvt_dir" -l jmeter_logs/portable_end_2_end.jtl -j jmeter_logs/portable_end_2_end.log -echo -e "---------------------------------------------\n" - - -/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/select_countwindow_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/select_countwindow_rule.jtl -j jmeter_logs/select_countwindow_rule.log -echo -e "---------------------------------------------\n" - -/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/http_pull_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/http_pull_rule.jtl -j jmeter_logs/http_pull_rule.log -echo -e "---------------------------------------------\n" - -/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/binary_image_process.jmx -Dfvt="$fvt_dir" -l jmeter_logs/binary_image_process.jtl -j jmeter_logs/binary_image_process.log -echo -e "---------------------------------------------\n" - -/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/table_static.jmx -Dfvt="$fvt_dir" -l jmeter_logs/table_static.jtl -j jmeter_logs/table_static.log -echo -e "---------------------------------------------\n" - -/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/table_cont.jmx -Dfvt="$fvt_dir" -l jmeter_logs/table_cont.jtl -j jmeter_logs/table_cont.log -echo -e "---------------------------------------------\n" - -/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/shared_source_rules.jmx -Dfvt="$fvt_dir" -l jmeter_logs/shared_source_rules.jtl -j jmeter_logs/shared_source_rules.log -echo -e "---------------------------------------------\n" - -/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/graph_condition_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/graph_condition_rule.jtl -j jmeter_logs/graph_condition_rule.log -echo -e "---------------------------------------------\n" - -/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/graph_group_order_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/graph_group_order_rule.jtl -j jmeter_logs/graph_group_order_rule.log -echo -e "---------------------------------------------\n" - -/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/graph_group_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/graph_group_rule.jtl -j jmeter_logs/graph_group_rule.log -echo -e "---------------------------------------------\n" - -/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/graph_join_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/graph_join_rule.jtl -j jmeter_logs/graph_join_rule.log -echo -e "---------------------------------------------\n" - -/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/graph_mix_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/graph_mix_rule.jtl -j jmeter_logs/graph_mix_rule.log -echo -e "---------------------------------------------\n" - -/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/graph_window_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/graph_window_rule.jtl -j jmeter_logs/graph_window_rule.log -echo -e "---------------------------------------------\n" - /opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/lookup_table_memory.jmx -Dfvt="$fvt_dir" -l jmeter_logs/lookup_table_memory.jtl -j jmeter_logs/lookup_table_memory.log echo -e "---------------------------------------------\n" - -/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/lookup_table_redis.jmx -Dfvt="$fvt_dir" -l jmeter_logs/lookup_table_redis.jtl -j jmeter_logs/lookup_table_redis.log -echo -e "---------------------------------------------\n" - -/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/lookup_table_sql.jmx -Dfvt="$fvt_dir" -l jmeter_logs/lookup_table_sql.jtl -j jmeter_logs/lookup_table_sql.log -echo -e "---------------------------------------------\n" - -echo -e "-------------------- management test ------------------------\n" -/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/management_test/data_import_export.jmx -Dfvt="$fvt_dir" -l jmeter_logs/data_import_export.jtl -j jmeter_logs/data_import_export.log -echo -e "---------------------------------------------\n" -/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/management_test/rule_metrics.jmx -Dfvt="$fvt_dir" -l jmeter_logs/rule_metrics.jtl -j jmeter_logs/rule_metrics.log -echo -e "---------------------------------------------\n" \ No newline at end of file