Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
mabdh committed Jul 10, 2024
1 parent 49df13d commit 2791bde
Show file tree
Hide file tree
Showing 16 changed files with 2,392 additions and 2,273 deletions.
28 changes: 16 additions & 12 deletions cli/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,22 +173,26 @@ func InitDeps(
),
}

dispatchServiceRegistry := map[string]notification.Dispatcher{
notification.DispatchKindBulkNotification: notification.NewDispatchBulkNotificationService(
notificationDeps,
notifierRegistry,
routerRegistry,
),
notification.DispatchKindSingleNotification: notification.NewDispatchSingleNotificationService(
// dispatchServiceRegistry := map[string]notification.Dispatcher{
// notification.DispatchKindBulkNotification: notification.NewDispatchBulkNotificationService(
// notificationDeps,
// notifierRegistry,
// routerRegistry,
// ),
// notification.DispatchKindSingleNotification: notification.NewDispatchSingleNotificationService(
// notificationDeps,
// notifierRegistry,
// routerRegistry,
// ),
// }

notificationService := notification.NewService(
notificationDeps,
notification.NewDispatchBulkNotificationService(
notificationDeps,
notifierRegistry,
routerRegistry,
),
}

notificationService := notification.NewService(
notificationDeps,
dispatchServiceRegistry,
)

alertService := alert.NewService(
Expand Down
2 changes: 1 addition & 1 deletion cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func StartServer(ctx context.Context, cfg config.Config) error {
}

// propagating the config explicitly
cfg.Notification.SubscriptionV2Enabled = cfg.Service.SubscriptionV2Enabled
// cfg.Notification.SubscriptionV2Enabled = cfg.Service.SubscriptionV2Enabled

defer cleanUpTelemetry()

Expand Down
4 changes: 1 addition & 3 deletions core/notification/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ type Config struct {
DLQHandler HandlerConfig `mapstructure:"dlq_handler" yaml:"dlq_handler"`
GroupBy []string `mapstructure:"group_by" yaml:"group_by"`

// experimental: derived from service.Config
SubscriptionV2Enabled bool
EnableSilenceFeature bool
EnableSilenceFeature bool
}

type HandlerConfig struct {
Expand Down
107 changes: 102 additions & 5 deletions core/notification/dispatch_bulk_notification_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *DispatchBulkNotificationService) getRouter(notificationRouterKind strin
return selectedRouter, nil
}

func (s *DispatchBulkNotificationService) prepareMetaMessages(ctx context.Context, ns []Notification) (metaMessages []MetaMessage, notificationLogs []log.Notification, err error) {
func (s *DispatchBulkNotificationService) prepareMetaMessages(ctx context.Context, ns []Notification, continueWhenError bool) (metaMessages []MetaMessage, notificationLogs []log.Notification, err error) {
for _, n := range ns {
if err := n.Validate(RouterSubscriber); err != nil {
return nil, nil, err
Expand All @@ -70,7 +70,9 @@ func (s *DispatchBulkNotificationService) prepareMetaMessages(ctx context.Contex
errMessage = fmt.Sprintf("not matching any subscription for notification: %s", string(nJson))
}
s.deps.Logger.Warn(errMessage)
continue
if continueWhenError {
continue
}
}
return nil, nil, err
}
Expand All @@ -97,9 +99,25 @@ func (s *DispatchBulkNotificationService) Dispatch(ctx context.Context, ns []Not
return nil, err
}

metaMessages, notificationLogs, err = s.prepareMetaMessages(ctx, notifications)
if err != nil {
return nil, err
for _, n := range ns {
switch n.Type {
case TypeAlert:
metaMsgs, nfLogs, err := s.fetchMetaMessagesByRouter(ctx, n, RouterSubscriber)
if err != nil {
return nil, err
}
metaMessages = append(metaMessages, metaMsgs...)
notificationLogs = append(notificationLogs, nfLogs...)
case TypeEvent:
metaMsgs, nfLogs, err := s.fetchMetaMessagesForEvents(ctx, n)
if err != nil {
return nil, err
}
metaMessages = append(metaMessages, metaMsgs...)
notificationLogs = append(notificationLogs, nfLogs...)
default:
return nil, errors.ErrInternal.WithMsgf("unknown notification type %s", n.Type)
}
}

if len(metaMessages) == 0 {
Expand Down Expand Up @@ -137,6 +155,56 @@ func (s *DispatchBulkNotificationService) Dispatch(ctx context.Context, ns []Not
return notificationIDs, nil
}

func (s *DispatchBulkNotificationService) fetchMetaMessagesByRouter(ctx context.Context, n Notification, flow string) (metaMessages []MetaMessage, notificationLogs []log.Notification, err error) {
if err := n.Validate(flow); err != nil {
return nil, nil, err
}

router, err := s.getRouter(flow)
if err != nil {
return nil, nil, err
}

// var (
// messages []Message
// notificationLogs []log.Notification
// )

metaMessages, notificationLogs, err = router.PrepareMetaMessages(ctx, n)
if err != nil {
if errors.Is(err, ErrRouteSubscriberNoMatchFound) {
errMessage := fmt.Sprintf("not matching any subscription for notification: %v", n)
nJson, err := json.MarshalIndent(n, "", " ")
if err == nil {
errMessage = fmt.Sprintf("not matching any subscription for notification: %s", string(nJson))
}
s.deps.Logger.Warn(errMessage)
}
return nil, nil, err
}

return metaMessages, notificationLogs, nil

// if len(metaMessages) == 0 && len(notificationLogs) == 0 {
// return nil, fmt.Errorf("something wrong and no messages will be sent with notification: %v", n)
// }

// if err := s.deps.LogService.LogNotifications(ctx, notificationLogs...); err != nil {
// return nil, fmt.Errorf("failed logging notifications: %w", err)
// }

// reducedMetaMessages, err := ReduceMetaMessages(metaMessages, s.deps.Cfg.GroupBy)
// if err != nil {
// return nil, err
// }

// messages, err = s.RenderMessages(ctx, reducedMetaMessages)
// if err != nil {
// return nil, err
// }
// return messages, nil
}

func (s *DispatchBulkNotificationService) getNotifierPlugin(receiverType string) (Notifier, error) {
notifierPlugin, exist := s.notifierPlugins[receiverType]
if !exist {
Expand All @@ -145,6 +213,35 @@ func (s *DispatchBulkNotificationService) getNotifierPlugin(receiverType string)
return notifierPlugin, nil
}

func (s *DispatchBulkNotificationService) fetchMetaMessagesForEvents(ctx context.Context, n Notification) ([]MetaMessage, []log.Notification, error) {
if len(n.ReceiverSelectors) == 0 && len(n.Labels) == 0 {
return nil, nil, errors.ErrInvalid.WithMsgf("no receivers found")
}

var (
metaMessages = []MetaMessage{}
notificationLogs = []log.Notification{}
)
if len(n.ReceiverSelectors) != 0 {
generatedMetaMessages, nfLog, err := s.fetchMetaMessagesByRouter(ctx, n, RouterReceiver)
if err != nil {
return nil, nil, err
}
metaMessages = append(metaMessages, generatedMetaMessages...)
notificationLogs = append(notificationLogs, nfLog...)
}

if len(n.Labels) != 0 {
generatedMetaMessages, nfLog, err := s.fetchMetaMessagesByRouter(ctx, n, RouterSubscriber)
if err != nil {
return nil, nil, err
}
metaMessages = append(metaMessages, generatedMetaMessages...)
notificationLogs = append(notificationLogs, nfLog...)
}
return metaMessages, notificationLogs, nil
}

func (s *DispatchBulkNotificationService) RenderMessages(ctx context.Context, metaMessages []MetaMessage) (messages []Message, err error) {
for _, mm := range metaMessages {
notifierPlugin, err := s.getNotifierPlugin(mm.ReceiverType)
Expand Down
Loading

0 comments on commit 2791bde

Please sign in to comment.