Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: yisaer <disxiaofei@163.com>
  • Loading branch information
Yisaer committed Dec 15, 2023
1 parent 6c5e4ec commit b8aa5bf
Show file tree
Hide file tree
Showing 12 changed files with 54 additions and 115 deletions.
1 change: 0 additions & 1 deletion .github/workflows/build_packages.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ concurrency:
cancel-in-progress: true

on:
pull_request:
release:
types:
- published
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/run_fvt_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ concurrency:
cancel-in-progress: true

on:
workflow_call:
pull_request:

env:
DEBIAN_FRONTEND: noninteractive
Expand Down Expand Up @@ -90,7 +90,7 @@ jobs:
./test/setup_env.sh
./test/prepare_plugins.sh
- name: run fvt tests
timeout-minutes: 12
timeout-minutes: 2
run: ./test/run_jmeter.sh with_edgex=true
- uses: actions/upload-artifact@v3
if: always()
Expand Down
20 changes: 17 additions & 3 deletions internal/converter/json/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/valyala/fastjson"

"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/pkg/ast"
"github.com/lf-edge/ekuiper/pkg/cast"
"github.com/lf-edge/ekuiper/pkg/message"
Expand Down Expand Up @@ -64,7 +65,14 @@ func (c *FastJsonConverter) Decode(b []byte) (interface{}, error) {
return c.decodeWithSchema(b, c.schema)
}

func (f *FastJsonConverter) decodeWithSchema(b []byte, schema map[string]*ast.JsonStreamField) (interface{}, error) {
func (f *FastJsonConverter) decodeWithSchema(b []byte, schema map[string]*ast.JsonStreamField) (m interface{}, err error) {
defer func() {
if err != nil {
conf.Log.Infof("decodeWithSchema,json:%v, schema:%v,err:%v", string(b), schema, err)
} else {
conf.Log.Infof("decodeWithSchema,json:%v, schema:%v,map:%v", string(b), schema, m)
}
}()
var p fastjson.Parser
v, err := p.ParseBytes(b)
if err != nil {
Expand Down Expand Up @@ -183,7 +191,7 @@ func (f *FastJsonConverter) decodeObject(obj *fastjson.Object, schema map[string
if err != nil {
return nil, err
}
childMap, err := f.decodeObject(childObj, schema[key].Properties)
childMap, err := f.decodeObject(childObj, field.Properties)
if err != nil {
return nil, err
}
Expand All @@ -197,7 +205,11 @@ func (f *FastJsonConverter) decodeObject(obj *fastjson.Object, schema map[string
if err != nil {
return nil, err
}
subList, err := f.decodeArray(childArray, schema[key].Items)
var items *ast.JsonStreamField
if field != nil {
items = field.Items
}
subList, err := f.decodeArray(childArray, items)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -255,6 +267,7 @@ func extractNumberValue(name string, v *fastjson.Value, field *ast.JsonStreamFie
}
return bv, nil
default:
conf.Log.Infof("%v has wrong type:%v, expect:%v", name, fastjson.TypeNumber.String(), field.Type)
return nil, fmt.Errorf("%v has wrong type:%v, expect:%v", name, fastjson.TypeNumber.String(), field.Type)
}
}
Expand All @@ -276,6 +289,7 @@ func extractStringValue(name string, v *fastjson.Value, field *ast.JsonStreamFie
case field.Type == "boolean":
return getBooleanFromValue(v)
default:
conf.Log.Infof("%v has wrong type:%v, expect:%v", name, fastjson.TypeString.String(), field.Type)
return nil, fmt.Errorf("%v has wrong type:%v, expect:%v", name, fastjson.TypeString.String(), field.Type)
}
}
Expand Down
2 changes: 2 additions & 0 deletions internal/processor/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ func (p *StreamProcessor) ExecStmt(statement string) (result []string, err error
var r string
r, err = p.execDrop(s, ast.TypeStream)
result = append(result, r)
r = fmt.Sprintf("stream %s is dropped.", s.Name)
log.Printf("%s", r)
case *ast.DropTableStatement:
var r string
r, err = p.execDrop(s, ast.TypeTable)
Expand Down
8 changes: 8 additions & 0 deletions internal/topo/planner/dataSourcePlan.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sort"
"strings"

"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/pkg/ast"
"github.com/lf-edge/ekuiper/pkg/message"
)
Expand Down Expand Up @@ -135,6 +136,10 @@ func (p *DataSourcePlan) extract(expr ast.Expr) (ast.Expr, ast.Expr) {
}

func (p *DataSourcePlan) PruneColumns(fields []ast.Expr) error {
for _, field := range fields {
conf.Log.Infof("PruneColumns before, datasource:%v, field:%v", p.name, field.String())
}

// init values
err := p.getProps()
if err != nil {
Expand Down Expand Up @@ -206,6 +211,9 @@ func (p *DataSourcePlan) PruneColumns(fields []ast.Expr) error {
}
}
p.getAllFields()
for key := range p.streamFields {
conf.Log.Infof("PruneColumns after, datasource:%v, name:%v", p.name, key)
}
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions internal/topo/planner/filterPlan.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package planner

import (
"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/internal/xsql"
"github.com/lf-edge/ekuiper/pkg/ast"
)
Expand Down Expand Up @@ -71,6 +72,7 @@ func (p *FilterPlan) PushDownPredicate(condition ast.Expr) (ast.Expr, LogicalPla
}

func (p *FilterPlan) PruneColumns(fields []ast.Expr) error {
conf.Log.Infof("filter PruneColumns, condition:%v", p.condition.String())
f := getFields(p.condition)
return p.baseLogicalPlan.PruneColumns(append(fields, f...))
}
Expand Down
8 changes: 7 additions & 1 deletion internal/topo/planner/joinPlan.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

package planner

import "github.com/lf-edge/ekuiper/pkg/ast"
import (
"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/pkg/ast"
)

type JoinPlan struct {
baseLogicalPlan
Expand Down Expand Up @@ -92,6 +95,9 @@ func extractCondition(condition ast.Expr) (unpushable ast.Expr, pushable ast.Exp
}

func (p *JoinPlan) PruneColumns(fields []ast.Expr) error {
for _, j := range p.joins {
conf.Log.Infof("join PruneColumns, join:%v", j.Expr.String())
}
f := getFields(p.joins)
return p.baseLogicalPlan.PruneColumns(append(fields, f...))
}
4 changes: 4 additions & 0 deletions internal/topo/planner/projectPlan.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package planner
import (
"strconv"

"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/pkg/ast"
)

Expand Down Expand Up @@ -95,6 +96,9 @@ func (p *ProjectPlan) BuildExplainInfo() {
}

func (p *ProjectPlan) PruneColumns(fields []ast.Expr) error {
for _, field := range fields {
conf.Log.Infof("proj PruneColumns, field:%v", field.String())
}
f := getFields(p.fields)
return p.baseLogicalPlan.PruneColumns(append(fields, f...))
}
9 changes: 8 additions & 1 deletion internal/topo/planner/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

package planner

import "github.com/lf-edge/ekuiper/pkg/ast"
import (
"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/pkg/ast"
)

func getRefSources(node ast.Node) ([]ast.StreamName, bool) {
result := make(map[ast.StreamName]bool)
Expand Down Expand Up @@ -74,5 +77,9 @@ func getFields(node ast.Node) []ast.Expr {
}
return true
})

for _, r := range result {
conf.Log.Infof("getFields after, field:%v", r.String())
}
return result
}
4 changes: 2 additions & 2 deletions test/change_stream_rule.jmx
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@
<elementProp name="" elementType="HTTPArgument">
<boolProp name="HTTPArgument.always_encode">false</boolProp>
<stringProp name="Argument.value">{&#xd;
&quot;sql&quot; : &quot;create stream demo (temperature float, light string) WITH (FORMAT=\&quot;JSON\&quot;, DATASOURCE=\&quot;devices/+/messages\&quot;,STRICT_VALIDATION=\&quot;true\&quot; )&quot;&#xd;
&quot;sql&quot; : &quot;create stream demo (temperature float, light bigint) WITH (FORMAT=\&quot;JSON\&quot;, DATASOURCE=\&quot;devices/+/messages\&quot;,STRICT_VALIDATION=\&quot;true\&quot; )&quot;&#xd;
}</stringProp>
<stringProp name="Argument.metadata">=</stringProp>
</elementProp>
Expand Down Expand Up @@ -181,7 +181,7 @@
&quot;qos&quot;: 1,&#xd;
&quot;clientId&quot;: &quot;demo_001&quot;&#xd;
}&#xd;
}&#xd;
},{ &quot;log&quot;:{} }&#xd;
]&#xd;
}</stringProp>
<stringProp name="Argument.metadata">=</stringProp>
Expand Down
4 changes: 2 additions & 2 deletions test/lookup_table_memory.jmx
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@
&quot;topic&quot;: &quot;alertVal&quot;,&#xd;
&quot;sendSingle&quot;: true&#xd;
}&#xd;
}&#xd;
},{ &quot;log&quot;:{} }&#xd;
]&#xd;
}</stringProp>
<stringProp name="Argument.metadata">=</stringProp>
Expand Down Expand Up @@ -305,7 +305,7 @@
&quot;topic&quot;: &quot;rule/alert&quot;,&#xd;
&quot;sendSingle&quot;: true&#xd;
}&#xd;
}&#xd;
},{ &quot;log&quot;:{} }&#xd;
]&#xd;
}</stringProp>
<stringProp name="Argument.metadata">=</stringProp>
Expand Down
103 changes: 0 additions & 103 deletions test/run_jmeter.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,108 +59,5 @@ fvt_dir=`pwd`

rm -rf jmeter_logs

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/streams_test.jmx -Dbase="$base_dir" -l jmeter_logs/stream_test.jtl -j jmeter_logs/stream_test.log
echo -e "---------------------------------------------\n"

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/rule_test.jmx -Dbase="$base_dir" -Dfvt="$fvt_dir" -l jmeter_logs/rule_test.jtl -j jmeter_logs/rule_test.log
echo -e "---------------------------------------------\n"

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/select_all_rule.jmx -l jmeter_logs/select_all_rule.jtl -j jmeter_logs/select_all_rule.log
echo -e "---------------------------------------------\n"

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/select_condition_rule.jmx -l jmeter_logs/select_condition_rule.jtl -j jmeter_logs/select_condition_rule.log
echo -e "---------------------------------------------\n"

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/select_aggr_rule.jmx -l jmeter_logs/select_aggr_rule.jtl -j jmeter_logs/select_aggr_rule.log
echo -e "---------------------------------------------\n"

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/change_rule_status.jmx -l jmeter_logs/change_rule_status.jtl -j jmeter_logs/change_rule_status.log
echo -e "---------------------------------------------\n"

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/change_stream_rule.jmx -l jmeter_logs/change_stream_rule.jtl -j jmeter_logs/change_stream_rule.log
echo -e "---------------------------------------------\n"

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/select_aggr_rule_order.jmx -l jmeter_logs/select_aggr_rule_order.jtl -j jmeter_logs/select_aggr_rule_order.log
echo -e "---------------------------------------------\n"

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/rule_pipeline.jmx -l jmeter_logs/rule_pipeline.jtl -j jmeter_logs/rule_pipeline.log
echo -e "---------------------------------------------\n"

if test $with_edgex = true; then
/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/select_edgex_condition_rule.jmx -Dbase="$base_dir" -Dfvt="$fvt_dir" -l jmeter_logs/select_edgex_condition_rule.jtl -j jmeter_logs/select_edgex_condition_rule.log
echo -e "---------------------------------------------\n"

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/edgex_sink_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/edgex_sink_rule.jtl -j jmeter_logs/edgex_sink_rule.log
echo -e "---------------------------------------------\n"

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/select_edgex_meta_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/select_edgex_meta_rule.jtl -j jmeter_logs/select_edgex_meta_rule.log
echo -e "---------------------------------------------\n"

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/edgex_mqtt_sink_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/edgex_mqtt_sink_rule.jtl -j jmeter_logs/edgex_mqtt_sink_rule.log
echo -e "---------------------------------------------\n"

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/edgex_redis_share_connection_sink_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/edgex_redis_share_connection_sink_rule.jtl -j jmeter_logs/edgex_redis_share_connection_sink_rule.log
echo -e "---------------------------------------------\n"


/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/edgex_array_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/edgex_array_rule.jtl -j jmeter_logs/edgex_array_rule.log
echo -e "---------------------------------------------\n"
fi

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/plugin_end_2_end.jmx -Dfvt="$fvt_dir" -l jmeter_logs/plugin_end_2_end.jtl -j jmeter_logs/plugin_end_2_end.log
echo -e "---------------------------------------------\n"

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/portable_end_2_end.jmx -Dfvt="$fvt_dir" -l jmeter_logs/portable_end_2_end.jtl -j jmeter_logs/portable_end_2_end.log
echo -e "---------------------------------------------\n"


/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/select_countwindow_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/select_countwindow_rule.jtl -j jmeter_logs/select_countwindow_rule.log
echo -e "---------------------------------------------\n"

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/http_pull_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/http_pull_rule.jtl -j jmeter_logs/http_pull_rule.log
echo -e "---------------------------------------------\n"

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/binary_image_process.jmx -Dfvt="$fvt_dir" -l jmeter_logs/binary_image_process.jtl -j jmeter_logs/binary_image_process.log
echo -e "---------------------------------------------\n"

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/table_static.jmx -Dfvt="$fvt_dir" -l jmeter_logs/table_static.jtl -j jmeter_logs/table_static.log
echo -e "---------------------------------------------\n"

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/table_cont.jmx -Dfvt="$fvt_dir" -l jmeter_logs/table_cont.jtl -j jmeter_logs/table_cont.log
echo -e "---------------------------------------------\n"

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/shared_source_rules.jmx -Dfvt="$fvt_dir" -l jmeter_logs/shared_source_rules.jtl -j jmeter_logs/shared_source_rules.log
echo -e "---------------------------------------------\n"

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/graph_condition_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/graph_condition_rule.jtl -j jmeter_logs/graph_condition_rule.log
echo -e "---------------------------------------------\n"

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/graph_group_order_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/graph_group_order_rule.jtl -j jmeter_logs/graph_group_order_rule.log
echo -e "---------------------------------------------\n"

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/graph_group_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/graph_group_rule.jtl -j jmeter_logs/graph_group_rule.log
echo -e "---------------------------------------------\n"

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/graph_join_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/graph_join_rule.jtl -j jmeter_logs/graph_join_rule.log
echo -e "---------------------------------------------\n"

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/graph_mix_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/graph_mix_rule.jtl -j jmeter_logs/graph_mix_rule.log
echo -e "---------------------------------------------\n"

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/graph_window_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/graph_window_rule.jtl -j jmeter_logs/graph_window_rule.log
echo -e "---------------------------------------------\n"

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/lookup_table_memory.jmx -Dfvt="$fvt_dir" -l jmeter_logs/lookup_table_memory.jtl -j jmeter_logs/lookup_table_memory.log
echo -e "---------------------------------------------\n"

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/lookup_table_redis.jmx -Dfvt="$fvt_dir" -l jmeter_logs/lookup_table_redis.jtl -j jmeter_logs/lookup_table_redis.log
echo -e "---------------------------------------------\n"

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/lookup_table_sql.jmx -Dfvt="$fvt_dir" -l jmeter_logs/lookup_table_sql.jtl -j jmeter_logs/lookup_table_sql.log
echo -e "---------------------------------------------\n"

echo -e "-------------------- management test ------------------------\n"
/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/management_test/data_import_export.jmx -Dfvt="$fvt_dir" -l jmeter_logs/data_import_export.jtl -j jmeter_logs/data_import_export.log
echo -e "---------------------------------------------\n"
/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/management_test/rule_metrics.jmx -Dfvt="$fvt_dir" -l jmeter_logs/rule_metrics.jtl -j jmeter_logs/rule_metrics.log
echo -e "---------------------------------------------\n"

0 comments on commit b8aa5bf

Please sign in to comment.