diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..d6f92b0 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,60 @@ +name: CI + +on: [push] + +jobs: + test-library: + runs-on: ubuntu-latest + strategy: + matrix: + go-version: [ '1.19', '1.20', '1.21.x' ] + + steps: + - uses: actions/checkout@v4 + + - name: Setup Go + uses: actions/setup-go@v4 + with: + go-version: ${{ matrix.go-version }} + + - name: Install dependencies + run: go get . + + - name: Go formatting analysis + run: | + if [ -n "$(gofmt -l .)" ]; then + gofmt -d . + exit 1 + fi + + - name: Go code quality analysis + run: go vet ./... + + - name: Go unit testing + run: | + go test -race $(go list ./... | grep -v /vendor/) -v -coverprofile=coverage.out + go tool cover -func=coverage.out + + - name: Upload coverage results + uses: actions/upload-artifact@v3 + with: + name: coverage + path: coverage.out + + test-lint: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Setup Go + uses: actions/setup-go@v4 + with: + go-version: ${{ matrix.go-version }} + + - name: Install golangci-lint + run: | + curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s v1.52.2 + + - name: Run golangci-lint + run: ./bin/golangci-lint run -v diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..f0f728f --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,373 @@ +## Golden config for golangci-lint v1.52.2 +# +# This is the best config for golangci-lint based on my experience and opinion. +# It is very strict, but not extremely strict. +# Feel free to adopt and change it for your needs. + +run: + timeout: 1m # default 1m + +linters-settings: + errcheck: + disable-default-exclusions: false # disable built-in exclude list # default: false + check-type-assertions: true # default false + check-blank: false # default false + ignore: "fmt:.*" # default fmt:.* + exclude-functions: [] # see https://github.com/kisielk/errcheck#excluding-functions for details # default [] + gosimple: + go: "1.20" # default 1.13 + checks: [ "*" ] # https://staticcheck.io/docs/options#checks # default ["*"] + govet: + enable-all: true + disable: + - fieldalignment # too strict + settings: + shadow: + strict: true # default false + staticcheck: + go: "1.20" # default 1.13 + checks: + - "*" # https://staticcheck.io/docs/options#checks # default ["*"] + - "-SA1019" # Removed DEPRACATED subgroup to be able to use "deprecated" tag only in structs. + structcheck: + exported-fields: false # default false + unused: + check-exported: false # default false # TODO: enable after fixing false positives + varcheck: + exported-fields: false # default false # TODO: enable after fixing false positives + + bidichk: + # The following configurations check for all mentioned invisible unicode runes. + # All runes are enabled by default. + left-to-right-embedding: true # default true + right-to-left-embedding: true # default true + pop-directional-formatting: true # default true + left-to-right-override: true # default true + right-to-left-override: true # default true + left-to-right-isolate: true # default true + right-to-left-isolate: true # default true + first-strong-isolate: true # default true + pop-directional-isolate: true # default true + cyclop: + max-complexity: 30 # the maximal code complexity to report # default 10 + package-average: 10.0 # the maximal average package complexity. If it's higher than 0.0 (float) the check is enabled # default 0.0 + skip-tests: false # should ignore tests # default false + dupl: + threshold: 150 # default 150 + errorlint: + # Check whether fmt.Errorf uses the %w verb for formatting errors. See the readme for caveats + errorf: true # default true + # Check for plain type assertions and type switches + asserts: true # default true + # Check for plain error comparisons + comparison: true # default true + exhaustive: + check-generated: false # indicates whether to check switch statements in generated Go source files # default false + default-signifies-exhaustive: false # if true, presence of "default" case in switch statements satisfies exhaustiveness, even if all enum members are not listed # default false + ignore-enum-members: "" # enum members matching the supplied regex do not have to be listed in switch statements to satisfy exhaustiveness # default "" + package-scope-only: false # consider enums only in package scopes, not in inner scopes # default false + # forbidigo: + # forbid: # forbid the following identifiers # default ^(fmt\.Print(|f|ln)|print|println)$ + # - ^(fmt\.Print(|f|ln)|print|println)$ + # exclude-godoc-examples: true # exclude godoc examples from forbidigo checks # default is true + funlen: + lines: 100 # default 60 + statements: 50 # default 40 + gocognit: + min-complexity: 20 # minimal code complexity to report, 30 by default (but we recommend 10-20) + goconst: + match-constant: true # look for existing constants matching the values # default true + min-len: 3 # minimal length of string constant # default 3 + min-occurrences: 3 # minimum occurrences of constant string count to trigger issue # default 3 + numbers: true # search also for duplicated numbers # default false + min: 3 # minimum value, only works with goconst.numbers # default 3 + max: 3 # maximum value, only works with goconst.numbers # default 3 + ignore-calls: true # ignore when constant is not used as function argument # default true + ignore-tests: false # ignore test files # default false + gocritic: + settings: + captLocal: + paramsOnly: false # whether to restrict checker to params only # default true + elseif: + skipBalanced: false # whether to skip balanced if-else pairs # default true + #hugeParam: # disabled by default + # sizeThreshold: 80 # size in bytes that makes the warning trigger # default 80 + #nestingReduce: # disabled by default + # bodyWidth: 5 # min number of statements inside a branch to trigger a warning # default 5 + #rangeExprCopy: # disabled by default + # sizeThreshold: 512 # size in bytes that makes the warning trigger # default 512 + # skipTestFuncs: true # whether to check test functions # default true + #rangeValCopy: # disabled by default + # sizeThreshold: 128 # size in bytes that makes the warning trigger # default 128 + # skipTestFuncs: true # whether to check test functions # default true + #ruleguard: # disabled by default + # rules: "" # path to a gorules file # default "" + #tooManyResultsChecker: # disabled by default + # maxResults: 5 # maximum number of results # default 5 + #truncateCmp: # disabled by default + # skipArchDependent: true # whether to skip int/uint/uintptr types # default true + underef: + skipRecvDeref: false # whether to skip (*x).method() calls where x is a pointer receiver # default true + #unnamedResult: # disabled by default + # checkExported: false # whether to check exported functions # default false + gocyclo: + min-complexity: 30 # default 30 + godot: + scope: declarations # comments to be checked: `declarations` (default), `toplevel`, or `all` + exclude: [] # list of regexps for excluding particular comment lines from check # default [] + capital: false # check that each sentence starts with a capital letter # default false + period: true # check that each sentence ends with a period # default true + gomnd: + # List of enabled checks, see https://github.com/tommy-muehle/go-mnd/#checks for description. + checks: # default argument,case,condition,operation,return,assign + - argument + - case + - condition + - operation + - return + - assign + # List of numbers to exclude from analysis. The numbers should be written as string. + # Following values always ignored: "1", "1.0", "0" and "0.0" + ignored-numbers: [] # default [] + # List of file patterns to exclude from analysis. + # Following values always ignored: `.+_test.go` + ignored-files: [] # default [] + # List of function patterns to exclude from analysis. + # Following functions always ignored: `time.Date` + ignored-functions: ["strconv.ParseUint"] # default [] + gomoddirectives: + replace-allow-list: [] # list of allowed `replace` directives # default [] + replace-local: false # allow local `replace` directives # default false + exclude-forbidden: false # forbid the use of `exclude` directives # default false + retract-allow-no-explanation: false # allow to use `retract` directives without explanation # default false + gomodguard: + allowed: + modules: [] # default [] + domains: [] # default [] + blocked: + modules: + - github.com/golang/protobuf: + recommendations: + - google.golang.org/protobuf + reason: "see https://developers.google.com/protocol-buffers/docs/reference/go/faq#modules" + - github.com/satori/go.uuid: + recommendations: + - github.com/google/uuid + reason: "satori's package is not maintained" + - github.com/gofrs/uuid: + recommendations: + - github.com/google/uuid + reason: "see recommendation from dev-infra team: https://confluence.gtforge.com/x/gQI6Aw" + versions: [] # default [] + local_replace_directives: true # default false + lll: + line-length: 120 # default 120 + makezero: + always: false # default false + maligned: + suggest-new: true # default false + misspell: + locale: US + ignore-words: + - "" # default: "" + nakedret: + max-func-lines: 0 # default 30 + nestif: + min-complexity: 4 # default 5 + nilnil: + checked-types: # default [ptr, func, iface, map, chan] + - ptr + - func + - iface + - map + - chan + nolintlint: + allow-unused: false # default false + allow-leading-space: true # default true + allow-no-explanation: [funlen, gocognit, lll] # default [] + require-explanation: true # default false + require-specific: true # default false + prealloc: + simple: false # default true + range-loops: true # default true + for-loops: false # default false + predeclared: + ignore: "" # comma-separated list of predeclared identifiers to not report on # default "" + q: false # include method names and field names (i.e., qualified names) in checks # default false + promlinter: + # Promlinter cannot infer all metrics name in static analysis. + # Enable strict mode will also include the errors caused by failing to parse the args. + strict: false # default false + # Please refer to https://github.com/yeya24/promlinter#usage for detailed usage. + # disabled-linters: + # - "Help" + # - "MetricUnits" + # - "Counter" + # - "HistogramSummaryReserved" + # - "MetricTypeInName" + # - "ReservedChars" + # - "CamelCase" + # - "lintUnitAbbreviations" + revive: + # default rules are ignored if any of following settings is defined + #max-open-files: 0 # maximum number of open files at the same time # defaults 0 - unlimited + #ignore-generated-header: false # when set to false, ignores files with "GENERATED" header, similar to golint # default false + #confidence: 0.3 # default failure confidence, this means that linting errors with less than X confidence will be ignored # default 0.8 + #severity: "warning" # minimal rule severity to fail {"error", "warning"} # default "warning" + #enable-all-rules: false # default false + # There is a list of default rules, but it can be redefined, see https://github.com/mgechev/revive#available-rules + rules: + - name: "var-naming" + arguments: [["ID"], []] + # - name: "xxx" + # disabled: false + # arguments: [] + # severity: "xxx" + # allows to redefine rule severity (without changing default rules list) + #directives: + # - name: "xxx" + # severity: "xxx" + rowserrcheck: + packages: + - database/sql + - github.com/jmoiron/sqlx + # stylecheck: + # go: "1.18" # default 1.13 + # checks: [ "*" ] # https://staticcheck.io/docs/options#checks # default ["*"] + # dot-import-whitelist: [] # https://staticcheck.io/docs/options#dot_import_whitelist # default [] + # initialisms: [ "ACL", "API", "ASCII", "CPU", "CSS", "DNS", "EOF", "GUID", "HTML", "HTTP", "HTTPS", "ID", "IP", "JSON", "QPS", "RAM", "RPC", "SLA", "SMTP", "SQL", "SSH", "TCP", "TLS", "TTL", "UDP", "UI", "GID", "UID", "UUID", "URI", "URL", "UTF8", "VM", "XML", "XMPP", "XSRF", "XSS" ] # https://staticcheck.io/docs/options#initialisms + # http-status-code-whitelist: [ "200", "400", "404", "500" ] # https://staticcheck.io/docs/options#http_status_code_whitelist + tenv: + all: true # check all functions in _test.go, not only test functions # default false + testpackage: + skip-regexp: (export|internal)_test\.go # default (export|internal)_test\.go + unparam: + check-exported: true # default false + wrapcheck: + ignoreSigs: [] # specifies substrings of signatures to ignore. Overrides default https://github.com/tomarrell/wrapcheck#configuration # default [] + ignoreSigRegexps: [] # this is similar to the ignoreSigs, but gives slightly more flexibility # default [] + ignorePackageGlobs: [] # see https://github.com/tomarrell/wrapcheck#configuration # default [] + +linters: + disable-all: true + enable: + ## enabled by default + # - deadcode + - errcheck + - gosimple + - govet + - ineffassign + - staticcheck + # - structcheck + - typecheck + - unused + # - varcheck + ## disabled by default + - asciicheck + - bidichk + - bodyclose + # - containedctx + - contextcheck + - cyclop + - decorder + - dogsled + - dupl + - durationcheck + - errchkjson + - errname + - errorlint + - exhaustive + - exportloopref + - funlen + # gochecknoglobals + - gochecknoinits + # - gocognit # NEED + - goconst + - gocritic + - gocyclo + - godot + # - godox + - goimports + - gomnd + - gomoddirectives + - gomodguard + - goprintffuncname + - gosec + - grouper + # - ifshort + - importas + # - lll # long long lines + - maintidx + - makezero + - nakedret + - nestif + - nilerr + - nilnil + - noctx + - nolintlint + - prealloc + - predeclared + - promlinter + - revive + - rowserrcheck + - sqlclosecheck + - tenv + - testpackage + - thelper + - tparallel + - unconvert + - unparam + - wastedassign + - whitespace + - wsl + ## disabled + #- depguard # replaced with gomodguard + #- exhaustivestruct # too strict - finds structs that have uninitialized fields # TODO: maybe enable for some packages? + #- forbidigo # Forbids identifiers + #- forcetypeassert # errcheck is used instead + #- gci # is not used - sorts imports + #- goerr113 # too strict - checks the errors handling expressions + #- gofumpt # replaced with goimports, gofumports is not available yet + #- goheader # is not used - checks that each file has the licence at the beginning + #- golint # deprecated - revive is used instead + #- interfacer # deprecated and has false positives + #- ireturn # good, but too strict - accept interfaces, return concrete types + #- maligned # deprecated + #- misspell # useless - correct commonly misspelled English words... quickly + #- nlreturn # too strict - requires a new line before return and branch statements + #- paralleltest # too many false positives + #- scopelint # deprecated + #- stylecheck # revive does the same + #- tagliatelle # is not used - checks the struct tags + #- wrapcheck # too strict - requires wrapping errors from external packages (even from the same repo) and interfaces + #- varnamelen # great idea, but too many false positives - checking length of variable's name matches its usage scope + +output: + uniq-by-line: false # default true + +issues: + max-issues-per-linter: 0 + max-same-issues: 0 + exclude-rules: + - source: "^//\\s*go:generate\\s" + linters: + - lll + - source: "(noinspection|TODO)" + linters: + - godot + - source: "//noinspection" + linters: + - gocritic + - source: "^\\s+if _, ok := err\\.\\([^.]+\\.InternalError\\); ok {" + linters: + - errorlint + - path: "_test\\.go" + linters: + - bodyclose + - dupl + - funlen + - gochecknoinits + - goconst + - noctx + - wrapcheck + - wsl diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md new file mode 100644 index 0000000..11ee12d --- /dev/null +++ b/CODE_OF_CONDUCT.md @@ -0,0 +1,136 @@ +# Contributor Covenant Code of Conduct + +## Our Pledge + +We as members, contributors, and leaders pledge to make participation in our +community a harassment-free experience for everyone, regardless of age, body +size, visible or invisible disability, ethnicity, sex characteristics, gender +identity and expression, level of experience, education, socio-economic status, +nationality, personal appearance, race, religion, or sexual identity +and orientation. + +We pledge to act and interact in ways that contribute to an open, welcoming, +diverse, inclusive, and healthy community. + +## Our Standards + +Examples of behavior that contributes to a positive environment for our +community include: + +* Demonstrating empathy and kindness toward other people +* Being respectful of differing opinions, viewpoints, and experiences +* Giving and gracefully accepting constructive feedback +* Accepting responsibility and apologizing to those affected by our mistakes, + and learning from the experience +* Focusing on what is best not just for us as individuals, but for the + overall community + +Examples of unacceptable behavior include: + +* The use of sexualized language or imagery, and sexual attention or + advances of any kind +* Trolling, insulting or derogatory comments, and personal or political attacks +* Public or private harassment +* Publishing others' private information, such as a physical or email + address, without their explicit permission +* Other conduct which could reasonably be considered inappropriate in a + professional setting + +## Enforcement Responsibilities + +Community leaders are responsible for clarifying and enforcing our standards of +acceptable behavior and will take appropriate and fair corrective action in +response to any behavior that they deem inappropriate, threatening, offensive, +or harmful. + +Community leaders have the right and responsibility to remove, edit, or reject +comments, commits, code, wiki edits, issues, and other contributions that are +not aligned to this Code of Conduct, and will communicate reasons for moderation +decisions when appropriate. + +## Scope + +This Code of Conduct applies within all community spaces, and also applies when +an individual is officially representing the community in public spaces. +Examples of representing our community include using an official email address, +posting via an official social media account, or acting as an appointed +representative at an online or offline event. + +## Enforcement + +Instances of abusive, harassing, or otherwise unacceptable behavior may be +reported to the community leaders responsible for enforcement at +[INSERT CONTACT METHOD]. +All complaints will be reviewed and investigated promptly and fairly. + +All community leaders are obligated to respect the privacy and security of the +reporter of any incident. + +## Enforcement Guidelines + +Community leaders will follow these Community Impact Guidelines in determining +the consequences for any action they deem in violation of this Code of Conduct: + +### 1. Correction + +**Community Impact**: Use of inappropriate language or other behavior deemed +unprofessional or unwelcome in the community. + +**Consequence**: A private, written warning from community leaders, providing +clarity around the nature of the violation and an explanation of why the +behavior was inappropriate. A public apology may be requested. + +### 2. Warning + +**Community Impact**: A violation through a single incident or series +of actions. + +**Consequence**: A warning with consequences for continued behavior. No +interaction with the people involved, including unsolicited interaction with +those enforcing the Code of Conduct, for a specified period of time. This +includes avoiding interactions in community spaces as well as external channels +like social media. Violating these terms may lead to a temporary or +permanent ban. + +### 3. Temporary Ban + +**Community Impact**: A serious violation of community standards, including +sustained inappropriate behavior. + +**Consequence**: A temporary ban from any sort of interaction or public +communication with the community for a specified period of time. No public or +private interaction with the people involved, including unsolicited interaction +with those enforcing the Code of Conduct, is allowed during this period. +Violating these terms may lead to a permanent ban. + +### 4. Permanent Ban + +**Community Impact**: Demonstrating a pattern of violation of community +standards, including sustained inappropriate behavior, harassment of an +individual, or aggression toward or disparagement of classes of individuals. + +**Consequence**: A permanent ban from any sort of public interaction within +the community. + +## Attribution + +This Code of Conduct is adapted from the [Contributor Covenant][homepage], +version 2.0, available at +[https://www.contributor-covenant.org/version/2/0/code_of_conduct.html][v2.0]. + +Community Impact Guidelines were inspired by +[Mozilla's code of conduct enforcement ladder][Mozilla CoC]. + +For answers to common questions about this code of conduct, see the FAQ at +[https://www.contributor-covenant.org/faq][FAQ]. Translations are available +at [https://www.contributor-covenant.org/translations][translations]. + +[homepage]: https://www.contributor-covenant.org + +[v2.0]: https://www.contributor-covenant.org/version/2/0/code_of_conduct.html + +[Mozilla CoC]: https://github.com/mozilla/diversity + +[FAQ]: https://www.contributor-covenant.org/faq + +[translations]: https://www.contributor-covenant.org/translations diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..ffea26c --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021-2023 KardinalAI + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..5f3bf0a --- /dev/null +++ b/README.md @@ -0,0 +1,522 @@ +# Gorabbit + +

+ logo +

+ +Gorabbit is a wrapper that provides high level and robust RabbitMQ operations through a client or a manager. + +This wrapper depends on the official [Go RabbitMQ plugin](https://github.com/rabbitmq/amqp091-go). + +* [Installation](#installation) + * [Go Module](#go-module) + * [Environment Variables](#environment-variables) +* [Always On Mechanism](#always-on-mechanism) +* [Client](#client) + * [Initialization](#client-initialization) + * [Options](#client-options) + * [Default Options](#client-with-default-options) + * [Custom Options](#client-with-custom-options) + * [Builder](#client-options-using-the-builder) + * [Struct](#client-options-using-struct-initialization) + * [Disconnection](#client-disconnection) + * [Publishing](#publishing) + * [Consuming](#consuming) + * [Ready and Health Checks](#ready-and-health-checks) +* [Manager](#manager) + * [Initialization](#manager-initialization) + * [Options](#manager-options) + * [Default Options](#manager-with-default-options) + * [Custom Options](#manager-with-custom-options) + * [Builder](#manager-options-using-the-builder) + * [Struct](#manager-options-using-struct-initialization) + * [Disconnection](#manager-disconnection) + * [Operations](#manager-operations) + * [Exchange Creation](#exchange-creation) + * [Queue Creation](#queue-creation) + * [Binding Creation](#binding-creation) + * [Message Count](#queue-messages-count) + * [Push Message](#push-message) + * [Pop Message](#pop-message) + * [Purge Queue](#purge-queue) + * [Delete Queue](#delete-queue) + * [Delete Exchange](#delete-exchange) + * [Setup From Definitions](#setup-from-schema-definition-file) + +## Installation + +### Go module + +```bash +go get github.com/KardinalAI/gorabbit/v1 +``` + +### Environment variables + +The client's and manager's `Mode` can also be set via an environment variable that will **override** the manually +entered value. + +```dotenv +GORABBIT_MODE: debug # possible values: release or debug +``` + +The client and manager can also be completely disabled via the following environment variable: + +```dotenv +GORABBIT_DISABLED: true # possible values: true, false, 1, or 0 +``` + +## Always-on mechanism + +Here is a visual representation of the always-on mechanism of a connection and channel when the `KeepAlive` flag is set +to true. + +![Always on mechanism](assets/always-on-mechanism.png) + +## Client + +The gorabbit client offers 2 main functionalities: + +* Publishing +* Consuming + +Additionally, the client also provides a ready check and a health check. + +### Client initialization + +A client can be initialized via the constructor `NewClient`. This constructor takes `ClientOptions` as an optional +parameter. + +### Client options + +| Property | Description | Default Value | +|---------------------|---------------------------------------------------------|---------------| +| Host | The hostname of the RabbitMQ server | 127.0.0.1 | +| Port | The port of the RabbitMQ server | 5672 | +| Username | The plain authentication username | guest | +| Password | The plain authentication password | guest | +| Vhost | The specific vhost to use when connection to CloudAMQP | | +| UseTLS | The flag that activates the use of TLS (amqps) | false | +| KeepAlive | The flag that activates retry and re-connect mechanisms | true | +| RetryDelay | The delay between each retry and re-connection | 3 seconds | +| MaxRetry | The max number of message retry if it failed to process | 5 | +| PublishingCacheTTL | The time to live for a failed publish when set in cache | 60 seconds | +| PublishingCacheSize | The max number of failed publish to add into cache | 128 | +| Mode | The mode defines whether logs are shown or not | Release | + +### Client with default options + +Passing `nil` options will trigger the client to use default values (host, port, credentials, etc...) +via `DefaultClientOptions()`. + +```go +client := gorabbit.NewClient(nil) +``` + +You can also explicitly pass `DefaultClientOptions()` for a cleaner initialization. + +```go +client := gorabbit.NewClient(gorabbit.DefaultClientOptions()) +``` + +Finally, passing a `NewClientOptions()` method also initializes default values if not overwritten. + +```go +client := gorabbit.NewClient(gorabbit.NewClientOptions()) +``` + +### Client with options from environment variables + +You can instantiate a client from environment variables, without the need of manually specifying options in the code. + +```go +client := gorabbit.NewClientFromEnv() +``` + +Here are the following supported environment variables: + +* `RABBITMQ_HOST`: Defines the host, +* `RABBITMQ_PORT`: Defines the port, +* `RABBITMQ_USERNAME`: Defines the username, +* `RABBITMQ_PASSWORD`: Defines the password, +* `RABBITMQ_VHOST`: Defines the vhost, +* `RABBITMQ_USE_TLS`: Defines whether to use TLS or no. + +**Note that environment variables are all optional, so missing keys will be replaced by their corresponding default.** + +### Client with custom options + +We can input custom values for a specific property, either via the built-in builder or via direct struct initialization. + +#### Client options using the builder + +`NewClientOptions()` and `DefaultClientOptions()` both return an instance of `*ClientOptions` that can act as a builder. + +```go +options := gorabbit.NewClientOptions(). + SetMode(gorabbit.Debug). + SetCredentials("root", "password"). + SetRetryDelay(5 * time.Second) + +client := gorabbit.NewClient(options) +``` + +> :information_source: There is a setter method for each property. + +#### Client options using struct initialization + +`ClientOptions` is an exported type, so it can be used directly. + +```go +options := gorabbit.ClientOptions { + Host: "localhost", + Port: 5673, + Username: "root", + Password: "password", + ... +} + +client := gorabbit.NewClient(&options) +``` + +> :warning: Direct initialization via the struct **does not use default values on missing properties**, so be sure to +> fill in every property available. + +### Client disconnection + +When a client is initialized, to prevent a leak, always disconnect it when no longer needed. + +```go +client := gorabbit.NewClient(gorabbit.DefaultClientOptions()) +defer client.Disconnect() +``` + +### Publishing + +To send a message, the client offers two simple methods: `Publish` and `PublishWithOptions`. The required arguments for +publishing are: + +* Exchange (which exchange the message should be sent to) +* Routing Key +* Payload (`interface{}`, the object will be marshalled internally) + +Example of sending a simple string + +```go +err := client.Publish("events_exchange", "event.foo.bar.created", "foo string") +``` + +Example of sending an object + +```go +type foo struct { + Action string +} + +err := client.Publish("events_exchange", "event.foo.bar.created", foo{Action: "bar"}) +``` + +Optionally, you can set the message's `Priority` and `DeliveryMode` via the `PublishWithOptions` method. + +```go +options := gorabbit.SendOptions(). + SetPriority(gorabbit.PriorityMedium). + SetDeliveryMode(gorabbit.Persistent) + +err := client.PublishWithOptions("events_exchange", "event.foo.bar.created", "foo string", options) +``` + +> :information_source: If the `KeepAlive` flag is set to true when initializing the client, failed publishing will be +> cached once +> and re-published as soon as the channel is back up. +> +> ![publishing safeguard](assets/publishing-safeguard.png) + +### Consuming + +To consume messages, gorabbit offers a very simple asynchronous consumer method `Consume` that takes a `MessageConsumer` +as argument. Error handling, acknowledgement, negative acknowledgement and rejection are all done internally by the +consumer. + +```go +err := client.RegisterConsumer(gorabbit.MessageConsumer{ + Queue: "events_queue", + Name: "toto_consumer", + PrefetchSize: 0, + PrefetchCount: 10, + AutoAck: false, + ConcurrentProcess: false, + Handlers: gorabbit.MQTTMessageHandlers{ + "event.foo.bar.created": func (payload []byte) error { + fmt.Println(string(payload)) + + return nil + }, + }, +}) +``` + +* Queue: The queue to consume messages from +* Name: Unique identifier for the consumer +* PrefetchSize: The maximum size of messages that can be processed at the same time +* PrefetchCount: The maximum number of messages that can be processed at the same time +* AutoAck: Automatic acknowledgement of messages upon reception +* ConcurrentProcess: Asynchronous handling of deliveries +* Handlers: A list of handlers for specified routes + +**NB:** [RabbitMQ Wildcards](https://www.cloudamqp.com/blog/rabbitmq-topic-exchange-explained.html) are also supported. +If multiple routing keys have the same handler, a wildcard can be used, for example: +`event.foo.bar.*` or `event.foo.#`. + +> :information_source: If the `KeepAlive` flag is set to true when initializing the client, consumers will +> auto-reconnect after a connection loss. +> This mechanism is indefinite and therefore, consuming from a non-existent queue will trigger an error repeatedly but +> will not affect +> other consumptions. This is because each consumer has its **own channel**. +> +> ![consumer safeguard](assets/consumer-safeguard.png) + +### Ready and Health checks + +The client offers `IsReady()` and `IsHealthy()` checks that can be used for monitoring. + +**Ready:** Verifies that connections are opened and ready to launch new operations. + +**Healthy:** Verifies that both connections and channels are opened, ready and ongoing operations are working +(Consumers are consuming). + +## Manager + +The gorabbit manager offers multiple management operations: + +* Exchange, queue and bindings creation +* Exchange and queue deletion +* Queue evaluation: Exists, number of messages +* Queue operations: Pop message, push message, purge + +> :warning: A manager should only be used for either testing RabbitMQ functionalities or setting up a RabbitMQ server. +> The manager does not provide robust mechanisms of retry and reconnection like the client. + +### Manager initialization + +A manager can be initialized via the constructor `NewManager`. This constructor takes `ManagerOptions` as an optional +parameter. + +### Manager options + +| Property | Description | Default Value | +|---------------------|---------------------------------------------------------|---------------| +| Host | The hostname of the RabbitMQ server | 127.0.0.1 | +| Port | The port of the RabbitMQ server | 5672 | +| Username | The plain authentication username | guest | +| Password | The plain authentication password | guest | +| Vhost | The specific vhost to use when connection to CloudAMQP | | +| UseTLS | The flag that activates the use of TLS (amqps) | false | +| Mode | The mode defines whether logs are shown or not | Release | + +### Manager with default options + +Passing `nil` options will trigger the manager to use default values (host, port, credentials, etc...) +via `DefaultManagerOptions()`. + +```go +manager := gorabbit.NewManager(nil) +``` + +You can also explicitly pass `DefaultManagerOptions()` for a cleaner initialization. + +```go +manager := gorabbit.NewManager(gorabbit.DefaultManagerOptions()) +``` + +Finally, passing a `NewManagerOptions()` method also initializes default values if not overwritten. + +```go +manager := gorabbit.NewManager(gorabbit.NewManagerOptions()) +``` + +### Manager with options from environment variables + +You can instantiate a manager from environment variables, without the need of manually specifying options in the code. + +```go +manager := gorabbit.NewManagerFromEnv() +``` + +Here are the following supported environment variables: + +* `RABBITMQ_HOST`: Defines the host, +* `RABBITMQ_PORT`: Defines the port, +* `RABBITMQ_USERNAME`: Defines the username, +* `RABBITMQ_PASSWORD`: Defines the password, +* `RABBITMQ_VHOST`: Defines the vhost, +* `RABBITMQ_USE_TLS`: Defines whether to use TLS or no. + +**Note that environment variables are all optional, so missing keys will be replaced by their corresponding default.** + +### Manager with custom options + +We can input custom values for a specific property, either via the built-in builder or via direct struct initialization. + +#### Manager options using the builder + +`NewManagerOptions()` and `DefaultManagerOptions()` both return an instance of `*ManagerOptions` that can act as a +builder. + +```go +options := gorabbit.NewManagerOptions(). + SetMode(gorabbit.Debug). + SetCredentials("root", "password") + +manager := gorabbit.NewManager(options) +``` + +> :information_source: There is a setter method for each property. + +#### Manager options using struct initialization + +`ManagerOptions` is an exported type, so it can be used directly. + +```go +options := gorabbit.ManagerOptions { + Host: "localhost", + Port: 5673, + Username: "root", + Password: "password", + Mode: gorabbit.Debug, +} + +manager := gorabbit.NewManager(&options) +``` + +> :warning: Direct initialization via the struct **does not use default values on missing properties**, so be sure to +> fill in every property available. + +### Manager disconnection + +When a manager is initialized, to prevent a leak, always disconnect it when no longer needed. + +```go +manager := gorabbit.NewManager(gorabbit.DefaultManagerOptions()) +defer manager.Disconnect() +``` + +### Manager operations + +The manager offers all necessary operations to manager a RabbitMQ server. + +#### Exchange creation + +Creates an exchange with optional arguments. + +```go +err := manager.CreateExchange(gorabbit.ExchangeConfig{ + Name: "events_exchange", + Type: gorabbit.ExchangeTypeTopic, + Persisted: false, + Args: nil, +}) +``` + +#### Queue creation + +Creates a queue with optional arguments and bindings if declared. + +```go +err := manager.CreateQueue(gorabbit.QueueConfig{ + Name: "events_queue", + Durable: false, + Exclusive: false, + Args: nil, + Bindings: &[]gorabbit.BindingConfig{ + { + RoutingKey: "event.foo.bar.created", + Exchange: "events_exchange", + }, + }, +}) +``` + +#### Binding creation + +Binds a queue to an exchange via a given routing key. + +```go +err := manager.BindExchangeToQueueViaRoutingKey("events_exchange", "events_queue", "event.foo.bar.created") +``` + +#### Queue messages count + +Returns the number of messages in a queue, or an error if the queue does not exist. This method can also evaluate the +existence of a queue. + +```go +messageCount, err := manager.GetNumberOfMessages("events_queue") +``` + +#### Push message + +Pushes a single message to a given exchange. + +```go +err := manager.PushMessageToExchange("events_exchange", "event.foo.bar.created", "single_message_payload") +``` + +#### Pop message + +Retrieves a single message from a given queue and auto acknowledges it if `autoAck` is set to true. + +```go +message, err := manager.PopMessageFromQueue("events_queue", true) +``` + +#### Purge queue + +Deletes all messages from a given queue. + +```go +err := manager.PurgeQueue("events_queue") +``` + +#### Delete queue + +Deletes a given queue. + +```go +err := manager.DeleteQueue("events_queue") +``` + +#### Delete exchange + +Deletes a given exchange. + +```go +err := manager.DeleteExchange("events_exchange") +``` + +#### Setup from schema definition file + +You can setup exchanges, queues and bindings automatically by referencing a +[RabbitMQ Schema Definition](assets/definitions.example.json) JSON file. + +```go +err := manager.SetupFromDefinitions("/path/to/definitions.json") +``` + +> :warning: The standard RabbitMQ definitions file contains configurations for +> `users`, `vhosts` and `permissions`. Those configurations are not taken into consideration +> in the `SetupFromDefinitions` method. + +## Launch Local RabbitMQ Server + +To run a local rabbitMQ server quickly with a docker container, simply run the following command: + +```bash +docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management +``` + +It will launch a local RabbitMQ server mapped on port 5672, and the management dashboard will be mapped on +port 15672 accessible on localhost:15672 with a username "guest" and password "guest". + +## License +**Gorabbit** is licensed under the [MIT](LICENSE). \ No newline at end of file diff --git a/assets/always-on-mechanism.png b/assets/always-on-mechanism.png new file mode 100644 index 0000000..72f0741 Binary files /dev/null and b/assets/always-on-mechanism.png differ diff --git a/assets/consumer-safeguard.png b/assets/consumer-safeguard.png new file mode 100644 index 0000000..2af89ec Binary files /dev/null and b/assets/consumer-safeguard.png differ diff --git a/assets/definitions.example.json b/assets/definitions.example.json new file mode 100644 index 0000000..2787da1 --- /dev/null +++ b/assets/definitions.example.json @@ -0,0 +1,56 @@ +{ + "exchanges": [ + { + "name": "foo_exchange", + "vhost": "/", + "type": "topic", + "durable": true, + "auto_delete": false, + "internal": false, + "arguments": {} + }, + { + "name": "bar_exchange", + "vhost": "/", + "type": "topic", + "durable": true, + "auto_delete": false, + "internal": false, + "arguments": {} + } + ], + "queues": [ + { + "name": "foo", + "vhost": "/", + "durable": true, + "auto_delete": false, + "arguments": {} + }, + { + "name": "bar", + "vhost": "/", + "durable": true, + "auto_delete": false, + "arguments": {} + } + ], + "bindings": [ + { + "source": "foo_exchange", + "vhost": "/", + "destination": "foo", + "destination_type": "queue", + "routing_key": "event.foo.#", + "arguments": {} + }, + { + "source": "bar_exchange", + "vhost": "/", + "destination": "bar", + "destination_type": "queue", + "routing_key": "event.bar.#", + "arguments": {} + } + ] +} diff --git a/assets/gorabbit-logo-lg.jpg b/assets/gorabbit-logo-lg.jpg new file mode 100644 index 0000000..9c5fd96 Binary files /dev/null and b/assets/gorabbit-logo-lg.jpg differ diff --git a/assets/gorabbit-logo-md.jpg b/assets/gorabbit-logo-md.jpg new file mode 100644 index 0000000..887fac0 Binary files /dev/null and b/assets/gorabbit-logo-md.jpg differ diff --git a/assets/gorabbit-logo-sm.jpg b/assets/gorabbit-logo-sm.jpg new file mode 100644 index 0000000..6f87576 Binary files /dev/null and b/assets/gorabbit-logo-sm.jpg differ diff --git a/assets/publishing-safeguard.png b/assets/publishing-safeguard.png new file mode 100644 index 0000000..4b4aecc Binary files /dev/null and b/assets/publishing-safeguard.png differ diff --git a/channel.go b/channel.go new file mode 100644 index 0000000..5bfd773 --- /dev/null +++ b/channel.go @@ -0,0 +1,596 @@ +package gorabbit + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + + amqp "github.com/rabbitmq/amqp091-go" +) + +// amqpChannels is a simple wrapper of an amqpChannel slice. +type amqpChannels []*amqpChannel + +// publishingChannel loops through all channels and returns the first available publisher channel if it exists. +func (a amqpChannels) publishingChannel() *amqpChannel { + for _, channel := range a { + if channel != nil && channel.connectionType == connectionTypePublisher { + return channel + } + } + + return nil +} + +// updateParentConnection updates every channel's parent connection. +func (a amqpChannels) updateParentConnection(conn *amqp.Connection) { + for _, channel := range a { + channel.connection = conn + } +} + +// amqpChannel holds information about the management of the native amqp.Channel. +type amqpChannel struct { + // ctx is the parent context and acts as a safeguard. + ctx context.Context + + // connection is the native amqp.Connection. + connection *amqp.Connection + + // channel is the native amqp.Channel. + channel *amqp.Channel + + // keepAlive is the flag that will define whether active guards and re-connections are enabled or not. + keepAlive bool + + // retryDelay defines the delay to wait before re-connecting if the channel was closed and the keepAlive flag is set to true. + retryDelay time.Duration + + // consumer is the MessageConsumer that holds all necessary information for the consumption of messages. + consumer *MessageConsumer + + // consumptionCtx holds the consumption context. + consumptionCtx context.Context + + // consumptionCancel is the cancel function of the consumptionCtx. + consumptionCancel context.CancelFunc + + // consumptionHealth manages the status of all active consumptions. + consumptionHealth consumptionHealth + + // publishingCache manages the caching of unpublished messages due to a connection error. + publishingCache *ttlMap[string, mqttPublishing] + + // maxRetry defines the retry header for each message. + maxRetry uint + + // closed is an inner property that switches to true if the channel was explicitly closed. + closed bool + + // logger logs events. + logger logger + + // releaseLogger forces logs not matter the mode. It is used to log important things. + releaseLogger logger + + // connectionType defines the connectionType. + connectionType connectionType +} + +// newConsumerChannel instantiates a new consumerChannel and amqpChannel for method inheritance. +// - ctx is the parent context. +// - connection is the parent amqp.Connection. +// - keepAlive will keep the channel alive if true. +// - retryDelay defines the delay between each retry, if the keepAlive flag is set to true. +// - consumer is the MessageConsumer that will hold consumption information. +// - maxRetry is the retry header for each message. +// - logger is the parent logger. +func newConsumerChannel(ctx context.Context, connection *amqp.Connection, keepAlive bool, retryDelay time.Duration, consumer *MessageConsumer, logger logger) *amqpChannel { + channel := &amqpChannel{ + ctx: ctx, + connection: connection, + keepAlive: keepAlive, + retryDelay: retryDelay, + logger: inheritLogger(logger, map[string]interface{}{ + "context": "channel", + "type": connectionTypeConsumer, + "consumer": consumer.Name, + "queue": consumer.Queue, + }), + releaseLogger: &stdLogger{ + logger: newLogrus(), + identifier: libraryName, + logFields: map[string]interface{}{ + "context": "channel", + "type": connectionTypeConsumer, + "consumer": consumer.Name, + "queue": consumer.Queue, + }, + }, + connectionType: connectionTypeConsumer, + consumptionHealth: make(consumptionHealth), + consumer: consumer, + } + + // We open an initial channel. + err := channel.open() + + // If the channel failed to open and the keepAlive flag is set to true, we want to retry until success. + if err != nil && keepAlive { + go channel.retry() + } + + return channel +} + +// newPublishingChannel instantiates a new publishingChannel and amqpChannel for method inheritance. +// - ctx is the parent context. +// - connection is the parent amqp.Connection. +// - keepAlive will keep the channel alive if true. +// - retryDelay defines the delay between each retry, if the keepAlive flag is set to true. +// - maxRetry defines the maximum number of times a message can be retried if its consumption failed. +// - publishingCacheSize is the maximum cache size of failed publishing. +// - publishingCacheTTL defines the time to live for each failed publishing that was put in cache. +// - logger is the parent logger. +func newPublishingChannel(ctx context.Context, connection *amqp.Connection, keepAlive bool, retryDelay time.Duration, maxRetry uint, publishingCacheSize uint64, publishingCacheTTL time.Duration, logger logger) *amqpChannel { + channel := &amqpChannel{ + ctx: ctx, + connection: connection, + keepAlive: keepAlive, + retryDelay: retryDelay, + logger: inheritLogger(logger, map[string]interface{}{ + "context": "channel", + "type": connectionTypePublisher, + }), + releaseLogger: &stdLogger{ + logger: newLogrus(), + identifier: libraryName, + logFields: map[string]interface{}{ + "context": "channel", + "type": connectionTypePublisher, + }, + }, + connectionType: connectionTypePublisher, + publishingCache: newTTLMap[string, mqttPublishing](publishingCacheSize, publishingCacheTTL), + maxRetry: maxRetry, + } + + // We open an initial channel. + err := channel.open() + + // If the channel failed to open and the keepAlive flag is set to true, we want to retry until success. + if err != nil && keepAlive { + go channel.retry() + } + + return channel +} + +// open opens a new amqp.Channel from the parent connection. +func (c *amqpChannel) open() error { + // If the channel is nil or closed we return an error. + if c.connection == nil || c.connection.IsClosed() { + err := errConnectionClosed + + c.logger.Error(err, "Could not open channel") + + return err + } + + // We request a channel from the parent connection. + channel, err := c.connection.Channel() + if err != nil { + c.logger.Error(err, "Could not open channel") + + return err + } + + c.channel = channel + + c.logger.Info("Channel opened") + + c.onChannelOpened() + + // If the keepAlive flag is set to true, we activate a new guard. + if c.keepAlive { + go c.guard() + } + + return nil +} + +// reconnect will indefinitely call the open method until a connection is successfully established or the context is canceled. +func (c *amqpChannel) retry() { + c.logger.Debug("Retry launched") + + for { + select { + case <-c.ctx.Done(): + c.logger.Debug("Retry stopped by the context") + + // If the context was canceled, we break out of the method. + return + default: + // Wait for the retryDelay. + time.Sleep(c.retryDelay) + + // If there is no channel or the current channel is closed, we open a new channel. + if !c.ready() { + err := c.open() + // If the operation succeeds, we break the loop. + if err == nil { + c.logger.Debug("Retry successful") + + return + } + + c.logger.Error(err, "Could not open new channel during retry") + } else { + // If the channel exists and is active, we break out. + return + } + } + } +} + +// guard is a channel safeguard that listens to channel close events and re-launches the channel. +func (c *amqpChannel) guard() { + c.logger.Debug("Guard launched") + + for { + select { + case <-c.ctx.Done(): + c.logger.Debug("Guard stopped by the context") + + // If the context was canceled, we break out of the method. + return + case err, ok := <-c.channel.NotifyClose(make(chan *amqp.Error)): + if !ok { + return + } + + if err != nil { + c.logger.Warn("Channel lost", logField{Key: "reason", Value: err.Reason}, logField{Key: "code", Value: err.Code}) + } + + // If the channel was explicitly closed, we do not want to retry. + if c.closed { + return + } + + c.onChannelClosed() + + go c.retry() + + return + } + } +} + +// close the channel only if it is ready. +func (c *amqpChannel) close() error { + if c.ready() { + err := c.channel.Close() + if err != nil { + c.logger.Error(err, "Could not close channel") + + return err + } + } + + c.closed = true + + return nil +} + +// ready returns true if the channel exists and is not closed. +func (c *amqpChannel) ready() bool { + return c.channel != nil && !c.channel.IsClosed() +} + +// healthy returns true if the channel exists and is not closed. +func (c *amqpChannel) healthy() bool { + if c.connectionType == connectionTypeConsumer { + return c.ready() && c.consumptionHealth.IsHealthy() + } + + return c.ready() +} + +// onChannelOpened is called when a channel is successfully opened. +func (c *amqpChannel) onChannelOpened() { + if c.connectionType == connectionTypeConsumer { + // We re-instantiate the consumptionContext and consumptionCancel. + c.consumptionCtx, c.consumptionCancel = context.WithCancel(c.ctx) + + // This is just a safeguard. + if c.consumer != nil { + c.logger.Info("Launching consumer", logField{Key: "event", Value: "onChannelOpened"}) + + // If the consumer is present we want to start consuming. + go c.consume() + } + } else { + // If the publishing cache is empty, nothing to do here. + if c.publishingCache == nil || c.publishingCache.Len() == 0 { + return + } + + c.logger.Info("Emptying publishing cache", logField{Key: "event", Value: "onChannelOpened"}) + + // For each cached unsuccessful message, we try publishing it again. + c.publishingCache.ForEach(func(key string, msg mqttPublishing) { + _ = c.channel.PublishWithContext(c.ctx, msg.Exchange, msg.RoutingKey, msg.Mandatory, msg.Immediate, msg.Msg) + + c.publishingCache.Delete(key) + }) + } +} + +// onChannelClosed is called when a channel is closed. +func (c *amqpChannel) onChannelClosed() { + if c.connectionType == connectionTypeConsumer { + c.logger.Info("Canceling consumptions", logField{Key: "event", Value: "onChannelClosed"}) + + // We cancel the consumptionCtx. + c.consumptionCancel() + } +} + +// getID returns a unique identifier for the channel. +func (c *amqpChannel) getID() string { + if c.consumer == nil { + return fmt.Sprintf("publisher_%s", uuid.NewString()) + } + + return fmt.Sprintf("%s_%s", c.consumer.Name, uuid.NewString()) +} + +// consume handles the consumption mechanism. +func (c *amqpChannel) consume() { + // TODO(Alex): Check if this can actually happen + // If the channel is not ready, we cannot consume. + if !c.ready() { + c.logger.Warn("Channel not ready, cannot launch consumer") + return + } + + // TODO(Alex): Double check why setting a prefetch size greater than 0 causes an error + // Set the QOS, which defines how many messages can be processed at the same time. + err := c.channel.Qos(c.consumer.PrefetchCount, c.consumer.PrefetchSize, false) + if err != nil { + c.logger.Error(err, "Could not define QOS for consumer") + + return + } + + deliveries, err := c.channel.Consume(c.consumer.Queue, c.getID(), c.consumer.AutoAck, false, false, false, nil) + + c.consumptionHealth.AddSubscription(c.consumer.Queue, err) + + if err != nil { + c.logger.Error(err, "Could not consume messages") + + // If the queue does not exist yet, we want to force a release log with a warning for better visibility. + if isErrorNotFound(err) { + c.releaseLogger.Warn("Queue does not exist", logField{Key: "queue", Value: c.consumer.Queue}) + } + + return + } + + for { + select { + case <-c.consumptionCtx.Done(): + return + case delivery := <-deliveries: + // When a queue is deleted midway, a delivery with no tag or ID is received. + if delivery.DeliveryTag == 0 && delivery.MessageId == "" { + c.logger.Warn("Queue has been deleted, stopping consumer") + + return + } + + // We copy the delivery for the concurrent process of it (otherwise we may process the wrong delivery + // if a new one is consumed while the previous is still being processed). + loopDelivery := delivery + + if c.consumer.ConcurrentProcess { + // We process the message asynchronously if the concurrency is set to true. + go c.processDelivery(&loopDelivery) + } else { + // Otherwise, we process the message synchronously. + c.processDelivery(&loopDelivery) + } + } + } +} + +// processDelivery is the logic that defines what to do with a processed delivery and its error. +func (c *amqpChannel) processDelivery(delivery *amqp.Delivery) { + handler := c.consumer.Handlers.FindFunc(delivery.RoutingKey) + + // If the handler doesn't exist for the received delivery, we negative acknowledge it without requeue. + if handler == nil { + c.logger.Debug("No handler found", logField{Key: "routingKey", Value: delivery.RoutingKey}) + + // If the consumer is not set to auto acknowledge the delivery, we negative acknowledge it without requeue. + if !c.consumer.AutoAck { + _ = delivery.Nack(false, false) + } + + return + } + + err := handler(delivery.Body) + + // If the consumer has the autoAck flag activated, we want to retry the delivery in case of an error. + if c.consumer.AutoAck { + if err != nil { + go c.retryDelivery(delivery, true) + } + + return + } + + // If there is no error, we can simply acknowledge the delivery. + if err == nil { + c.logger.Debug("Delivery successfully processed", logField{Key: "messageID", Value: delivery.MessageId}) + + _ = delivery.Ack(false) + + return + } + + // Otherwise we retry the delivery. + go c.retryDelivery(delivery, false) +} + +// retryDelivery processes a delivery retry based on its redelivery header. +func (c *amqpChannel) retryDelivery(delivery *amqp.Delivery, alreadyAcknowledged bool) { + c.logger.Debug("Delivery retry launched") + + for { + select { + case <-c.consumptionCtx.Done(): + c.logger.Debug("Delivery retry stopped by the consumption context") + + return + default: + // We wait for the retry delay before retrying a message. + time.Sleep(c.retryDelay) + + // We first extract the xDeathCountHeader. + maxRetryHeader, exists := delivery.Headers[xDeathCountHeader] + + // If the header doesn't exist. + if !exists { + c.logger.Debug("Delivery retry invalid") + + // We negative acknowledge the delivery without requeue if the autoAck flag is set to false. + if !alreadyAcknowledged { + _ = delivery.Nack(false, false) + } + + return + } + + // We then cast the value as an int32. + retriesCount, ok := maxRetryHeader.(int32) + + // If the casting fails,we negative acknowledge the delivery without requeue if the autoAck flag is set to false. + if !ok { + c.logger.Debug("Delivery retry invalid") + + if !alreadyAcknowledged { + _ = delivery.Nack(false, false) + } + + return + } + + // If the retries count is still greater than 0, we re-publish the delivery with a decremented xDeathCountHeader. + if retriesCount > 0 { + c.logger.Debug("Retrying delivery", logField{Key: "retriesLeft", Value: retriesCount - 1}) + + // We first negative acknowledge the existing delivery to remove it from queue if the autoAck flag is set to false. + if !alreadyAcknowledged { + _ = delivery.Nack(false, false) + } + + // We create a new publishing which is a copy of the old one but with a decremented xDeathCountHeader. + newPublishing := amqp.Publishing{ + ContentType: "application/json", + Body: delivery.Body, + Type: delivery.RoutingKey, + Priority: delivery.Priority, + DeliveryMode: delivery.DeliveryMode, + MessageId: delivery.MessageId, + Timestamp: delivery.Timestamp, + Headers: map[string]interface{}{ + xDeathCountHeader: int(retriesCount - 1), + }, + } + + // We work on a best-effort basis. We try to re-publish the delivery, but we do nothing if it fails. + _ = c.channel.PublishWithContext(c.ctx, delivery.Exchange, delivery.RoutingKey, false, false, newPublishing) + + return + } + + c.logger.Debug("Cannot retry delivery, max retries reached") + + // Otherwise, we negative acknowledge the delivery without requeue if the autoAck flag is set to false. + if !alreadyAcknowledged { + _ = delivery.Nack(false, false) + } + + return + } + } +} + +// publish will publish a message with the given configuration. +func (c *amqpChannel) publish(exchange string, routingKey string, payload []byte, options *publishingOptions) error { + publishing := &amqp.Publishing{ + ContentType: "application/json", + Body: payload, + Type: routingKey, + Priority: PriorityMedium.Uint8(), + DeliveryMode: Persistent.Uint8(), + MessageId: uuid.NewString(), + Timestamp: time.Now(), + Headers: map[string]interface{}{ + xDeathCountHeader: int(c.maxRetry), + }, + } + + // If options are declared, we add the option. + if options != nil { + publishing.Priority = options.priority() + publishing.DeliveryMode = options.mode() + } + + // If the channel is not ready, we cannot publish, but we send the message to cache if the keepAlive flag is set to true. + if !c.ready() { + err := errChannelClosed + + if c.keepAlive { + c.logger.Error(err, "Could not publish message, sending to cache") + + msg := mqttPublishing{ + Exchange: exchange, + RoutingKey: routingKey, + Mandatory: false, + Immediate: false, + Msg: *publishing, + } + + c.publishingCache.Put(msg.HashCode(), msg) + } else { + c.logger.Error(err, "Could not publish message") + } + + return err + } + + err := c.channel.PublishWithContext(c.ctx, exchange, routingKey, false, false, *publishing) + + // If the message could not be sent we return an error without caching it. + if err != nil { + c.logger.Error(err, "Could not publish message") + + // If the exchange does not exist yet, we want to force a release log with a warning for better visibility. + if isErrorNotFound(err) { + c.releaseLogger.Warn("The MQTT message was not sent, exchange does not exist", logField{Key: "exchange", Value: exchange}, logField{Key: "routingKey", Value: routingKey}) + } + + return err + } + + c.logger.Debug("Message successfully sent", logField{Key: "messageID", Value: publishing.MessageId}) + + return nil +} diff --git a/client.go b/client.go new file mode 100644 index 0000000..2793ecf --- /dev/null +++ b/client.go @@ -0,0 +1,248 @@ +package gorabbit + +import ( + "context" + "fmt" + "os" +) + +// MQTTClient is a simple MQTT interface that offers basic client operations such as: +// - Publishing +// - Consuming +// - Disconnecting +// - Ready and health checks +type MQTTClient interface { + // Disconnect launches the disconnection process. + // This operation disables to client permanently. + Disconnect() error + + // Publish will send the desired payload through the selected channel. + // - exchange is the name of the exchange targeted for event publishing. + // - routingKey is the route that the exchange will use to forward the message. + // - payload is the object you want to send as a byte array. + // Returns an error if the connection to the RabbitMQ server is down. + Publish(exchange, routingKey string, payload interface{}) error + + // PublishWithOptions will send the desired payload through the selected channel. + // - exchange is the name of the exchange targeted for event publishing. + // - routingKey is the route that the exchange will use to forward the message. + // - payload is the object you want to send as a byte array. + // Optionally you can add publishingOptions for extra customization. + // Returns an error if the connection to the RabbitMQ server is down. + PublishWithOptions(exchange, routingKey string, payload interface{}, options *publishingOptions) error + + // RegisterConsumer will register a MessageConsumer for internal queue subscription and message processing. + // The MessageConsumer will hold a list of MQTTMessageHandlers to internalize message processing. + // Based on the return of error of each handler, the process of acknowledgment, rejection and retry of messages is + // fully handled internally. + // Furthermore, connection lost and channel errors are also internally handled by the connectionManager that will keep consumers + // alive if and when necessary. + RegisterConsumer(consumer MessageConsumer) error + + // IsReady returns true if the client is fully operational and connected to the RabbitMQ. + IsReady() bool + + // IsHealthy returns true if the client is ready (IsReady) and all channels are operating successfully. + IsHealthy() bool + + // GetHost returns the host used to initialize the client. + GetHost() string + + // GetPort returns the port used to initialize the client. + GetPort() uint + + // GetUsername returns the username used to initialize the client. + GetUsername() string + + // GetVhost returns the vhost used to initialize the client. + GetVhost() string + + // IsDisabled returns whether the client is disabled or not. + IsDisabled() bool +} + +type mqttClient struct { + // Host is the RabbitMQ server host name. + Host string + + // Port is the RabbitMQ server port number. + Port uint + + // Username is the RabbitMQ server allowed username. + Username string + + // Password is the RabbitMQ server allowed password. + Password string + + // Vhost is used for CloudAMQP connections to set the specific vhost. + Vhost string + + // logger defines the logger used, depending on the mode set. + logger logger + + // disabled completely disables the client if true. + disabled bool + + // connectionManager manages the connection and channel logic and high-level logic + // such as keep alive mechanism and health check. + connectionManager *connectionManager + + // ctx holds the global context used for the client. + ctx context.Context + + // cancel is the cancelFunc for the ctx. + cancel context.CancelFunc +} + +// NewClient will instantiate a new MQTTClient. +// If options is set to nil, the DefaultClientOptions will be used. +func NewClient(options *ClientOptions) MQTTClient { + // If no options is passed, we use the DefaultClientOptions. + if options == nil { + options = DefaultClientOptions() + } + + return newClientFromOptions(options) +} + +// NewClientFromEnv will instantiate a new MQTTClient from environment variables. +func NewClientFromEnv() MQTTClient { + options := NewClientOptionsFromEnv() + + return newClientFromOptions(options) +} + +func newClientFromOptions(options *ClientOptions) MQTTClient { + client := &mqttClient{ + Host: options.Host, + Port: options.Port, + Username: options.Username, + Password: options.Password, + Vhost: options.Vhost, + logger: &noLogger{}, + } + + // We check if the disabled flag is present, which will completely disable the MQTTClient. + if disabledOverride := os.Getenv("GORABBIT_DISABLED"); disabledOverride != "" { + switch disabledOverride { + case "1", "true": + client.disabled = true + return client + } + } + + // We check if the mode was overwritten with the environment variable "GORABBIT_MODE". + if modeOverride := os.Getenv("GORABBIT_MODE"); isValidMode(modeOverride) { + // We override the mode only if it is valid + options.Mode = modeOverride + } + + if options.Mode == Debug { + // If the mode is Debug, we want to actually log important events. + client.logger = newStdLogger() + } + + client.ctx, client.cancel = context.WithCancel(context.Background()) + + protocol := defaultProtocol + + if options.UseTLS { + protocol = securedProtocol + } + + dialURL := fmt.Sprintf("%s://%s:%s@%s:%d/%s", protocol, client.Username, client.Password, client.Host, client.Port, client.Vhost) + + client.connectionManager = newConnectionManager( + client.ctx, + dialURL, + options.KeepAlive, + options.RetryDelay, + options.MaxRetry, + options.PublishingCacheSize, + options.PublishingCacheTTL, + client.logger, + ) + + return client +} + +func (client *mqttClient) Publish(exchange string, routingKey string, payload interface{}) error { + return client.PublishWithOptions(exchange, routingKey, payload, nil) +} + +func (client *mqttClient) PublishWithOptions(exchange string, routingKey string, payload interface{}, options *publishingOptions) error { + // client is disabled, so we do nothing and return no error. + if client.disabled { + return nil + } + + return client.connectionManager.publish(exchange, routingKey, payload, options) +} + +func (client *mqttClient) RegisterConsumer(consumer MessageConsumer) error { + // client is disabled, so we do nothing and return no error. + if client.disabled { + return nil + } + + return client.connectionManager.registerConsumer(consumer) +} + +func (client *mqttClient) Disconnect() error { + // client is disabled, so we do nothing and return no error. + if client.disabled { + return nil + } + + err := client.connectionManager.close() + + if err != nil { + return err + } + + // cancel the context to stop all reconnection goroutines. + client.cancel() + + // disable the client to avoid trying to launch new operations. + client.disabled = true + + return nil +} + +func (client *mqttClient) IsReady() bool { + // client is disabled, so we do nothing and return true. + if client.disabled { + return true + } + + return client.connectionManager.isReady() +} + +func (client *mqttClient) IsHealthy() bool { + // client is disabled, so we do nothing and return true. + if client.disabled { + return true + } + + return client.connectionManager.isHealthy() +} + +func (client *mqttClient) GetHost() string { + return client.Host +} + +func (client *mqttClient) GetPort() uint { + return client.Port +} + +func (client *mqttClient) GetUsername() string { + return client.Username +} + +func (client *mqttClient) GetVhost() string { + return client.Vhost +} + +func (client *mqttClient) IsDisabled() bool { + return client.disabled +} diff --git a/client_options.go b/client_options.go new file mode 100644 index 0000000..4d6d6ef --- /dev/null +++ b/client_options.go @@ -0,0 +1,187 @@ +package gorabbit + +import ( + "time" + + "github.com/Netflix/go-env" +) + +// ClientOptions holds all necessary properties to launch a successful connection with an MQTTClient. +type ClientOptions struct { + // Host is the RabbitMQ server host name. + Host string + + // Port is the RabbitMQ server port number. + Port uint + + // Username is the RabbitMQ server allowed username. + Username string + + // Password is the RabbitMQ server allowed password. + Password string + + // Vhost is used for CloudAMQP connections to set the specific vhost. + Vhost string + + // UseTLS defines whether we use amqp or amqps protocol. + UseTLS bool + + // KeepAlive will determine whether the re-connection and retry mechanisms should be triggered. + KeepAlive bool + + // RetryDelay will define the delay for the re-connection and retry mechanism. + RetryDelay time.Duration + + // MaxRetry will define the number of retries when an amqpMessage could not be processed. + MaxRetry uint + + // PublishingCacheTTL defines the time to live for each publishing cache item. + PublishingCacheTTL time.Duration + + // PublishingCacheSize defines the max length of the publishing cache. + PublishingCacheSize uint64 + + // Mode will specify whether logs are enabled or not. + Mode string +} + +// DefaultClientOptions will return a ClientOptions with default values. +func DefaultClientOptions() *ClientOptions { + return &ClientOptions{ + Host: defaultHost, + Port: defaultPort, + Username: defaultUsername, + Password: defaultPassword, + Vhost: defaultVhost, + UseTLS: defaultUseTLS, + KeepAlive: defaultKeepAlive, + RetryDelay: defaultRetryDelay, + MaxRetry: defaultMaxRetry, + PublishingCacheTTL: defaultPublishingCacheTTL, + PublishingCacheSize: defaultPublishingCacheSize, + Mode: defaultMode, + } +} + +// NewClientOptions is the exported builder for a ClientOptions and will offer setter methods for an easy construction. +// Any non-assigned field will be set to default through DefaultClientOptions. +func NewClientOptions() *ClientOptions { + return DefaultClientOptions() +} + +// NewClientOptionsFromEnv will generate a ClientOptions from environment variables. Empty values will be taken as default +// through the DefaultClientOptions. +func NewClientOptionsFromEnv() *ClientOptions { + defaultOpts := DefaultClientOptions() + + fromEnv := new(RabbitMQEnvs) + + _, err := env.UnmarshalFromEnviron(fromEnv) + if err != nil { + return defaultOpts + } + + if fromEnv.Host != "" { + defaultOpts.Host = fromEnv.Host + } + + if fromEnv.Port > 0 { + defaultOpts.Port = fromEnv.Port + } + + if fromEnv.Username != "" { + defaultOpts.Username = fromEnv.Username + } + + if fromEnv.Password != "" { + defaultOpts.Password = fromEnv.Password + } + + if fromEnv.Vhost != "" { + defaultOpts.Vhost = fromEnv.Vhost + } + + defaultOpts.UseTLS = fromEnv.UseTLS + + return defaultOpts +} + +// SetHost will assign the Host. +func (c *ClientOptions) SetHost(host string) *ClientOptions { + c.Host = host + + return c +} + +// SetPort will assign the Port. +func (c *ClientOptions) SetPort(port uint) *ClientOptions { + c.Port = port + + return c +} + +// SetCredentials will assign the Username and Password. +func (c *ClientOptions) SetCredentials(username, password string) *ClientOptions { + c.Username = username + c.Password = password + + return c +} + +// SetVhost will assign the Vhost. +func (c *ClientOptions) SetVhost(vhost string) *ClientOptions { + c.Vhost = vhost + + return c +} + +// SetUseTLS will assign the UseTLS status. +func (c *ClientOptions) SetUseTLS(use bool) *ClientOptions { + c.UseTLS = use + + return c +} + +// SetKeepAlive will assign the KeepAlive status. +func (c *ClientOptions) SetKeepAlive(keepAlive bool) *ClientOptions { + c.KeepAlive = keepAlive + + return c +} + +// SetRetryDelay will assign the retry delay. +func (c *ClientOptions) SetRetryDelay(delay time.Duration) *ClientOptions { + c.RetryDelay = delay + + return c +} + +// SetMaxRetry will assign the max retry count. +func (c *ClientOptions) SetMaxRetry(retry uint) *ClientOptions { + c.MaxRetry = retry + + return c +} + +// SetPublishingCacheTTL will assign the publishing cache item TTL. +func (c *ClientOptions) SetPublishingCacheTTL(ttl time.Duration) *ClientOptions { + c.PublishingCacheTTL = ttl + + return c +} + +// SetPublishingCacheSize will assign the publishing cache max length. +func (c *ClientOptions) SetPublishingCacheSize(size uint64) *ClientOptions { + c.PublishingCacheSize = size + + return c +} + +// SetMode will assign the Mode if valid. +func (c *ClientOptions) SetMode(mode string) *ClientOptions { + if isValidMode(mode) { + c.Mode = mode + } + + return c +} diff --git a/connection.go b/connection.go new file mode 100644 index 0000000..c90b662 --- /dev/null +++ b/connection.go @@ -0,0 +1,310 @@ +package gorabbit + +import ( + "context" + "net/url" + "time" + + amqp "github.com/rabbitmq/amqp091-go" +) + +// amqpConnection holds information about the management of the native amqp.Connection. +type amqpConnection struct { + // ctx is the parent context and acts as a safeguard. + ctx context.Context + + // connection is the native amqp.Connection. + connection *amqp.Connection + + // uri represents the connection string to the RabbitMQ server. + uri string + + // keepAlive is the flag that will define whether active guards and re-connections are enabled or not. + keepAlive bool + + // retryDelay defines the delay to wait before re-connecting if we lose connection and the keepAlive flag is set to true. + retryDelay time.Duration + + // closed is an inner property that switches to true if the connection was explicitly closed. + closed bool + + // channels holds a list of active amqpChannel + channels amqpChannels + + // maxRetry defines the number of retries when publishing a message. + maxRetry uint + + // publishingCacheSize defines the maximum length of cached failed publishing. + publishingCacheSize uint64 + + // publishingCacheTTL defines the time to live for a cached failed publishing. + publishingCacheTTL time.Duration + + // logger logs events. + logger logger + + // connectionType defines the connectionType. + connectionType connectionType +} + +// newConsumerConnection initializes a new consumer amqpConnection with given arguments. +// - ctx is the parent context. +// - uri is the connection string. +// - keepAlive will keep the connection alive if true. +// - retryDelay defines the delay between each re-connection, if the keepAlive flag is set to true. +// - logger is the parent logger. +func newConsumerConnection(ctx context.Context, uri string, keepAlive bool, retryDelay time.Duration, logger logger) *amqpConnection { + return newConnection(ctx, uri, keepAlive, retryDelay, logger, connectionTypeConsumer) +} + +// newPublishingConnection initializes a new publisher amqpConnection with given arguments. +// - ctx is the parent context. +// - uri is the connection string. +// - keepAlive will keep the connection alive if true. +// - retryDelay defines the delay between each re-connection, if the keepAlive flag is set to true. +// - maxRetry defines the publishing max retry header. +// - publishingCacheSize defines the maximum length of failed publishing cache. +// - publishingCacheTTL defines the time to live for failed publishing in cache. +// - logger is the parent logger. +func newPublishingConnection(ctx context.Context, uri string, keepAlive bool, retryDelay time.Duration, maxRetry uint, publishingCacheSize uint64, publishingCacheTTL time.Duration, logger logger) *amqpConnection { + conn := newConnection(ctx, uri, keepAlive, retryDelay, logger, connectionTypePublisher) + + conn.maxRetry = maxRetry + conn.publishingCacheSize = publishingCacheSize + conn.publishingCacheTTL = publishingCacheTTL + + return conn +} + +// newConnection initializes a new amqpConnection with given arguments. +// - ctx is the parent context. +// - uri is the connection string. +// - keepAlive will keep the connection alive if true. +// - retryDelay defines the delay between each re-connection, if the keepAlive flag is set to true. +// - logger is the parent logger. +func newConnection(ctx context.Context, uri string, keepAlive bool, retryDelay time.Duration, logger logger, connectionType connectionType) *amqpConnection { + conn := &amqpConnection{ + ctx: ctx, + uri: uri, + keepAlive: keepAlive, + retryDelay: retryDelay, + channels: make(amqpChannels, 0), + logger: inheritLogger(logger, map[string]interface{}{ + "context": "connection", + "type": connectionType, + }), + connectionType: connectionType, + } + + conn.logger.Debug("Initializing new amqp connection", logField{Key: "uri", Value: conn.uriForLog()}) + + // We open an initial connection. + err := conn.open() + + // If the connection failed and the keepAlive flag is set to true, we want to re-connect until success. + if err != nil && keepAlive { + go conn.reconnect() + } + + return conn +} + +// open opens a new amqp.Connection with the help of a defined uri. +func (a *amqpConnection) open() error { + // If the uri is empty, we return an error. + if a.uri == "" { + return errEmptyURI + } + + a.logger.Debug("Connecting to RabbitMQ server", logField{Key: "uri", Value: a.uriForLog()}) + + // We request a connection from the RabbitMQ server. + conn, err := amqp.Dial(a.uri) + if err != nil { + a.logger.Error(err, "Connection failed") + + return err + } + + a.logger.Info("Connection successful", logField{Key: "uri", Value: a.uriForLog()}) + + a.connection = conn + + a.channels.updateParentConnection(a.connection) + + // If the keepAlive flag is set to true, we activate a new guard. + if a.keepAlive { + go a.guard() + } + + return nil +} + +// reconnect will indefinitely call the open method until a connection is successfully established or the context is canceled. +func (a *amqpConnection) reconnect() { + a.logger.Debug("Re-connection launched") + + for { + select { + case <-a.ctx.Done(): + a.logger.Debug("Re-connection stopped by the context") + + // If the context was canceled, we break out of the method. + return + default: + // Wait for the retryDelay. + time.Sleep(a.retryDelay) + + // If there is no connection or the current connection is closed, we open a new connection. + if !a.ready() { + err := a.open() + // If the operation succeeds, we break the loop. + if err == nil { + a.logger.Debug("Re-connection successful") + + return + } + + a.logger.Error(err, "Could not open new connection during re-connection") + } else { + // If the connection exists and is active, we break out. + return + } + } + } +} + +// guard is a connection safeguard that listens to connection close events and re-launches the connection. +func (a *amqpConnection) guard() { + a.logger.Debug("Guard launched") + + for { + select { + case <-a.ctx.Done(): + a.logger.Debug("Guard stopped by the context") + + // If the context was canceled, we break out of the method. + return + case err, ok := <-a.connection.NotifyClose(make(chan *amqp.Error)): + if !ok { + return + } + + if err != nil { + a.logger.Warn("Connection lost", logField{Key: "reason", Value: err.Reason}, logField{Key: "code", Value: err.Code}) + } + + // If the connection was explicitly closed, we do not want to re-connect. + if a.closed { + return + } + + go a.reconnect() + + return + } + } +} + +// close the connection only if it is ready. +func (a *amqpConnection) close() error { + if a.ready() { + for _, channel := range a.channels { + err := channel.close() + if err != nil { + return err + } + } + + err := a.connection.Close() + if err != nil { + a.logger.Error(err, "Could not close connection") + + return err + } + } + + a.closed = true + + a.logger.Info("Connection closed") + + return nil +} + +// ready returns true if the connection exists and is not closed. +func (a *amqpConnection) ready() bool { + return a.connection != nil && !a.connection.IsClosed() +} + +// healthy returns true if the connection exists, is not closed and all child channels are healthy. +func (a *amqpConnection) healthy() bool { + // If the connection is not ready, return false. + if !a.ready() { + return false + } + + // Verify that all connection channels are ready too. + for _, channel := range a.channels { + if !channel.healthy() { + return false + } + } + + return true +} + +// registerConsumer opens a new consumerChannel and registers the MessageConsumer. +func (a *amqpConnection) registerConsumer(consumer MessageConsumer) error { + for _, channel := range a.channels { + if channel.consumer != nil && channel.consumer.Queue == consumer.Queue { + err := errConsumerAlreadyExists + + a.logger.Error(err, "Could not register consumer", logField{Key: "consumer", Value: consumer.Name}) + + return err + } + } + + if err := consumer.Handlers.Validate(); err != nil { + return err + } + + channel := newConsumerChannel(a.ctx, a.connection, a.keepAlive, a.retryDelay, &consumer, a.logger) + + a.channels = append(a.channels, channel) + + a.logger.Info("Consumer registered", logField{Key: "consumer", Value: consumer.Name}) + + return nil +} + +func (a *amqpConnection) publish(exchange, routingKey string, payload []byte, options *publishingOptions) error { + publishingChannel := a.channels.publishingChannel() + if publishingChannel == nil { + publishingChannel = newPublishingChannel(a.ctx, a.connection, a.keepAlive, a.retryDelay, a.maxRetry, a.publishingCacheSize, a.publishingCacheTTL, a.logger) + + a.channels = append(a.channels, publishingChannel) + } + + return publishingChannel.publish(exchange, routingKey, payload, options) +} + +// uriForLog returns the uri with the password hidden for security measures. +func (a *amqpConnection) uriForLog() string { + if a.uri == "" { + return a.uri + } + + parsedURL, err := url.Parse(a.uri) + if err != nil { + return "" + } + + hiddenPassword := "xxxx" + + if parsedURL.User != nil { + parsedURL.User = url.UserPassword(parsedURL.User.Username(), hiddenPassword) + } + + return parsedURL.String() +} diff --git a/connection_manager.go b/connection_manager.go new file mode 100644 index 0000000..9d02c28 --- /dev/null +++ b/connection_manager.go @@ -0,0 +1,83 @@ +package gorabbit + +import ( + "context" + "encoding/json" + "time" +) + +type connectionManager struct { + // consumerConnection holds the independent consuming connection. + consumerConnection *amqpConnection + + // publisherConnection holds the independent publishing connection. + publisherConnection *amqpConnection +} + +// newConnectionManager instantiates a new connectionManager with given arguments. +func newConnectionManager( + ctx context.Context, + uri string, + keepAlive bool, + retryDelay time.Duration, + maxRetry uint, + publishingCacheSize uint64, + publishingCacheTTL time.Duration, + logger logger, +) *connectionManager { + c := &connectionManager{ + consumerConnection: newConsumerConnection(ctx, uri, keepAlive, retryDelay, logger), + publisherConnection: newPublishingConnection(ctx, uri, keepAlive, retryDelay, maxRetry, publishingCacheSize, publishingCacheTTL, logger), + } + + return c +} + +// close offers the basic connection and channel close() mechanism but with extra higher level checks. +func (c *connectionManager) close() error { + if err := c.publisherConnection.close(); err != nil { + return err + } + + return c.consumerConnection.close() +} + +// isReady returns true if both consumerConnection and publishingConnection are ready. +func (c *connectionManager) isReady() bool { + if c.publisherConnection == nil || c.consumerConnection == nil { + return false + } + + return c.publisherConnection.ready() && c.consumerConnection.ready() +} + +// isHealthy returns true if both consumerConnection and publishingConnection are healthy. +func (c *connectionManager) isHealthy() bool { + if c.publisherConnection == nil || c.consumerConnection == nil { + return false + } + + return c.publisherConnection.healthy() && c.consumerConnection.healthy() +} + +// registerConsumer registers a new MessageConsumer. +func (c *connectionManager) registerConsumer(consumer MessageConsumer) error { + if c.consumerConnection == nil { + return errConsumerConnectionNotInitialized + } + + return c.consumerConnection.registerConsumer(consumer) +} + +func (c *connectionManager) publish(exchange, routingKey string, payload interface{}, options *publishingOptions) error { + if c.publisherConnection == nil { + return errPublisherConnectionNotInitialized + } + + payloadBytes, err := json.Marshal(payload) + if err != nil { + return err + } + + return c.publisherConnection.publish(exchange, routingKey, payloadBytes, options) +} diff --git a/constants.go b/constants.go new file mode 100644 index 0000000..691a863 --- /dev/null +++ b/constants.go @@ -0,0 +1,110 @@ +package gorabbit + +import ( + "errors" + "time" +) + +// Library name. +const libraryName = "Gorabbit" + +// Connection protocols. +const ( + defaultProtocol = "amqp" + securedProtocol = "amqps" +) + +// Default values for the ClientOptions and ManagerOptions. +const ( + defaultHost = "127.0.0.1" + defaultPort = 5672 + defaultUsername = "guest" + defaultPassword = "guest" + defaultVhost = "" + defaultUseTLS = false + defaultKeepAlive = true + defaultRetryDelay = 3 * time.Second + defaultMaxRetry = 5 + defaultPublishingCacheTTL = 60 * time.Second + defaultPublishingCacheSize = 128 + defaultMode = Release +) + +const ( + xDeathCountHeader = "x-death-count" +) + +// Connection Types. + +type connectionType string + +const ( + connectionTypeConsumer connectionType = "consumer" + connectionTypePublisher connectionType = "publisher" +) + +// Exchange Types + +type ExchangeType string + +const ( + ExchangeTypeTopic ExchangeType = "topic" + ExchangeTypeDirect ExchangeType = "direct" + ExchangeTypeFanout ExchangeType = "fanout" + ExchangeTypeHeaders ExchangeType = "headers" +) + +func (e ExchangeType) String() string { + return string(e) +} + +// Priority Levels. + +type MessagePriority uint8 + +const ( + PriorityLowest MessagePriority = 1 + PriorityVeryLow MessagePriority = 2 + PriorityLow MessagePriority = 3 + PriorityMedium MessagePriority = 4 + PriorityHigh MessagePriority = 5 + PriorityHighest MessagePriority = 6 +) + +func (m MessagePriority) Uint8() uint8 { + return uint8(m) +} + +// Delivery Modes. + +type DeliveryMode uint8 + +const ( + Transient DeliveryMode = 1 + Persistent DeliveryMode = 2 +) + +func (d DeliveryMode) Uint8() uint8 { + return uint8(d) +} + +// Logging Modes. +const ( + Release = "release" + Debug = "debug" +) + +func isValidMode(mode string) bool { + return mode == Release || mode == Debug +} + +// Errors. +var ( + errEmptyURI = errors.New("amqp uri is empty") + errChannelClosed = errors.New("channel is closed") + errConnectionClosed = errors.New("connection is closed") + errConsumerAlreadyExists = errors.New("consumer already exists") + errConsumerConnectionNotInitialized = errors.New("consumerConnection is not initialized") + errPublisherConnectionNotInitialized = errors.New("publisherConnection is not initialized") + errEmptyQueue = errors.New("queue is empty") +) diff --git a/consumer.go b/consumer.go new file mode 100644 index 0000000..d37c347 --- /dev/null +++ b/consumer.go @@ -0,0 +1,224 @@ +package gorabbit + +import ( + "errors" + "fmt" + "strings" +) + +// MQTTMessageHandlers is a wrapper that holds a map[string]MQTTMessageHandlerFunc. +type MQTTMessageHandlers map[string]MQTTMessageHandlerFunc + +// MQTTMessageHandlerFunc is the function that will be called when a delivery is received. +type MQTTMessageHandlerFunc func(payload []byte) error + +// Validate verifies that all routing keys in the handlers are properly formatted and allowed. +func (mh MQTTMessageHandlers) Validate() error { + for k := range mh { + // A routing key cannot be empty. + if len(k) == 0 { + return errors.New("a routing key cannot be empty") + } + + // A routing key cannot be equal to the wildcard '#'. + if len(k) == 1 && k == "#" { + return errors.New("a routing key cannot be the wildcard '#'") + } + + // A routing key cannot contain spaces. + if strings.Contains(k, " ") { + return errors.New("a routing key cannot contain spaces") + } + + // If a routing key is not just made up of one word. + if strings.Contains(k, ".") { + // We need to make sure that we do not find an empty word or a '%' in the middle of the key. + split := strings.Split(k, ".") + + for i, v := range split { + // We cannot have empty strings. + if v == "" { + return fmt.Errorf("the routing key '%s' is not properly formatted", k) + } + + // The wildcard '#' is not allowed in the middle. + if v == "#" && i > 0 && i < len(split)-1 { + return fmt.Errorf("the wildcard '#' in the routing key '%s' is not allowed", k) + } + } + } + } + + return nil +} + +// matchesPrefixWildcard verifies that everything that comes after the '#' wildcard matches. +func (mh MQTTMessageHandlers) matchesPrefixWildcard(storedWords, words []string) bool { + // compareIndex starts after the wildcard in the storedWords array. + compareIndex := 1 + + // we initialize the wordIdx at -1. + wordIdx := -1 + + // Here we are searching for the first occurrence of the first word after the '#' wildcard + // of the storedWords in the words. + for i, w := range words { + if w == storedWords[compareIndex] { + // We can now start comparing at 'i'. + wordIdx = i + break + } + } + + // If we did not find the first word, then surely the key does not match. + if wordIdx == -1 { + return false + } + + // If the length of storedWords is not the same as the length of words after the wildcard, + // then surely the key does not match. + if len(storedWords)-compareIndex != len(words)-wordIdx { + return false + } + + // Now we can compare, word by word if the routing keys matches. + for i := wordIdx; i < len(words); i++ { + // Be careful, if we find '*' then it should match no matter what. + if storedWords[compareIndex] != words[i] && storedWords[compareIndex] != "*" { + return false + } + + // We move right in the storedWords. + compareIndex++ + } + + return true +} + +// matchesSuffixWildcard verifies that everything that comes before the '#' wildcard matches. +func (mh MQTTMessageHandlers) matchesSuffixWildcard(storedWords, words []string) bool { + backCount := 2 + + // compareIndex starts before the wildcard in the storedWords array. + compareIndex := len(storedWords) - backCount + + // we initialize the wordIdx at -1. + wordIdx := -1 + + // Here we are searching for the first occurrence of the first word before the '#' wildcard + // of the storedWords in the words. + for i, w := range words { + if w == storedWords[compareIndex] { + wordIdx = i + break + } + } + + // If we did not find the first word, then surely the key does not match. + if wordIdx == -1 { + return false + } + + // If the indexes are not the same then surely the key does not match. + if compareIndex != wordIdx { + return false + } + + // Now we can compare, word by word, going backwards if the routing keys matches. + for i := wordIdx; i > -1; i-- { + // Be careful, if we find '*' then it should match no matter what. + if storedWords[compareIndex] != words[i] && storedWords[compareIndex] != "*" { + return false + } + + // We move left in the storedWords. + compareIndex-- + } + + return true +} + +// matchesSuffixWildcard verifies that 2 keys match word by word. +func (mh MQTTMessageHandlers) matchesKey(storedWords, words []string) bool { + // If the lengths are not the same then surely the key does not match. + if len(storedWords) != len(words) { + return false + } + + // Now we can compare, word by word if the routing keys matches. + for i, word := range words { + // Be careful, if we find '*' then it should match no matter what. + if storedWords[i] != word && storedWords[i] != "*" { + return false + } + } + + return true +} + +func (mh MQTTMessageHandlers) FindFunc(routingKey string) MQTTMessageHandlerFunc { + // We first check for a direct match + if fn, found := mh[routingKey]; found { + return fn + } + + // Split the routing key into individual words. + words := strings.Split(routingKey, ".") + + // Check if any of the registered keys match the routing key. + for key, fn := range mh { + // Split the registered key into individual words. + storedWords := strings.Split(key, ".") + + //nolint: gocritic,nestif // We need this if-else block + if storedWords[0] == "#" { + if !mh.matchesPrefixWildcard(storedWords, words) { + continue + } + } else if storedWords[len(storedWords)-1] == "#" { + if !mh.matchesSuffixWildcard(storedWords, words) { + continue + } + } else { + if !mh.matchesKey(storedWords, words) { + continue + } + } + + return fn + } + + // No matching keys were found. + return nil +} + +// MessageConsumer holds all the information needed to consume messages. +type MessageConsumer struct { + // Queue defines the queue from which we want to consume messages. + Queue string + + // Name is a unique identifier of the consumer. Should be as explicit as possible. + Name string + + // PrefetchSize defines the max size of messages that are allowed to be processed at the same time. + // This property is dropped if AutoAck is set to true. + PrefetchSize int + + // PrefetchCount defines the max number of messages that are allowed to be processed at the same time. + // This property is dropped if AutoAck is set to true. + PrefetchCount int + + // AutoAck defines whether a message is directly acknowledged or not when being consumed. + AutoAck bool + + // ConcurrentProcess will make MQTTMessageHandlers run concurrently for faster consumption, if set to true. + ConcurrentProcess bool + + // Handlers is the list of defined handlers. + Handlers MQTTMessageHandlers +} + +// HashCode returns a unique identifier for the defined consumer. +func (c MessageConsumer) HashCode() string { + return fmt.Sprintf("%s-%s", c.Queue, c.Name) +} diff --git a/consumer_test.go b/consumer_test.go new file mode 100644 index 0000000..106bdff --- /dev/null +++ b/consumer_test.go @@ -0,0 +1,160 @@ +package gorabbit_test + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/KardinalAI/gorabbit/v1" +) + +func TestMQTTMessageHandlers_Validate(t *testing.T) { + tests := []struct { + handlers gorabbit.MQTTMessageHandlers + expectedError error + }{ + { + handlers: gorabbit.MQTTMessageHandlers{ + "event.user.#": func(payload []byte) error { return nil }, + "event.email.*.generated": func(payload []byte) error { return nil }, + "event.*.space.boom": func(payload []byte) error { return nil }, + "*.toto.order.passed": func(payload []byte) error { return nil }, + "#.toto": func(payload []byte) error { return nil }, + }, + expectedError: nil, + }, + { + handlers: gorabbit.MQTTMessageHandlers{ + "": func(payload []byte) error { return nil }, + }, + expectedError: errors.New("a routing key cannot be empty"), + }, + { + handlers: gorabbit.MQTTMessageHandlers{ + " ": func(payload []byte) error { return nil }, + }, + expectedError: errors.New("a routing key cannot contain spaces"), + }, + { + handlers: gorabbit.MQTTMessageHandlers{ + "#": func(payload []byte) error { return nil }, + }, + expectedError: errors.New("a routing key cannot be the wildcard '#'"), + }, + { + handlers: gorabbit.MQTTMessageHandlers{ + "toto.#.titi": func(payload []byte) error { return nil }, + }, + expectedError: errors.New("the wildcard '#' in the routing key 'toto.#.titi' is not allowed"), + }, + { + handlers: gorabbit.MQTTMessageHandlers{ + "toto titi": func(payload []byte) error { return nil }, + }, + expectedError: errors.New("a routing key cannot contain spaces"), + }, + { + handlers: gorabbit.MQTTMessageHandlers{ + "toto..titi": func(payload []byte) error { return nil }, + }, + expectedError: errors.New("the routing key 'toto..titi' is not properly formatted"), + }, + { + handlers: gorabbit.MQTTMessageHandlers{ + ".toto.titi": func(payload []byte) error { return nil }, + }, + expectedError: errors.New("the routing key '.toto.titi' is not properly formatted"), + }, + { + handlers: gorabbit.MQTTMessageHandlers{ + "toto.titi.": func(payload []byte) error { return nil }, + }, + expectedError: errors.New("the routing key 'toto.titi.' is not properly formatted"), + }, + } + + for _, test := range tests { + err := test.handlers.Validate() + + assert.Equal(t, test.expectedError, err) + } +} + +func TestMQTTMessageHandlers_FindFunc(t *testing.T) { + handlers := gorabbit.MQTTMessageHandlers{ + "event.user.#": func(payload []byte) error { return nil }, + "event.email.*.generated": func(payload []byte) error { return nil }, + "event.*.space.boom": func(payload []byte) error { return nil }, + "*.toto.order.passed": func(payload []byte) error { return nil }, + "#.toto": func(payload []byte) error { return nil }, + } + + tests := []struct { + input string + shouldMatch bool + }{ + { + input: "event.user.plan.generated", + shouldMatch: true, + }, + { + input: "event.user.password.generated.before.awakening.the.titan", + shouldMatch: true, + }, + { + input: "event.email.subject.generated", + shouldMatch: true, + }, + { + input: "event.email.toto.generated", + shouldMatch: true, + }, + { + input: "event.email.titi.generated", + shouldMatch: true, + }, + { + input: "event.email.order.created", + shouldMatch: false, + }, + { + input: "event.toto.space.boom", + shouldMatch: true, + }, + { + input: "event.toto.space.not_boom", + shouldMatch: false, + }, + { + input: "command.toto.order.passed", + shouldMatch: true, + }, + { + input: "command.toto.order.passed.please", + shouldMatch: false, + }, + { + input: "event.toto", + shouldMatch: true, + }, + { + input: "event.space.space.toto", + shouldMatch: true, + }, + { + input: "event.toto.space", + shouldMatch: false, + }, + } + + for _, test := range tests { + fn := handlers.FindFunc(test.input) + + if test.shouldMatch { + assert.NotNil(t, fn) + } else { + assert.Nil(t, fn) + } + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..161ad27 --- /dev/null +++ b/go.mod @@ -0,0 +1,18 @@ +module github.com/KardinalAI/gorabbit/v1 + +go 1.20 + +require ( + github.com/Netflix/go-env v0.0.0-20220526054621-78278af1949d + github.com/google/uuid v1.4.0 + github.com/rabbitmq/amqp091-go v1.9.0 + github.com/sirupsen/logrus v1.9.3 + github.com/stretchr/testify v1.8.4 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/sys v0.15.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..dcf62bb --- /dev/null +++ b/go.sum @@ -0,0 +1,36 @@ +github.com/Netflix/go-env v0.0.0-20220526054621-78278af1949d h1:wvStE9wLpws31NiWUx+38wny1msZ/tm+eL5xmm4Y7So= +github.com/Netflix/go-env v0.0.0-20220526054621-78278af1949d/go.mod h1:9XMFaCeRyW7fC9XJOWQ+NdAv8VLG7ys7l3x4ozEGLUQ= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo= +github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/logger.go b/logger.go new file mode 100644 index 0000000..58c411b --- /dev/null +++ b/logger.go @@ -0,0 +1,128 @@ +package gorabbit + +import ( + "os" + + "github.com/sirupsen/logrus" +) + +type logField struct { + Key string + Value interface{} +} + +// logger is the interface that defines log methods. +type logger interface { + Error(error, string, ...logField) + + Warn(string, ...logField) + + Info(string, ...logField) + + Debug(string, ...logField) +} + +// stdLogger logs to stdout using logrus (https://github.com/sirupsen/logrus). +type stdLogger struct { + logger *logrus.Logger + identifier string + logFields map[string]interface{} +} + +func newStdLogger() logger { + return &stdLogger{ + logger: newLogrus(), + identifier: libraryName, + logFields: nil, + } +} + +func (l stdLogger) getExtraFields(fields []logField) map[string]interface{} { + extraFields := make(map[string]interface{}) + + for k, field := range l.logFields { + extraFields[k] = field + } + + for _, extraField := range fields { + extraFields[extraField.Key] = extraField.Value + } + + return extraFields +} + +func (l stdLogger) Error(err error, s string, fields ...logField) { + log := l.logger.WithField("library", l.identifier) + + extraFields := l.getExtraFields(fields) + + log.WithFields(extraFields).WithError(err).Error(s) +} + +func (l stdLogger) Warn(s string, fields ...logField) { + log := l.logger.WithField("library", l.identifier) + + extraFields := l.getExtraFields(fields) + + log.WithFields(extraFields).Warn(s) +} + +func (l stdLogger) Info(s string, fields ...logField) { + log := l.logger.WithField("library", l.identifier) + + extraFields := l.getExtraFields(fields) + + log.WithFields(extraFields).Info(s) +} + +func (l stdLogger) Debug(s string, fields ...logField) { + log := l.logger.WithField("library", l.identifier) + + extraFields := l.getExtraFields(fields) + + log.WithFields(extraFields).Debug(s) +} + +// noLogger does not log at all, this is the default. +type noLogger struct{} + +func (l noLogger) Error(_ error, _ string, _ ...logField) {} + +func (l noLogger) Warn(_ string, _ ...logField) {} + +func (l noLogger) Info(_ string, _ ...logField) {} + +func (l noLogger) Debug(_ string, _ ...logField) {} + +func newLogrus() *logrus.Logger { + log := &logrus.Logger{ + Out: os.Stdout, + Formatter: &logrus.JSONFormatter{ + DisableTimestamp: true, + }, + Level: logrus.DebugLevel, + } + + logLevel := os.Getenv("LOG_LEVEL") + if logLevel != "" { + lvl, err := logrus.ParseLevel(logLevel) + if err == nil { + log.Level = lvl + } + } + + return log +} + +func inheritLogger(parent logger, logFields map[string]interface{}) logger { + switch v := parent.(type) { + case *stdLogger: + return &stdLogger{ + logger: v.logger, + identifier: libraryName, + logFields: logFields, + } + default: + return parent + } +} diff --git a/manager.go b/manager.go new file mode 100644 index 0000000..1595102 --- /dev/null +++ b/manager.go @@ -0,0 +1,542 @@ +package gorabbit + +import ( + "context" + "encoding/json" + "fmt" + "os" + "time" + + "github.com/google/uuid" + amqp "github.com/rabbitmq/amqp091-go" +) + +// MQTTManager is a simple MQTT interface that offers basic management operations such as: +// - Creation of queue, exchange and bindings +// - Deletion of queues and exchanges +// - Purge of queues +// - Queue evaluation (existence and number of messages) +type MQTTManager interface { + // Disconnect launches the disconnection process. + // This operation disables to manager permanently. + Disconnect() error + + // CreateQueue will create a new queue from QueueConfig. + CreateQueue(config QueueConfig) error + + // CreateExchange will create a new exchange from ExchangeConfig. + CreateExchange(config ExchangeConfig) error + + // BindExchangeToQueueViaRoutingKey will bind an exchange to a queue via a given routingKey. + // Returns an error if the connection to the RabbitMQ server is down or if the exchange or queue does not exist. + BindExchangeToQueueViaRoutingKey(exchange, queue, routingKey string) error + + // GetNumberOfMessages retrieves the number of messages currently sitting in a given queue. + // Returns an error if the connection to the RabbitMQ server is down or the queue does not exist. + GetNumberOfMessages(queue string) (int, error) + + // PushMessageToExchange pushes a message to a given exchange with a given routing key. + // Returns an error if the connection to the RabbitMQ server is down or the exchange does not exist. + PushMessageToExchange(exchange, routingKey string, payload interface{}) error + + // PopMessageFromQueue retrieves the first message of a queue. The message can then be auto-acknowledged or not. + // Returns an error if the connection to the RabbitMQ server is down or the queue does not exist or is empty. + PopMessageFromQueue(queue string, autoAck bool) (*amqp.Delivery, error) + + // PurgeQueue will empty a queue of all its current messages. + // Returns an error if the connection to the RabbitMQ server is down or the queue does not exist. + PurgeQueue(queue string) error + + // DeleteQueue permanently deletes an existing queue. + // Returns an error if the connection to the RabbitMQ server is down or the queue does not exist. + DeleteQueue(queue string) error + + // DeleteExchange permanently deletes an existing exchange. + // Returns an error if the connection to the RabbitMQ server is down or the exchange does not exist. + DeleteExchange(exchange string) error + + // SetupFromDefinitions loads a definitions.json file and automatically sets up exchanges, queues and bindings. + SetupFromDefinitions(path string) error + + // GetHost returns the host used to initialize the manager. + GetHost() string + + // GetPort returns the port used to initialize the manager. + GetPort() uint + + // GetUsername returns the username used to initialize the manager. + GetUsername() string + + // GetVhost returns the vhost used to initialize the manager. + GetVhost() string + + // IsDisabled returns whether the manager is disabled or not. + IsDisabled() bool +} + +type mqttManager struct { + // Host is the RabbitMQ server host name. + Host string + + // Port is the RabbitMQ server port number. + Port uint + + // Username is the RabbitMQ server allowed username. + Username string + + // Password is the RabbitMQ server allowed password. + Password string + + // Vhost is used for CloudAMQP connections to set the specific vhost. + Vhost string + + // logger defines the logger used, depending on the mode set. + logger logger + + // disabled completely disables the manager if true. + disabled bool + + // connection holds the single connection to the RabbitMQ server. + connection *amqp.Connection + + // channel holds the single channel from the connection. + channel *amqp.Channel +} + +// NewManager will instantiate a new MQTTManager. +// If options is set to nil, the DefaultManagerOptions will be used. +func NewManager(options *ManagerOptions) (MQTTManager, error) { + // If no options is passed, we use the DefaultManagerOptions. + if options == nil { + options = DefaultManagerOptions() + } + + return newManagerFromOptions(options) +} + +// NewManagerFromEnv will instantiate a new MQTTManager from environment variables. +func NewManagerFromEnv() (MQTTManager, error) { + options := NewManagerOptionsFromEnv() + + return newManagerFromOptions(options) +} + +func newManagerFromOptions(options *ManagerOptions) (MQTTManager, error) { + manager := &mqttManager{ + Host: options.Host, + Port: options.Port, + Username: options.Username, + Password: options.Password, + Vhost: options.Vhost, + logger: &noLogger{}, + } + + // We check if the disabled flag is present, which will completely disable the MQTTManager. + if disabledOverride := os.Getenv("GORABBIT_DISABLED"); disabledOverride != "" { + switch disabledOverride { + case "1", "true": + manager.disabled = true + return manager, nil + } + } + + // We check if the mode was overwritten with the environment variable "GORABBIT_MODE". + if modeOverride := os.Getenv("GORABBIT_MODE"); isValidMode(modeOverride) { + // We override the mode only if it is valid + options.Mode = modeOverride + } + + if options.Mode == Debug { + // If the mode is Debug, we want to actually log important events. + manager.logger = newStdLogger() + } + + protocol := defaultProtocol + + if options.UseTLS { + protocol = securedProtocol + } + + dialURL := fmt.Sprintf("%s://%s:%s@%s:%d/%s", protocol, manager.Username, manager.Password, manager.Host, manager.Port, manager.Vhost) + + var err error + + manager.connection, err = amqp.Dial(dialURL) + if err != nil { + return manager, err + } + + manager.channel, err = manager.connection.Channel() + if err != nil { + return manager, err + } + + return manager, nil +} + +func (manager *mqttManager) Disconnect() error { + // Manager is disabled, so we do nothing and return no error. + if manager.disabled { + return nil + } + + // We close the manager's channel only if it is opened. + if manager.channel != nil && !manager.channel.IsClosed() { + err := manager.channel.Close() + if err != nil { + return err + } + } + + // We close the manager's connection only if it is opened. + if manager.connection != nil && !manager.connection.IsClosed() { + return manager.connection.Close() + } + + return nil +} + +func (manager *mqttManager) CreateQueue(config QueueConfig) error { + // Manager is disabled, so we do nothing and return no error. + if manager.disabled { + return nil + } + + // If the manager is not ready, we return its error. + if ready, err := manager.ready(); !ready { + return err + } + + // We declare the queue via the channel. + _, err := manager.channel.QueueDeclare( + config.Name, // name + config.Durable, // durable + false, // delete when unused + config.Exclusive, // exclusive + false, // no-wait + config.Args, + ) + + if err != nil { + return err + } + + // If bindings are also declared, we create the bindings too. + if config.Bindings != nil { + for _, binding := range config.Bindings { + err = manager.BindExchangeToQueueViaRoutingKey(binding.Exchange, config.Name, binding.RoutingKey) + + if err != nil { + return err + } + } + } + + return nil +} + +func (manager *mqttManager) CreateExchange(config ExchangeConfig) error { + // Manager is disabled, so we do nothing and return no error. + if manager.disabled { + return nil + } + + // If the manager is not ready, we return its error. + if ready, err := manager.ready(); !ready { + return err + } + + // We declare the exchange via the channel. + return manager.channel.ExchangeDeclare( + config.Name, // name + config.Type.String(), // type + config.Persisted, // durable + !config.Persisted, // auto-deleted + false, // internal + false, // no-wait + config.Args, // arguments + ) +} + +func (manager *mqttManager) BindExchangeToQueueViaRoutingKey(exchange, queue, routingKey string) error { + // Manager is disabled, so we do nothing and return no error. + if manager.disabled { + return nil + } + + // If the manager is not ready, we return its error. + if ready, err := manager.ready(); !ready { + return err + } + + // We bind the queue to a given exchange and routing key via the channel. + return manager.channel.QueueBind( + queue, + routingKey, + exchange, + false, + nil, + ) +} + +func (manager *mqttManager) GetNumberOfMessages(queue string) (int, error) { + // Manager is disabled, so we do nothing and return no error. + if manager.disabled { + return -1, nil + } + + // If the manager is not ready, we return its error. + if ready, err := manager.ready(); !ready { + return -1, err + } + + // We passively declare the queue via the channel, this will return the existing queue or an error if it doesn't exist. + q, err := manager.channel.QueueDeclarePassive( + queue, + false, + false, + false, + false, + nil, + ) + + if err != nil { + return -1, err + } + + return q.Messages, nil +} + +func (manager *mqttManager) PushMessageToExchange(exchange, routingKey string, payload interface{}) error { + // Manager is disabled, so we do nothing and return no error. + if manager.disabled { + return nil + } + + // If the manager is not ready, we return its error. + if ready, err := manager.ready(); !ready { + return err + } + + // We convert the payload to a []byte. + payloadBytes, err := json.Marshal(payload) + if err != nil { + return err + } + + // We build the amqp.Publishing object. + publishing := amqp.Publishing{ + ContentType: "application/json", + Body: payloadBytes, + Type: routingKey, + Priority: PriorityMedium.Uint8(), + DeliveryMode: Transient.Uint8(), + MessageId: uuid.NewString(), + Timestamp: time.Now(), + } + + // We push the message via the channel. + return manager.channel.PublishWithContext(context.TODO(), exchange, routingKey, false, false, publishing) +} + +func (manager *mqttManager) PopMessageFromQueue(queue string, autoAck bool) (*amqp.Delivery, error) { + // Manager is disabled, so we do nothing and return no error. + if manager.disabled { + //nolint: nilnil // We must return + return nil, nil + } + + // If the manager is not ready, we return its error. + if ready, err := manager.ready(); !ready { + return nil, err + } + + // We get the message via the channel. + m, ok, err := manager.channel.Get(queue, autoAck) + + if err != nil { + return nil, err + } + + // If the queue is empty. + if !ok { + return nil, errEmptyQueue + } + + return &m, nil +} + +func (manager *mqttManager) PurgeQueue(queue string) error { + // Manager is disabled, so we do nothing and return no error. + if manager.disabled { + return nil + } + + // If the manager is not ready, we return its error. + if ready, err := manager.ready(); !ready { + return err + } + + // We purge the queue via the channel. + _, err := manager.channel.QueuePurge(queue, false) + + if err != nil { + return err + } + + return nil +} + +func (manager *mqttManager) DeleteQueue(queue string) error { + // Manager is disabled, so we do nothing and return no error. + if manager.disabled { + return nil + } + + // If the manager is not ready, we return its error. + if ready, err := manager.ready(); !ready { + return err + } + + // We delete the queue via the channel. + _, err := manager.channel.QueueDelete(queue, false, false, false) + + if err != nil { + return err + } + + return nil +} + +func (manager *mqttManager) DeleteExchange(exchange string) error { + // Manager is disabled, so we do nothing and return no error. + if manager.disabled { + return nil + } + + // If the manager is not ready, we return its error. + if ready, err := manager.ready(); !ready { + return err + } + + // We delete the exchange via the channel. + return manager.channel.ExchangeDelete(exchange, false, false) +} + +func (manager *mqttManager) SetupFromDefinitions(path string) error { + // Manager is disabled, so we do nothing and return no error. + if manager.disabled { + return nil + } + + // If the manager is not ready, we return its error. + if ready, err := manager.ready(); !ready { + return err + } + + // We read the definitions.json file. + definitions, err := os.ReadFile(path) + if err != nil { + return err + } + + def := new(SchemaDefinitions) + + // We parse the definitions.json file into the corresponding struct. + err = json.Unmarshal(definitions, def) + if err != nil { + return err + } + + for _, queue := range def.Queues { + // We create the queue. + err = manager.CreateQueue(QueueConfig{ + Name: queue.Name, + Durable: queue.Durable, + Exclusive: false, + }) + + if err != nil { + return err + } + } + + for _, exchange := range def.Exchanges { + // We create the exchange. + err = manager.CreateExchange(ExchangeConfig{ + Name: exchange.Name, + Type: ExchangeType(exchange.Type), + Persisted: exchange.Durable, + }) + + if err != nil { + return err + } + } + + for _, binding := range def.Bindings { + // We bind the given exchange to the given queue via the given routing key. + err = manager.BindExchangeToQueueViaRoutingKey(binding.Source, binding.Destination, binding.RoutingKey) + + if err != nil { + return err + } + } + + return nil +} + +func (manager *mqttManager) checkChannel() error { + var err error + + // If the connection is nil or closed, we must request a new channel. + if manager.channel == nil || manager.channel.IsClosed() { + manager.channel, err = manager.connection.Channel() + } + + return err +} + +func (manager *mqttManager) ready() (bool, error) { + // Manager is disabled, so we do nothing and return no error. + if manager.disabled { + return true, nil + } + + // If the connection is nil or closed, we return an error because the manager is not ready. + if manager.connection == nil || manager.connection.IsClosed() { + return false, errConnectionClosed + } + + // We check the channel as it might have been closed, and we need to request a new one. + if err := manager.checkChannel(); err != nil { + return false, err + } + + // If the channel is still nil or closed, we return an error because the manager is not ready. + if manager.channel == nil || manager.channel.IsClosed() { + return false, errChannelClosed + } + + return true, nil +} + +func (manager *mqttManager) GetHost() string { + return manager.Host +} + +func (manager *mqttManager) GetPort() uint { + return manager.Port +} + +func (manager *mqttManager) GetUsername() string { + return manager.Username +} + +func (manager *mqttManager) GetVhost() string { + return manager.Vhost +} + +func (manager *mqttManager) IsDisabled() bool { + return manager.disabled +} diff --git a/manager_options.go b/manager_options.go new file mode 100644 index 0000000..79f4bd0 --- /dev/null +++ b/manager_options.go @@ -0,0 +1,128 @@ +package gorabbit + +import "github.com/Netflix/go-env" + +// ManagerOptions holds all necessary properties to launch a successful connection with an MQTTManager. +type ManagerOptions struct { + // Host is the RabbitMQ server host name. + Host string + + // Port is the RabbitMQ server port number. + Port uint + + // Username is the RabbitMQ server allowed username. + Username string + + // Password is the RabbitMQ server allowed password. + Password string + + // Vhost is used for CloudAMQP connections to set the specific vhost. + Vhost string + + // UseTLS defines whether we use amqp or amqps protocol. + UseTLS bool + + // Mode will specify whether logs are enabled or not. + Mode string +} + +// DefaultManagerOptions will return a ManagerOptions with default values. +func DefaultManagerOptions() *ManagerOptions { + return &ManagerOptions{ + Host: defaultHost, + Port: defaultPort, + Username: defaultUsername, + Password: defaultPassword, + Vhost: defaultVhost, + UseTLS: defaultUseTLS, + Mode: defaultMode, + } +} + +// NewManagerOptions is the exported builder for a ManagerOptions and will offer setter methods for an easy construction. +// Any non-assigned field will be set to default through DefaultManagerOptions. +func NewManagerOptions() *ManagerOptions { + return DefaultManagerOptions() +} + +// NewManagerOptionsFromEnv will generate a ManagerOptions from environment variables. Empty values will be taken as default +// through the DefaultManagerOptions. +func NewManagerOptionsFromEnv() *ManagerOptions { + defaultOpts := DefaultManagerOptions() + + fromEnv := new(RabbitMQEnvs) + + _, err := env.UnmarshalFromEnviron(fromEnv) + if err != nil { + return defaultOpts + } + + if fromEnv.Host != "" { + defaultOpts.Host = fromEnv.Host + } + + if fromEnv.Port > 0 { + defaultOpts.Port = fromEnv.Port + } + + if fromEnv.Username != "" { + defaultOpts.Username = fromEnv.Username + } + + if fromEnv.Password != "" { + defaultOpts.Password = fromEnv.Password + } + + if fromEnv.Vhost != "" { + defaultOpts.Vhost = fromEnv.Vhost + } + + defaultOpts.UseTLS = fromEnv.UseTLS + + return defaultOpts +} + +// SetHost will assign the host. +func (m *ManagerOptions) SetHost(host string) *ManagerOptions { + m.Host = host + + return m +} + +// SetPort will assign the port. +func (m *ManagerOptions) SetPort(port uint) *ManagerOptions { + m.Port = port + + return m +} + +// SetCredentials will assign the username and password. +func (m *ManagerOptions) SetCredentials(username, password string) *ManagerOptions { + m.Username = username + m.Password = password + + return m +} + +// SetVhost will assign the Vhost. +func (m *ManagerOptions) SetVhost(vhost string) *ManagerOptions { + m.Vhost = vhost + + return m +} + +// SetUseTLS will assign the UseTLS status. +func (m *ManagerOptions) SetUseTLS(use bool) *ManagerOptions { + m.UseTLS = use + + return m +} + +// SetMode will assign the mode if valid. +func (m *ManagerOptions) SetMode(mode string) *ManagerOptions { + if isValidMode(mode) { + m.Mode = mode + } + + return m +} diff --git a/model.go b/model.go new file mode 100644 index 0000000..c60a9ef --- /dev/null +++ b/model.go @@ -0,0 +1,129 @@ +package gorabbit + +import ( + amqp "github.com/rabbitmq/amqp091-go" +) + +type SchemaDefinitions struct { + Exchanges []struct { + Name string `json:"name"` + Vhost string `json:"vhost"` + Type string `json:"type"` + Durable bool `json:"durable"` + AutoDelete bool `json:"auto_delete"` + Internal bool `json:"internal"` + Arguments struct { + } `json:"arguments"` + } `json:"exchanges"` + Queues []struct { + Name string `json:"name"` + Vhost string `json:"vhost"` + Durable bool `json:"durable"` + AutoDelete bool `json:"auto_delete"` + Arguments struct { + } `json:"arguments"` + } `json:"queues"` + Bindings []struct { + Source string `json:"source"` + Vhost string `json:"vhost"` + Destination string `json:"destination"` + DestinationType string `json:"destination_type"` + RoutingKey string `json:"routing_key"` + Arguments struct { + } `json:"arguments"` + } `json:"bindings"` +} + +type ExchangeConfig struct { + Name string `yaml:"name"` + Type ExchangeType `yaml:"type"` + Persisted bool `yaml:"persisted"` + Args map[string]interface{} `yaml:"args"` +} + +type QueueConfig struct { + Name string `yaml:"name"` + Durable bool `yaml:"durable"` + Exclusive bool `yaml:"exclusive"` + Args map[string]interface{} `yaml:"args"` + Bindings []BindingConfig `yaml:"bindings"` +} + +type BindingConfig struct { + RoutingKey string `yaml:"routing_key"` + Exchange string `yaml:"exchange"` +} + +type publishingOptions struct { + messagePriority *MessagePriority + deliveryMode *DeliveryMode +} + +func SendOptions() *publishingOptions { + return &publishingOptions{} +} + +func (m *publishingOptions) priority() uint8 { + if m.messagePriority == nil { + return PriorityMedium.Uint8() + } + + return m.messagePriority.Uint8() +} + +func (m *publishingOptions) mode() uint8 { + if m.deliveryMode == nil { + return Persistent.Uint8() + } + + return m.deliveryMode.Uint8() +} + +func (m *publishingOptions) SetPriority(priority MessagePriority) *publishingOptions { + m.messagePriority = &priority + + return m +} + +func (m *publishingOptions) SetMode(mode DeliveryMode) *publishingOptions { + m.deliveryMode = &mode + + return m +} + +type consumptionHealth map[string]bool + +func (s consumptionHealth) IsHealthy() bool { + for _, v := range s { + if !v { + return false + } + } + + return true +} + +func (s consumptionHealth) AddSubscription(queue string, err error) { + s[queue] = err == nil +} + +type mqttPublishing struct { + Exchange string + RoutingKey string + Mandatory bool + Immediate bool + Msg amqp.Publishing +} + +func (m mqttPublishing) HashCode() string { + return m.Msg.MessageId +} + +type RabbitMQEnvs struct { + Host string `env:"RABBITMQ_HOST"` + Port uint `env:"RABBITMQ_PORT"` + Username string `env:"RABBITMQ_USERNAME"` + Password string `env:"RABBITMQ_PASSWORD"` + Vhost string `env:"RABBITMQ_VHOST"` + UseTLS bool `env:"RABBITMQ_USE_TLS"` +} diff --git a/ttl_map.go b/ttl_map.go new file mode 100644 index 0000000..2e8fb6d --- /dev/null +++ b/ttl_map.go @@ -0,0 +1,79 @@ +package gorabbit + +import ( + "sync" + "time" +) + +type ttlMapValue[V any] struct { + value V + createdAt time.Time +} + +type ttlMap[K comparable, V any] struct { + m map[K]ttlMapValue[V] + l sync.Mutex +} + +func newTTLMap[K comparable, V any](ln uint64, maxTTL time.Duration) *ttlMap[K, V] { + m := &ttlMap[K, V]{m: make(map[K]ttlMapValue[V], ln)} + + go func() { + const tickFraction = 3 + + for now := range time.Tick(maxTTL / tickFraction) { + m.l.Lock() + for k := range m.m { + issueDate := m.m[k].createdAt + if now.Sub(issueDate) >= maxTTL { + delete(m.m, k) + } + } + m.l.Unlock() + } + }() + + return m +} + +func (m *ttlMap[K, V]) Len() int { + return len(m.m) +} + +func (m *ttlMap[K, V]) Put(k K, v V) { + m.l.Lock() + + defer m.l.Unlock() + + if _, ok := m.m[k]; !ok { + m.m[k] = ttlMapValue[V]{value: v, createdAt: time.Now()} + } +} + +func (m *ttlMap[K, V]) Get(k K) (V, bool) { + m.l.Lock() + + defer m.l.Unlock() + + v, found := m.m[k] + + innerVal := v.value + + return innerVal, found +} + +func (m *ttlMap[K, V]) ForEach(process func(k K, v V)) { + for key, value := range m.m { + innerVal := value.value + + process(key, innerVal) + } +} + +func (m *ttlMap[K, V]) Delete(k K) { + m.l.Lock() + + defer m.l.Unlock() + + delete(m.m, k) +} diff --git a/utils.go b/utils.go new file mode 100644 index 0000000..c6f77b4 --- /dev/null +++ b/utils.go @@ -0,0 +1,25 @@ +package gorabbit + +import ( + "errors" + + amqp "github.com/rabbitmq/amqp091-go" +) + +// Error Utils. +const ( + codeNotFound = 404 +) + +// isErrorNotFound checks if the error returned by a connection or channel has the 404 code. +func isErrorNotFound(err error) bool { + var amqpError *amqp.Error + + errors.As(err, &amqpError) + + if amqpError == nil { + return false + } + + return amqpError.Code == codeNotFound +}