Skip to content

Commit

Permalink
Support optional destinations (#182)
Browse files Browse the repository at this point in the history
Signed-off-by: Sanjula Ganepola <Sanjula.Ganepola@ibm.com>
  • Loading branch information
SanjulaGanepola authored Jan 2, 2025
1 parent bbd3b25 commit af3961c
Showing 1 changed file with 48 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.github.theprez.jcmdutils.StringUtils;
import com.github.theprez.manzan.ManzanEventType;
import com.github.theprez.manzan.ManzanEventType;
import com.github.theprez.manzan.WatchStarter;
import com.github.theprez.manzan.routes.ManzanRoute;
import com.github.theprez.manzan.routes.event.FileEvent;
Expand Down Expand Up @@ -61,7 +60,7 @@ public synchronized Map<String, ManzanRoute> getRoutes() throws IOException, AS4
}
if ("false".equalsIgnoreCase(getIni().get(section, "enabled"))) {
continue;
} else if ( type.equals("watch")){
} else if (type.equals("watch")) {
// We will handle the watch events separately as the logic is a bit more complicated
watchEvents.add(section);
continue;
Expand Down Expand Up @@ -100,50 +99,63 @@ public synchronized Map<String, ManzanRoute> getRoutes() throws IOException, AS4
for (int i = 0; i < watchEvents.size(); i++) {
final String section = watchEvents.get(i);
final String name = section;
int userNumToProcess = getOptionalInt(name, "numToProcess");
final int numToProcess = userNumToProcess != -1 ? userNumToProcess : DEFAULT_NUM_TO_PROCESS;
final String format = getOptionalString(name, "format");
String strwch = getRequiredString(name, "strwch");
String id = getRequiredString(name, "id");

ManzanEventType eventType;
if(strwch.contains("WCHMSGQ")) {
eventType = ManzanEventType.WATCH_MSG;
} else if(strwch.contains("WCHLICLOG")) {
eventType = ManzanEventType.WATCH_VLOG;
} else if(strwch.contains("WCHPAL")) {
eventType = ManzanEventType.WATCH_PAL;
} else {
throw new RuntimeException("Watch for message, LIC log entry, or PAL entry not specified");
}
// Required fields
String id = getRequiredString(name, "id");
String strwch = getRequiredString(name, "strwch");

int userInterval = getOptionalInt(name, "interval");
final int interval = userInterval != -1 ? userInterval : DEFAULT_INTERVAL;
final List<String> destinations = new LinkedList<String>();
for (String d : getRequiredString(name, "destinations").split("\\s*,\\s*")) {
d = d.trim();
if (!m_destinations.contains(d)) {
throw new RuntimeException(
"No destination configured named '" + d + "' for data source '" + name + "'");
String userDestinations = getOptionalString(name, "destinations");
if (userDestinations != null) {
// Optional fields
int userNumToProcess = getOptionalInt(name, "numToProcess");
final int numToProcess = userNumToProcess != -1 ? userNumToProcess : DEFAULT_NUM_TO_PROCESS;
int userInterval = getOptionalInt(name, "interval");
final int interval = userInterval != -1 ? userInterval : DEFAULT_INTERVAL;
final String format = getOptionalString(name, "format");

// Determine the event type
ManzanEventType eventType;
if (strwch.contains("WCHMSGQ")) {
eventType = ManzanEventType.WATCH_MSG;
} else if (strwch.contains("WCHLICLOG")) {
eventType = ManzanEventType.WATCH_VLOG;
} else if (strwch.contains("WCHPAL")) {
eventType = ManzanEventType.WATCH_PAL;
} else {
throw new RuntimeException("Watch for message, LIC log entry, or PAL entry not specified");
}
if (StringUtils.isNonEmpty(d)) {
destinations.add(d);

// Process the destinations
final List<String> destinations = new LinkedList<String>();
for (String d : userDestinations.split("\\s*,\\s*")) {
d = d.trim();
if (!m_destinations.contains(d)) {
throw new RuntimeException(
"No destination configured named '" + d + "' for data source '" + name + "'");
}
if (StringUtils.isNonEmpty(d)) {
destinations.add(d);
}
}
}

// Build the maps
String destString = createRecipientList(destinations);
formatMap.put(id.toUpperCase(), format);
destMap.put(id.toUpperCase(), destString);
// Build the maps
String destString = createRecipientList(destinations);
formatMap.put(id.toUpperCase(), format);
destMap.put(id.toUpperCase(), destString);

String sqlRouteName = name + "sql";
ret.put(sqlRouteName, new WatchMsgEventSql(sqlRouteName, id, format, destinations, schema, eventType,
interval, numToProcess));
}

String sqlRouteName = name + "sql";
ret.put(sqlRouteName, new WatchMsgEventSql(sqlRouteName, id, format, destinations, schema, eventType, interval, numToProcess));
// Create the watcher
WatchStarter ws = new WatchStarter(id, strwch);
ws.strwch();
}

if (watchEvents.size() > 0){
// After iterating over the loop, the formatMap and destMap are complete. Now create the route.
if (watchEvents.size() > 0) {
// After iterating over the loop, the formatMap and destMap are complete. Now
// create the route.
final String routeName = "socketWatcher";
ret.put(routeName, new WatchMsgEventSockets(routeName, formatMap, destMap));
}
Expand Down

0 comments on commit af3961c

Please sign in to comment.