Skip to content

Commit

Permalink
Properly propagate general configuration to providers loaded from a r…
Browse files Browse the repository at this point in the history
…egistry.

Removed superfluous Semaphore in the ListRecords scenario.
Use a ScheduledThreadPoolExecutor for managing the number of threads, instead of a Semaphore.
Give the thread the name of the provider its processing.
  • Loading branch information
menzowindhouwer committed Nov 15, 2016
1 parent ee9c295 commit 269f4e3
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 68 deletions.
23 changes: 0 additions & 23 deletions src/main/java/ORG/oclc/oai/harvester2/verb/ListRecords.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@
public class ListRecords extends HarvesterVerb {
private static Logger logger = LogManager.getLogger(ListRecords.class);

private static Semaphore semaphore = new Semaphore(1);

/**
* Mock object constructor (for unit testing purposes)
*/
Expand Down Expand Up @@ -109,27 +107,6 @@ public ListRecords(String baseURL, String resumptionToken, int timeout, Path tem
super(getRequestURL(baseURL, resumptionToken), timeout, temp);
}

public void harvest(String requestURL, int timeout, Path temp) throws MalformedURLException, IOException {
if (semaphore!=null) {
for (;;) {
try {
logger.debug("request ListRecords verb");
semaphore.acquire();
logger.debug("acquired ListRecords verb");
break;
} catch (InterruptedException e) { }
}
}
try {
super.harvest(requestURL, timeout, temp);
} finally {
if (semaphore!=null) {
semaphore.release();
logger.debug("released ListRecords verb");
}
}
}

/**
* Get the oai:resumptionToken from the response
*
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/nl/mpi/oai/harvester/control/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,11 @@ private void parseProviders(Node base) throws
provider.setMaxRetryCount(maxRetryCount);
provider.setRetryDelay(retryDelay);
provider.setExclusive(exclusive);
} else {
provider.setTimeout(getTimeout());
provider.setMaxRetryCount(getMaxRetryCount());
provider.setRetryDelay(getRetryDelay());
provider.setExclusive(false);
}
providers.add(provider);
}
Expand Down
19 changes: 8 additions & 11 deletions src/main/java/nl/mpi/oai/harvester/control/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import javax.xml.xpath.XPathExpressionException;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;


/**
Expand All @@ -45,25 +47,20 @@ public class Main {

private static void runHarvesting(Configuration config) {
config.log();

ExecutorService executor = new ScheduledThreadPoolExecutor(config.getMaxJobs());

// Start a new worker thread for each provider. The Worker class
// is responsible for honouring the configured limit of
// concurrent worker threads.
Worker.setConcurrentLimit(config.getMaxJobs());
// create a CycleFactory
CycleFactory factory = new CycleFactory();
// get a cycle based on the overview file
File OverviewFile = new File (config.getOverviewFile());
Cycle cycle = factory.createCycle(OverviewFile);

for (Provider provider : config.getProviders()) {

// create a new working, passing one and the same for each cycle
Worker worker = new Worker(
provider, config.getActionSequences(), cycle);

worker.startWorker();


// create a new worker, passing one and the same for each cycle
Worker worker = new Worker(provider, config.getActionSequences(), cycle);
executor.execute(worker);
}
}

Expand Down
37 changes: 3 additions & 34 deletions src/main/java/nl/mpi/oai/harvester/control/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ class Worker implements Runnable {

private static final Logger logger = LogManager.getLogger(Worker.class);

/** A standard semaphore is used to track the number of running threads. */
private static Semaphore semaphore;

/** The provider this worker deals with. */
private final Provider provider;

Expand All @@ -65,15 +62,6 @@ retrieve each record in the list individually. ListRecords: skip the
// kj: annotate
Endpoint endpoint;

/**
* Set the maximum number of concurrent worker threads.
*
* @param num number of running threads that may not be exceeded
*/
public static void setConcurrentLimit(int num) {
semaphore = new Semaphore(num);
}

/**
* Associate a provider and action actionSequences with a scenario
*
Expand All @@ -97,29 +85,14 @@ public Worker(Provider provider, List<ActionSequence> actionSequences,
this.scenarioName = endpoint.getScenario();
}

/**
* <br>Start this worker thread <br><br>
*
* This method will block for as long as necessary until a thread can be
* started without violating the limit.
*/
public void startWorker() {
for (;;) {
try {
semaphore.acquire();
break;
} catch (InterruptedException e) { }
}
Thread t = new Thread(this);
t.start();
}

@Override
public void run() {
Throwable t = null;
try {
logger.debug("Welcome to OAI Harvest Manager worker!");
provider.init();

Thread.currentThread().setName(provider.getName());

// setting specific log filename
ThreadContext.put("logFileName", Util.toFileFormat(provider.getName()).replaceAll("/",""));
Expand Down Expand Up @@ -232,12 +205,8 @@ public void run() {
else
logger.info("Processing finished for " + provider);

semaphore.release();

logger.debug("Goodbye from OAI Harvest Manager worker!");
}
}

}


}

0 comments on commit 269f4e3

Please sign in to comment.