-
Notifications
You must be signed in to change notification settings - Fork 1
/
jenkins.py
executable file
·1601 lines (1331 loc) · 60.7 KB
/
jenkins.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
# Run Jenkins job, retrieve build artifacts and save them to local file system
__author__ = "Mads Meisner-Jensen"
import os
import sys
import argparse
import time
import datetime
import requests
import threading
import traceback
import configparser
import xml.dom.minidom as minidom
import urllib
from requests.auth import HTTPBasicAuth
from pathlib import Path
from pprint import pprint, pformat
#####################################################################
# Helpers
#####################################################################
import collections
Color = collections.namedtuple('Color', "reset black red green yellow blue magenta cyan white grey ired igreen iyellow iblue imagenta icyan iwhite")
Color.__new__.__defaults__ = ("",) * len(Color._fields)
Style = collections.namedtuple('Color', "bold dim under inv")
Style.__new__.__defaults__ = ("",) * len(Style._fields)
fg = Color()
style = Style()
ColorLog = collections.namedtuple('ColorLog', "send recv info note progress error warn miniwarn")
ColorLog.__new__.__defaults__ = ("",) * len(ColorLog._fields)
color = ColorLog()
def color_enable(force=False):
global fg, style, color
if force or (sys.stdout.isatty() and os.name != 'nt'):
fg = Color(reset="\033[0m",black="\033[30m",red="\033[31m",green="\033[32m",yellow="\033[33m",blue="\033[34m",magenta="\033[35m",cyan="\033[36m",white="\033[37m",
grey="\033[90m",ired="\033[91m",igreen="\033[92m",iyellow="\033[93m",iblue="\033[94m",imagenta="\033[95m",icyan="\033[96m",iwhite="\033[97m")
style = Style(bold="\033[1m", dim="\033[2m", under="\033[4m", inv="\033[7m")
color = ColorLog(
send = fg.igreen,
recv = fg.green,
info = fg.white + style.bold,
note=fg.iyellow,
progress = fg.white + style.dim,
error=fg.ired,
warn=fg.iyellow,
miniwarn=fg.iyellow,
)
def xml_get_first_child_node_of_tag(dom, tag):
"""
See https://docs.python.org/3.6/library/xml.dom.minidom.html
See https://docs.python.org/3.6/library/xml.dom.html
:param dom: result of minidom.parseString(config_xml)
:param tag: name of XML tag
:return:
"""
node = dom.getElementsByTagName(tag)
if node and node[0].childNodes and node[0].childNodes.length > 0:
child = node[0].firstChild
if child.nodeType == minidom.Node.TEXT_NODE:
return child
else:
return None
def is_posix():
try:
import posix
return True
except ImportError:
return False
def key_value_str_to_dict(s):
d = {}
kv_list = s.split(",")
for kv in kv_list:
k, v = kv.split("=")
d[k] = v
return d
def deltatimeToHumanStr(deltaTime, decimalPlaces=0, separator=' '):
"""
Format number of seconds or a datetime.deltatime object into a short human readable string
>>> deltatimeToHumanStr(datetime.timedelta(seconds=1))
'1s'
>>> deltatimeToHumanStr(datetime.timedelta(seconds=1.750), 1)
'1.8s'
>>> deltatimeToHumanStr(datetime.timedelta(seconds=1.750), 3)
'1.750s'
>>> deltatimeToHumanStr(datetime.timedelta(hours=3, seconds=1.2), 1)
'3h 0m 1.2s'
>>> deltatimeToHumanStr(datetime.timedelta(days=6, hours=14, minutes=44, seconds=55))
'6d 14h 44m 55s'
>>> deltatimeToHumanStr(123)
'2m 3s'
:param deltaTime: Either number of seconds or a deltatime object
:param decimalPlaces: Number of decimal places for the seconds part
:param separator: Separator between each part
:return:
"""
if not isinstance(deltaTime, datetime.timedelta):
deltaTime = datetime.timedelta(seconds=deltaTime)
d = deltaTime.days
h, s = divmod(deltaTime.seconds, 3600)
m, s = divmod(s, 60)
s = float(s) + float(deltaTime.microseconds) / 1000000
#print("FOO", s, deltaTime.microseconds, float(deltaTime.microseconds)/1000000)
dhms = ""
if d > 0:
dhms += str(d) + 'd' + separator
if h > 0 or len(dhms) > 0:
dhms += str(h) + 'h' + separator
if m > 0 or len(dhms) > 0:
dhms += str(m) + 'm' + separator
if s > 0 or len(dhms) > 0:
dhms += ('{:.%df}s' % decimalPlaces).format(s)
return dhms
def timestamp_ms_to_datetime(ts_ms):
t = datetime.datetime.fromtimestamp(int(ts_ms) / 1000)
return t.strftime("%Y-%m-%d %H:%M:%S")
def timestamp_ms_to_deltatime(ts_ms):
t = datetime.datetime.fromtimestamp(int(ts_ms) / 1000)
dt = datetime.datetime.now() - t
return deltatimeToHumanStr(dt)
def timestamp_ms_to_datetime_and_deltatime(ts_ms):
abstime = timestamp_ms_to_datetime(ts_ms)
dts = timestamp_ms_to_deltatime(ts_ms)
return f"{abstime} ({dts} ago)"
#####################################################################
# Jenkins
#####################################################################
# Jenkins has some advanced REST methods where one can select specific JSON fields, like:
# http://jenkins.lan/job/JOBNAME/api/json?tree=builds[number,result,duration,url,actions[parameters[name,value]]]
# In this code, we just retrieve everything and filter stuff ourselves
class JenkinsException(Exception):
pass
class Config(object):
"""
- `build_params_default` are the default parameters used for
parameterized builds.
- See https://wiki.jenkins.io/display/JENKINS/Parameterized+Build
- Jenkins only allows remotely triggered builds if job config
contains `authToken` thus resulting in REST API request:
`http://server/job/myjob/buildWithParameters?token=TOKEN&PARAMETER=Value`
"""
FILENAME = os.path.expanduser("~/.jenkins.ini")
def __init__(self):
self.server_url = "https://jenkins.url.not.set"
self.check_certificate = True
self.auth_user = ""
self.auth_password = ""
# Set this to False to completely disable Jenkins linter before
# new groovy script is uploaded (using --groovy option)
self.jenkins_crumb = ""
self.build_params_default = "delay=0"
self.console_poll_interval = 2
self.console_log_dir = "/tmp/jenkins-log"
self.stop_job_on_user_abort = True
def read(self, obj, filename=None):
"""
:param filename: path to configfile
:return: The filepath of the file that was read
"""
if not filename:
filename = Config.FILENAME
if not os.path.exists(filename):
return
# ConfigParser stores values as strings, so you have to convert them yourself
cfg = configparser.ConfigParser()
cfg.read(filename)
# Read the global section (which are the instance variables of this class)
section = "global"
for name in self.__dict__.keys():
cur_value = getattr(self, name)
if isinstance(cur_value, bool):
value = cfg.getboolean(section, name, fallback=cur_value)
elif isinstance(cur_value, int):
value = cfg.getint(section, name, fallback=cur_value)
elif isinstance(cur_value, float):
value = cfg.getfloat(section, name, fallback=cur_value)
else:
value = cfg.get(section, name, fallback=cur_value)
# strip quotes from config value is string
value = value.strip('"')
if value is not None:
setattr(self, name, value)
if not hasattr(obj, name):
raise RuntimeError(f"{obj} does no have attr '{name}'")
setattr(obj, name, value)
#print(f"setattr {name} = {value}")
obj.config_was_read_ok = True
return filename
def write(self):
d = { name:getattr(self, name) for name in self.__dict__.keys() }
cfg = configparser.ConfigParser()
cfg["global"] = d
cfg.write(sys.stdout)
print("# note that the username is without the email domain, e.g. 'foo' instead of 'foo@bla.org'")
class Jenkins(object):
"""
Jenkins Commandline Client that can:
- Start a build job, optionally with parameters
- Wait for completion
- Print console output on the fly
- Fetch artifacts of build job (latestSuccessful or specific build number)
- Get project info
- Fetch full console log of given project (last build or specific build number)
[Remote Access API](https://www.jenkins.io/doc/book/using/remote-access-api/)
You need to set two class variables to make the class work:
- `JENKINS_URL` is the URL for the Jenkins server
https://javadoc.jenkins-ci.org/hudson/model/FreeStyleProject.html
Python Jenkins Modules
- https://python-jenkins.readthedocs.io/en/latest
- https://opendev.org/jjb/python-jenkins
- https://jenkinsapi.readthedocs.io/en/latest
- https://github.com/pycontribs/jenkinsapi
Other jenkins CLI implementations:
- https://github.com/jenkins-zh/jenkins-cli (go)
- https://github.com/m-sureshraj/jenni
Jenkins personal assistant - CLI tool to interact with Jenkins server (nodejs)
"""
BUILD_WAIT_TIMEOUT = 7200
BUILD_NAMES = (
'lastBuild', 'lastCompletedBuild', 'lastFailedBuild', 'lastSuccessfulBuild',
# Following three are typically not that interesting
'lastStableBuild', 'lastUnstableBuild', 'lastUnsuccessfulBuild'
)
def __init__(self, verbose=1):
# disable InsecureRequestWarning: "Unverified HTTPS request" warnings
requests.packages.urllib3.disable_warnings(requests.urllib3.exceptions.InsecureRequestWarning)
# Configurable settings (read form config file)
self.config_was_read_ok = False
self.server_url = None
self.check_certificate = True
self.auth_user = ""
self.auth_password = ""
self.jenkins_crumb = ""
self.console_poll_interval = 2
self.console_log_dir = "/tmp/jenkins-log"
self.stop_job_on_user_abort = False
self._conn_ok = False
self.job_name = ""
self.job_id = ""
self.job_id_low = 0
self.job_id_high = 0
self.job_started = None
self.artifacts = None
self.build_params_default = ""
self.console_output_file = True
self.verbose = verbose
self.log_progress = True
self.log_req = False
self.log_resp_status = False
self.log_resp_headers = False
self.log_resp_text = False
self.log_resp_json = False
def __str__(self):
return f"<Jenkins {self.server_url} auth={self.auth_user}>"
def log_enable(self, flags):
self.log_req = 's' in flags
self.log_resp_status = 'r' in flags
self.log_resp_headers = 'h' in flags or 'rr' in flags
self.log_resp_text = 't' in flags
self.log_resp_json = 'j' in flags
@staticmethod
def get_log_help():
return "s = send, r = response status, h = response headers, t = response text, j = response pretty json"
def echo_progress(self, s):
if self.log_progress:
print(f"{color.progress}{s}{fg.reset}")
def echo_note(self, s, level=0, file=None):
if self.verbose >= level:
print(f"{color.note}{s}{fg.reset}", file=file)
def echo_info(self, s, level=0):
if self.verbose >= level:
print(f"{color.info}{s}{fg.reset}")
def echo_verb(self, s, level=1):
if self.verbose >= level:
print(f"{color.info}{s}{fg.reset}")
def echo_color(self, s, color=Color.white, level=0, file=sys.stdout):
if self.verbose >= level:
print(f"{color}{s}{fg.reset}", file=file)
def log_response(self, response, force=False):
# if self.log_req or force:
# print(f"{color.send}{response.request.method} {response.url}{fg.reset}")
if self.log_resp_headers or force:
print(f"{color.send}Request headers: {response.request.headers}{fg.reset}")
#print(f"{color.send}Auth: {response.request.auth}{fg.reset}")
print(f"{color.recv}Response: {response.status_code} {response.reason}\n{response.headers}{fg.reset}")
elif self.log_resp_status:
print(f"{color.recv}Response: {response.status_code} {response.reason}{fg.reset}")
return response
def job_id_iter(self):
"""
Iterator over all job_id's if user supplied something like `foobaz/23..28
"""
if self.job_id_low > 0 and self.job_id_high > 0:
for job_id in range(self.job_id_low, self.job_id_high + 1):
yield job_id
else:
yield self.job_id
def set_job_name_and_id(self, job_name, job_id='lastSuccessfulBuild'):
if not job_name:
return
parts = job_name.split("/")
if len(parts) == 2:
job_name, job_id = parts
if ".." in job_id:
parts = job_id.split("..")
if len(parts) == 2:
lo, hi = int(parts[0]), int(parts[1])
if lo > hi:
lo, hi = hi, lo
self.job_id_low, self.job_id_high = lo, hi
job_id = str(lo)
#print(f"job_id_low={self.job_id_low} job_id_high={self.job_id_high}")
#sys.exit(0)
self.job_name = job_name
self.job_id = job_id
if all([x in '0123456789' for x in job_id]):
return
if job_id == "last":
self.job_id = Jenkins.BUILD_NAMES[0]
return
m = []
for b in Jenkins.BUILD_NAMES:
if job_id.lower() in b.lower():
m.append(b)
if len(m) > 1:
raise ValueError(f"job_id matches several build types: {m}")
if len(m) == 0:
bts = ",".join(Jenkins.BUILD_NAMES)
raise ValueError(f"Bad job_id: '{job_id}'\nAllowed job_id is numeric or one of: {bts}")
self.job_id = m[0]
def assert_auth_is_valid(self):
s = "Add it to the jenkins config file or supply it with --auth option"
if not self.auth_user:
raise ValueError(f"Jenkins username is not set: {s}")
if not self.auth_password:
raise ValueError(f"Jenkins password/API-token is not set: {s}")
def request(self, url, method="GET", params=None, headers=None, data=None, auth=None, **kwargs):
"""
see https://stackoverflow.com/questions/16907684/fetching-a-url-from-a-basic-auth-protected-jenkins-server-with-urllib2
and https://findwork.dev/blog/advanced-usage-python-requests-timeouts-retries-hooks/
:param url: URL of the form "http://jenkins.lan/job/{name}"
:param method: GET or POST
:param headers: Dictionary of user headers
:param params: Dictionary of request params
:param data: Dictionary of form data
:param auth: True to use authenticated request
:return: requests.Response object from requests.get()
"""
if auth:
auth = HTTPBasicAuth(username=self.auth_user, password=self.auth_password)
if self.log_req:
q = "?" + urllib.parse.urlencode(params) if params else ""
auth_str = f"with HTTPBasicAuth(username={self.auth_user})" if auth else ""
print(f"{color.send}{method} {url}{q}{fg.reset} {auth_str}")
response = requests.request(method, url, headers=headers, params=params, data=data, verify=self.check_certificate, auth=auth, **kwargs)
if not auth and response.status_code == 403 and self.auth_user and self.auth_password:
self.log_response(response)
self.echo_progress("Retrying with HTTPBasicAuth")
auth = HTTPBasicAuth(username=self.auth_user, password=self.auth_password)
response = requests.request(method, url, headers=headers, params=params, data=data, verify=self.check_certificate, auth=auth, **kwargs)
self.log_response(response)
if response.status_code >= 400:
if not self.log_req:
print(f"{color.send}{response.request.method} {response.url}{fg.reset}")
response.raise_for_status()
return response
def request_api_json(self, url, params=None, try_auth=False, **kwargs):
"""
Send request and return response as JSON
:param url: URL of the form "http://some.server.loc/job/{name}/api/json"
:param params: Dictionary of request params
:param try_auth: Use authenticated request if possible
:return: JSON response object
"""
if not url.endswith("/api/json"):
url += "/api/json"
# Use authenticated request if we have username and password available
with_auth = try_auth and self.auth_user and self.auth_password
response = self.request(url, params=params, auth=with_auth, **kwargs)
if self.log_resp_text:
print(response.text)
jr = response.json()
if self.log_resp_json:
pprint(jr)
return jr
def assert_connectivity(self):
# NOTE: we could also use requests.get(..., cert=FILEPATH) to pass in the CA certificate
self.echo_info(f"Checking Jenkins connectivity: {self.server_url}")
try:
requests.get(self.server_url, verify=False)
self._conn_ok = True
except requests.exceptions.SSLError:
error_msg_cacert = """
It seems you don't have the Jenkins (self-signed) CA certificate installed
or the certificate has expired.
"""
raise JenkinsException(error_msg_cacert)
def get_job_url(self, name=None):
if not name:
name = self.job_name
else:
self.job_name = name
if not name:
raise ValueError("JOB name argument mssing")
return f"{self.server_url}/job/{name}"
def get_job_id_url(self, name=None, job_id=None):
if not job_id:
job_id = self.job_id
else:
self.job_id = job_id
if not job_id:
raise ValueError(f"Missing job ID. Try again with something like: {prog} ... {jen.job_name}/last")
return self.get_job_url(name) + f"/{job_id}"
def list_projects(self):
url = f"{self.server_url}"
jr = jen.request_api_json(url, {'tree': "jobs[name]"})
jobs = jr.get('jobs')
for job in jobs:
_class = job.get('_class')
if _class:
_class = _class.split(".")[-1]
name = job.get('name')
if self.verbose:
print(name, _class)
else:
print(name)
def list_queue(self):
w = 13
def print_queue_item(i, j):
_class = j.get('_class')
if not _class:
return
print(f"{i:3d} {_class:{w}} '{j['name']}' {j.get('id')}")
# 'blocked', 'buildableStartMilliseconds',
#for k in ('name', 'id', 'inQueueSince', 'timestamp', 'why'):
for k in ('inQueueSince', 'timestamp', 'why'):
if k not in j:
continue
print(f" {k:{w}} {j.get(k)}")
for i, item in enumerate(self.get_queue()):
if self.job_name and self.job_name != item['name']:
continue
print_queue_item(i, item)
def get_queue(self):
def fixup_queue_item(j):
_class = j.get('_class')
if _class:
_class = _class.split(".")[-1].replace("Queue$", "").replace("Item", "")
j['_class'] = _class
if _class not in ("Waiting", "Blocked", "Buildable"):
return
for k in ('inQueueSince', ): #, 'buildableStartMilliseconds'
if k in j:
j[k] = timestamp_ms_to_datetime_and_deltatime(j[k])
timestamp = j.get('timestamp')
if timestamp:
j['timestamp'] = timestamp_ms_to_datetime(timestamp)
task = j.get('task')
if task:
j['name'] = task.get('name')
why = j.get('why', "")
toolong = len(why) - 70
if toolong > 0:
if toolong < 12:
j['why'] = why
else:
j['why'] = why[:60] + f" ... [{toolong} more]"
return j
url = f"{self.server_url}/queue"
jr = jen.request_api_json(url)
for item in jr.get('items'):
yield fixup_queue_item(item)
def get_queue_by_job(self, name):
self.echo_info("Getting Jenkins queue")
for item in self.get_queue():
if item['name'] == name:
yield item
def list_nodes(self, oneline=True):
w = 13
def print_computer(i, j):
_class = j.get('_class')
if not _class:
return
displayName = j['displayName']
desc = j.get('description')
labels = j.get('labels')
numExecutors = j.get('numExecutors')
idle_busy = "idle" if j.get('idle') is True else "busy"
on_offline = "offline" if j.get('offline') is True else "online"
temp_space = j['monitorData'].get('hudson.node_monitors.TemporarySpaceMonitor', {})
if temp_space:
size = temp_space['size']
disk_free = "disk_free={:.1f}GB".format((size >> 20) / 1024)
else:
disk_free = ""
if oneline:
print(f"{i:3d} {_class} {idle_busy} {numExecutors} {on_offline} '{displayName}' labels='{labels}' desc='{desc}' {disk_free}")
else:
print(f"{i:3d} {_class:{w}} '{displayName}' {desc}")
for k in ('labels', 'idle', 'numExecutors'):
print(f" {k:{w}} {j.get(k)}")
for i, item in enumerate(self.get_nodes(search=self.job_name)):
print_computer(i, item)
def get_nodes(self, search=None):
def fixup_computer(j):
_class = j.get('_class')
_class = _class.split(".")[-1].replace("Hudson$", "").replace("Computer", "")
j['_class'] = _class
j['labels'] = " ".join(x['name'] for x in j['assignedLabels'])
return j
# authenticated request adds "monitorData" dictionary to response
url = f"{self.server_url}/computer"
jr = jen.request_api_json(url, try_auth=True)
for item in jr.get('computer'):
c = fixup_computer(item)
if not search:
yield c
elif search in c['labels'] or search in c['displayName']:
yield c
@staticmethod
def job_get_param_definition(jr):
props = jr.get('property')
job_params = []
for prop in props:
if len(prop) == 1:
continue
if prop.get('_class') != "hudson.model.ParametersDefinitionProperty":
continue
params = prop.get('parameterDefinitions')
for param in params:
job_param = {
'name': param.get('name'),
'default': param.get('defaultParameterValue', {}).get('value'),
'description': param.get('description'),
}
job_params.append(job_param)
return job_params
def print_project(self, name=None, show_only_all_builds=False):
"""
Print info on a single project
"""
url = self.get_job_url(name)
jr = self.request_api_json(url)
_class = jr.get('_class')
if _class:
_class = _class.split(".")[-1]
jr['class'] = _class
# Make dictionary of symbolic build names to build number,
# e.g. { 'last': 94, 'lastSuccessful': 94, 'lastFailed': 92, ... }
name_to_number = {}
for b in Jenkins.BUILD_NAMES[:4]:
sym_build_name = jr.get(b)
if sym_build_name:
number = "None" if sym_build_name is None else sym_build_name.get('number')
name = b.replace("Build", "")
name_to_number[name] = number
if show_only_all_builds:
jr = self.build_get(job_id="all")
for jr_build in reversed(jr.get('builds')):
self.build_print(jr_build, oneline=True, name_to_number=name_to_number)
return
w = 16
# 'inQueue' is apparently always False?
for name in ('fullName', 'description', 'class'):
value = jr.get(name)
print(f"{name:{w}} {value}")
props = jr.get('property')
# Project has properties only if there are more than the '_class'
# item in any of the arrays dictionaries
# has_props = max(len(p) for p in props) > 1
# if has_props:
# print("property:")
def printProperty():
_class = prop.get('_class')
if _class == "hudson.model.ParametersDefinitionProperty":
params = prop.get('parameterDefinitions')
print("parameterDefinitions:")
name_width = max([len(p.get('name')) for p in params]) + 1
for param in params:
name = param.get('name')
defval = param.get('defaultParameterValue', {}).get('value')
print(f" {name:{name_width}} {defval}")
else:
self.echo_color(f" No handler for printing class {_class}",
color=color.miniwarn, file=sys.stderr)
for prop in props:
if len(prop) > 1:
printProperty()
# Iterate only over distinct/unique build numbers
numbers = set(name_to_number.values())
for number in sorted(numbers):
jr_build = self.build_get(job_id=number)
self.build_print(jr_build, name_to_number=name_to_number)
if self.verbose:
queue = list(self.get_queue_by_job(self.job_name))
if queue:
print(f"{len(queue)} build jobs queued for {self.job_name}")
else:
print(f"No build jobs queued for {self.job_name}")
def build_print(self, jr, oneline=False, name_to_number=dict()):
number = jr.get('number')
result = jr.get('result')
building = jr.get('building')
timestamp = timestamp_ms_to_datetime_and_deltatime(jr.get('timestamp'))
duration = deltatimeToHumanStr(jr.get('duration') / 1000)
estDuration = deltatimeToHumanStr(jr.get('estimatedDuration') / 1000)
sym_names = [ name for name,num in name_to_number.items() if num == number ]
sym_names = " ".join(sym_names) if sym_names else ""
if oneline:
print(f"{number:4} {result:10} {duration:>10} {timestamp} {sym_names}")
else:
w = 12
print(f"build {number:{w - 2}} {sym_names}")
for name in ('result', 'building'):
value = jr.get(name)
if value:
print(f" {name:{w}} {value}")
for name, value in (('timestamp', timestamp), ('duration', duration), ('estDuration', estDuration)):
print(f" {name:{w}} {value}")
def build_get(self, name=None, job_id=None):
"""
Get Jenkins job result
For example, to get artifacts::
jr = jenkins.get_job()
artifacts = jr.get('artifacts', [])
>>> [{'displayPath': 'foo.zip', 'fileName': 'foo.zip', 'relativePath': 'foo.zip'}],
See https://stackoverflow.com/questions/54119863/get-build-details-for-all-builds-of-all-jobs-from-jenkins-rest-api
:param name: Jenkins job name
:param job_id: Jenkins job ID
:return: JSON response object of request "{self.server_url}/job/{name}/{job_id}"
"""
url = self.get_job_id_url(name=name, job_id=job_id)
params = {}
if self.job_id == "all":
url = self.get_job_url()
params = {'tree': 'jobs[name]'}
params = {'tree': 'jobs[name,url,builds[number,result,duration,url]]'}
params = {'tree': 'builds[number,result,timestamp,duration,estimatedDuration]'}
#url = f"{self.server_url}"
return self.request_api_json(url, params=params)
def get_config_as_xml_and_dom(self, name=None):
"""
Get Jenkins job config XML
:param name: Jenkins job name
:return: XML text, xml.dom.minidom.Document object
"""
job_url = self.get_job_url(name=name)
url = f"{job_url}/config.xml"
response = self.request(url, auth=True)
dom = minidom.parseString(response.text)
# warn on unexpected content
first_tag = dom.documentElement.tagName
if not response.text.startswith('<?xml version='):
self.echo_note("Content of response is not XML as expected", file=sys.stderr)
root_tags = ("flow-definition", "project")
if first_tag not in root_tags:
self.echo_note(f"Root element is '{first_tag}' but expected one of: {root_tags}", file=sys.stderr)
return response.text, dom
def get_system_log(self):
url = f"{self.server_url}/api/system/logs"
response = self.request(url, method="GET", auth=True)
print(response.content)
def post_config_xml(self, xml_text=None, filename=None, name=None):
"""
Post config.xml to Jenkins job
:param xml_text:
:param filename:
:param name:
:return:
"""
job_url = self.get_job_url(name=name)
url = f"{job_url}/config.xml"
if filename and xml_text:
raise ValueError("Ambiguous arguments: both xml_text and filename supplied")
if not xml_text:
xml_text = open(filename, "r").read()
else:
filename = "new config"
try:
response = self.request(url, method="POST", data=xml_text, auth=True)
self.echo_info(f"Posted {filename} to {self.job_name} config.xml")
except requests.exceptions.HTTPError as e:
r = e.response
if r.status_code == 500:
new_config_msg = ""
if filename:
new_config_msg = f"New config.xml was saved to {filename}\n"
print(f"""
POST of config.xml was refused on server.
{new_config_msg}
You can inspect the Jenkins server logs for the exact cause here:
{self.server_url}/log/all
""")
raise
def pipeline_linter_is_valid(self, script_text):
"""
See https://www.jenkins.io/doc/book/pipeline/development
:return: True on success
"""
# if user does not want linting then return success
if self.jenkins_crumb is False:
return True
if not self.jenkins_crumb:
self.echo_note(f"Configuration does not contain 'jenkins_crumb'\n"
f"I will request one and print it. You should add it to the config file like:\n"
f"jenkins_crumb=\"INSERT-CRUMB-HERE\"")
#url = self.server_url + '/crumbIssuer/api/xml?xpath=concat(//crumbRequestField,\":\",//crumb)'
url = self.server_url + '/crumbIssuer/api/xml?xpath=//crumb'
response = self.request(url, "GET")
crumb = response.content.decode("utf8").replace("<crumb>", "").replace("</crumb>", "")
print(f'jenkins_crumb=\"{crumb}\"')
else:
crumb = self.jenkins_crumb
url = self.server_url + "/pipeline-model-converter/validate"
headers = { 'Jenkins-Crumb': crumb }
form_data = { 'jenkinsfile': script_text }
response = self.request(url, method="POST", headers=headers, data=form_data)
text = response.content.decode("utf8").rstrip()
print(text)
return not text.startswith("Errors") # ... " encountered validating Jenkinsfile:"
def get_groovy_script(self):
xml, dom = self.get_config_as_xml_and_dom()
node = xml_get_first_child_node_of_tag(dom, "script")
return node.nodeValue if node else ""
def get_config_replace_script_and_post(self, filename):
script_text = open(filename, "r").read()
if not self.pipeline_linter_is_valid(script_text):
raise JenkinsException("Not posting groovy file due to Jenkins linter errors")
xml, dom = self.get_config_as_xml_and_dom()
ts = datetime.datetime.fromtimestamp(time.time())
backup_file, _ = self.make_output_filename_and_symlink(with_job_id=False)
backup_file += ts.strftime("-config.xml.%Y%m%d-%H%M%S")
open(backup_file, "w").write(xml)
self.echo_info(f"Wrote backup of config.xml to {backup_file}")
node = xml_get_first_child_node_of_tag(dom, "script")
if not node:
raise ValueError("<script> element not found in config")
node.nodeValue = script_text
self.echo_info(f"Replaced config.xml <script> with file '{filename}'")
new_config, _ = self.make_output_filename_and_symlink(with_job_id=False)
new_config += ts.strftime("-new-config.xml")
open(new_config, "wb").write(dom.toxml('utf-8'))
self.echo_info(f"Wrote new version of config.xml to {new_config}")
self.post_config_xml(filename=new_config)
def make_output_filename_and_symlink(self, with_job_id=True):
logpath_job = f"{self.console_log_dir}/{self.job_name}"
symlink = f"{logpath_job}-latest" if is_posix() else ""
logfile = f"{logpath_job}"
if with_job_id:
logfile += f"-{self.job_id}"
os.makedirs(os.path.dirname(logpath_job), exist_ok=True)
return logfile, symlink
def get_console_output_for_job(self, name, job_id, fout, stdout):
job_url = self.get_job_id_url(name=name, job_id=job_id)
text_size = 0
started_at = time.time()
last_output_at = started_at
last_progress_at = last_output_at
while True:
url = f"{job_url}/logText/progressiveText?start={text_size}"
r = self.request(url)
if r.text:
last_output_at = time.time()
last_progress_at = last_output_at
if stdout:
sys.stdout.write(r.text)
if fout:
fout.write(r.text)
fout.flush()
more_data = r.headers.get('X-More-Data')
text_size = r.headers.get('X-Text-Size')
if not more_data:
break
time.sleep(self.console_poll_interval)
if time.time() - last_progress_at >= 10:
last_progress_at = time.time()
since_start = deltatimeToHumanStr(time.time() - started_at)
since_output = deltatimeToHumanStr(time.time() - last_output_at)
self.echo_progress(
f"Waiting for output: started {since_start} ago, last output {since_output} ago")
def get_console_output(self):
"""
https://jenkins.lan/job/openwrt/17/logText/progressiveText?start=0
"""
# only print to stdout if we are getting log of a single job
is_only_one_job = len(list(self.job_id_iter())) == 1
for job_id in self.job_id_iter():
fout = None
if self.console_output_file:
logfile, symlink = self.make_output_filename_and_symlink()
self.console_output_file = f"{logfile}-console.log"
fout = open(self.console_output_file, "w")
if symlink and is_only_one_job:
symlink = f"{symlink}-console.log"
if os.path.islink(symlink):
os.remove(symlink)
os.symlink(os.path.basename(self.console_output_file), symlink)
self.get_console_output_for_job(name=None, job_id=job_id, fout=fout, stdout=is_only_one_job)
if self.console_output_file:
self.echo_info(f"Wrote console output to {self.console_output_file}")
if symlink and is_only_one_job:
self.echo_info(f"Wrote symlink {symlink}")
def req_waitfor_key_value(self, url, key, wait_msg="result", timeout=60, interval=1):
"""
Continuously poll ``url`` (api/json) and return when JSON response object
contains ``key`` and is non-empty
:param url: Jenkins job url
:param key: JSON key to wait for
:param wait_msg: User meesage printed on console
:param timeout: Timeout
:param interval: Poll interval
:return: JSON response object
"""
if not interval:
interval = max(int(timeout / 30), 5)
started = time.time()
deadline = started + timeout
elapsed = 0
while time.time() < deadline:
jr = self.request_api_json(url)
# only return if key exists AND has a value
value = jr.get(key)
if value is not None:
self.echo_verb(f"Got: {key}")
return jr
time.sleep(interval)
elapsed = time.time() - started