Skip to content

Commit

Permalink
GODRIVER-2335 Preemptively cancel in progress operations when SDAM he…
Browse files Browse the repository at this point in the history
…artbeats timeout. (#1549)
  • Loading branch information
qingyang-hu authored Feb 13, 2024
1 parent 556e2f2 commit 9c6603d
Show file tree
Hide file tree
Showing 22 changed files with 1,572 additions and 83 deletions.
5 changes: 3 additions & 2 deletions event/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,9 @@ type PoolEvent struct {
Reason string `json:"reason"`
// ServiceID is only set if the Type is PoolCleared and the server is deployed behind a load balancer. This field
// can be used to distinguish between individual servers in a load balanced deployment.
ServiceID *primitive.ObjectID `json:"serviceId"`
Error error `json:"error"`
ServiceID *primitive.ObjectID `json:"serviceId"`
Interruption bool `json:"interruptInUseConnections"`
Error error `json:"error"`
}

// PoolMonitor is a function that allows the user to gain access to events occurring in the pool
Expand Down
8 changes: 8 additions & 0 deletions internal/eventtest/eventtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,11 @@ func (tpm *TestPoolMonitor) IsPoolCleared() bool {
})
return len(poolClearedEvents) > 0
}

// Interruptions returns the number of interruptions in the events recorded by the testPoolMonitor.
func (tpm *TestPoolMonitor) Interruptions() int {
interruptions := tpm.Events(func(evt *event.PoolEvent) bool {
return evt.Interruption
})
return len(interruptions)
}
2 changes: 2 additions & 0 deletions mongo/integration/unified/client_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,8 @@ func setClientOptionsFromURIOptions(clientOpts *options.ClientOptions, uriOpts b
switch strings.ToLower(key) {
case "appname":
clientOpts.SetAppName(value.(string))
case "connecttimeoutms":
clientOpts.SetConnectTimeout(time.Duration(value.(int32)) * time.Millisecond)
case "heartbeatfrequencyms":
clientOpts.SetHeartbeatInterval(time.Duration(value.(int32)) * time.Millisecond)
case "loadbalanced":
Expand Down
69 changes: 69 additions & 0 deletions mongo/integration/unified/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,70 @@ func newCollectionEntityOptions(id string, databaseID string, collectionName str
return options
}

type task struct {
name string
execute func() error
}

type backgroundRoutine struct {
tasks chan *task
wg sync.WaitGroup
err error
}

func (b *backgroundRoutine) start() {
b.wg.Add(1)

go func() {
defer b.wg.Done()

for t := range b.tasks {
if b.err != nil {
continue
}

ch := make(chan error)
go func(task *task) {
ch <- task.execute()
}(t)
select {
case err := <-ch:
if err != nil {
b.err = fmt.Errorf("error running operation %s: %v", t.name, err)
}
case <-time.After(10 * time.Second):
b.err = fmt.Errorf("timed out after 10 seconds")
}
}
}()
}

func (b *backgroundRoutine) stop() error {
close(b.tasks)
b.wg.Wait()
return b.err
}

func (b *backgroundRoutine) addTask(name string, execute func() error) bool {
select {
case b.tasks <- &task{
name: name,
execute: execute,
}:
return true
default:
return false
}
}

func newBackgroundRoutine() *backgroundRoutine {
routine := &backgroundRoutine{
tasks: make(chan *task, 10),
}

return routine
}

type clientEncryptionOpts struct {
KeyVaultClient string `bson:"keyVaultClient"`
KeyVaultNamespace string `bson:"keyVaultNamespace"`
Expand All @@ -136,6 +200,7 @@ type EntityMap struct {
successValues map[string]int32
iterationValues map[string]int32
clientEncryptionEntities map[string]*mongo.ClientEncryption
routinesMap sync.Map // maps thread name to *backgroundRoutine
evtLock sync.Mutex
closed atomic.Value
// keyVaultClientIDs tracks IDs of clients used as a keyVaultClient in ClientEncryption objects.
Expand Down Expand Up @@ -283,6 +348,10 @@ func (em *EntityMap) addEntity(ctx context.Context, entityType string, entityOpt
err = em.addCollectionEntity(entityOptions)
case "session":
err = em.addSessionEntity(entityOptions)
case "thread":
routine := newBackgroundRoutine()
em.routinesMap.Store(entityOptions.ID, routine)
routine.start()
case "bucket":
err = em.addGridFSBucketEntity(entityOptions)
case "clientEncryption":
Expand Down
7 changes: 6 additions & 1 deletion mongo/integration/unified/event_verification.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ type cmapEvent struct {
ConnectionCheckedInEvent *struct{} `bson:"connectionCheckedInEvent"`

PoolClearedEvent *struct {
HasServiceID *bool `bson:"hasServiceId"`
HasServiceID *bool `bson:"hasServiceId"`
InterruptInUseConnections *bool `bson:"interruptInUseConnections"`
} `bson:"poolClearedEvent"`
}

Expand Down Expand Up @@ -361,6 +362,10 @@ func verifyCMAPEvents(client *clientEntity, expectedEvents *expectedEvents) erro
return newEventVerificationError(idx, client, "error verifying serviceID: %v", err)
}
}
if expectInterruption := evt.PoolClearedEvent.InterruptInUseConnections; expectInterruption != nil && *expectInterruption != actual.Interruption {
return newEventVerificationError(idx, client, "expected interruptInUseConnections %v, got %v",
expectInterruption, actual.Interruption)
}
default:
return newEventVerificationError(idx, client, "no expected event set on cmapEvent instance")
}
Expand Down
35 changes: 30 additions & 5 deletions mongo/integration/unified/testrunner_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ func (lp *loopArgs) iterationsStored() bool {
return lp.IterationsEntityID != ""
}

func executeTestRunnerOperation(ctx context.Context, operation *operation, loopDone <-chan struct{}) error {
args := operation.Arguments
func executeTestRunnerOperation(ctx context.Context, op *operation, loopDone <-chan struct{}) error {
args := op.Arguments

switch operation.Name {
switch op.Name {
case "failPoint":
clientID := lookupString(args, "client")
client, err := entities(ctx).client(clientID)
Expand Down Expand Up @@ -187,9 +187,34 @@ func executeTestRunnerOperation(ctx context.Context, operation *operation, loopD
}
}
return nil
case "runOnThread":
operationRaw, err := args.LookupErr("operation")
if err != nil {
return fmt.Errorf("'operation' argument not found in runOnThread operation")
}
threadOp := new(operation)
if err := operationRaw.Unmarshal(threadOp); err != nil {
return fmt.Errorf("error unmarshaling 'operation' argument: %v", err)
}
thread := lookupString(args, "thread")
routine, ok := entities(ctx).routinesMap.Load(thread)
if !ok {
return fmt.Errorf("run on unknown thread: %s", thread)
}
routine.(*backgroundRoutine).addTask(threadOp.Name, func() error {
return threadOp.execute(ctx, loopDone)
})
return nil
case "waitForThread":
thread := lookupString(args, "thread")
routine, ok := entities(ctx).routinesMap.Load(thread)
if !ok {
return fmt.Errorf("wait for unknown thread: %s", thread)
}
return routine.(*backgroundRoutine).stop()
case "waitForEvent":
var wfeArgs waitForEventArguments
if err := bson.Unmarshal(operation.Arguments, &wfeArgs); err != nil {
if err := bson.Unmarshal(op.Arguments, &wfeArgs); err != nil {
return fmt.Errorf("error unmarshalling event to waitForEventArguments: %v", err)
}

Expand All @@ -198,7 +223,7 @@ func executeTestRunnerOperation(ctx context.Context, operation *operation, loopD

return waitForEvent(wfeCtx, wfeArgs)
default:
return fmt.Errorf("unrecognized testRunner operation %q", operation.Name)
return fmt.Errorf("unrecognized testRunner operation %q", op.Name)
}
}

Expand Down
6 changes: 6 additions & 0 deletions mongo/integration/unified/unified_spec_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ var (
// the "find" and one for the "getMore", but we send three for both.
"A successful find event with a getmore and the server kills the cursor (<= 4.4)": "See GODRIVER-1773",

// GODRIVER-2577: The following spec tests require canceling ops immediately, but the current logic clears pools
// and cancels in-progress ops after two the heartbeat failures.
"Connection pool clear uses interruptInUseConnections=true after monitor timeout": "Godriver clears after multiple timeout",
"Error returned from connection pool clear with interruptInUseConnections=true is retryable": "Godriver clears after multiple timeout",
"Error returned from connection pool clear with interruptInUseConnections=true is retryable for write": "Godriver clears after multiple timeout",

// TODO(GODRIVER-2843): Fix and unskip these test cases.
"Find operation with snapshot": "Test fails frequently. See GODRIVER-2843",
"Write commands with snapshot session do not affect snapshot reads": "Test fails frequently. See GODRIVER-2843",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
{
"version": 1,
"style": "unit",
"description": "Connections MUST be interrupted as soon as possible (interruptInUseConnections=true)",
"poolOptions": {
"backgroundThreadIntervalMS": 10000
},
"operations": [
{
"name": "ready"
},
{
"name": "checkOut"
},
{
"name": "checkOut",
"label": "conn"
},
{
"name": "clear",
"interruptInUseConnections": true
},
{
"name": "waitForEvent",
"event": "ConnectionPoolCleared",
"count": 1,
"timeout": 1000
},
{
"name": "waitForEvent",
"event": "ConnectionClosed",
"count": 2,
"timeout": 1000
},
{
"name": "close"
}
],
"events": [
{
"type": "ConnectionCheckedOut",
"connectionId": 1,
"address": 42
},
{
"type": "ConnectionCheckedOut",
"connectionId": 2,
"address": 42
},
{
"type": "ConnectionPoolCleared",
"interruptInUseConnections": true
},
{
"type": "ConnectionClosed",
"reason": "stale",
"address": 42
},
{
"type": "ConnectionClosed",
"reason": "stale",
"address": 42
},
{
"type": "ConnectionPoolClosed",
"address": 42
}
],
"ignore": [
"ConnectionCreated",
"ConnectionPoolReady",
"ConnectionReady",
"ConnectionCheckOutStarted",
"ConnectionPoolCreated",
"ConnectionCheckedIn"
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
version: 1
style: unit
description: Connections MUST be interrupted as soon as possible (interruptInUseConnections=true)
poolOptions:
# ensure it's not involved by default
backgroundThreadIntervalMS: 10000
operations:
- name: ready
- name: checkOut
- name: checkOut
label: conn
- name: clear
interruptInUseConnections: true
- name: waitForEvent
event: ConnectionPoolCleared
count: 1
timeout: 1000
- name: waitForEvent
event: ConnectionClosed
count: 2
timeout: 1000
- name: close
events:
- type: ConnectionCheckedOut
connectionId: 1
address: 42
- type: ConnectionCheckedOut
connectionId: 2
address: 42
- type: ConnectionPoolCleared
interruptInUseConnections: true
- type: ConnectionClosed
reason: stale
address: 42
- type: ConnectionClosed
reason: stale
address: 42
- type: ConnectionPoolClosed
address: 42
ignore:
- ConnectionCreated
- ConnectionPoolReady
- ConnectionReady
- ConnectionCheckOutStarted
- ConnectionPoolCreated
- ConnectionCheckedIn
Loading

0 comments on commit 9c6603d

Please sign in to comment.