Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

import gpg 

import logging 

from django.conf import settings 

from enum import IntEnum 

 

from vendor.hkp4py import HTTPClientError, KeyServer 

 

logging.getLogger(__name__) 

logging.getLogger().setLevel(logging.DEBUG) 

 

DEFAULT_KEYSERVERS = ['hkps://keys.openpgp.org'] 

 

ERRMSG_EXPIRED = 'Key is expired' 

ERRMSG_REVOKED = 'Key is revoked' 

ERRMSG_AMBIGUOUS = 'Key ID has been found multiple times on the same keyserver.' 

ERRMSG_INVALID = 'Key ID is invalid or too short.' 

ERRMSG_NOTFOUND = 'Key not found.' 

ERRMSG_NOTIMPORTED = 'Key could not be imported. Maybe it has no UIDs?' 

 

 

class KeyRefreshStatus(IntEnum): 

NA = 0 # Not refreshed yet, and no local key retrievable 

CURRENT = 1 # Key has been updated from keyservers after being instantiated 

STORED = 2 # Key has been found in local keyring 

SRVERR = 3 # All keyservers have returned a Server Error (unavailable) 

AMBIGUOUS = 4 # Key ID is ambiguous on at least one server 

INVALID = 5 # Search for Key ID has been considered an invalid request on at least one server, Key has not been found on other servers. 

IMPORTERR = 6 # Key found, but could not be imported. Likely missing UIDs 

 

 

class KeyStatus(IntEnum): 

NA = 0 # No Key available 

REVOKED = 1 # Key is considered revoked by GPG 

EXPIRED = 2 # Key is considered expired by GPG 

AVLBL = 3 # Key is available 

INVALID = 4 # Key may be available but should not be used 

 

 

class ServerUnavailableError(Exception): 

"""Raised by retrieve_key if search throws an exception.""" 

pass 

 

 

class AmbiguousKeyIDError(Exception): 

"""Raised by retrieve_key if search returns multiple results for the same keyid""" 

pass 

 

 

class InvalidRequestError(Exception): 

"""Raised by retrieve_key if search returns a HTTP Client Error.""" 

pass 

 

 

class NoDefaultSignatureKeyError(Exception): 

"""Raised when encrypting a message to a public key without specifying a Signature (private) key.""" 

pass 

 

 

class EncryptionKeyInvalidError(Exception): 

"""Raised when encrypting a message to a public key with an invalid, revoked, expired or unavailable key.""" 

pass 

 

 

class EncryptionGenericError(Exception): 

"""Raised when any unexpected exception is raised by GPGME during encryption.""" 

pass 

 

 

class SignatureGenericError(Exception): 

"""Raised when any unexpected exception is raised by GPGME during signing.""" 

pass 

 

 

class SignatureKeyInvalidError(Exception): 

"""Raised when attempting to sign a message with an invalid, revoked, expired or unavailable key.""" 

pass 

 

 

def retrieve_key(keyid, keyserver): 

""" Low-Level function handling retrieval of keyid from keyserver. 

Please note, that the result is untrusted, and there is no proof yet 

that the key corresponds to a given keyid! """ 

server = KeyServer(keyserver) 

keys = None 

candidate = None 

# The legacy keyservers require key IDs and fingerprints to start with 0x 

if not keyid.startswith('0x'): 

keyid = '0x' + keyid 

try: 

keys = server.search(keyid) 

except HTTPClientError as ex: 

logging.error('Client Error {}'.format(ex)) 

raise InvalidRequestError 

except Exception as ex: 

logging.error('Server Error {}'.format(ex)) 

raise ServerUnavailableError 

 

# If a short key id has been supplied, a keyserver 

# could respond with more than one ID (https://evil32.com). 

# 

# The most prudent thing to do in this case is not to use the 

# key at all, and request a unique identifier 

if keys is not None: 

if len(keys) == 1: 

candidate = keys[0] 

106 ↛ 109line 106 didn't jump to line 109, because the condition on line 106 was never false elif keys != []: 

raise AmbiguousKeyIDError 

 

return candidate 

 

 

def sign_message(msg, signer=None): 

""" Utility function to create signed plaintext message.""" 

with gpg.Context(armor=True) as ctx: 

ctx.set_engine_info(gpg.constants.protocol.OpenPGP, 

home_dir=settings.GPG_HOME) 

117 ↛ 118line 117 didn't jump to line 118, because the condition on line 117 was never true if signer is None: 

if settings.GPG_HOST_USER is not None: 

signer = settings.GPG_HOST_USER 

else: 

logging.error( 

'Could not find default signature key. Check your settings.' 

) 

raise NoDefaultSignatureKeyError 

try: 

signing_key = ctx.get_key(signer, secret=True) 

ctx.signers = [signing_key] 

except gpg.errors.KeyNotFound as ex: 

logging.error('Could not find signature key: {}'.format(ex)) 

raise NoDefaultSignatureKeyError 

except gpg.errors.GPGMEError as ex: 

logging.error('GPG error with signature key: {}'.format(ex)) 

raise NoDefaultSignatureKeyError 

 

try: 

136 ↛ 139line 136 didn't jump to line 139, because the condition on line 136 was never false if isinstance(msg, str): 

msg = msg.encode() 

 

signed_msg, result = ctx.sign(msg, 

mode=gpg.constants.sig.mode.CLEAR) 

return (signed_msg, result) 

except Exception as ex: 

# Fail loudly! 

raise SignatureGenericError('Signing Failed: {}'.format(ex)) 

 

 

class PublicKey(object): 

def __init__(self, keyid): 

self.keyid = keyid 

self.ctx = gpg.Context(armor=True) 

self.ctx.set_engine_info(gpg.constants.protocol.OpenPGP, 

home_dir=settings.GPG_HOME) 

self.ctx.set_keylist_mode(gpg.constants.keylist.mode.LOCAL) 

self.key = None 

self.keystatus = KeyStatus.NA 

self.refresh_status = KeyRefreshStatus.NA 

self._access_keyring() 

if self.key is not None: 

self.refresh_status = KeyRefreshStatus.STORED 

 

def __del__(self): 

del self.ctx 

return 

 

def _access_keyring(self): 

# Check for key in PublicKeyring 

try: 

self.key = self.ctx.get_key(self.keyid) 

169 ↛ 185line 169 didn't jump to line 185, because the condition on line 169 was never false if self.key is not None: 

if self.key.revoked: 

self.keystatus = KeyStatus.REVOKED 

elif self.key.expired: 

self.keystatus = KeyStatus.EXPIRED 

174 ↛ 175line 174 didn't jump to line 175, because the condition on line 174 was never true elif self.key.disabled: 

self.keystatus = KeyStatus.NA 

else: 

177 ↛ 180line 177 didn't jump to line 180, because the condition on line 177 was never false if self.refresh_status != KeyRefreshStatus.AMBIGUOUS: 

self.keystatus = KeyStatus.AVLBL 

else: 

self.keystatus = KeyStatus.INVALID 

logging.debug( 

'Key {0} found in keyring, with status {1}'.format( 

self.keyid, self.keystatus)) 

else: 

logging.debug('No key in keyring for ID {}'.format(self.keyid)) 

186 ↛ 189line 186 didn't jump to line 189 except gpg.errors.KeyNotFound: 

logging.debug('No key in keyring for ID {}'.format(self.keyid)) 

self.keystatus = KeyStatus.NA 

except gpg.errors.GPGMEError as ex: 

logging.error( 

'GPG error while reading key {0} from keyring: {1}'.format( 

self.keyid, ex)) 

self.keystatus = KeyStatus.NA 

return 

 

def store(armored_key): 

raise NotImplementedError("TODO: Implement manual storage of keys") 

 

def refresh(self): 

self.refresh_status = KeyRefreshStatus.NA 

 

202 ↛ 203line 202 didn't jump to line 203, because the condition on line 202 was never true if settings.GPG_KEYSERVERS is None or settings.GPG_KEYSERVERS == []: 

keyservers = DEFAULT_KEYSERVERS 

else: 

keyservers = settings.GPG_KEYSERVERS 

 

if self.key is not None: 

self.refresh_status = KeyRefreshStatus.STORED 

 

for keyserver in keyservers: 

try: 

key = retrieve_key(self.keyid, keyserver) 

if key is not None: 

# We have a key - import it and let GPG decide on the 

# Update logic. 

# Note that as of now, gpg as distributed by Debian 

# will also import keys without a user id attached, 

# but unpatched versions of gpg don't. This introduces 

# the possibility that revocations without user id 

# may not be processed by gpg if the system is installed on 

# a different Linux distribution or on a *BSD! 

import_result = self.ctx.key_import(key.key_blob) 

223 ↛ 261line 223 didn't jump to line 261, because the condition on line 223 was never false if import_result is not None: 

224 ↛ 239line 224 didn't jump to line 239, because the condition on line 224 was never false if hasattr(import_result, "considered"): 

logging.debug(""" 

Key {0} found on {1}. 

New Revocations: {2} 

Unchanged: {3} 

""".format(self.keyid, keyserver, 

import_result.new_revocations, 

import_result.unchanged)) 

232 ↛ 261line 232 didn't jump to line 261, because the condition on line 232 was never false if self.refresh_status != KeyRefreshStatus.AMBIGUOUS: 

# Importing the key failed. Most likely a broken key or missing UIDs 

if import_result.no_user_id == 1 and not self.refresh_status == KeyRefreshStatus.CURRENT: 

self.refresh_status = KeyRefreshStatus.IMPORTERR 

else: 

self.refresh_status = KeyRefreshStatus.CURRENT 

else: 

logging.warning( 

'Import of key {0} from {1} failed'.format( 

self.keyid, keyserver)) 

except ServerUnavailableError: 

if self.refresh_status != KeyRefreshStatus.CURRENT and self.refresh_status != KeyRefreshStatus.AMBIGUOUS: 

# Only set a server error status if the key has not yet been found 

# on a different server. 

self.refresh_status = KeyRefreshStatus.SRVERR 

except InvalidRequestError: 

if self.refresh_status == KeyRefreshStatus.NA or self.refresh_status == KeyRefreshStatus.SRVERR: 

# If the key was already found on another server, or was 

# ambiguous on another server, those states should have precedence. 

251 ↛ 261line 251 didn't jump to line 261, because the condition on line 251 was never false if self.refresh_status != KeyRefreshStatus.AMBIGUOUS: 

self.refresh_status = KeyRefreshStatus.INVALID 

logging.error( 

'Search for Key {0} was considered an invalid request by {1}' 

.format(self.keyid, keyserver)) 

except AmbiguousKeyIDError: 

self.refresh_status = KeyRefreshStatus.AMBIGUOUS 

logging.error('Key {0} has multiple results on {1}'.format( 

self.keyid, keyserver)) 

# Look into Keyring to set the Key Status 

self._access_keyring() 

 

def get_error_msg(self): 

msg = '' 

if self.keystatus == KeyStatus.EXPIRED: 

msg = ERRMSG_EXPIRED 

elif self.keystatus == KeyStatus.REVOKED: 

msg = ERRMSG_REVOKED 

269 ↛ 270line 269 didn't jump to line 270, because the condition on line 269 was never true elif self.keystatus == KeyStatus.INVALID: 

if self.refresh_status == KeyRefreshStatus.AMBIGUOUS: 

msg = ERRMSG_AMBIGUOUS 

else: 

msg = ERRMSG_INVALID 

274 ↛ 284line 274 didn't jump to line 284, because the condition on line 274 was never false elif self.keystatus == KeyStatus.NA: 

if self.refresh_status == KeyRefreshStatus.INVALID: 

msg = ERRMSG_INVALID 

elif self.refresh_status == KeyRefreshStatus.AMBIGUOUS: 

msg = ERRMSG_AMBIGUOUS 

elif self.refresh_status == KeyRefreshStatus.IMPORTERR: 

msg = ERRMSG_NOTIMPORTED 

else: 

msg = ERRMSG_NOTFOUND 

else: 

logging.error( 

"Implementation Error: Not all possible key states are explicitly handled" 

) 

return msg 

 

def encrypt_to(self, msg, signer=None): 

290 ↛ 291line 290 didn't jump to line 291, because the condition on line 290 was never true if signer is None: 

if settings.GPG_HOST_USER is not None: 

signer = settings.GPG_HOST_USER 

else: 

logging.error( 

'Could not find default signature key. Check your settings.' 

) 

raise NoDefaultSignatureKeyError 

try: 

signing_key = self.ctx.get_key(signer) 

self.ctx.signers = [signing_key] 

except gpg.errors.KeyNotFound as ex: 

logging.error('Could not find signature key: {}'.format(ex)) 

raise NoDefaultSignatureKeyError 

except gpg.errors.GPGMEError as ex: 

logging.error('GPG error with signature key: {}'.format(ex)) 

raise NoDefaultSignatureKeyError 

 

308 ↛ 309line 308 didn't jump to line 309, because the condition on line 308 was never true if self.keystatus != KeyStatus.AVLBL: 

raise EncryptionKeyInvalidError 

 

try: 

312 ↛ 314line 312 didn't jump to line 314, because the condition on line 312 was never false if isinstance(msg, str): 

msg = msg.encode() 

ciphertext, result, sign_result = self.ctx.encrypt( 

msg, 

recipients=[signing_key, self.key], 

sign=True, 

always_trust=True) 

return (ciphertext, result, sign_result) 

except gpg.errors.InvalidRecipients as ex: 

raise EncryptionKeyInvalidError(ex) 

except gpg.errors.InvalidSigners as ex: 

raise SignatureKeyInvalidError(ex) 

except Exception as ex: 

raise EncryptionGenericError(ex)