-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathport_scanner.py
204 lines (167 loc) · 7.9 KB
/
port_scanner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
'''importing required modules'''
import socket
from datetime import datetime
import os
import ipaddress
import json
import re
from validators import url
import requests
# # # # # # # # # # # # # # # # # # # # # # # # #
# currently only checks for cves on ssh service #
# and only extracts them from the nist cve api #
# # # # # # # # # # # # # # # # # # # # # # # # #
OUTPUT_FOLDER = 'port_scan_reports'
NIST_API_OUTPUT = 'nist_output.json'
def extract_ssh_version(input_string):
'''version extraction of ssh service'''
# Define a regular expression pattern to match the version
version_pattern = r'OpenSSH_([\d\.]+[a-z]*\d*)'
match = re.search(version_pattern, input_string)
# Check if a match is found
if match:
# Extract the version from the matched group
version = match.group(1)
return version
print('Version not found in the input string.')
return None
def search_cve_by_keyword(keyword):
'''search for cve using nist api'''
search_url = f"https://services.nvd.nist.gov/rest/json/cves/2.0?keywordSearch={keyword}"
try:
response = requests.get(search_url, timeout=10)
response.raise_for_status() # Check for errors
# Save the response content to a file for debugging
with open(NIST_API_OUTPUT, "w", encoding="utf-8") as file:
file.write(response.text)
cve_data = response.json()
return cve_data
except requests.exceptions.RequestException as e:
print(f"Error making API request: {e}")
return None
def extract_cves(output_path):
'''looks through response from nist api for specific cve details'''
with open(NIST_API_OUTPUT, 'r', encoding='utf-8') as file:
data = json.load(file)
# total results
total_results = data.get('totalResults')
print(f"Total CVEs: {total_results}\n")
# list to store each cve as dictionary
cve_list = []
# extract specific data for each cve and append to list
for vulnerability in data['vulnerabilities']:
cve_id = vulnerability['cve']['id']
published = vulnerability['cve']['published']
last_modified = vulnerability['cve']['lastModified']
cve_dict = {
"ID": cve_id,
"Published": published[:19],
"Last Modified": last_modified[:19]
}
# determines whether cvssV3 exists or not, then extracts it
for key in vulnerability['cve']['metrics']:
metrics_len = len(vulnerability.get('cve', {}).get('metrics', {}))
if metrics_len == 1:
if 'cvssMetricV3' in key:
cve_dict['cvssV3'] = vulnerability["cve"]["metrics"][key][0]["cvssData"]["baseScore"]
else:
cve_dict['cvssV2'] = vulnerability["cve"]["metrics"][key][0]["cvssData"]["baseScore"]
elif metrics_len == 2:
if 'cvssMetricV3' in key:
cve_dict['cvssV3'] = vulnerability["cve"]["metrics"][key][0]["cvssData"]["baseScore"]
else:
cve_dict['cvssV2'] = vulnerability["cve"]["metrics"][key][0]["cvssData"]["baseScore"]
cve_list.append(cve_dict)
# sort the list based on cvssV3 (if it exists) then cvssV2
sorted_cve_list = sorted(cve_list, key=lambda x: (x.get('cvssV3', 0), x.get('cvssV2', 0)), reverse=True)
# only print 5 highest cvss rated cves
if total_results > 5:
for cve in sorted_cve_list[:5]:
print(f"ID: {cve['ID']}")
print(f'More details: \nhttps://nvd.nist.gov/vuln/detail/{cve["ID"]}')
print(f'cvssV3: {cve.get("cvssV3", "N/A")}')
print(f'cvssV2: {cve.get("cvssV2", "N/A")}')
print(f"Published: {cve['Published']}")
print(f"Last Modified: {cve['Last Modified']}\n")
print(f"For the rest of the cve:s check {output_path}")
else:
for cve in sorted_cve_list:
print(f"ID: {cve['ID']}")
print(f'More details: \nhttps://nvd.nist.gov/vuln/detail/{cve["ID"]}')
print(f'cvssV3: {cve.get("cvssV3", "N/A")}')
print(f'cvssV2: {cve.get("cvssV2", "N/A")}')
print(f"Published: {cve['Published']}")
print(f"Last Modified: {cve['Last Modified']}\n")
# open the file in write mode and write all the output
with open(output_path, 'a', encoding='utf-8') as result:
# print the sorted details of all CVEs
for cve in sorted_cve_list:
result.write(f"ID: {cve['ID']}\n")
result.write(f'More details: \nhttps://nvd.nist.gov/vuln/detail/{cve["ID"]}\n')
result.write(f'cvssV3: {cve.get("cvssV3", "N/A")}\n')
result.write(f'cvssV2: {cve.get("cvssV2", "N/A")}\n')
result.write(f"Published: {cve['Published']}\n")
result.write(f"Last Modified: {cve['Last Modified']}\n\n")
def port_scanner():
'''function that scans for open ports and running services'''
# check if the output folder exists
if not os.path.exists(OUTPUT_FOLDER):
os.makedirs(OUTPUT_FOLDER)
# ask for target, name output file
while True:
try:
user_target_host = input('Enter target url/ip-address: ')
if url(user_target_host) or ipaddress.ip_address(user_target_host):
auto_output_file = f'{datetime.now().strftime("%Y-%m-%d-%H:%M:%S")}_scan_report_{user_target_host}.txt'
output_path = os.path.join(OUTPUT_FOLDER, auto_output_file)
break
except ValueError:
print('Error: Not a valid url/ip-address.\nMake sure to enter full url...')
# ask for port range
while True:
try:
user_starting_port = int(input('Enter a starting port number: '))
if 0 <= user_starting_port <= 65535:
break
print('Error: Port number must be between 0 and 65535.')
except ValueError:
print('Error: Not a valid number.')
while True:
try:
user_ending_port = int(input('Enter an ending port number: '))
if user_starting_port <= user_ending_port <= 65535:
break
print(f'Error: Port number must be between {user_starting_port} and 65535.')
except ValueError:
print('Error: Not a valid number.')
print(f'\nStarting port scan on {user_target_host}...')
with open(output_path, mode='a', encoding='utf-8') as result:
result.write(f'Port scanning results for {user_target_host}:\nPORT NUMBER : STATUS : SERVICE\n\n')
for port in range(user_starting_port, user_ending_port + 1):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
protocol = 'tcp'
sock.settimeout(1)
try:
with socket.create_connection((user_target_host, port), timeout=2) as s:
# get name of running service
with open(output_path, mode='a', encoding='utf-8') as result:
result.write(f'Port {port} : open : {socket.getservbyport(port, protocol)}\n')
try:
# grabbing banner for service version
banner = s.recv(1024)
# making sure banner contains something more than a few white-spaces
if len(banner) > 15:
print(f'port {port} is open {banner.decode("utf-8")}')
if int(port) == 22:
search_cve_by_keyword(extract_ssh_version(banner.decode("utf-8")))
extract_cves(output_path)
except ValueError:
print(f'port {port} is open')
except (socket.timeout, socket.error):
pass
finally:
sock.close()
with open(output_path, mode='a', encoding='utf-8') as result:
result.write(f'\nScan completed: {datetime.now().strftime("%Y-%m-%d-%H:%M:%S")}\n')
print(f'Port scan successfully completed!\nResults saved in {output_path}')
os.remove(NIST_API_OUTPUT)