Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(libbeat): use math/rand/v2 and drop seed init #42025

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions libbeat/cfgfile/glob_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ package cfgfile

import (
"io/ioutil"
"math/rand"
"os"
"strconv"
"testing"
"time"

Expand All @@ -30,16 +28,13 @@ import (

func TestGlobWatcher(t *testing.T) {
// Create random temp directory
id := strconv.Itoa(rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int())
dir, err := ioutil.TempDir("", id)
defer os.RemoveAll(dir)
assert.NoError(t, err)
dir := t.TempDir()
glob := dir + "/*.yml"

gcd := NewGlobWatcher(glob)

content := []byte("test\n")
err = ioutil.WriteFile(dir+"/config1.yml", content, 0644)
err := ioutil.WriteFile(dir+"/config1.yml", content, 0644)
assert.NoError(t, err)
err = ioutil.WriteFile(dir+"/config2.yml", content, 0644)
assert.NoError(t, err)
Expand Down
25 changes: 0 additions & 25 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,11 @@ package instance

import (
"context"
cryptRand "crypto/rand"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"math"
"math/big"
"math/rand"
"net"
"os"
"os/user"
Expand Down Expand Up @@ -183,27 +179,6 @@ func defaultCertReloadConfig() certReloadConfig {

var debugf = logp.MakeDebug("beat")

func init() {
initRand()
}

// initRand initializes the runtime random number generator seed using
// global, shared cryptographically strong pseudo random number generator.
//
// On linux Reader might use getrandom(2) or /udev/random. On windows systems
// CryptGenRandom is used.
func initRand() {
n, err := cryptRand.Int(cryptRand.Reader, big.NewInt(math.MaxInt64))
var seed int64
if err != nil {
// fallback to current timestamp
seed = time.Now().UnixNano()
} else {
seed = n.Int64()
}
rand.Seed(seed) //nolint:staticcheck // need seed from cryptographically strong PRNG.
}

// Run initializes and runs a Beater implementation. name is the name of the
// Beat (e.g. packetbeat or metricbeat). version is version number of the Beater
// implementation. bt is the `Creator` callback for creating a new beater
Expand Down
4 changes: 2 additions & 2 deletions libbeat/common/backoff/equal_jitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package backoff

import (
"math/rand"
"math/rand/v2"
"time"
)

Expand Down Expand Up @@ -55,7 +55,7 @@ func (b *EqualJitterBackoff) Reset() {
func (b *EqualJitterBackoff) Wait() bool {
// Make sure we have always some minimal back off and jitter.
temp := int64(b.duration / 2)
backoff := time.Duration(temp + rand.Int63n(temp))
backoff := time.Duration(temp + rand.Int64N(temp))

// increase duration for next wait.
b.duration *= 2
Expand Down
4 changes: 2 additions & 2 deletions libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"context"
"errors"
"io"
"math/rand"
"math/rand/v2"
"strconv"
"time"

Expand Down Expand Up @@ -217,7 +217,7 @@ func (r *reporter) initLoop(c config) {
// Select one configured endpoint by random and check if xpack is available
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
client := r.out[rand.Intn(len(r.out))]
client := r.out[rand.IntN(len(r.out))]
err := client.Connect(ctx)
if err == nil {
closing(log, client)
Expand Down
4 changes: 2 additions & 2 deletions libbeat/outputs/elasticsearch/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import (
"context"
"fmt"
"math/rand"
"math/rand/v2"
"testing"
"time"

Expand Down Expand Up @@ -83,7 +83,7 @@
// drop old index preparing test
_, _, _ = client.conn.Delete(index, "", "", nil)

batch := encodeBatch(client, outest.NewBatch(beat.Event{

Check failure on line 86 in libbeat/outputs/elasticsearch/client_integration_test.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

cannot infer B (libbeat/outputs/elasticsearch/event_encoder_test.go:106:18) (typecheck)
Timestamp: time.Now(),
Fields: mapstr.M{
"type": "libbeat",
Expand Down Expand Up @@ -135,7 +135,7 @@
}

publish := func(event beat.Event) {
batch := encodeBatch(client, outest.NewBatch(event))

Check failure on line 138 in libbeat/outputs/elasticsearch/client_integration_test.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

cannot infer B (libbeat/outputs/elasticsearch/event_encoder_test.go:106:18) (typecheck)
err := output.Publish(context.Background(), batch)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -217,7 +217,7 @@
_, _, _ = client.conn.Delete(index, "", "", nil)
_, _, _ = client.conn.Delete(deadletterIndex, "", "", nil)

batch := encodeBatch(client, outest.NewBatch(beat.Event{

Check failure on line 220 in libbeat/outputs/elasticsearch/client_integration_test.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

cannot infer B (libbeat/outputs/elasticsearch/event_encoder_test.go:106:18) (typecheck)
Timestamp: time.Now(),
Fields: mapstr.M{
"type": "libbeat",
Expand Down Expand Up @@ -472,6 +472,6 @@
panic("no elasticsearch client")
}

client := grp.Clients[rand.Intn(L)]
client := grp.Clients[rand.IntN(L)]
return client.(outputs.NetworkClient)
}
2 changes: 1 addition & 1 deletion libbeat/outputs/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"math/rand/v2"
"strings"

"github.com/elastic/beats/v7/libbeat/publisher"
Expand Down
6 changes: 3 additions & 3 deletions libbeat/outputs/kafka/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package kafka

import "math/rand"
import "math/rand/v2"

// common helpers used by unit+integration tests

Expand All @@ -35,8 +35,8 @@ func randASCIIBytes(length int) []byte {

func randChar() byte {
start, end := 'a', 'z'
if rand.Int31n(2) == 1 {
if rand.Int32N(2) == 1 {
start, end = 'A', 'Z'
}
return byte(rand.Int31n(end-start+1) + start)
return byte(rand.Int32N(end-start+1) + start)
}
4 changes: 2 additions & 2 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"errors"
"fmt"
"math"
"math/rand"
"math/rand/v2"
"strings"
"time"

Expand Down Expand Up @@ -335,7 +335,7 @@ func makeBackoffFunc(cfg backoffConfig) func(retries, maxRetries int) time.Durat
// apply about equaly distributed jitter in second half of the interval, such that the wait
// time falls into the interval [dur/2, dur]
limit := int64(dur / 2)
jitter := rand.Int63n(limit + 1)
jitter := rand.Int64N(limit + 1)
return time.Duration(limit + jitter)
}
}
4 changes: 2 additions & 2 deletions libbeat/outputs/kafka/kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"context"
"encoding/json"
"fmt"
"math/rand"
"math/rand/v2"
"os"
"strconv"
"sync"
Expand Down Expand Up @@ -58,7 +58,7 @@ type eventInfo struct {
func TestKafkaPublish(t *testing.T) {
logp.TestingSetup(logp.WithSelectors("kafka"))

id := strconv.Itoa(rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int())
id := strconv.Itoa(rand.Int())
testTopic := fmt.Sprintf("test-libbeat-%s", id)
logType := fmt.Sprintf("log-type-%s", id)

Expand Down
13 changes: 5 additions & 8 deletions libbeat/outputs/kafka/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"fmt"
"hash"
"hash/fnv"
"math/rand"
"math/rand/v2"
"strconv"

"github.com/elastic/elastic-agent-libs/config"
Expand Down Expand Up @@ -153,15 +153,14 @@ func cfgRandomPartitioner(_ *logp.Logger, config *config.C) (func() partitioner,
}

return func() partitioner {
generator := rand.New(rand.NewSource(rand.Int63()))
N := cfg.GroupEvents
count := cfg.GroupEvents
partition := int32(0)

return func(_ *message, numPartitions int32) (int32, error) {
if N == count {
count = 0
partition = int32(generator.Intn(int(numPartitions)))
partition = int32(rand.IntN(int(numPartitions)))
}
count++
return partition, nil
Expand All @@ -182,7 +181,7 @@ func cfgRoundRobinPartitioner(_ *logp.Logger, config *config.C) (func() partitio
return func() partitioner {
N := cfg.GroupEvents
count := N
partition := rand.Int31()
partition := rand.Int32()

return func(_ *message, numPartitions int32) (int32, error) {
if N == count {
Expand Down Expand Up @@ -218,12 +217,11 @@ func cfgHashPartitioner(log *logp.Logger, config *config.C) (func() partitioner,
}

func makeHashPartitioner() partitioner {
generator := rand.New(rand.NewSource(rand.Int63()))
hasher := fnv.New32a()

return func(msg *message, numPartitions int32) (int32, error) {
if msg.key == nil {
return int32(generator.Intn(int(numPartitions))), nil
return int32(rand.IntN(int(numPartitions))), nil
}

hash := msg.hash
Expand All @@ -242,7 +240,6 @@ func makeHashPartitioner() partitioner {
}

func makeFieldsHashPartitioner(log *logp.Logger, fields []string, dropFail bool) partitioner {
generator := rand.New(rand.NewSource(rand.Int63()))
hasher := fnv.New32a()

return func(msg *message, numPartitions int32) (int32, error) {
Expand All @@ -264,7 +261,7 @@ func makeFieldsHashPartitioner(log *logp.Logger, fields []string, dropFail bool)
return -1, err
}

msg.hash = generator.Uint32()
msg.hash = rand.Uint32()
} else {
msg.hash = hasher.Sum32()
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/processors/add_id/generator/es_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package generator

import (
"encoding/base64"
"math/rand"
"math/rand/v2"
"sync"
"time"
)
Expand Down
4 changes: 2 additions & 2 deletions libbeat/publisher/pipeline/stress/out.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package stress

import (
"context"
"math/rand"
"math/rand/v2"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -81,7 +81,7 @@ func (t *testOutput) Publish(_ context.Context, batch publisher.Batch) error {
min := int64(config.MinWait)
max := int64(config.MaxWait)
if max > 0 && min < max {
waitFor := rand.Int63n(max-min) + min
waitFor := rand.Int64N(max-min) + min

// TODO: make wait interruptable via `Close`
time.Sleep(time.Duration(waitFor))
Expand Down
4 changes: 2 additions & 2 deletions libbeat/publisher/pipeline/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package pipeline

import (
"context"
"math/rand"
"math/rand/v2"
"sync"
"time"

Expand Down Expand Up @@ -169,7 +169,7 @@ func randomBatch(min, max int) *mockBatch {

// randIntBetween returns a random integer in [min, max)
func randIntBetween(min, max int) int {
return rand.Intn(max-min) + min
return rand.IntN(max-min) + min
}

func waitUntilTrue(duration time.Duration, fn func() bool) bool {
Expand Down
6 changes: 1 addition & 5 deletions libbeat/template/load_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
"math/rand/v2"
"net/http"
"net/http/httptest"
"path/filepath"
Expand All @@ -45,10 +45,6 @@ import (
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
)

func init() {
rand.Seed(time.Now().UnixNano())
}

type testTemplate struct {
t *testing.T
client ESClient
Expand Down
Loading