Skip to content

Commit

Permalink
Merge pull request #10 from harisheoran/feature/history
Browse files Browse the repository at this point in the history
Created some json response helpers
  • Loading branch information
harisheoran authored Dec 14, 2024
2 parents a1e1930 + c77575d commit b3cbd48
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 59 deletions.
1 change: 1 addition & 0 deletions chat-server/cmd/server/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type app struct {
messageController postgre.MessageController
userController postgre.UserController
kafkaProducer *kafka.Writer
kafkaConsumer *kafka.Reader
appConfig *AppConfig
}

Expand Down
44 changes: 38 additions & 6 deletions chat-server/cmd/server/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ import (
this file contains logic for realtime chat
- Web Socket listening and broadcasting
- Redis publishing and subscribing
- Kafka producing
- Kafka producing and consuming
*/

// broadcast the message to every connection
func (app *app) broadcastMessages() {
// var message Message
Expand Down Expand Up @@ -54,12 +53,12 @@ func (app *app) publishToRedis() {
if err != nil {
panic(err)
}
app.infologger.Println("message published to redis")

app.infologger.Println("message published to redis")
}
}

// subscribe to the Redis channel
// subscribe to the Redis channel and pass to the broadcast function
func (app *app) subscribeToRedis() {
// There is no error because go-redis automatically reconnects on error.
pubsub := app.redisConnection.Subscribe(ctx, myChannel)
Expand All @@ -81,6 +80,7 @@ func (app *app) subscribeToRedis() {
}

// produce message to the kafka channel
// subscribe to the redis channel and produce it to kafka
func (app *app) produceToKafka() {

// There is no error because go-redis automatically reconnects on error.
Expand All @@ -104,7 +104,6 @@ func (app *app) produceToKafka() {
app.errorlogger.Println("Unable to unmarshal message:", err)
continue
}
fmt.Println("From Redis subscribed value is: ", message)

// publish to Kafka broker
err = app.kafkaProducer.WriteMessages(context.Background(), kafka.Message{
Expand All @@ -114,8 +113,41 @@ func (app *app) produceToKafka() {
if err != nil {
app.errorlogger.Println("Unable to produce to kafka", err)
} else {
app.infologger.Println("MESSAGE SENT: ", message.Payload)
app.infologger.Println("message produced to kafka", message.Payload)
}
}

}

// consume from kafka and save to the database
func (app *app) consumeFromKafka() {

for {
consumedMessage, err := app.kafkaConsumer.ReadMessage(context.Background())
if err != nil {
app.errorlogger.Println("Could not read message: ", err)
} else {
app.infologger.Println("message comnsumed from kafka", consumedMessage.Value)
}

this := string(consumedMessage.Value[0])
fmt.Printf("THIS %s", this)

/*
messageToSave := model.Message{
Msg: string(consumedMessage.Value),
}
err = app.messageController.InsertMessage(&messageToSave)
if err != nil {
// send json response also here
//
//
app.errorlogger.Println("unable to save the consumed messsage into the database")
}
*/

app.infologger.Println("consumed message saved into db successfully")
}

}
17 changes: 11 additions & 6 deletions chat-server/cmd/server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package main

import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"text/template"

Expand All @@ -18,20 +16,26 @@ Handlers for all the routes present in routes.go file
*/

func (app *app) healthHandler(w http.ResponseWriter, request *http.Request) {
fmt.Fprintf(w, "my chat system's health is OK!. %s", app.appConfig.env, version)
healthResponse := map[string]string{
"message": "Health is Ok!",
"env": app.appConfig.env,
"version": version,
}
app.sendJSON(w, http.StatusOK, healthResponse)
}

// main chat handler which upgrade http / https connection to web socket
func (app *app) chatHandler(w http.ResponseWriter, request *http.Request) {
webSocketConnection, err := upgrader.Upgrade(w, request, nil)
if err != nil {
log.Println("ERROR: upgrading the connection to web socket", err)
app.errorlogger.Println("ERROR: upgrading the connection to web socket", err)
app.internalServerErrorJSONResponse(w)
return
}

app.infologger.Println("connection upgraded to Web Socket")

//defer webSocketConnection.Close()
defer webSocketConnection.Close()

client[webSocketConnection] = true

Expand All @@ -42,7 +46,8 @@ func (app *app) chatHandler(w http.ResponseWriter, request *http.Request) {
var message Message
mt, messageByte, err := connection.ReadMessage()
if err != nil {
log.Printf("ERROR: Unable to read the message from client %v: %v", webSocketConnection.RemoteAddr(), err)
app.errorlogger.Printf("ERROR: Unable to read the message from client %v: %v", webSocketConnection.RemoteAddr(), err)
app.internalServerErrorJSONResponse(w)
delete(client, connection)
return
}
Expand Down
52 changes: 50 additions & 2 deletions chat-server/cmd/server/helper.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,59 @@
package main

import "net/http"
import (
"encoding/json"
"net/http"
)

/*
Helper functions to handle common and repeated tasks
*/
func sendJSONResponse(w http.ResponseWriter, statusCode int, response any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(statusCode)
json.NewEncoder(w).Encode(response)
}

// send json responsei
func (app *app) sendJSON(w http.ResponseWriter, statusCode int, data interface{}) error {
jsonData, err := json.MarshalIndent(data, "", "\t")
if err != nil {
return err
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(statusCode)
w.Write(jsonData)

return nil
}

func (app *app) readJSON(request *http.Request, target interface{}) error {

err := json.NewDecoder(request.Body).Decode(&target)
if err != nil {
return err
}

func (app *app) InternalServerResponse(w http.ResponseWriter) {
return nil
}

/*
Error JSON response helpers
*/

// server error response in JSON
func (app *app) serverErrorJsonResponse(w http.ResponseWriter, statusCode int, data interface{}) {
app.errorlogger.Println(data)
err := app.sendJSON(w, statusCode, "")
if err != nil {
app.errorlogger.Println("Unable to send internal server error response ", err)
w.WriteHeader(500)
}
}

// internal server error response in JSON
func (app *app) internalServerErrorJSONResponse(w http.ResponseWriter) {
message := "The server encountered an Internal Error and could not process the request."
app.serverErrorJsonResponse(w, http.StatusInternalServerError, message)
}
29 changes: 0 additions & 29 deletions chat-server/cmd/server/kafka.go

This file was deleted.

26 changes: 23 additions & 3 deletions chat-server/cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ func main() {
userController: postgre.UserController{
DbConnection: databaseConnection,
},
kafkaProducer: createProducer(),
kafkaProducer: createKafkaProducer(),
kafkaConsumer: createKafkaConsumer(),
appConfig: &appConfig,
}

Expand All @@ -119,6 +120,7 @@ func main() {
go app.subscribeToRedis()
go app.broadcastMessages()
go app.produceToKafka()
// go app.consumeFromKafka()

// start the server
port := fmt.Sprintf(":%d", app.appConfig.port)
Expand Down Expand Up @@ -192,8 +194,8 @@ func kafkaInitialize() (*kafka.Dialer, error) {
return dialer, nil
}

// create a producer
func createProducer() *kafka.Writer {
// create a kafka producer
func createKafkaProducer() *kafka.Writer {
dailer, err := kafkaInitialize()
if err != nil {
log.Println("unable to authenticate or initialize with Kafka: ", err)
Expand All @@ -208,3 +210,21 @@ func createProducer() *kafka.Writer {

return producer
}

// create a kafka consumer
func createKafkaConsumer() *kafka.Reader {

dialer, err := kafkaInitialize()
if err != nil {
log.Println("unable to authenticate or initialize with Kafka: ", err)
}

// init consumer
consumer := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{kafkaUrl},
Topic: TOPIC_NAME,
Dialer: dialer,
})

return consumer
}
2 changes: 1 addition & 1 deletion chat-server/cmd/server/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func (app *app) router() http.Handler {
mainRouter.HandleFunc("/v1/home", app.homeHandler)
mainRouter.HandleFunc("/v1/health", app.healthHandler)
mainRouter.HandleFunc("/v1/chat", app.chatHandler)
mainRouter.HandleFunc("/v1/auth", app.authHandler)
mainRouter.HandleFunc("/v1/auth", app.authHandler).Methods("POST")

return mainRouter
}
12 changes: 0 additions & 12 deletions chat-server/cmd/server/utils.go

This file was deleted.

1 change: 1 addition & 0 deletions chat-server/tmp/build-errors.log
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1
exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1exit status 1

0 comments on commit b3cbd48

Please sign in to comment.