Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/watermill #199

Merged
merged 17 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,40 @@ SQLITE_FILEPATH=database/golang-api.db

#JWT
JWT_SECRET=ThisIsKey

# Message queue config
MQ_TRACK=true
MQ_DEBUG=true
DEAD_LETTER_QUEUE=dead_queue
HANDLER_NAME=handler

# sql
MQ_DIALECT=sql
MQ_DB_DIALECT=postgres
MQ_DB_HOST=localhost
MQ_DB_PORT=5432
MQ_DB_USERNAME=golang-api
MQ_DB_PASSWORD=golang-api
MQ_DB_NAME=golang-api
MQ_DB_QUERYSTRING=sslmode=disable

# Rabbitmq
# MQ_DIALECT=amqp
# AMQB_URI=amqp://guest:guest@localhost:5672/

# Redis
# MQ_DIALECT=redis
# REDIS_HOST=
# CONSUMER_GROUP=
# REDIS_USERNAME=
# REDIS_PASSWORD=

# kafka
# MQ_DIALECT=kafka
# KAFKA_BROKER=
# CONSUMER_GROUP=

# googleCloud
# MQ_DIALECT=googlecloud
# PROJECT_ID=
# SUBSCRIPTION_ID=
76 changes: 75 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

- [Migrations](#migrations)

- [Messaging queue](#messaging-queue)

- [Code Walk-through](#code-walk-through)
- [Config](#config)
- [Command](#command)
Expand Down Expand Up @@ -87,8 +89,80 @@ Migrations are like **version control for your database**, allowing your team to
- **RUN :** To run migration there is two command `make migration-up` && `make migration-down`.
- Migration needs `-- +migrate Up` and `-- +migrate Down` respectively in starting of files, this is required because we are using [sql-migrate](https://github.com/rubenv/sql-migrate) package

### **Messaging Queue**
- We are using [watermill](https://watermill.io/) package for messaging queue.
- Watermill is a Golang library for working efficiently with message streams. It is intended for building event-driven applications.


- #### Multiple Message Queue Broker Support
- We are supporting 5 types of message queue broker at this time `rabbitmq`, `redis`, `googleCloud`,`sql(postgres,mysql)` & `kafka`
- It allows us to switch to message queue broker without changing too much stuff.
- Watermill package allows us to do that.
- We have environment variable `MQ_DIALECT` where you can set to message queue broker type.
- Need to change env accoeding to message queue broker.
- #### Creating An Worker
- All of the workers for your application are stored in the `cli/workers` directory.
- To create a new job add a new file into the `cli/workers` directory.
- #### Class Structure
- Workers class are very simple, consisting of a single method `Handle`. `Handle` executes when a message is received.
- The `Handle()` method should return an `error` if the job fails.
```go
type WelcomeMail struct {
FirstName string
LastName string
Email string
Roles string
}
// Handle executes the job.
func (w WelcomeMail) Handle() error {
return nil
}
```
- #### Register Worker
- After creating the struct, you need to register it in `cli/workers/worker_handler.go`, so that it can be called correctly.
- To register a new worker add struct to `RegisterWorkerStruct` function.
```go
func RegisterWorkerStruct() []interface{} {
return []interface{}{
WelcomeMail{},
// ...
}
}
```
- #### Command to run worker
```go
go run app.go worker --retry-delay 400 --retry-count 3 --topic user
// --retry-delay 400 --retry-count 3 are optional
// --retry-delay 400 means it will retry after 400ms
// --retry-count 3 means it will retry 3 times

```

- #### Publish Message
- The `InitPubliser` function initializes a `WatermillPubliser` based on the provided configuration.
```go
pub, err := watermill.InitPubliser(cfg)
if err != nil {
// Handle error
}
```
- The `Publish` method on `WatermillPubliser` is used to publish a message to a specific topic(queue name). The message is encoded using the Go `encoding/gob` package before being sent.
```go
// Worker struct must be registered before publishing
err := pub.Publish(topic, workerStruct)
if err != nil {
// Handle error
}
```
- #### Dead Letter Queue
- The `dead letter queue`, also known as the `poison queue` in watermill, is a designated destination for messages that have failed to undergo processing by a consumer.
chintansakhiya marked this conversation as resolved.
Show resolved Hide resolved
- The name of this queue is specified in the `DEAD_LETTER_QUEUE` environment variable, we are storing failed job into database.
- Command to run dead letter queue
```go
go run app.go dead-letter-queue
```

---
### **Code Walk-through**
- #### Config:
- We are using [envconfig](https://github.com/kelseyhightower/envconfig) which binds env values to struct elements.
Expand Down Expand Up @@ -165,7 +239,7 @@ Migrations are like **version control for your database**, allowing your team to
- Each controller must have their struct which contains model object, that will be use to call function of models.
- #### Utils:
- We have define some common methods inside `utils` like json response.
- We need common function which handles json response that's why we have created (json_response.go)[utils/json_response.go]
- We need common function which handles json response that's why we have created [json_response.go](utils/json_response.go)
- Similarly we have created different files for different use in utils.
---
### **Testcases**
Expand Down
7 changes: 6 additions & 1 deletion cli/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/Improwised/golang-api/config"
"github.com/Improwised/golang-api/database"
"github.com/Improwised/golang-api/pkg/events"
"github.com/Improwised/golang-api/pkg/watermill"
"github.com/Improwised/golang-api/routes"
"github.com/gofiber/fiber/v2"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -43,8 +44,12 @@ func GetAPICommandDef(cfg config.AppConfig, logger *zap.Logger) cobra.Command {
return err
}

pub, err := watermill.InitPublisher(cfg,false)
if err != nil {
return err
}
// setup routes
err = routes.Setup(app, db, logger, cfg, events, promMetrics)
err = routes.Setup(app, db, logger, cfg, events, promMetrics, pub)
if err != nil {
return err
}
Expand Down
41 changes: 41 additions & 0 deletions cli/dead_letter_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package cli

import (
"go.uber.org/zap"

"github.com/Improwised/golang-api/cli/workers"
"github.com/Improwised/golang-api/config"
"github.com/Improwised/golang-api/pkg/watermill"
"github.com/spf13/cobra"
)

type DeadLetterQ struct {
Handler string `json:"handler_poisoned"`
Reason string `json:"reason_poisoned"`
Subscriber string `json:"subscriber_poisoned"`
Topic string `json:"topic_poisoned"`
}

// GetAPICommandDef runs app
func GetDeadQueueCommandDef(cfg config.AppConfig, logger *zap.Logger) cobra.Command {

workerCommand := cobra.Command{
Use: "dead-letter-queue",
Short: "To start dead-letter queue",
Long: `This queue is used to store failed job from all worker`,
RunE: func(cmd *cobra.Command, args []string) error {

// Init worker
subscriber, err := watermill.InitSubscriber(cfg, true)
if err != nil {
return err
}

// run worker with topic(queue name) and process function
// it will run failed job until it get success
err = subscriber.Run(cfg.MQ.DeadLetterQ, "dead_letter_queue", workers.Process)
return err
},
}
return workerCommand
}
7 changes: 6 additions & 1 deletion cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@ import (
func Init(cfg config.AppConfig, logger *zap.Logger) error {
migrationCmd := GetMigrationCommandDef(cfg)
apiCmd := GetAPICommandDef(cfg, logger)
workerCmd := GetWorkerCommandDef(cfg, logger)
workerCmd.PersistentFlags().Int("retry-delay", 100, "time intertval for two retry in ms")
workerCmd.PersistentFlags().Int("retry-count", 3, "number of retry")
workerCmd.PersistentFlags().String("topic", "", "topic name(queue name)")

deadQueueCmd := GetDeadQueueCommandDef(cfg, logger)
rootCmd := &cobra.Command{Use: "golang-api"}
rootCmd.AddCommand(&migrationCmd, &apiCmd)
rootCmd.AddCommand(&migrationCmd, &apiCmd, &workerCmd, &deadQueueCmd)
return rootCmd.Execute()
}
56 changes: 56 additions & 0 deletions cli/worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package cli

import (
"go.uber.org/zap"

"github.com/Improwised/golang-api/cli/workers"
"github.com/Improwised/golang-api/config"
"github.com/Improwised/golang-api/pkg/watermill"
"github.com/spf13/cobra"
)

// GetAPICommandDef runs app

func GetWorkerCommandDef(cfg config.AppConfig, logger *zap.Logger) cobra.Command {

workerCommand := cobra.Command{
Use: "worker",
Short: "To start worker",
Long: `To start worker`,
RunE: func(cmd *cobra.Command, args []string) error {
// Get details from flag
topic, err := cmd.Flags().GetString("topic")
if err != nil {
return err
}

retryCount, err := cmd.Flags().GetInt("retry-count")
if err != nil {
return err
}

delay, err := cmd.Flags().GetInt("retry-delay")
if err != nil {
return err
}

// Init subscriber
subscriber, err := watermill.InitSubscriber(cfg, false)
if err != nil {
return err
}

// init router for add middleware,retry count,etc
router, err := subscriber.InitRouter(cfg, delay, retryCount)
if err != nil {
return err
}

// run worker with topic(queue name) and process function
err = router.Run(topic, cfg.MQ.HandlerName, workers.Process)
return err

},
}
return workerCommand
}
32 changes: 32 additions & 0 deletions cli/workers/user.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package workers

import (
"log"

helpers "github.com/Improwised/golang-api/helpers/smtp"
)

type WelcomeMail struct {
FirstName string `json:"first_name" `
LastName string `json:"last_name"`
Email string `json:"email"`
Roles string `json:"roles"`
}

func (w WelcomeMail) Handle() error {
log.Println("sending mail")

smtp := helpers.NewSMTPHelper("localhost", "2525", "root", "pass")
smtp.SetSubject("welocme")
smtp.SetPlainBody([]byte("welcome to our org"))

smtp.SetSender("support@improwised.com")
smtp.SetReceivers([]string{w.Email})

if err := smtp.SendMail(); err != nil {
return err
}

log.Printf("mail send to %v", w.Email)
return nil
}
45 changes: 45 additions & 0 deletions cli/workers/worker_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package workers

import (
"bytes"
"encoding/gob"

"github.com/ThreeDotsLabs/watermill/message"
)

func init() {
for _, v := range RegisterWorkerStruct() {
gob.Register(v)
}
}

// Register all worker struct here befour run worker for proper unmarshalling
func RegisterWorkerStruct() []interface{} {
return []interface{}{
WelcomeMail{},
// ...
}
}

// Handler interface for all worker struct
type Handler interface {
Handle() error
}

// process all worker struct and call Handle function according to struct
func Process(msg *message.Message) error {
buf := bytes.NewBuffer(msg.Payload)
dec := gob.NewDecoder(buf)

var result Handler
err := dec.Decode(&result)
if err != nil {
return err
}

if err := result.Handle(); err != nil {
return err
}
msg.Ack()
return nil
}
1 change: 1 addition & 0 deletions config/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type AppConfig struct {
Port string `envconfig:"APP_PORT"`
Secret string `envconfig:"JWT_SECRET"`
DB DBConfig
MQ MQConfig
}

// GetConfig Collects all configs
Expand Down
Loading
Loading