diff --git a/camel/src/main/java/com/github/theprez/manzan/configuration/DataConfig.java b/camel/src/main/java/com/github/theprez/manzan/configuration/DataConfig.java index b95315b..a4a81ce 100644 --- a/camel/src/main/java/com/github/theprez/manzan/configuration/DataConfig.java +++ b/camel/src/main/java/com/github/theprez/manzan/configuration/DataConfig.java @@ -4,6 +4,8 @@ import java.io.File; import java.io.IOException; import java.util.LinkedHashMap; +import java.util.HashMap; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -13,6 +15,7 @@ import com.github.theprez.jcmdutils.StringUtils; import com.github.theprez.manzan.ManzanEventType; +import com.github.theprez.manzan.ManzanEventType; import com.github.theprez.manzan.WatchStarter; import com.github.theprez.manzan.routes.ManzanRoute; import com.github.theprez.manzan.routes.event.FileEvent; @@ -22,6 +25,8 @@ import com.ibm.as400.access.ErrorCompletingRequestException; import com.ibm.as400.access.ObjectDoesNotExistException; +import static com.github.theprez.manzan.routes.ManzanRoute.createRecipientList; + public class DataConfig extends Config { private final static int DEFAULT_INTERVAL = 5; @@ -46,6 +51,9 @@ public synchronized Map getRoutes() throws IOException, AS4 return m_routes; } final Map ret = new LinkedHashMap(); + final List watchEvents = new ArrayList<>(); + final String schema = ApplicationConfig.get().getLibrary(); + for (final String section : getIni().keySet()) { final String type = getIni().get(section, "type"); if (StringUtils.isEmpty(type)) { @@ -53,14 +61,15 @@ public synchronized Map getRoutes() throws IOException, AS4 } if ("false".equalsIgnoreCase(getIni().get(section, "enabled"))) { continue; + } else if ( type.equals("watch")){ + // We will handle the watch events separately as the logic is a bit more complicated + watchEvents.add(section); + continue; } final String name = section; - final String schema = ApplicationConfig.get().getLibrary(); final String format = getOptionalString(name, "format"); int userInterval = getOptionalInt(name, "interval"); final int interval = userInterval != -1 ? userInterval : DEFAULT_INTERVAL; - int userNumToProcess = getOptionalInt(name, "numToProcess"); - final int numToProcess = userNumToProcess != -1 ? userNumToProcess : DEFAULT_NUM_TO_PROCESS; final List destinations = new LinkedList(); for (String d : getRequiredString(name, "destinations").split("\\s*,\\s*")) { d = d.trim(); @@ -73,28 +82,6 @@ public synchronized Map getRoutes() throws IOException, AS4 } } switch (type) { - case "watch": - String id = getRequiredString(name, "id"); - String strwch = getRequiredString(name, "strwch"); - String sqlRouteName = name + "sql"; - String socketRouteName = name + "socket"; - - ManzanEventType eventType; - if(strwch.contains("WCHMSGQ")) { - eventType = ManzanEventType.WATCH_MSG; - } else if(strwch.contains("WCHLICLOG")) { - eventType = ManzanEventType.WATCH_VLOG; - } else if(strwch.contains("WCHPAL")) { - eventType = ManzanEventType.WATCH_PAL; - } else { - throw new RuntimeException("Watch for message, LIC log entry, or PAL entry not specified"); - } - - ret.put(sqlRouteName, new WatchMsgEventSql(sqlRouteName, id, format, destinations, schema, eventType, interval, numToProcess)); - ret.put(socketRouteName, new WatchMsgEventSockets(socketRouteName, format, destinations, schema, interval, numToProcess)); - WatchStarter ws = new WatchStarter(id, strwch); - ws.strwch(); - break; case "file": String file = getRequiredString(name, "file"); String filter = getOptionalString(name, "filter"); @@ -104,7 +91,62 @@ public synchronized Map getRoutes() throws IOException, AS4 throw new RuntimeException("Unknown destination type: " + type); } } + + // We will create a formatMap to store the format for each watch session, as well + // as a destMap to store the destinations for each watch session + final Map formatMap = new HashMap<>(); + final Map destMap = new HashMap<>(); + + for (int i = 0; i < watchEvents.size(); i++) { + final String section = watchEvents.get(i); + final String name = section; + int userNumToProcess = getOptionalInt(name, "numToProcess"); + final int numToProcess = userNumToProcess != -1 ? userNumToProcess : DEFAULT_NUM_TO_PROCESS; + final String format = getOptionalString(name, "format"); + String strwch = getRequiredString(name, "strwch"); + String id = getRequiredString(name, "id"); + + ManzanEventType eventType; + if(strwch.contains("WCHMSGQ")) { + eventType = ManzanEventType.WATCH_MSG; + } else if(strwch.contains("WCHLICLOG")) { + eventType = ManzanEventType.WATCH_VLOG; + } else if(strwch.contains("WCHPAL")) { + eventType = ManzanEventType.WATCH_PAL; + } else { + throw new RuntimeException("Watch for message, LIC log entry, or PAL entry not specified"); + } + + int userInterval = getOptionalInt(name, "interval"); + final int interval = userInterval != -1 ? userInterval : DEFAULT_INTERVAL; + final List destinations = new LinkedList(); + for (String d : getRequiredString(name, "destinations").split("\\s*,\\s*")) { + d = d.trim(); + if (!m_destinations.contains(d)) { + throw new RuntimeException( + "No destination configured named '" + d + "' for data source '" + name + "'"); + } + if (StringUtils.isNonEmpty(d)) { + destinations.add(d); + } + } + + // Build the maps + String destString = createRecipientList(destinations); + formatMap.put(id.toUpperCase(), format); + destMap.put(id.toUpperCase(), destString); + + String sqlRouteName = name + "sql"; + ret.put(sqlRouteName, new WatchMsgEventSql(sqlRouteName, id, format, destinations, schema, eventType, interval, numToProcess)); + WatchStarter ws = new WatchStarter(id, strwch); + ws.strwch(); + } + + if (watchEvents.size() > 0){ + // After iterating over the loop, the formatMap and destMap are complete. Now create the route. + final String routeName = "socketWatcher"; + ret.put(routeName, new WatchMsgEventSockets(routeName, formatMap, destMap)); + } return m_routes = ret; } - } diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/ManzanRoute.java b/camel/src/main/java/com/github/theprez/manzan/routes/ManzanRoute.java index 4fc6f62..a7c4577 100644 --- a/camel/src/main/java/com/github/theprez/manzan/routes/ManzanRoute.java +++ b/camel/src/main/java/com/github/theprez/manzan/routes/ManzanRoute.java @@ -104,6 +104,13 @@ protected String getString(final Exchange _exchange, final String _attr) { } protected void setRecipientList(final List _destinations) throws IOException { + m_recipientList = createRecipientList(_destinations); + if (StringUtils.isEmpty(m_recipientList)) { + throw new IOException("Message watch for '" + m_name + "' has no valid destinations"); + } + } + + public static String createRecipientList(final List _destinations) throws IOException { String destinationsStr = ""; for (final String dest : _destinations) { if (StringUtils.isEmpty(dest)) { @@ -111,10 +118,6 @@ protected void setRecipientList(final List _destinations) throws IOExcep } destinationsStr += "direct:" + dest.toLowerCase().trim() + ","; } - m_recipientList = destinationsStr.replaceFirst(",$", "").trim(); - - if (StringUtils.isEmpty(m_recipientList)) { - throw new IOException("Message watch for '" + m_name + "' has no valid destinations"); - } + return destinationsStr.replaceFirst(",$", "").trim(); } } diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSockets.java b/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSockets.java index f0c30ed..e8e55ac 100644 --- a/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSockets.java +++ b/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSockets.java @@ -1,28 +1,25 @@ package com.github.theprez.manzan.routes.event; -import java.io.IOException; -import java.util.List; import java.util.Map; import org.apache.camel.model.dataformat.JsonLibrary; -import com.github.theprez.jcmdutils.StringUtils; import com.github.theprez.manzan.ManzanEventType; import com.github.theprez.manzan.ManzanMessageFormatter; import com.github.theprez.manzan.routes.ManzanRoute; public class WatchMsgEventSockets extends ManzanRoute { - private final ManzanMessageFormatter m_formatter; + private final Map m_formatMap; + private final Map m_destMap; private final String m_socketIp = "0.0.0.0"; private final String m_socketPort = "8080"; - public WatchMsgEventSockets(final String _name, final String _format, - final List _destinations, final String _schema, final int _interval, final int _numToProcess) - throws IOException { + public WatchMsgEventSockets(final String _name, final Map _formatMap, + final Map _destMap) { super(_name); - m_formatter = StringUtils.isEmpty(_format) ? null : new ManzanMessageFormatter(_format); - super.setRecipientList(_destinations); + m_formatMap = _formatMap; + m_destMap = _destMap; } //@formatter:off @@ -32,16 +29,22 @@ public void configure() { .unmarshal().json(JsonLibrary.Jackson, Map.class) .routeId("manzan_msg:"+m_name) .setHeader(EVENT_TYPE, constant(ManzanEventType.WATCH_MSG)) - .setHeader("session_id", simple("${body[sessionId]}")) + .setHeader("session_id", simple("${body[SESSION_ID]}")) .setHeader("data_map", simple("${body}")) .marshal().json(true) //TODO: skip this if we are applying a format .setBody(simple("${body}\n")) .process(exchange -> { - if (null != m_formatter) { + String sessionId = exchange.getIn().getHeader("session_id", String.class); + String format = m_formatMap.get(sessionId); + if (format != null) { + ManzanMessageFormatter m_formatter = new ManzanMessageFormatter(format); exchange.getIn().setBody(m_formatter.format(getDataMap(exchange))); } + String destinations = m_destMap.get(sessionId); // Get destinations from m_destMap + exchange.getIn().setHeader("destinations", destinations); }) - .recipientList(constant(getRecipientList())).parallelProcessing().stopOnException().end(); + .recipientList(header("destinations")) + .parallelProcessing().stopOnException().end(); } //@formatter:on } \ No newline at end of file