-
Notifications
You must be signed in to change notification settings - Fork 0
/
darkspark.py
709 lines (546 loc) · 25.1 KB
/
darkspark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
"""
DarkSpark Searchable Symmetric Encryption library
==================================================
[Searchable symmetric encryption](SSE)
* SSE is a symmetric-key encryption scheme that allows one to search a collection of [encrypted documents](ED) without the ability to decrypt them.
* - Encrypt a collection of documents D=(D_1, ..., D_n)
* - Each document D_i \subseteq W is viewed as a set of keywords from a [keyword space](W).
* - Given the [encryption key](K) and a keyword w \in W, the SSE generates a [search token](tk) with which the ED can be searched for keyword w.
* - The result of the search is the subset of ED which contains the keyword w.
A static SSE scheme consists of three algorithms (SETUP, TOKEN, SEARCH) that work as follows:
* SETUP takes as input a security parameter k and a document collection D and outputs a symmetric key K and an encrypted document collection ED.
* TOKEN takes as input the symmetric key K and a keyword w and outputs a search token tk.
* SEARCH takes as input the encrypted document collection ED and a search token tk and outputs a set of encrypted documents R \subseteq ED.
The SSE scheme is used by a client and an untrusted server as follows.
* The client encrypts its data collection using the SETUP algorithm which returns a secret key K and an encrypted document collection ED.
* The client keeps K secret and sends ED to the untrusted server.
* To search for a keyword w, the client runs the SEARCH TOKEN algorithms on K and w to generate a search token tk which it sends to the server.
* The server runs SEARCH with ED and tk and returns the resulting encrypted documents back to the client.
"""
import secrets
import json
from typing import List, Tuple, Iterable
from dataclasses import dataclass, field
from enum import Enum
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import padding, hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding as asym_padding
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.exceptions import InvalidSignature
class KeywordSpace(Enum):
"""Keyword space W"""
ALPHABET = "abcdefghijklmnopqrstuvwxyz"
@classmethod
def get_random_keyword(cls) -> str:
"""Return a random keyword from the keyword space W"""
return secrets.choice(cls.ALPHABET)
@classmethod
def get_random_keywords(cls, n: int) -> List[str]:
"""Return a list of n random keywords from the keyword space W"""
return [cls.get_random_keyword() for _ in range(n)]
@dataclass
class Document:
"""Document D_i \subseteq W"""
keywords: List[str] = field(default_factory=KeywordSpace.get_random_keywords)
def __post_init__(self):
"""Ensure that all keywords are in the keyword space W"""
for keyword in self.keywords:
if keyword not in KeywordSpace.ALPHABET:
raise ValueError(f"{keyword} is not in the keyword space")
def __repr__(self):
return f"Document({self.keywords})"
def __str__(self):
return f"{self.keywords}"
@dataclass
class DocumentCollection:
"""Document collection D=(D_1, ..., D_n)"""
documents: List[Document] = field(default_factory=list)
def __post_init__(self):
"""Ensure that all documents are unique"""
if len(self.documents) != len(set(self.documents)):
raise ValueError("All documents must be unique")
def __repr__(self):
return f"DocumentCollection({self.documents})"
def __str__(self):
return f"{self.documents}"
@dataclass
class EncryptedDocument:
"""Encrypted document ED_i"""
ciphertext: bytes = field(default_factory=bytes)
def __init__(self, ciphertext: bytes, iv: bytes):
"""Initialize an encrypted document with ciphertext and iv."""
self._ciphertext = ciphertext
self._iv = iv
def __post_init__(self):
"""Ensure that the ciphertext is not empty"""
if not self.ciphertext:
raise ValueError("Ciphertext cannot be empty")
def __repr__(self):
return f"EncryptedDocument({self.ciphertext})"
def __str__(self):
return f"{self.ciphertext}"
@property
def ciphertext(self) -> bytes:
"""Return the ciphertext of the encrypted document."""
return self._ciphertext
@property
def iv(self) -> bytes:
"""Return the initialization vector of the encrypted document."""
return self._iv
@classmethod
def from_json(cls, json_string: str) -> "EncryptedDocument":
"""Return an encrypted document from a json string."""
d = json.loads(json_string)
return cls(bytes.fromhex(d["ciphertext"]), bytes.fromhex(d["iv"]))
def to_json(self) -> str:
"""Return a json string representation of the encrypted document."""
return json.dumps({"ciphertext": self._ciphertext.hex(), "iv": self._iv.hex()})
@dataclass
class EncryptedDocumentCollection:
"""Encrypted document collection ED=(ED_1, ..., ED_n)"""
encrypted_documents: List[EncryptedDocument] = field(default_factory=list)
def __post_init__(self):
"""Ensure that all encrypted documents are unique"""
if len(self.encrypted_documents) != len(set(self.encrypted_documents)):
raise ValueError("All encrypted documents must be unique")
def __len__(self) -> int:
"""Return the number of encrypted documents in the collection."""
return len(self.encrypted_documents)
def __iter__(self) -> Iterable[EncryptedDocument]:
"""Return an iterator over the encrypted documents in the collection."""
return iter(self.encrypted_documents)
def __repr__(self) -> str:
"""Return a string representation of the encrypted document collection."""
return f"EncryptedDocumentCollection({list(self)})"
def __str__(self) -> str:
"""Return a string representation of the encrypted document collection."""
return f"EncryptedDocumentCollection({list(self)})"
@classmethod
def from_json(cls, json_string: str) -> "EncryptedDocumentCollection":
"""Return an encrypted document collection from a json string."""
d = json.loads(json_string)
return cls([EncryptedDocument.from_json(json.dumps(doc)) for doc in d["docs"]])
def to_json(self) -> str:
"""Return a json string representation of the encrypted document collection."""
return json.dumps({"docs": [doc.to_json() for doc in self]})
class Encryption:
"""A class that represents a symmetric-key encryption scheme."""
def __init__(self, key: bytes):
"""Initialize an encryption scheme with a key."""
self._key = key
def encrypt(self, document: Document) -> EncryptedDocument:
"""Return an encrypted document."""
raise NotImplementedError
def decrypt(self, encrypted_document: EncryptedDocument) -> Document:
"""Return a decrypted document."""
raise NotImplementedError
class AESEncryption(Encryption):
"""A class that represents an AES encryption scheme."""
def __init__(self, key: bytes):
"""Initialize an AES encryption scheme with a key."""
super().__init__(key)
self._backend = default_backend()
self._cipher = Cipher(
algorithms.AES(self._key), modes.CBC(b"\x00" * 16), self._backend
)
def encrypt(self, document: Document) -> EncryptedDocument:
"""Return an encrypted document."""
encryptor = self._cipher.encryptor()
padder = padding.PKCS7(128).padder()
padded_data = (
padder.update(json.dumps(document.keywords).encode()) + padder.finalize()
)
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
return EncryptedDocument(ciphertext, encryptor.iv)
def decrypt(self, encrypted_document: EncryptedDocument) -> Document:
"""Return a decrypted document."""
decryptor = self._cipher.decryptor()
unpadder = padding.PKCS7(128).unpadder()
padded_data = (
decryptor.update(encrypted_document.ciphertext) + decryptor.finalize()
)
data = unpadder.update(padded_data) + unpadder.finalize()
return Document(json.loads(data))
@dataclass
class SearchToken:
"""Search token tk"""
token: bytes = field(default_factory=bytes)
def __post_init__(self):
"""Ensure that the search token is not empty"""
if not self.token:
raise ValueError("Search token cannot be empty")
def __repr__(self):
return f"SearchToken({self.token})"
def __str__(self):
return f"{self.token}"
@classmethod
def from_json(cls, json_string: str) -> "SearchToken":
"""Return a search token from a json string."""
d = json.loads(json_string)
return cls(bytes.fromhex(d["token"]))
def to_json(self) -> str:
"""Return a json string representation of the search token."""
return json.dumps({"token": self.token.hex()})
class SearchTokenGenerator:
"""A class that represents a search token generator."""
def __init__(self, key: bytes):
"""Initialize a search token generator with a key."""
self._key = key
def generate_search_token(self, keyword: str) -> SearchToken:
"""Return a search token for a keyword."""
raise NotImplementedError
class AESSearchTokenGenerator(SearchTokenGenerator):
"""A class that represents an AES search token generator."""
def __init__(self, key: bytes):
"""Initialize an AES search token generator with a key."""
super().__init__(key)
self._backend = default_backend()
self._cipher = Cipher(
algorithms.AES(self._key), modes.CBC(b"\x00" * 16), self._backend
)
def generate_search_token(self, keyword: str) -> SearchToken:
"""Return a search token for a keyword."""
encryptor = self._cipher.encryptor()
padder = padding.PKCS7(128).padder()
padded_data = padder.update(keyword.encode()) + padder.finalize()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
return SearchToken(ciphertext)
class Search:
"""A class that represents a search scheme."""
def __init__(self, key: bytes):
"""Initialize a search scheme with a key."""
self._key = key
def search(
self,
encrypted_document_collection: EncryptedDocumentCollection,
search_token: SearchToken,
) -> List[EncryptedDocument]:
"""Return a list of encrypted documents that contain the keyword."""
raise NotImplementedError
class AESSearch(Search):
"""A class that represents an AES search scheme."""
def __init__(self, key: bytes):
"""Initialize an AES search scheme with a key."""
super().__init__(key)
self._backend = default_backend()
self._cipher = Cipher(
algorithms.AES(self._key), modes.CBC(b"\x00" * 16), self._backend
)
def search(
self,
encrypted_document_collection: EncryptedDocumentCollection,
search_token: SearchToken,
) -> List[EncryptedDocument]:
"""Return a list of encrypted documents that contain the keyword."""
decryptor = self._cipher.decryptor()
unpadder = padding.PKCS7(128).unpadder()
padded_data = decryptor.update(search_token.token) + decryptor.finalize()
keyword = unpadder.update(padded_data) + unpadder.finalize()
return [
doc
for doc in encrypted_document_collection
if keyword in self._decrypt(doc)
]
def _decrypt(self, encrypted_document: EncryptedDocument) -> Document:
"""Return a decrypted document."""
decryptor = self._cipher.decryptor()
unpadder = padding.PKCS7(128).unpadder()
padded_data = (
decryptor.update(encrypted_document.ciphertext) + decryptor.finalize()
)
data = unpadder.update(padded_data) + unpadder.finalize()
return Document(json.loads(data))
class SigningKey:
"""A class that represents a signing key."""
def __init__(self, private_key: bytes, public_key: bytes):
"""Initialize a signing key with a private key and a public key."""
self._private_key = private_key
self._public_key = public_key
def __repr__(self) -> str:
"""Return a string representation of the signing key."""
return f"SigningKey({self._private_key}, {self._public_key})"
def __str__(self) -> str:
"""Return a string representation of the signing key."""
return f"SigningKey({self._private_key}, {self._public_key})"
@property
def private_key(self) -> bytes:
"""Return the private key of the signing key."""
return self._private_key
@property
def public_key(self) -> bytes:
"""Return the public key of the signing key."""
return self._public_key
@classmethod
def generate(cls) -> "SigningKey":
"""Return a new signing key."""
private_key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend()
)
public_key = private_key.public_key()
return cls(
private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
),
public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
),
)
@classmethod
def from_pem(cls, private_pem: str, public_pem: str) -> "SigningKey":
"""Return a signing key from a private pem and a public pem."""
private_key = serialization.load_pem_private_key(
private_pem.encode(), password=None, backend=default_backend()
)
public_key = serialization.load_pem_public_key(
public_pem.encode(), backend=default_backend()
)
return cls(
private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
),
public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
),
)
def to_pem(self) -> Tuple[str, str]:
"""Return a private pem and a public pem of the signing key."""
return (self._private_key.decode(), self._public_key.decode())
class Signing:
"""A class that represents a signing scheme."""
def __init__(self, signing_key: SigningKey):
"""Initialize a signing scheme with a signing key."""
self._signing_key = signing_key
def sign(self, data: bytes) -> bytes:
"""Return a signature of the data."""
raise NotImplementedError
def verify(self, data: bytes, signature: bytes) -> bool:
"""Return True if the signature is valid for the data."""
raise NotImplementedError
class RSASigning(Signing):
"""A class that represents an RSA signing scheme."""
def __init__(self, signing_key: SigningKey):
"""Initialize an RSA signing scheme with a signing key."""
super().__init__(signing_key)
self._private_key = serialization.load_pem_private_key(
self._signing_key.private_key, password=None, backend=default_backend()
)
self._public_key = serialization.load_pem_public_key(
self._signing_key.public_key, backend=default_backend()
)
def sign(self, data: bytes) -> bytes:
"""Return a signature of the data."""
return self._private_key.sign(
data,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256(),
)
def verify(self, data: bytes, signature: bytes) -> bool:
"""Return True if the signature is valid for the data."""
try:
self._public_key.verify(
signature,
data,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH,
),
hashes.SHA256(),
)
return True
except InvalidSignature:
return False
class KeyDerivationFunction:
"""Key derivation function"""
def __init__(self, salt: bytes):
"""Initialize the key derivation function"""
self.salt = salt
def derive(self, password: bytes, length: int) -> bytes:
"""Derive a key from the password using the key derivation function"""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=length,
salt=self.salt,
iterations=100000,
backend=default_backend(),
)
key = kdf.derive(password)
return key
class PublicKeyInfrastructure:
"""Public key infrastructure"""
def __init__(self, private_key: rsa.RSAPrivateKey, public_key: rsa.RSAPublicKey):
"""Initialize the public key infrastructure"""
self.private_key = private_key
self.public_key = public_key
def sign(self, message: bytes) -> bytes:
"""Sign the message using the public key infrastructure"""
signature = self.private_key.sign(
message,
asym_padding.PSS(
mgf=asym_padding.MGF1(hashes.SHA256()),
salt_length=asym_padding.PSS.MAX_LENGTH,
),
hashes.SHA256(),
)
return signature
def verify(self, message: bytes, signature: bytes) -> bool:
"""Verify the signature of the message using the public key infrastructure"""
try:
self.public_key.verify(
signature,
message,
asym_padding.PSS(
mgf=asym_padding.MGF1(hashes.SHA256()),
salt_length=asym_padding.PSS.MAX_LENGTH,
),
hashes.SHA256(),
)
return True
except InvalidSignature:
return False
class SSE:
"""A class that represents a static searchable symmetric encryption scheme."""
def __init__(
self,
encryption: Encryption,
search_token_generator: SearchTokenGenerator,
search: Search,
):
"""Initialize a static searchable symmetric encryption scheme with an encryption scheme, a search token generator, and a search scheme."""
self._encryption = encryption
self._search_token_generator = search_token_generator
self._search = search
def setup(
self, document_collection: List[Document]
) -> Tuple[bytes, EncryptedDocumentCollection]:
"""Return a secret key and an encrypted document collection."""
key = secrets.token_bytes(32)
return (
key,
EncryptedDocumentCollection(
[self._encryption.encrypt(doc) for doc in document_collection]
),
)
def token(self, key: bytes, keyword: str) -> SearchToken:
"""Return a search token for a keyword."""
return self._search_token_generator.generate_search_token(keyword)
def search(
self,
encrypted_document_collection: EncryptedDocumentCollection,
search_token: SearchToken,
) -> List[EncryptedDocument]:
"""Return a list of encrypted documents that contain the keyword."""
return self._search.search(encrypted_document_collection, search_token)
class EncryptedSSE:
"""A class that represents an encrypted searchable symmetric encryption scheme"""
def __init__(
self,
sse: SSE,
key_derivation_function: KeyDerivationFunction,
signing: Signing,
public_key_infrastructure: PublicKeyInfrastructure,
):
"""Initialize an encrypted searchable symmetric encryption scheme with an encryption scheme, a search token generator, and a search scheme."""
self._encryption = sse._encryption
self._search_token_generator = sse._search_token_generator
self._search = sse._search
self._key_derivation_function = key_derivation_function
self._signing = signing
self._public_key_infrastructure = public_key_infrastructure
def setup(self, password: bytes, document_collection: List[Document]) -> str:
"""Return a json string representation of the encrypted document collection."""
# Generate a shared secret based on the password and salt.
shared_secret = (
self._key_derivation_function.derive(password=password, length=32)
if password is not None
else secrets.token_bytes(32)
)
# Generate a secret key based on the shared secret.
secret_key = self._key_derivation_function.derive(
password=shared_secret, length=32
)
# Create an encryption scheme based on the secret key.
encryption = AESEncryption(key=secret_key)
# Create an encrypted document collection based on the document collection and encryption scheme.
encrypted_document_collection = EncryptedDocumentCollection(
[encryption.encrypt(doc) for doc in document_collection]
)
# Sign the encrypted document collection.
signature = self._signing.sign(encrypted_document_collection.to_json().encode())
# Create a dictionary representation of the setup.
setup = {
"encrypted_document_collection": encrypted_document_collection,
"signature": signature,
}
return json.dumps(setup)
def token(self, password: bytes, keyword: str) -> str:
"""Return a json string representation of the search token."""
if password is not None:
# Generate a shared secret based on the password and salt.
shared_secret = self._key_derivation_function.derive(
password=password, length=32
)
# Generate a secret key based on the shared secret.
secret_key = self._key_derivation_function.derive(
password=shared_secret, length=32
)
# Create a search token generator based on the secret key.
search_token_generator = AESSearchTokenGenerator(key=secret_key)
else:
# Generate a secret key.
secret_key = secrets.token_bytes(32)
# Create a search token generator based on the secret key.
search_token_generator = AESSearchTokenGenerator(key=secret_key)
# Generate a search token for the keyword using the search token generator.
search_token = search_token_generator.generate_search_token(keyword)
return json.dumps({"search_token": search_token})
def search(self, password: bytes, token: str, encrypted_documents: str) -> str:
"""Return a json string representation of the list of encrypted documents that contain the keyword."""
if password is not None:
# Generate a shared secret based on the password and salt.
shared_secret = self._key_derivation_function.derive(
password=password, length=32
)
# Generate a secret key based on the shared secret.
secret_key = self._key_derivation_function.derive(
password=shared_secret, length=32
)
# Create a search scheme based on the secret key.
search = AESSearch(key=secret_key)
else:
# Generate a secret key.
secret_key = secrets.token_bytes(32)
# Create a search scheme based on the secret key.
search = AESSearch(key=secret_key)
# Create an encrypted document collection from the json string representation of the encrypted documents.
encrypted_document_collection = EncryptedDocumentCollection.from_json(
encrypted_documents
)
# Create a search token from the json string representation of the search token.
search_token = SearchToken.from_json(token)
# Perform the search using the search scheme and return the json string representation of the list of encrypted documents that contain the keyword.
return json.dumps(
{
"encrypted_documents": list(
search.search(encrypted_document_collection, search_token)
)
}
)
def verify(self, encrypted_documents: str, signature: bytes) -> bool:
"""Return True if the signature is valid for the encrypted documents."""
# Create an encrypted document collection from the json string representation of the encrypted documents.
encrypted_document_collection = EncryptedDocumentCollection.from_json(
encrypted_documents
)
# Verify the signature of the encrypted documents using the public key infrastructure.
return self._public_key_infrastructure.verify(
encrypted_document_collection.to_json().encode(), signature
)