Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only create one socket listener #175

Merged
merged 13 commits into from
Dec 18, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import java.io.File;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand All @@ -13,6 +15,7 @@

import com.github.theprez.jcmdutils.StringUtils;
import com.github.theprez.manzan.ManzanEventType;
import com.github.theprez.manzan.ManzanEventType;
import com.github.theprez.manzan.WatchStarter;
import com.github.theprez.manzan.routes.ManzanRoute;
import com.github.theprez.manzan.routes.event.FileEvent;
Expand All @@ -22,6 +25,8 @@
import com.ibm.as400.access.ErrorCompletingRequestException;
import com.ibm.as400.access.ObjectDoesNotExistException;

import static com.github.theprez.manzan.routes.ManzanRoute.createRecipientList;

public class DataConfig extends Config {

private final static int DEFAULT_INTERVAL = 5;
Expand All @@ -46,21 +51,25 @@ public synchronized Map<String, ManzanRoute> getRoutes() throws IOException, AS4
return m_routes;
}
final Map<String, ManzanRoute> ret = new LinkedHashMap<String, ManzanRoute>();
final List<String> watchEvents = new ArrayList<>();
final String schema = ApplicationConfig.get().getLibrary();

for (final String section : getIni().keySet()) {
final String type = getIni().get(section, "type");
if (StringUtils.isEmpty(type)) {
throw new RuntimeException("Type not specified for data source [" + section + "]");
}
if ("false".equalsIgnoreCase(getIni().get(section, "enabled"))) {
continue;
} else if ( type.equals("watch")){
// We will handle the watch events separately as the logic is a bit more complicated
watchEvents.add(section);
continue;
}
final String name = section;
final String schema = ApplicationConfig.get().getLibrary();
final String format = getOptionalString(name, "format");
int userInterval = getOptionalInt(name, "interval");
final int interval = userInterval != -1 ? userInterval : DEFAULT_INTERVAL;
int userNumToProcess = getOptionalInt(name, "numToProcess");
final int numToProcess = userNumToProcess != -1 ? userNumToProcess : DEFAULT_NUM_TO_PROCESS;
final List<String> destinations = new LinkedList<String>();
for (String d : getRequiredString(name, "destinations").split("\\s*,\\s*")) {
d = d.trim();
Expand All @@ -73,28 +82,6 @@ public synchronized Map<String, ManzanRoute> getRoutes() throws IOException, AS4
}
}
switch (type) {
case "watch":
String id = getRequiredString(name, "id");
String strwch = getRequiredString(name, "strwch");
String sqlRouteName = name + "sql";
String socketRouteName = name + "socket";

ManzanEventType eventType;
if(strwch.contains("WCHMSGQ")) {
eventType = ManzanEventType.WATCH_MSG;
} else if(strwch.contains("WCHLICLOG")) {
eventType = ManzanEventType.WATCH_VLOG;
} else if(strwch.contains("WCHPAL")) {
eventType = ManzanEventType.WATCH_PAL;
} else {
throw new RuntimeException("Watch for message, LIC log entry, or PAL entry not specified");
}

ret.put(sqlRouteName, new WatchMsgEventSql(sqlRouteName, id, format, destinations, schema, eventType, interval, numToProcess));
ret.put(socketRouteName, new WatchMsgEventSockets(socketRouteName, format, destinations, schema, interval, numToProcess));
WatchStarter ws = new WatchStarter(id, strwch);
ws.strwch();
break;
case "file":
String file = getRequiredString(name, "file");
String filter = getOptionalString(name, "filter");
Expand All @@ -104,7 +91,62 @@ public synchronized Map<String, ManzanRoute> getRoutes() throws IOException, AS4
throw new RuntimeException("Unknown destination type: " + type);
}
}

// We will create a formatMap to store the format for each watch session, as well
// as a destMap to store the destinations for each watch session
final Map<String, String> formatMap = new HashMap<>();
final Map<String, String> destMap = new HashMap<>();

for (int i = 0; i < watchEvents.size(); i++) {
final String section = watchEvents.get(i);
final String name = section;
int userNumToProcess = getOptionalInt(name, "numToProcess");
final int numToProcess = userNumToProcess != -1 ? userNumToProcess : DEFAULT_NUM_TO_PROCESS;
final String format = getOptionalString(name, "format");
String strwch = getRequiredString(name, "strwch");
String id = getRequiredString(name, "id");

ManzanEventType eventType;
if(strwch.contains("WCHMSGQ")) {
eventType = ManzanEventType.WATCH_MSG;
} else if(strwch.contains("WCHLICLOG")) {
eventType = ManzanEventType.WATCH_VLOG;
} else if(strwch.contains("WCHPAL")) {
eventType = ManzanEventType.WATCH_PAL;
} else {
throw new RuntimeException("Watch for message, LIC log entry, or PAL entry not specified");
}

int userInterval = getOptionalInt(name, "interval");
final int interval = userInterval != -1 ? userInterval : DEFAULT_INTERVAL;
final List<String> destinations = new LinkedList<String>();
for (String d : getRequiredString(name, "destinations").split("\\s*,\\s*")) {
d = d.trim();
if (!m_destinations.contains(d)) {
throw new RuntimeException(
"No destination configured named '" + d + "' for data source '" + name + "'");
}
if (StringUtils.isNonEmpty(d)) {
destinations.add(d);
}
}

// Build the maps
String destString = createRecipientList(destinations);
formatMap.put(id.toUpperCase(), format);
destMap.put(id.toUpperCase(), destString);

String sqlRouteName = name + "sql";
ret.put(sqlRouteName, new WatchMsgEventSql(sqlRouteName, id, format, destinations, schema, eventType, interval, numToProcess));
WatchStarter ws = new WatchStarter(id, strwch);
ws.strwch();
}

if (watchEvents.size() > 0){
// After iterating over the loop, the formatMap and destMap are complete. Now create the route.
final String routeName = "socketWatcher";
ret.put(routeName, new WatchMsgEventSockets(routeName, formatMap, destMap));
}
return m_routes = ret;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,20 @@ protected String getString(final Exchange _exchange, final String _attr) {
}

protected void setRecipientList(final List<String> _destinations) throws IOException {
m_recipientList = createRecipientList(_destinations);
if (StringUtils.isEmpty(m_recipientList)) {
throw new IOException("Message watch for '" + m_name + "' has no valid destinations");
}
}

public static String createRecipientList(final List<String> _destinations) throws IOException {
String destinationsStr = "";
for (final String dest : _destinations) {
if (StringUtils.isEmpty(dest)) {
continue;
}
destinationsStr += "direct:" + dest.toLowerCase().trim() + ",";
}
m_recipientList = destinationsStr.replaceFirst(",$", "").trim();

if (StringUtils.isEmpty(m_recipientList)) {
throw new IOException("Message watch for '" + m_name + "' has no valid destinations");
}
return destinationsStr.replaceFirst(",$", "").trim();
}
}
Original file line number Diff line number Diff line change
@@ -1,28 +1,25 @@
package com.github.theprez.manzan.routes.event;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.apache.camel.model.dataformat.JsonLibrary;

import com.github.theprez.jcmdutils.StringUtils;
import com.github.theprez.manzan.ManzanEventType;
import com.github.theprez.manzan.ManzanMessageFormatter;
import com.github.theprez.manzan.routes.ManzanRoute;

public class WatchMsgEventSockets extends ManzanRoute {

private final ManzanMessageFormatter m_formatter;
private final Map<String, String> m_formatMap;
private final Map<String, String> m_destMap;
private final String m_socketIp = "0.0.0.0";
private final String m_socketPort = "8080";

public WatchMsgEventSockets(final String _name, final String _format,
final List<String> _destinations, final String _schema, final int _interval, final int _numToProcess)
throws IOException {
public WatchMsgEventSockets(final String _name, final Map<String, String> _formatMap,
final Map<String, String> _destMap) {
super(_name);
m_formatter = StringUtils.isEmpty(_format) ? null : new ManzanMessageFormatter(_format);
super.setRecipientList(_destinations);
m_formatMap = _formatMap;
m_destMap = _destMap;
}

//@formatter:off
Expand All @@ -32,16 +29,22 @@ public void configure() {
.unmarshal().json(JsonLibrary.Jackson, Map.class)
.routeId("manzan_msg:"+m_name)
.setHeader(EVENT_TYPE, constant(ManzanEventType.WATCH_MSG))
.setHeader("session_id", simple("${body[sessionId]}"))
.setHeader("session_id", simple("${body[SESSION_ID]}"))
.setHeader("data_map", simple("${body}"))
.marshal().json(true) //TODO: skip this if we are applying a format
.setBody(simple("${body}\n"))
.process(exchange -> {
if (null != m_formatter) {
String sessionId = exchange.getIn().getHeader("session_id", String.class);
String format = m_formatMap.get(sessionId);
if (format != null) {
ManzanMessageFormatter m_formatter = new ManzanMessageFormatter(format);
exchange.getIn().setBody(m_formatter.format(getDataMap(exchange)));
}
String destinations = m_destMap.get(sessionId); // Get destinations from m_destMap
exchange.getIn().setHeader("destinations", destinations);
})
.recipientList(constant(getRecipientList())).parallelProcessing().stopOnException().end();
.recipientList(header("destinations"))
.parallelProcessing().stopOnException().end();
}
//@formatter:on
}
Loading