Skip to content

Commit

Permalink
Merge pull request #517 from gstarovo/point_extension
Browse files Browse the repository at this point in the history
EC point format extension
  • Loading branch information
tomato42 authored Dec 17, 2024
2 parents 4e16574 + c1b7ff2 commit 412a531
Show file tree
Hide file tree
Showing 10 changed files with 343 additions and 54 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ coverage.xml
pylint_report.txt
build/
docs/_build/
htmlcov/
htmlcov/
1 change: 1 addition & 0 deletions scripts/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ def printGoodConnection(connection, seconds):
if connection.server_cert_compression_algo:
print(" Server compression algorithm used: {0}".format(
connection.server_cert_compression_algo))
print(" Session used ec point format extension: {0}".format(connection.session.ec_point_format))

def printExporter(connection, expLabel, expLength):
if expLabel is None:
Expand Down
Empty file removed test
Empty file.
149 changes: 147 additions & 2 deletions tests/tlstest.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from xmlrpc import client as xmlrpclib
import ssl
from tlslite import *
from tlslite.constants import KeyUpdateMessageType, SignatureScheme
from tlslite.constants import KeyUpdateMessageType, ECPointFormat, SignatureScheme

try:
from tack.structures.Tack import Tack
Expand Down Expand Up @@ -303,6 +303,77 @@ def connect():

test_no += 1

print("Test {0} - client compressed/uncompressed - uncompressed, TLSv1.2".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
settings.eccCurves = ["secp256r1", "secp384r1", "secp521r1", "x25519", "x448"]
settings.keyShares = ["secp256r1"]
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert connection.session.ec_point_format == ECPointFormat.uncompressed
connection.close()

test_no += 1

print("Test {0} - client compressed - compressed, TLSv1.2".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
settings.eccCurves = ["secp256r1", "secp384r1", "secp521r1", "x25519", "x448"]
settings.keyShares = ["secp256r1"]
settings.ec_point_formats = [ECPointFormat.ansiX962_compressed_prime, ECPointFormat.uncompressed]
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert connection.session.ec_point_format == ECPointFormat.ansiX962_compressed_prime
connection.close()

test_no += 1

print("Test {0} - client missing uncompressed - error, TLSv1.2".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
settings.ec_point_formats = [ECPointFormat.ansiX962_compressed_prime]
settings.eccCurves = ["secp256r1", "secp384r1", "secp521r1", "x25519", "x448"]
settings.keyShares = ["secp256r1"]
try:
connection.handshakeClientCert(settings=settings)
assert False
except ValueError as e:
assert "Uncompressed EC point format is not provided" in str(e)
except TLSAbruptCloseError as e:
pass
connection.close()

test_no += 1

print("Test {0} - client comppressed char2 - error, TLSv1.2".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
settings.ec_point_formats = [ECPointFormat.ansiX962_compressed_char2]
settings.eccCurves = ["secp256r1", "secp384r1", "secp521r1", "x25519", "x448"]
settings.keyShares = ["secp256r1"]
try:
connection.handshakeClientCert(settings=settings)
assert False
except ValueError as e:
assert "Unknown EC point format provided: ['ansiX962_compressed_char2']" in str(e)
except TLSAbruptCloseError as e:
pass
connection.close()

test_no += 1

print("Test {0} - mismatched ECDSA curve, TLSv1.2".format(test_no))
synchro.recv(1)
connection = connect()
Expand Down Expand Up @@ -2220,6 +2291,79 @@ def connect():

test_no += 1

print("Test {0} - server uncompressed ec format - uncompressed, TLSv1.2".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 1)
settings.maxVersion = (3, 3)
settings.eccCurves = ["secp256r1", "secp384r1", "secp521r1", "x25519", "x448"]
settings.keyShares = ["secp256r1"]
settings.ec_point_formats = [ECPointFormat.uncompressed]
connection.handshakeServer(certChain=x509ecdsaChain,
privateKey=x509ecdsaKey, settings=settings)
testConnServer(connection)
assert connection.session.ec_point_format == ECPointFormat.uncompressed
connection.close()

test_no += 1

print("Test {0} - server compressed ec format - compressed, TLSv1.2".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 1)
settings.maxVersion = (3, 3)
settings.eccCurves = ["secp256r1", "secp384r1", "secp521r1", "x25519", "x448"]
settings.keyShares = ["secp256r1"]
connection.handshakeServer(certChain=x509ecdsaChain,
privateKey=x509ecdsaKey, settings=settings)
testConnServer(connection)
assert connection.session.ec_point_format == ECPointFormat.ansiX962_compressed_prime
connection.close()

test_no +=1

print("Test {0} - server missing uncompressed in client - error, TLSv1.2".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 1)
settings.maxVersion = (3, 3)
settings.eccCurves = ["secp256r1", "secp384r1", "secp521r1", "x25519", "x448"]
settings.keyShares = ["secp256r1"]
try:
connection.handshakeServer(certChain=x509ecdsaChain,
privateKey=x509ecdsaKey, settings=settings)
assert False
except ValueError as e:
assert "Uncompressed EC point format is not provided" in str(e)
except TLSAbruptCloseError as e:
pass
connection.close()

test_no +=1

print("Test {0} - client compressed char2 - error, TLSv1.2".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 1)
settings.maxVersion = (3, 3)
settings.eccCurves = ["secp256r1", "secp384r1", "secp521r1", "x25519", "x448"]
settings.keyShares = ["secp256r1"]
try:
connection.handshakeServer(certChain=x509ecdsaChain,
privateKey=x509ecdsaKey, settings=settings)
assert False
except ValueError as e:
assert "Unknown EC point format provided: [2]" in str(e)
except TLSAbruptCloseError as e:
pass
connection.close()

test_no +=1

print("Test {0} - mismatched ECDSA curve, TLSv1.2".format(test_no))
synchro.send(b'R')
connection = connect()
Expand Down Expand Up @@ -3509,7 +3653,7 @@ def heartbeat_response_check(message):
assert synchro.recv(1) == b'R'
connection.close()

test_no += 1
test_no +=1

print("Tests {0}-{1} - XMLRPXC server".format(test_no, test_no + 2))

Expand Down Expand Up @@ -3542,6 +3686,7 @@ def add(self, x, y): return x + y

synchro.close()
synchroSocket.close()

test_no += 2

print("Test succeeded")
Expand Down
19 changes: 17 additions & 2 deletions tlslite/handshakesettings.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

"""Class for setting handshake parameters."""

from .constants import CertificateType
from .constants import CertificateType, ECPointFormat
from .utils import cryptomath
from .utils import cipherfactory
from .utils.compat import ecdsaAllCurves, int_types, ML_KEM_AVAILABLE
Expand Down Expand Up @@ -74,7 +74,8 @@
TICKET_CIPHERS = ["chacha20-poly1305", "aes256gcm", "aes128gcm", "aes128ccm",
"aes128ccm_8", "aes256ccm", "aes256ccm_8"]
PSK_MODES = ["psk_dhe_ke", "psk_ke"]

EC_POINT_FORMATS = [ECPointFormat.ansiX962_compressed_prime,
ECPointFormat.uncompressed]
ALL_COMPRESSION_ALGOS_SEND = ["zlib"]
if compression_algo_impls["brotli_compress"]:
ALL_COMPRESSION_ALGOS_SEND.append('brotli')
Expand Down Expand Up @@ -395,6 +396,10 @@ class HandshakeSettings(object):
option is for when a certificate was received/decompressed by this
peer.
:vartype ec_point_formats: list
:ivar ec_point_formats: Enabled point format extension for
elliptic curves.
"""

def _init_key_settings(self):
Expand Down Expand Up @@ -442,6 +447,7 @@ def _init_misc_extensions(self):
# resumed connections (as tickets are single-use in TLS 1.3
self.ticket_count = 2
self.record_size_limit = 2**14 + 1 # TLS 1.3 includes content type
self.ec_point_formats = list(EC_POINT_FORMATS)

# Certificate compression
self.certificate_compression_send = list(ALL_COMPRESSION_ALGOS_SEND)
Expand Down Expand Up @@ -652,6 +658,14 @@ def _sanityCheckExtensions(other):
not 64 <= other.record_size_limit <= 2**14 + 1:
raise ValueError("record_size_limit cannot exceed 2**14+1 bytes")

bad_ec_ext = [ECPointFormat.toStr(rep) for rep in other.ec_point_formats if
rep not in EC_POINT_FORMATS]
if bad_ec_ext:
raise ValueError("Unknown EC point format provided: "
"{0}".format(bad_ec_ext))
if ECPointFormat.uncompressed not in other.ec_point_formats:
raise ValueError("Uncompressed EC point format is not provided")

HandshakeSettings._sanityCheckEMSExtension(other)

if other.certificate_compression_send:
Expand Down Expand Up @@ -746,6 +760,7 @@ def _copy_extension_settings(self, other):
other.sendFallbackSCSV = self.sendFallbackSCSV
other.useEncryptThenMAC = self.useEncryptThenMAC
other.usePaddingExtension = self.usePaddingExtension
other.ec_point_formats = self.ec_point_formats
# session tickets
other.padding_cb = self.padding_cb
other.ticketKeys = self.ticketKeys
Expand Down
Loading

0 comments on commit 412a531

Please sign in to comment.