-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathconnection_manager.go
97 lines (79 loc) · 2.61 KB
/
connection_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package gorabbit
import (
"context"
"time"
)
type connectionManager struct {
// consumerConnection holds the independent consuming connection.
consumerConnection *amqpConnection
// publisherConnection holds the independent publishing connection.
publisherConnection *amqpConnection
// marshaller holds the marshaller used to encode messages.
marshaller Marshaller
}
// newConnectionManager instantiates a new connectionManager with given arguments.
func newConnectionManager(
ctx context.Context,
uri string,
connectionName string,
keepAlive bool,
retryDelay time.Duration,
maxRetry uint,
publishingCacheSize uint64,
publishingCacheTTL time.Duration,
logger logger,
marshaller Marshaller,
) *connectionManager {
if connectionName == "" {
connectionName = libraryName
}
c := &connectionManager{
consumerConnection: newConsumerConnection(
ctx, uri, connectionName, keepAlive, retryDelay, logger, marshaller,
),
publisherConnection: newPublishingConnection(
ctx, uri, connectionName, keepAlive, retryDelay, maxRetry,
publishingCacheSize, publishingCacheTTL, logger, marshaller,
),
marshaller: marshaller,
}
return c
}
// close offers the basic connection and channel close() mechanism but with extra higher level checks.
func (c *connectionManager) close() error {
if err := c.publisherConnection.close(); err != nil {
return err
}
return c.consumerConnection.close()
}
// isReady returns true if both consumerConnection and publishingConnection are ready.
func (c *connectionManager) isReady() bool {
if c.publisherConnection == nil || c.consumerConnection == nil {
return false
}
return c.publisherConnection.ready() && c.consumerConnection.ready()
}
// isHealthy returns true if both consumerConnection and publishingConnection are healthy.
func (c *connectionManager) isHealthy() bool {
if c.publisherConnection == nil || c.consumerConnection == nil {
return false
}
return c.publisherConnection.healthy() && c.consumerConnection.healthy()
}
// registerConsumer registers a new MessageConsumer.
func (c *connectionManager) registerConsumer(consumer MessageConsumer) error {
if c.consumerConnection == nil {
return errConsumerConnectionNotInitialized
}
return c.consumerConnection.registerConsumer(consumer)
}
func (c *connectionManager) publish(exchange, routingKey string, payload interface{}, options *PublishingOptions) error {
if c.publisherConnection == nil {
return errPublisherConnectionNotInitialized
}
payloadBytes, err := c.marshaller.Marshal(payload)
if err != nil {
return err
}
return c.publisherConnection.publish(exchange, routingKey, payloadBytes, options)
}