-
Notifications
You must be signed in to change notification settings - Fork 27
/
Copy pathrepo-s3-cleanup
executable file
·421 lines (360 loc) · 17.6 KB
/
repo-s3-cleanup
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
#!/usr/bin/env python3
'''Delete old tarballs in aliBuild's S3 remote store.
This script only considers the specified packages' tarballs for deletion,
leaving tarballs they depend on in place (but see the warning about false
positives below).
Symlinks to deleted tarballs are however cleaned up from
TARS/<arch>/<package>/<tarball>, and symlinks under the respective
TARS/<arch>/dist*/<package>/<package>-<version>/ are deleted as well.
If a tarball is considered for deletion, but another tarball depends on it (and
the latter is not deleted), then the former will be left in place, even if it
would otherwise be old enough for deletion. This prevents breaking existing
packages' dependency trees.
'''
import logging
import os
import sys
import typing
from argparse import ArgumentParser, Namespace
from collections import defaultdict
from datetime import datetime, timezone, timedelta
from fnmatch import fnmatchcase
from boto3 import client
from botocore.exceptions import ClientError
if typing.TYPE_CHECKING:
from collections.abc import Iterable, Sequence, Set
SymlinkMapping = dict[str, str]
'''Map symlink basenames to associated data.
The data is either their target in the store or their associated package
name.
'''
def main(args: Namespace) -> int:
'''Script entry point.'''
setup_logger(verbose=args.verbose)
log = logging.getLogger(__name__)
try:
s3c = client('s3', endpoint_url=args.endpoint_url,
aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'])
except KeyError as err:
log.fatal('the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment'
' variables are required', exc_info=err)
return 1
enabled_archs: 'list[str]' = [
prefix for prefix, _, _
in get_hierarchy(s3c, args.bucket, 'TARS/', recursive=False)
if prefix.endswith('/') and
any(fnmatchcase(prefix[4:].strip('/'), pattern)
for pattern in args.architecture_patterns or ['*'])
]
log.info('matched architectures: %s',
', '.join(arch[4:].strip('/') for arch in enabled_archs))
symlink_mapping, package_mapping = \
load_all_symlinks(s3c, args.bucket, enabled_archs)
consider_tarballs = frozenset(
target for basename, target in symlink_mapping.items()
if package_mapping[basename] in args.packages
)
del package_mapping
# Scan TARS/<arch>/store/*/*/*.tar.gz to find main tarballs.
cutoff = datetime.now(timezone(timedelta(0), 'UTC')) - \
timedelta(days=args.max_age)
to_delete: 'set[str]' = set()
sizes: 'dict[str, int]' = {}
for arch_path in enabled_archs:
for key, mtime, size in get_hierarchy(s3c, args.bucket,
arch_path + 'store/'):
assert mtime is not None, 'we should have keys here, not prefixes'
if key not in consider_tarballs:
continue
if mtime >= cutoff:
log.debug('tarball mtime %s newer than %d days; skipping: %s',
mtime.strftime('%Y-%m-%d %H:%M:%SZ'),
args.max_age, key)
continue
log.debug('tarball mtime %s older than %d days (size: %s): %s',
mtime.strftime('%Y-%m-%d %H:%M:%SZ'),
args.max_age, format_byte_size(size), key)
sizes[key] = size
to_delete.add(key)
log.debug('found %d candidate tarballs for deletion; total size: %s',
len(to_delete), format_byte_size(sum(sizes.values())))
log.debug('fetching dist symlink names; this may take a while')
dist_symlinks = sorted(
key
for arch_prefix in enabled_archs
for dist in ('dist/', 'dist-direct/', 'dist-runtime/')
for key, _, _ in get_hierarchy(s3c, args.bucket, arch_prefix + dist)
if key.endswith('.tar.gz') and key[len(arch_prefix):].count('/') == 3
)
# Fetch symlinks pointing to the tarball to be deleted.
to_delete, dependencies = get_keys_for_deletion(to_delete, dist_symlinks,
symlink_mapping)
deletable = find_deletable(to_delete, dependencies)
success = delete_objects(s3c, args.bucket, to_delete, do_it=args.do_it)
if not success:
log.error('encountered errors during deletion; see above for details')
log.info('%s %d objects, freeing %s',
'deleted' if args.do_it else 'would delete',
len(deletable),
format_byte_size(sum(sizes.get(k, 0) for k in deletable)))
blocked = to_delete - deletable
log.info('blocked from deletion: %d objects, totalling %s', len(blocked),
format_byte_size(sum(sizes.get(k, 0) for k in blocked)))
return 0 if success else 1
def get_keys_for_deletion(tarball_keys_to_delete: 'Set[str]',
dist_symlinks: 'list[str]',
symlink_mapping: 'SymlinkMapping') \
-> 'tuple[set[str], list[tuple[str, str]]]':
'''Return keys which should be deleted for the given tarball.'''
log = logging.getLogger(__name__)
symlinks: 'set[str]' = set()
dependencies: 'list[tuple[str, str]]' = []
# We assume dist_symlinks is sorted, so that all symlinks in the same
# directory are consecutive elements in the list.
this_dir: 'str | None' = None
this_dir_consider_deletion: bool = False
this_dir_contents: 'list[str]' = []
def delete_symlink_dir():
'''If necessary, register this directory's symlinks for deletion.'''
if not this_dir_consider_deletion or this_dir is None:
return
# This dist directory belongs to a tarball that we want to delete.
# Delete all the symlinks inside this directory if and only if we're
# deleting the associated tarball.
symlinks.update(this_dir_contents)
dependencies.extend((symlink_mapping[main_package], symlink)
for symlink in this_dir_contents)
log.debug('found %d dependencies of tarball to delete under %s/',
len(this_dir_contents), this_dir)
for symlink_key in dist_symlinks:
symlink_dir, valid, symlink_basename = symlink_key.rpartition('/')
assert valid, 'expected slash in symlink key; got %r' + symlink_key
# Save the contents of this dist subdirectory, in case we need to do
# something with its contents later.
if this_dir != symlink_dir:
log.debug('listing dependencies under %s', symlink_dir)
delete_symlink_dir()
this_dir = symlink_dir
this_dir_consider_deletion = False
this_dir_contents.clear()
this_dir_contents.append(symlink_key)
arch = symlink_dir.split('/', 2)[1]
package_and_version = os.path.basename(symlink_dir)
main_package = f'{package_and_version}.{arch}.tar.gz'
if symlink_basename != main_package:
# We've found a symlink inside another package's dist dir; add a
# dependency relation.
dependencies.append((symlink_mapping[main_package],
symlink_mapping[symlink_basename]))
elif symlink_mapping[symlink_basename] in tarball_keys_to_delete:
# If we've found the dist dir belonging to a tarball that we want
# to delete. Add all symlinks in the same dir to the list of
# symlinks to consider for deletion.
this_dir_consider_deletion = True
# Make sure the very last symlink directory isn't forgotten.
delete_symlink_dir()
return symlinks | tarball_keys_to_delete, dependencies
def find_deletable(to_delete: 'set[str]',
dependencies: 'list[tuple[str, str]]') -> 'set[str]':
'''Delete old orphan packages and return files deleted and their sizes.'''
log = logging.getLogger(__name__)
# Sort dependency pairs so that we process tarballs in store/ first. This
# ought to speed up resolving transitive dependencies slightly, as the
# store/ tarballs have the "real" dependency and the ones in dist*/ just
# depend on their respective store/ tarball.
dependencies.sort(key=lambda dep: dep[1], reverse=True)
# A tarball can be deleted if nothing else depends on it, with dependency
# relationships having been calculated earlier from dist*/ symlinks.
blockers: 'defaultdict[str, set[str]]' = defaultdict(set)
converged = False
while not converged:
converged = True
for up, down in dependencies:
if up not in to_delete and up not in blockers[down]:
# This dependency relationship blocks deletion of `down`.
blockers[down].add(up)
converged = False
elif blockers[up] - blockers[down]:
# There is a transitive dependency here. `up` is blocked from
# deletion, so block `down` as well.
blockers[down] |= blockers[up]
converged = False
for item, reverse_deps in blockers.items():
if reverse_deps and item in to_delete:
log.debug('not deleting %s: blocked by %r', item, reverse_deps)
return to_delete - {down for down, ups in blockers.items() if ups}
def delete_objects(s3c, bucket: str, to_delete: 'Iterable[str]',
*, do_it: bool = False) -> bool:
'''Perform the "real" deletion for the given objects, if requested.'''
log = logging.getLogger(__name__)
delete_batch: 'list[dict[typing.Literal["Key"], str]]' = []
success: bool = True
def delete_current_batch() -> bool:
'''Delete all objects in the currently-accumulated batch.'''
batch_success = True
response = s3c.delete_objects(Bucket=bucket, Delete={
'Quiet': False, 'Objects': delete_batch,
})
for deleted in response.get('Deleted', ()):
log.info('deleted: %s', deleted['Key'])
for error in response.get('Errors', ()):
log.error('error %s for %s: %s', error['Code'], error['Key'],
error['Message'])
batch_success = False
delete_batch.clear()
return batch_success
for item in sorted(to_delete):
if not do_it:
log.info('would delete: %s', item)
elif len(delete_batch) == 1000:
# We can only delete batches of 1000 keys at a time.
success &= delete_current_batch()
else:
delete_batch.append({'Key': item})
# Delete the last batch of keys as well.
if do_it and delete_batch:
success &= delete_current_batch()
return success
def load_all_symlinks(s3c, bucket: str, enabled_archs: 'Sequence[str]') \
-> 'tuple[SymlinkMapping, SymlinkMapping]':
'''Load the symlink mapping for all discovered packages.'''
symlinks: 'SymlinkMapping' = {}
packages: 'SymlinkMapping' = {}
for arch_path in enabled_archs:
for package in list_packages(s3c, bucket, arch_path):
mapping = load_symlink_mapping(s3c, bucket, arch_path, package)
for basename, target in mapping.items():
# The basename includes the architecture (as .$arch.tar.gz
# suffix), so it's safe to merge these dicts for multiple
# architectures.
symlinks[basename] = target
packages[basename] = package
return symlinks, packages
def list_packages(s3c, bucket: str, arch_path: str) -> 'Iterable[str]':
'''Generate package names found under the given architecture prefix.'''
for prefix, _, _ in get_hierarchy(s3c, bucket, arch_path, recursive=False):
if prefix.endswith('/'):
package = os.path.basename(prefix.rstrip('/'))
if package not in {'store', 'dist', 'dist-direct', 'dist-runtime'}:
yield package
def load_symlink_mapping(s3c, bucket: str, arch_path: str, package: str) \
-> 'SymlinkMapping':
'''Create a mapping from symlink basenames to tarballs in the store.'''
log = logging.getLogger(__name__)
mapping: 'SymlinkMapping' = {}
manifest_key = f'{arch_path}{package}.manifest'
try:
log.debug('fetching object s3://%s/%s', bucket, manifest_key)
manifest = s3c.get_object(Bucket=bucket, Key=manifest_key)['Body'] \
.read().decode('utf-8')
except ClientError as exc:
# If the manifest doesn't exist, ignore the error and fetch symlinks
# individually below.
if exc.response['Error']['Code'] != 'NoSuchKey':
log.fatal('got unknown error response from S3: %r', exc.response)
raise ValueError('could not fetch %s' % manifest_key) from exc
else:
for line in manifest.splitlines():
basename, valid, target = line.partition('\t')
if not valid:
continue
mapping[basename] = \
normalize_symlink_target(f'{arch_path}{package}/_', target)
# Fetch any leftover symlinks not listed in the manifest.
for key, _, _ in get_hierarchy(s3c, bucket, f'{arch_path}{package}/',
recursive=False):
if key.endswith('/'):
continue
basename = os.path.basename(key)
if basename in mapping or not basename.endswith('.tar.gz'):
continue
log.debug('fetching object s3://%s/%s', bucket, key)
target = s3c.get_object(Bucket=bucket, Key=key)['Body'] \
.read().decode('utf-8').rstrip('\n')
mapping[basename] = normalize_symlink_target(key, target)
return mapping
def get_hierarchy(s3c, bucket: str, prefix: str, recursive: bool = True) \
-> 'Iterable[tuple[str, datetime | None, int]]':
'''Retrieve all objects under the specified prefix, caching the answer.
If recursive=False is given, include common subprefixes in the output.
These can be identified by their trailing slash.
'''
log = logging.getLogger(__name__)
log.debug('listing keys under s3://%s/%s (recursive=%r)',
bucket, prefix, recursive)
kwargs = {} if recursive else {'Delimiter': '/'}
for page in s3c.get_paginator('list_objects_v2') \
.paginate(Bucket=bucket, Prefix=prefix, **kwargs):
for item in page.get('Contents', ()):
yield (item['Key'], item['LastModified'], item['Size'])
for subprefix in page.get('CommonPrefixes', ()):
yield (subprefix['Prefix'], None, 0)
def normalize_symlink_target(symlink_key: str, target: str) -> str:
'''Turn the target of a symlink pointing into the store into an S3 key.'''
if target.startswith('TARS/'):
return target
if not target.startswith('../../'):
target = '../../' + target
return os.path.normpath(os.path.dirname(symlink_key) + '/' + target)
def format_byte_size(size_bytes: int) -> str:
'''Make a number of bytes into a human-readable file size.'''
if size_bytes < 1024:
return f'{size_bytes:d} B'
kbytes = size_bytes / 1024
prefixes = 'KMGTPEZY'
for i, prefix in enumerate(prefixes):
prefix_size = kbytes / 1024 ** i
if abs(prefix_size) < 1024:
return f'{prefix_size:.1f} {prefix}iB'
return f'{kbytes / 1024 ** len(prefixes):.1f} {prefixes[-1]}iB'
def setup_logger(verbose: bool) -> None:
'''Set up the logger for this module, and silence boto3.'''
logging.getLogger('boto3').setLevel(logging.WARNING)
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG if verbose else logging.INFO)
handler = logging.StreamHandler()
handler.setLevel(0) # the level set above still applies
log.addHandler(handler)
def parse_args() -> Namespace:
'''Parse command-line arguments.'''
parser = ArgumentParser(description=__doc__, epilog='''\
Warning: the way -p/--package is defined, it can cause false positives,
e.g. specifying "-p O2" leads to O2-customization tarballs being considered
for deletion as well (but not e.g. O2Suite due to the "-" added by this
script to the end of specified package names).
''')
parser.add_argument(
'-v', '--verbose', action='store_true', default=False,
help='show debug output')
parser.add_argument(
'-y', '--do-it', action='store_true', default=False,
help='actually delete packages (without this, only print which '
'objects would be deleted)')
parser.add_argument(
'-u', '--endpoint-url', default='https://s3.cern.ch', metavar='URL',
help='S3 endpoint base URL (default %(default)s)')
parser.add_argument(
'-b', '--bucket', default='alibuild-repo',
help='S3 bucket to clean up (default %(default)s). '
'The script expects a TARS/<arch>/... hierarchy inside the bucket.')
parser.add_argument(
'-d', '--max-age', type=int, default=7,
help='delete packages older than MAX_AGE days (default %(default)s)')
parser.add_argument(
'-a', '--architecture', metavar='PATTERN', default=[],
action='append', dest='architecture_patterns',
help='architecture to clean up -- can be specified multiple times; '
'the default is to include all architectures; %(metavar)s is matched '
'against architectures using fnmatch (i.e. glob-style patterns)')
parser.add_argument(
'packages', metavar='PACKAGE', nargs='+',
help='package name to clean up -- can be specified multiple times; '
'tarballs with names beginning with "%(metavar)s-" will be '
'considered for deletion')
return parser.parse_args()
if __name__ == '__main__':
try:
sys.exit(main(parse_args()))
except (KeyboardInterrupt, BrokenPipeError):
pass