Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job Streaming for PNE/Patchfinder #168

Merged
merged 80 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
2327b02
Added TODOs for job streaming
dylan-mulligan Oct 9, 2023
858f60e
Improved test for debugging
dylan-mulligan Oct 9, 2023
bca9f83
Merge branch 'main' into pne-patchfinder-job-streaming
dylan-mulligan Nov 2, 2023
7b1726f
Implemented basic job streaming (untested)
dylan-mulligan Nov 2, 2023
0aec5f7
Logging cleanup
dylan-mulligan Nov 2, 2023
85299a0
Merge branch 'main' into pne-patchfinder-job-streaming
dylan-mulligan Nov 6, 2023
8fea4ee
Job streaming queue implemented
dylan-mulligan Nov 6, 2023
498bce8
Test fixes
dylan-mulligan Nov 6, 2023
054d97e
More test fixes
dylan-mulligan Nov 6, 2023
36b142b
Converted batch job processing functionality to single job processing…
dylan-mulligan Nov 9, 2023
aebafdf
Added Messenger init logging and re-enable ssl
dylan-mulligan Nov 15, 2023
8d8ae65
Fixed cloning issues and improved rabbit implementation
dylan-mulligan Nov 15, 2023
58ff59b
Manual test messages
dylan-mulligan Nov 15, 2023
314a8de
Test fixes
dylan-mulligan Nov 15, 2023
d40fd52
added missing imports
dylan-mulligan Nov 15, 2023
69dd539
Implemented futures for proper job result collection and updated test…
dylan-mulligan Nov 20, 2023
4a4fa42
Updated default env vars
dylan-mulligan Nov 20, 2023
4018eaf
Patch/fix job streaming
dylan-mulligan Nov 21, 2023
bf98538
Extracted shared messenger component for patch/fix
dylan-mulligan Nov 21, 2023
16916b4
Fix tests
dylan-mulligan Nov 21, 2023
6244221
Remove deprecated methods
dylan-mulligan Nov 21, 2023
19eae52
Changed reconciler to stream cve ids to pne one at a time
ctevse Nov 8, 2023
d937f00
Refactored reconciler so the controller no longer has a dependency on…
ctevse Nov 10, 2023
53199dc
Modified Reconciler to send a stream of cve's to pne
ctevse Nov 20, 2023
f2f1e49
Changed pne messenger to accept object with CVE ID from reconciler
ctevse Nov 20, 2023
0a394fe
Refactored characterizer dependencies into characterizer package
ctevse Nov 21, 2023
b96410c
Refactored reconciler dependencies into reconciler package
ctevse Nov 21, 2023
6b72862
Refactored PNE to accept one job at a time
ctevse Nov 21, 2023
0157524
Fixed tests
ctevse Nov 21, 2023
5be0c25
Added guard against empty messages
ctevse Nov 21, 2023
f2922d7
Update env vars for pf/ff queues
dylan-mulligan Nov 21, 2023
192e287
Update output messages for job streaming to pf/ff
dylan-mulligan Nov 21, 2023
b1ff373
test fix
dylan-mulligan Nov 21, 2023
84585f4
test fixes
dylan-mulligan Nov 21, 2023
6571eb2
test fix
dylan-mulligan Nov 21, 2023
6455f17
test fixes
dylan-mulligan Nov 21, 2023
befc68d
Cleaned up gitcontroller tests to ensure there aren't issues with tes…
ctevse Nov 21, 2023
6a588de
Updated patchfinder to use JUnit5
ctevse Nov 22, 2023
a0dbc02
Disabled tests that are failing for reasons out of our control atm
ctevse Nov 22, 2023
4409786
Polished new rabbit implementation
dylan-mulligan Nov 22, 2023
837bafa
Fixed log
dylan-mulligan Nov 22, 2023
ff69079
Added TODOs for job streaming
dylan-mulligan Oct 9, 2023
85ffffb
Implemented basic job streaming (untested)
dylan-mulligan Nov 2, 2023
6c42532
Logging cleanup
dylan-mulligan Nov 2, 2023
3c8e2d7
Job streaming queue implemented
dylan-mulligan Nov 6, 2023
ce03f70
Test fixes
dylan-mulligan Nov 6, 2023
ffd9fe8
More test fixes
dylan-mulligan Nov 6, 2023
9dc3aee
Converted batch job processing functionality to single job processing…
dylan-mulligan Nov 9, 2023
120cea3
Added Messenger init logging and re-enable ssl
dylan-mulligan Nov 15, 2023
96a4cc7
Fixed cloning issues and improved rabbit implementation
dylan-mulligan Nov 15, 2023
e37a54b
Manual test messages
dylan-mulligan Nov 15, 2023
460bf28
Test fixes
dylan-mulligan Nov 15, 2023
a06cd58
added missing imports
dylan-mulligan Nov 15, 2023
1b3e966
Implemented futures for proper job result collection and updated test…
dylan-mulligan Nov 20, 2023
9b3f72c
Updated default env vars
dylan-mulligan Nov 20, 2023
3f21dab
Patch/fix job streaming
dylan-mulligan Nov 21, 2023
6304587
Extracted shared messenger component for patch/fix
dylan-mulligan Nov 21, 2023
d9759cc
Fix tests
dylan-mulligan Nov 21, 2023
133812b
Remove deprecated methods
dylan-mulligan Nov 21, 2023
172e476
Changed reconciler to stream cve ids to pne one at a time
ctevse Nov 8, 2023
5bacf40
Refactored reconciler so the controller no longer has a dependency on…
ctevse Nov 10, 2023
9049182
Modified Reconciler to send a stream of cve's to pne
ctevse Nov 20, 2023
8324e29
Changed pne messenger to accept object with CVE ID from reconciler
ctevse Nov 20, 2023
b2673d1
Refactored characterizer dependencies into characterizer package
ctevse Nov 21, 2023
409c200
Refactored reconciler dependencies into reconciler package
ctevse Nov 21, 2023
5f6f5eb
Refactored PNE to accept one job at a time
ctevse Nov 21, 2023
59a6709
Fixed tests
ctevse Nov 21, 2023
2dd5e6f
Added guard against empty messages
ctevse Nov 21, 2023
28bc4dc
Update env vars for pf/ff queues
dylan-mulligan Nov 21, 2023
8d05b10
Update output messages for job streaming to pf/ff
dylan-mulligan Nov 21, 2023
5d8cc7e
test fix
dylan-mulligan Nov 21, 2023
9cfd33e
test fixes
dylan-mulligan Nov 21, 2023
067a991
test fix
dylan-mulligan Nov 21, 2023
dfe0e34
test fixes
dylan-mulligan Nov 21, 2023
e30b1de
Cleaned up gitcontroller tests to ensure there aren't issues with tes…
ctevse Nov 21, 2023
c17263c
Updated patchfinder to use JUnit5
ctevse Nov 22, 2023
debcde6
Disabled tests that are failing for reasons out of our control atm
ctevse Nov 22, 2023
dc69c4a
Polished new rabbit implementation
dylan-mulligan Nov 22, 2023
ca54c06
Fixed log
dylan-mulligan Nov 22, 2023
6ebd20b
Merge remote-tracking branch 'origin/pne-patchfinder-job-streaming' i…
dylan-mulligan Nov 22, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions patchfinder/env.list
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,18 @@ RABBIT_HOST=host.docker.internal
RABBIT_PORT=5672
RABBIT_USERNAME=guest
RABBIT_PASSWORD=guest
PF_INPUT_QUEUE=PNE_OUT
PF_INPUT_QUEUE=PNE_OUT_PATCH
FF_INPUT_QUEUE=PNE_OUT_FIX

# --- PATCH FINDER VARS ---
PF_INPUT_MODE=off
PF_INPUT_MODE=rabbit
CVE_LIMIT=20
ADDRESS_BASES=https://www.github.com/,https://www.gitlab.com/
MAX_THREADS=10
CLONE_COMMIT_THRESHOLD=1000
CLONE_COMMIT_LIMIT=50000
CLONE_COMMIT_THRESHOLD=250
CLONE_COMMIT_LIMIT=200000
CLONE_PATH=nvip_data/patch-repos
PATCH_SRC_URL_PATH=nvip_data/source_dict.json

# --- FIX FINDER VARS ---
FF_INPUT_MODE=dev
FF_INPUT_MODE=off
20 changes: 17 additions & 3 deletions patchfinder/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
<version>1.0</version>

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

Expand Down Expand Up @@ -39,10 +37,20 @@
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>17</source>
<target>17</target>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19</version>
<version>2.22.2</version>
<!-- Other configuration options -->
</plugin>
</plugins>
Expand Down Expand Up @@ -132,6 +140,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
Expand Down
44 changes: 24 additions & 20 deletions patchfinder/src/main/java/FixFinderMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
* SOFTWARE.
*/

import db.DatabaseHelper;
import env.FixFinderEnvVars;
import env.SharedEnvVars;
import messenger.Messenger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import fixes.FixFinder;
Expand All @@ -37,6 +40,13 @@
*/
public class FixFinderMain extends Thread {
private final static Logger logger = LogManager.getLogger(FixFinderMain.class);
private final DatabaseHelper databaseHelper;
private final Messenger messenger;

public FixFinderMain(DatabaseHelper dbh, Messenger messenger) {
this.databaseHelper = dbh;
this.messenger = messenger;
}

/**
* Entry point for the FixFinder, initializes necessary classes and start listening for jobs with RabbitMQ
Expand All @@ -45,18 +55,24 @@ public class FixFinderMain extends Thread {
public void run() {
logger.info("Starting FixFinder...");

// Init FixFinder
FixFinder.init();
// Get input mode
final String inputMode = FixFinderEnvVars.getInputMode();

// Determine run mode and start PatchFinder
switch (FixFinderEnvVars.getInputMode()) {
switch (inputMode) {
case "db":
// Init FixFinder
FixFinder.init(this.databaseHelper);
runDb();
break;
case "rabbit":
// Init FixFinder
FixFinder.init(this.databaseHelper);
runRabbit();
break;
case "dev":
// Init FixFinder
FixFinder.init(this.databaseHelper);
runDev();
break;
default:
Expand All @@ -70,32 +86,20 @@ private void runDb() {
List<String> cveIds = new ArrayList<>(FixFinder.getDatabaseHelper().getCves(FixFinderEnvVars.getCveLimit()));
logger.info("Successfully got {} CVEs from the database", cveIds.size());

try {
FixFinder.run(cveIds);
} catch (Exception e) {
logger.error("A fatal error attempting to complete jobs: {}", e.toString());
}
for (String cveId : cveIds) FixFinder.run(cveId);
}

// TODO: Support end message
private void runRabbit() {
// TODO: RabbitMQ integration, wait until PoC is accepted to complete this
throw new UnsupportedOperationException();
// Start job handling
messenger.startHandlingFixJobs(SharedEnvVars.getFixFinderInputQueue());
}

private void runDev() {
// Manually enter CVEs for development
List<String> cveIds = new ArrayList<>();
cveIds.add("CVE-2023-38571");

try {
FixFinder.run(cveIds);
} catch (Exception e) {
logger.error("A fatal error attempting to complete jobs: {}", e.toString());
}
}

public static void main(String[] args) {
FixFinderMain finder = new FixFinderMain();
finder.start();
for (String cveId : cveIds) FixFinder.run(cveId);
}
}
47 changes: 20 additions & 27 deletions patchfinder/src/main/java/PatchFinderMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@
* SOFTWARE.
*/

import db.DatabaseHelper;
import env.PatchFinderEnvVars;
import env.SharedEnvVars;
import messenger.Messenger;
import model.CpeGroup;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.apache.logging.log4j.LogManager;
Expand All @@ -41,6 +42,13 @@
*/
public class PatchFinderMain extends Thread {
private final static Logger logger = LogManager.getLogger(PatchFinderMain.class);
private final DatabaseHelper databaseHelper;
private final Messenger messenger;

public PatchFinderMain(DatabaseHelper dbh, Messenger messenger) {
this.databaseHelper = dbh;
this.messenger = messenger;
}

/**
* Entry point for the PatchFinder, initializes necessary classes and start listening for jobs with RabbitMQ
Expand All @@ -49,7 +57,7 @@ public class PatchFinderMain extends Thread {
public void run() {
logger.info("Starting PatchFinder...");
// Init PatchFinder
PatchFinder.init();
PatchFinder.init(this.databaseHelper);

// Determine run mode and start PatchFinder
switch (PatchFinderEnvVars.getInputMode()) {
Expand All @@ -71,36 +79,21 @@ private void runDb() {
final int affectedProductsCount = affectedProducts.values().stream().map(CpeGroup::getVersionsCount).reduce(0, Integer::sum);
logger.info("Successfully got {} CVEs mapped to {} affected products from the database", affectedProducts.size(), affectedProductsCount);
try {
PatchFinder.run(affectedProducts, PatchFinderEnvVars.getCveLimit());
// TODO: Delegate to threads
for (String cveId : affectedProducts.keySet()) {
PatchFinder.run(cveId, affectedProducts.get(cveId));
}

// When all threads are done, write source dict to file
PatchFinder.writeSourceDict();
} catch (IOException e) {
logger.error("A fatal error attempting to complete jobs: {}", e.toString());
}
}

// TODO: Support end message
private void runRabbit() {
// Start busy-wait loop
final Messenger rabbitMQ = new Messenger(
PatchFinderEnvVars.getRabbitHost(),
PatchFinderEnvVars.getRabbitVHost(),
PatchFinderEnvVars.getRabbitPort(),PatchFinderEnvVars.getRabbitUsername(),
PatchFinderEnvVars.getRabbitPassword(),
PatchFinderEnvVars.getRabbitInputQueue()
);
logger.info("Starting busy-wait loop for jobs...");
while(true) {
try {
// Wait and get jobs
final List<String> jobs = rabbitMQ.waitForProductNameExtractorMessage(PatchFinderEnvVars.getRabbitPollInterval());

// If null is returned, either and error occurred or intentional program quit
if(jobs == null) break;

// Otherwise, run received jobs
PatchFinder.run(jobs);
} catch (IOException | InterruptedException e) {
logger.error("A fatal error occurred during job waiting: {}", e.toString());
break;
}
}
// Start job handling
messenger.startHandlingPatchJobs(SharedEnvVars.getPatchFinderInputQueue());
}
}
28 changes: 26 additions & 2 deletions patchfinder/src/main/java/PatchFixMain.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,30 @@
import db.DatabaseHelper;
import env.SharedEnvVars;
import messenger.Messenger;

public class PatchFixMain {
public static void main(String[] args) {
new PatchFinderMain().start();
new FixFinderMain().start();
SharedEnvVars.initializeEnvVars(false);

// Init dbh
final DatabaseHelper dbh = new DatabaseHelper(
SharedEnvVars.getDatabaseType(),
SharedEnvVars.getHikariUrl(),
SharedEnvVars.getHikariUser(),
SharedEnvVars.getHikariPassword()
);

// Init messenger
final Messenger m = new Messenger(
SharedEnvVars.getRabbitHost(),
SharedEnvVars.getRabbitVHost(),
SharedEnvVars.getRabbitPort(),
SharedEnvVars.getRabbitUsername(),
SharedEnvVars.getRabbitPassword()
);

// Init and start Patchfinder/Fixfinder with dbh and messenger instances
new PatchFinderMain(dbh, m).start();
new FixFinderMain(dbh, m).start();
}
}
41 changes: 33 additions & 8 deletions patchfinder/src/main/java/db/DatabaseHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,10 @@ public Set<String> getExistingPatchCommitShas() {
* Collects a map of CPEs with their correlated CVE and Vuln ID used for
* collecting patches given a list of CVE ids.
*
* @param cveIds CVEs to get affected products for
* @param cveId CVEs to get affected products for
* @return a map of affected products
*/
public Map<String, CpeGroup> getAffectedProducts(List<String> cveIds) {
public Map<String, CpeGroup> getAffectedProducts(String cveId) {
Map<String, CpeGroup> affectedProducts = new HashMap<>();
// Prepare statement
try (Connection conn = getConnection();
Expand All @@ -213,16 +213,14 @@ public Map<String, CpeGroup> getAffectedProducts(List<String> cveIds) {
) {
// Execute correct statement and get result set
ResultSet res = null;
if(cveIds == null) {
if(cveId == null) {
res = getAll.executeQuery();
parseAffectedProducts(affectedProducts, res);
}
else {
for (String cveId : cveIds) {
getById.setString(1, cveId);
res = getById.executeQuery();
parseAffectedProducts(affectedProducts, res);
}
getById.setString(1, cveId);
res = getById.executeQuery();
parseAffectedProducts(affectedProducts, res);
}

} catch (Exception e) {
Expand Down Expand Up @@ -412,6 +410,33 @@ public List<String> getCves(int cveLimit) {
return cves;
}

public int[] insertFixes(Set<Fix> fixes) {
int failedInserts = 0;
int existingInserts = 0;

for (Fix fix : fixes) {
try {
final int result = this.insertFix(fix);
// Result of operation, 0 for OK, 1 for failed, 2 for already exists
switch (result) {
case 1:
failedInserts++;
break;
case 2:
existingInserts++;
break;
default:
break;
}
}
catch (SQLException e) {
logger.error("Failed to insert fix {}: {}", fix, e.toString());
}
}

return new int[] {failedInserts, existingInserts};
}

/**
* Method for inserting a fix into the fixes table
* Should also check for duplicates
Expand Down
Loading
Loading