Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

""" 

Python HKP protocol client implementation based on current draft spec 

http://tools.ietf.org/html/draft-shaw-openpgp-hkp-00 

 

Taken from:  

https://github.com/dgladkov/python-hkp/blob/master/hkp/client.py 

""" 

from datetime import datetime 

import requests 

try: 

import urllib.parse as parse 

except ImportError: 

import urlparse as parse 

import hkp4py.utils as utils 

 

__all__ = ['Key', 'Identity', 'KeyServer', 'HTTPClientError'] 

 

# Loosely taken from RFC2440 (http://tools.ietf.org/html/rfc2440#section-9.1) 

ALGORITHMS = { 

0: 'unknown', 

1: 'RSA (Encrypt or Sign)', 

2: 'RSA Encrypt-Only', 

3: 'RSA Sign-Only', 

16: 'Elgamal (Encrypt-Only)', 

17: 'DSA (Digital Signature Standard)', 

18: 'Elliptic Curve', 

19: 'ECDSA', 

20: 'Elgamal (Encrypt or Sign)', 

21: 'Reserved for Diffie-Hellman', 

22: 'EdDSA', 

} 

 

 

class HTTPClientError(Exception): 

"""Thrown by Keyserver.search if any 4xx Error Code is reported.""" 

pass 

 

 

class Key(object): 

""" 

Public key object. 

""" 

 

_begin_header = '-----BEGIN PGP PUBLIC KEY BLOCK-----' 

_end_header = '-----END PGP PUBLIC KEY BLOCK-----' 

 

def __init__(self, 

host, 

port, 

keyid, 

algo, 

keylen, 

creation_date, 

expiration_date, 

flags, 

session=None): 

""" 

Takes keyserver host and port used to look up ASCII armored key, and 

data as it is present in search query result. 

""" 

self.host = host 

self.port = port 

self.keyid = keyid 

algo = int(algo) 

self.algo = ALGORITHMS.get(algo, algo) 

self.key_length = int(keylen) 

self.creation_date = datetime.fromtimestamp(int(creation_date)) 

self.session = session 

 

if expiration_date: 

self.expiration_date = datetime.fromtimestamp(int(expiration_date)) 

else: 

self.expiration_date = None 

 

self.revoked = self.disabled = self.expired = False 

if 'r' in flags: 

self.revoked = True 

78 ↛ 79line 78 didn't jump to line 79, because the condition on line 78 was never true if 'd' in flags: 

self.disabled = True 

80 ↛ 81line 80 didn't jump to line 81, because the condition on line 80 was never true if 'e' in flags: 

self.expired = True 

 

self.identities = [] 

 

def __repr__(self): 

return 'Key {} {}'.format(self.keyid, self.algo) 

 

def __str__(self): 

return repr(self) 

 

@utils.cached_property 

def key(self): 

return self.retrieve() 

 

@utils.cached_property 

def key_blob(self): 

return self.retrieve(blob=True) 

 

def retrieve(self, nm=False, blob=False): 

""" 

Retrieve public key from keyserver and strip off any enclosing HTML. 

""" 

opts = ( 

('mr', True), 

('nm', nm), 

) 

 

keyid = self.keyid 

params = { 

'search': keyid.startswith('0x') and keyid or '0x{}'.format(keyid), 

'op': 'get', 

'options': ','.join(name for name, val in opts if val), 

} 

request_url = '{}:{}/pks/lookup'.format(self.host, self.port) 

response = self.session.get(request_url, params=params) 

116 ↛ 130line 116 didn't jump to line 130, because the condition on line 116 was never false if response.ok: 

# strip off enclosing text or HTML. According to RFC headers MUST be 

# always preserved, so we rely on them 

response = response.text 

key = response.split(self._begin_header)[1].split( 

self._end_header)[0] 

key = '{}{}{}'.format(self._begin_header, key, self._end_header) 

123 ↛ 128line 123 didn't jump to line 128, because the condition on line 123 was never false if blob: 

# cannot use requests.content because of potential html 

# provided by keyserver. (see above comment) 

return bytes(key.encode("utf-8")) 

else: 

return key 

else: 

return None 

 

 

class Identity(object): 

""" 

Key owner's identity. Constructor takes data as it is present in search 

query result. 

""" 

def __init__(self, uid, creation_date, expiration_date, flags): 

self.uid = parse.unquote(uid) 

 

141 ↛ 144line 141 didn't jump to line 144, because the condition on line 141 was never false if creation_date: 

self.creation_date = datetime.fromtimestamp(int(creation_date)) 

else: 

self.creation_date = None 

 

146 ↛ 147line 146 didn't jump to line 147, because the condition on line 146 was never true if expiration_date: 

self.expiration_date = datetime.fromtimestamp(int(expiration_date)) 

else: 

self.expiration_date = None 

 

self.revoked = self.disabled = self.expired = False 

 

153 ↛ 154line 153 didn't jump to line 154, because the condition on line 153 was never true if 'r' in flags: 

self.revoked = True 

155 ↛ 156line 155 didn't jump to line 156, because the condition on line 155 was never true if 'd' in flags: 

self.disabled = True 

157 ↛ 158line 157 didn't jump to line 158, because the condition on line 157 was never true if 'e' in flags: 

self.expired = True 

 

def __repr__(self): 

return 'Identity {}'.format(self.uid) 

 

def __str__(self): 

return repr(self) 

 

 

class KeyServer(object): 

""" 

Keyserver object used for search queries. 

""" 

def __init__(self, 

host, 

port=11371, 

proxies=None, 

headers=None, 

verify=True): 

177 ↛ 183line 177 didn't jump to line 183, because the condition on line 177 was never false if host.startswith('hkp://') or host.startswith('hkps://'): 

host = host.replace("hkp", "http", 1) 

if host.startswith('https'): 

180 ↛ 184line 180 didn't jump to line 184, because the condition on line 180 was never false if port == 11371: 

port = 443 

else: 

raise Exception("Unsupported protocol, hkp|hkps are supported.") 

self.host = host 

self.port = port 

# Buildup Session 

self.session = requests.session() 

self.session.headers = headers 

self.session.proxies = proxies 

190 ↛ 191line 190 didn't jump to line 191, because the condition on line 190 was never true if host.endswith("hkps.pool.sks-keyservers.net"): 

verify = utils.ca().pem 

self.session.verify = verify 

 

def __parse_index(self, response): 

""" 

Parse machine readable index response. 

""" 

lines = response.splitlines()[1:] 

result, key = [], None 

 

for line in iter(lines): 

items = line.split(':') 

if 'pub' in items[0]: 

key = Key(self.host, 

self.port, 

*items[1:], 

session=self.session) 

result.append(key) 

if 'uid' in items[0] and key: 

key.identities.append(Identity(*items[1:])) 

return result 

 

def search(self, query, exact=False, nm=False): 

""" 

Searches for given query, returns list of key objects. 

""" 

opts = ( 

('mr', True), 

('nm', nm), 

) 

 

params = { 

'op': 'index', 

'options': ','.join(name for name, val in opts if val), 

'search': query, 

'exact': exact and 'on' or 'off', 

} 

 

request_url = '{}:{}/pks/lookup'.format(self.host, self.port) 

response = self.session.get(request_url, params=params) 

if response.ok: 

response = response.text 

elif response.status_code == requests.codes.not_found: 

return None 

235 ↛ 236line 235 didn't jump to line 236, because the condition on line 235 was never true elif response.status_code >= 200 and response.status_code < 300: 

return None 

elif response.status_code >= 400 and response.status_code < 500: 

# Distinguish between client errors and server errors / redirects 

raise HTTPClientError 

else: 

raise Exception('{}\nRequest URL: {}\nResponse:\n{}'.format( 

response.status_code, response.request.url, response.text)) 

return self.__parse_index(response) 

 

def add(self, key): 

""" 

Upload key to the keyserver. 

""" 

request_url = '{}:{}/pks/add'.format(self.host, self.port) 

data = {'keytext': key} 

response = self.session.post(request_url, data=data) 

response.raise_for_status()