1
2
3
4
5
6
7
8
9 """Classes representing TLS messages."""
10
11 from .utils.compat import *
12 from .utils.cryptomath import *
13 from .errors import *
14 from .utils.codec import *
15 from .constants import *
16 from .x509 import X509
17 from .x509certchain import X509CertChain
18 from .utils.tackwrapper import *
19
22 self.type = 0
23 self.version = (0,0)
24 self.length = 0
25 self.ssl2 = False
26
28 self.type = type
29 self.version = version
30 self.length = length
31 return self
32
34 w = Writer()
35 w.add(self.type, 1)
36 w.add(self.version[0], 1)
37 w.add(self.version[1], 1)
38 w.add(self.length, 2)
39 return w.bytes
40
42 self.type = p.get(1)
43 self.version = (p.get(1), p.get(1))
44 self.length = p.get(2)
45 self.ssl2 = False
46 return self
47
50 self.type = 0
51 self.version = (0,0)
52 self.length = 0
53 self.ssl2 = True
54
63
64
67 self.contentType = ContentType.alert
68 self.level = 0
69 self.description = 0
70
72 self.level = level
73 self.description = description
74 return self
75
82
84 w = Writer()
85 w.add(self.level, 1)
86 w.add(self.description, 1)
87 return w.bytes
88
89
94
95 - def postWrite(self, w):
96 headerWriter = Writer()
97 headerWriter.add(self.handshakeType, 1)
98 headerWriter.add(len(w.bytes), 3)
99 return headerWriter.bytes + w.bytes
100
115
116 - def create(self, version, random, session_id, cipher_suites,
117 certificate_types=None, srpUsername=None,
118 tack=False, supports_npn=False, serverName=None):
119 self.client_version = version
120 self.random = random
121 self.session_id = session_id
122 self.cipher_suites = cipher_suites
123 self.certificate_types = certificate_types
124 self.compression_methods = [0]
125 if srpUsername:
126 self.srp_username = bytearray(srpUsername, "utf-8")
127 self.tack = tack
128 self.supports_npn = supports_npn
129 if serverName:
130 self.server_name = bytearray(serverName, "utf-8")
131 return self
132
190
192 w = Writer()
193 w.add(self.client_version[0], 1)
194 w.add(self.client_version[1], 1)
195 w.addFixSeq(self.random, 1)
196 w.addVarSeq(self.session_id, 1, 1)
197 w.addVarSeq(self.cipher_suites, 2, 2)
198 w.addVarSeq(self.compression_methods, 1, 1)
199
200 w2 = Writer()
201 if self.certificate_types and self.certificate_types != \
202 [CertificateType.x509]:
203 w2.add(ExtensionType.cert_type, 2)
204 w2.add(len(self.certificate_types)+1, 2)
205 w2.addVarSeq(self.certificate_types, 1, 1)
206 if self.srp_username:
207 w2.add(ExtensionType.srp, 2)
208 w2.add(len(self.srp_username)+1, 2)
209 w2.addVarSeq(self.srp_username, 1, 1)
210 if self.supports_npn:
211 w2.add(ExtensionType.supports_npn, 2)
212 w2.add(0, 2)
213 if self.server_name:
214 w2.add(ExtensionType.server_name, 2)
215 w2.add(len(self.server_name)+5, 2)
216 w2.add(len(self.server_name)+3, 2)
217 w2.add(NameType.host_name, 1)
218 w2.addVarSeq(self.server_name, 1, 2)
219 if self.tack:
220 w2.add(ExtensionType.tack, 2)
221 w2.add(0, 2)
222 if len(w2.bytes):
223 w.add(len(w2.bytes), 2)
224 w.bytes += w2.bytes
225 return self.postWrite(w)
226
230
232 return 'Cannot encode a list of next protocols because it contains an element with invalid length %d. Element lengths must be 0 < x < 256' % self.length
233
236 HandshakeMsg.__init__(self, HandshakeType.server_hello)
237 self.server_version = (0,0)
238 self.random = bytearray(32)
239 self.session_id = bytearray(0)
240 self.cipher_suite = 0
241 self.certificate_type = CertificateType.x509
242 self.compression_method = 0
243 self.tackExt = None
244 self.next_protos_advertised = None
245 self.next_protos = None
246
247 - def create(self, version, random, session_id, cipher_suite,
248 certificate_type, tackExt, next_protos_advertised):
249 self.server_version = version
250 self.random = random
251 self.session_id = session_id
252 self.cipher_suite = cipher_suite
253 self.certificate_type = certificate_type
254 self.compression_method = 0
255 self.tackExt = tackExt
256 self.next_protos_advertised = next_protos_advertised
257 return self
258
285
287 protos = []
288 while True:
289 if len(b) == 0:
290 break
291 l = b[0]
292 b = b[1:]
293 if len(b) < l:
294 raise BadNextProtos(len(b))
295 protos.append(b[:l])
296 b = b[l:]
297 return protos
298
300 b = bytearray()
301 for e in self.next_protos_advertised:
302 if len(e) > 255 or len(e) == 0:
303 raise BadNextProtos(len(e))
304 b += bytearray( [len(e)] ) + bytearray(e)
305 return b
306
308 w = Writer()
309 w.add(self.server_version[0], 1)
310 w.add(self.server_version[1], 1)
311 w.addFixSeq(self.random, 1)
312 w.addVarSeq(self.session_id, 1, 1)
313 w.add(self.cipher_suite, 2)
314 w.add(self.compression_method, 1)
315
316 w2 = Writer()
317 if self.certificate_type and self.certificate_type != \
318 CertificateType.x509:
319 w2.add(ExtensionType.cert_type, 2)
320 w2.add(1, 2)
321 w2.add(self.certificate_type, 1)
322 if self.tackExt:
323 b = self.tackExt.serialize()
324 w2.add(ExtensionType.tack, 2)
325 w2.add(len(b), 2)
326 w2.bytes += b
327 if self.next_protos_advertised is not None:
328 encoded_next_protos_advertised = self.__next_protos_encoded()
329 w2.add(ExtensionType.supports_npn, 2)
330 w2.add(len(encoded_next_protos_advertised), 2)
331 w2.addFixSeq(encoded_next_protos_advertised, 1)
332 if len(w2.bytes):
333 w.add(len(w2.bytes), 2)
334 w.bytes += w2.bytes
335 return self.postWrite(w)
336
337
343
345 self.certChain = certChain
346 return self
347
367
369 w = Writer()
370 if self.certificateType == CertificateType.x509:
371 chainLength = 0
372 if self.certChain:
373 certificate_list = self.certChain.x509List
374 else:
375 certificate_list = []
376
377 for cert in certificate_list:
378 bytes = cert.writeBytes()
379 chainLength += len(bytes)+3
380
381 w.add(chainLength, 3)
382 for cert in certificate_list:
383 bytes = cert.writeBytes()
384 w.addVarSeq(bytes, 1, 3)
385 else:
386 raise AssertionError()
387 return self.postWrite(w)
388
396
397 - def create(self, certificate_types, certificate_authorities):
398 self.certificate_types = certificate_types
399 self.certificate_authorities = certificate_authorities
400 return self
401
403 p.startLengthCheck(3)
404 self.certificate_types = p.getVarList(1, 1)
405 ca_list_length = p.get(2)
406 index = 0
407 self.certificate_authorities = []
408 while index != ca_list_length:
409 ca_bytes = p.getVarBytes(2)
410 self.certificate_authorities.append(ca_bytes)
411 index += len(ca_bytes)+2
412 p.stopLengthCheck()
413 return self
414
416 w = Writer()
417 w.addVarSeq(self.certificate_types, 1, 1)
418 caLength = 0
419
420 for ca_dn in self.certificate_authorities:
421 caLength += len(ca_dn)+2
422 w.add(caLength, 2)
423
424 for ca_dn in self.certificate_authorities:
425 w.addVarSeq(ca_dn, 1, 2)
426 return self.postWrite(w)
427
430 HandshakeMsg.__init__(self, HandshakeType.server_key_exchange)
431 self.cipherSuite = cipherSuite
432 self.srp_N = 0
433 self.srp_g = 0
434 self.srp_s = bytearray(0)
435 self.srp_B = 0
436
437 self.dh_p = 0
438 self.dh_g = 0
439 self.dh_Ys = 0
440 self.signature = bytearray(0)
441
442 - def createSRP(self, srp_N, srp_g, srp_s, srp_B):
443 self.srp_N = srp_N
444 self.srp_g = srp_g
445 self.srp_s = srp_s
446 self.srp_B = srp_B
447 return self
448
450 self.dh_p = dh_p
451 self.dh_g = dh_g
452 self.dh_Ys = dh_Ys
453 return self
454
470
472 w = Writer()
473 if self.cipherSuite in CipherSuite.srpAllSuites:
474 w.addVarSeq(numberToByteArray(self.srp_N), 1, 2)
475 w.addVarSeq(numberToByteArray(self.srp_g), 1, 2)
476 w.addVarSeq(self.srp_s, 1, 1)
477 w.addVarSeq(numberToByteArray(self.srp_B), 1, 2)
478 if self.cipherSuite in CipherSuite.srpCertSuites:
479 w.addVarSeq(self.signature, 1, 2)
480 elif self.cipherSuite in CipherSuite.anonSuites:
481 w.addVarSeq(numberToByteArray(self.dh_p), 1, 2)
482 w.addVarSeq(numberToByteArray(self.dh_g), 1, 2)
483 w.addVarSeq(numberToByteArray(self.dh_Ys), 1, 2)
484 if self.cipherSuite in []:
485 w.addVarSeq(self.signature, 1, 2)
486 return self.postWrite(w)
487
488 - def hash(self, clientRandom, serverRandom):
489 oldCipherSuite = self.cipherSuite
490 self.cipherSuite = None
491 try:
492 bytes = clientRandom + serverRandom + self.write()[4:]
493 return MD5(bytes) + SHA1(bytes)
494 finally:
495 self.cipherSuite = oldCipherSuite
496
512
514 - def __init__(self, cipherSuite, version=None):
520
522 self.srp_A = srp_A
523 return self
524
525 - def createRSA(self, encryptedPreMasterSecret):
526 self.encryptedPreMasterSecret = encryptedPreMasterSecret
527 return self
528
530 self.dh_Yc = dh_Yc
531 return self
532
551
553 w = Writer()
554 if self.cipherSuite in CipherSuite.srpAllSuites:
555 w.addVarSeq(numberToByteArray(self.srp_A), 1, 2)
556 elif self.cipherSuite in CipherSuite.certSuites:
557 if self.version in ((3,1), (3,2)):
558 w.addVarSeq(self.encryptedPreMasterSecret, 1, 2)
559 elif self.version == (3,0):
560 w.addFixSeq(self.encryptedPreMasterSecret, 1)
561 else:
562 raise AssertionError()
563 elif self.cipherSuite in CipherSuite.anonSuites:
564 w.addVarSeq(numberToByteArray(self.dh_Yc), 1, 2)
565 else:
566 raise AssertionError()
567 return self.postWrite(w)
568
573
575 self.signature = signature
576 return self
577
583
588
593
595 self.type = 1
596 return self
597
603
605 w = Writer()
606 w.add(self.type,1)
607 return w.bytes
608
609
614
615 - def create(self, next_proto):
616 self.next_proto = next_proto
617 return self
618
625
626 - def write(self, trial=False):
627 w = Writer()
628 w.addVarSeq(self.next_proto, 1, 1)
629 paddingLen = 32 - ((len(self.next_proto) + 2) % 32)
630 w.addVarSeq(bytearray(paddingLen), 1, 1)
631 return self.postWrite(w)
632
638
639 - def create(self, verify_data):
640 self.verify_data = verify_data
641 return self
642
653
658
663
665 self.bytes = bytes
666 return self
667
672
674 self.bytes = p.bytes
675 return self
676
679