Changeset 172
- Timestamp:
- 12/16/06 05:10:56 (2 years ago)
- Files:
-
- oss/httpauthfilter/example.py (added)
- oss/httpauthfilter/httpauth.py (modified) (10 diffs)
- oss/httpauthfilter/httpauthfilter.py (modified) (10 diffs)
- oss/httpauthfilter/static (added)
- oss/httpauthfilter/static/test.js (added)
- oss/httpauthfilter/test.py (added)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
oss/httpauthfilter/httpauth.py
r5 r172 22 22 __version__ = 1, 0, 0 23 23 __author__ = "Tiago Cogumbreiro <cogumbreiro@users.sf.net>" 24 __contributors__ = ['Sylvain Hellegouarch', 'Peter Russell'] 24 25 __credits__ = """ 25 26 Peter van Kampen for its recipe which implement most of Digest authentication: … … 39 40 this list of conditions and the following disclaimer in the documentation 40 41 and/or other materials provided with the distribution. 41 * Neither the name of Sylvain Hellegouarchnor the names of his contributors42 * Neither the name of Tiago Cogumbreiro nor the names of his contributors 42 43 may be used to endorse or promote products derived from this software 43 44 without specific prior written permission. … … 210 211 return _A1 (params_copy, password) 211 212 212 def _A1(params, password ):213 def _A1(params, password, password_is_hashed=False): 213 214 algorithm = params.get ("algorithm", MD5) 214 215 H = DIGEST_AUTH_ENCODERS[algorithm] 215 216 216 if algorithm == MD5: 217 217 # If the "algorithm" directive's value is "MD5" or is … … 221 221 222 222 elif algorithm == MD5_SESS: 223 224 223 # This is A1 if qop is set 225 224 # A1 = H( unq(username-value) ":" unq(realm-value) ":" passwd ) 226 225 # ":" unq(nonce-value) ":" unq(cnonce-value) 227 h_a1 = H ("%s:%s:%s" % (params["username"], params["realm"], password)) 226 if password_is_hashed: 227 h_a1 = password 228 else: 229 h_a1 = H ("%s:%s:%s" % (params["username"], params["realm"], 230 password)) 228 231 return "%s:%s:%s" % (h_a1, params["nonce"], params["cnonce"]) 229 232 … … 251 254 raise NotImplementedError ("The 'qop' method is unknown: %s" % qop) 252 255 253 def _computeDigestResponse(auth_map, password, method = "GET", A1 = None,**kwargs): 256 def _computeDigestResponse(auth_map, password, method = "GET", password_is_hashed=False, 257 A1 = None,**kwargs): 254 258 """ 255 259 Generates a response respecting the algorithm defined in RFC 2617 … … 268 272 if algorithm == MD5_SESS and A1 is not None: 269 273 H_A1 = H(A1) 274 elif password_is_hashed and algorithm == MD5: 275 H_A1 = password 270 276 else: 271 H_A1 = H(_A1(params, password ))277 H_A1 = H(_A1(params, password, password_is_hashed)) 272 278 273 279 if qop == "auth" or aop == "auth-int": … … 296 302 return KD(H_A1, request) 297 303 298 def _checkDigestResponse(auth_map, password, method = "GET", A1 = None,**kwargs): 304 def _checkDigestResponse(auth_map, password, method = "GET", password_is_hashed=False, A1 = None, 305 **kwargs): 299 306 """This function is used to verify the response given by the client when 300 307 he tries to authenticate. … … 305 312 """ 306 313 307 response = _computeDigestResponse(auth_map, password, method, A1,**kwargs) 314 response = _computeDigestResponse(auth_map, password, method, password_is_hashed, 315 A1,**kwargs) 308 316 309 317 return response == auth_map["response"] 310 318 311 def _checkBasicResponse (auth_map, password, method='GET', **kwargs): 312 return auth_map["password"] == password 313 319 def _checkBasicResponse (auth_map, password, method='GET', 320 password_is_hashed=False, **kwargs): 321 if password_is_hashed: 322 hash_components = [auth_map["username"].strip(), 323 auth_map["realm"].strip(), 324 auth_map["password"].strip()] 325 hashed = md5.new(':'.join(hash_components)).hexdigest() 326 return hashed == password 327 else: 328 return auth_map["password"] == password 329 314 330 AUTH_RESPONSES = { 315 331 "basic": _checkBasicResponse, … … 317 333 } 318 334 319 def checkResponse (auth_map, password, method = "GET", **kwargs):335 def checkResponse (auth_map, password, method = "GET", password_is_hashed=False, **kwargs): 320 336 """'checkResponse' compares the auth_map with the password and optionally 321 337 other arguments that each implementation might need. 322 338 339 If the password_is_hashed flag is set, then the password is stored 340 as an md5 hash. 341 323 342 If the response is of type 'Basic' then the function has the following 324 343 signature: 325 344 326 checkBasicResponse (auth_map, password) -> bool 345 checkBasicResponse (auth_map, password, 346 password_is_hashed=False) -> bool 327 347 328 348 If the response is of type 'Digest' then the function has the following 329 349 signature: 330 350 331 checkDigestResponse (auth_map, password, method = 'GET', A1 = None) -> bool 351 checkDigestResponse (auth_map, password, method = 'GET', A1 = None, 352 password_is_hashed=False) -> bool 332 353 333 354 The 'A1' argument is only used in MD5_SESS algorithm based responses. … … 336 357 global AUTH_RESPONSES 337 358 checker = AUTH_RESPONSES[auth_map["auth_scheme"]] 338 return checker (auth_map, password, method, **kwargs)359 return checker (auth_map, password, method, password_is_hashed,**kwargs) 339 360 oss/httpauthfilter/httpauthfilter.py
r52 r172 1 """ 1 # -*- coding: utf-8 -*- 2 3 __authors__ = ["Sylvain Hellegouarch (sh@defuze.org)"] 4 __contributors__ = ['Tiago Cogumbreiro <cogumbreiro@users.sf.net>', 'Peter Russell'] 5 __date__ = "2006/12/16" 6 __copyright__ = """ 2 7 Copyright (c) 2005, Sylvain Hellegouarch 3 8 Copyright (c) 2005, Tiago Cogumbreiro <cogumbreiro@users.sf.net> 9 """ 4 10 11 __license__ = """ 5 12 All rights reserved. 6 13 … … 30 37 31 38 """ 39 16/12/2006 - Added Peter Russell patch to support pre-hashed passwords 40 so that now you can store hashed values instead of clear text 41 - Should have also fixed the staticfilter case 32 42 10/04/2006 - Refactored the filter 33 43 30/12/2005 - Fixed to match lower cases attributes used by CherryPy … … 36 46 06/12/2005 - Updated to match CherryPy 2.2.0-beta revision 862 (Sylvain) 37 47 """ 48 49 # This version has been test against CherryPy 2.2.2 RC2 38 50 39 51 # … … 60 72 maxReplay - allows you to set how many times the same Digest authentification value can be used from a given user agent 61 73 defaults to 0 which dismiss the replay checker 62 retrieveUsersFunc - a function that returns a dictionnary of user:password (clear text) 74 retrieveUsersFunc - a function that returns a dictionnary of 75 user:password if storRealmsWithPasswords is False, or 76 user:{realm:password} if not. If storeHashedCredentials is 77 True, then password will be the hash of username:realm:password. 63 78 handy if you want to retrieve your users from a database for instance 64 79 cppasswordPath - path of the file maintaining the user:password tuples. 65 80 you can leave it to None when using retrieveUsersFunc 66 81 """ 67 def __init__(self, realm, privateKey, unauthorizedPath, maxReplay=0, retrieveUsersFunc=None, cppasswordPath=None): 82 def __init__(self, realm, privateKey, unauthorizedPath, 83 maxReplay=0, retrieveUsersFunc=None, 84 cppasswordPath=None, storeRealmsWithPasswords=False, 85 storeHashedCredentials=False): 68 86 self.cppasswordPath = cppasswordPath 69 87 self.cppasswordLastModified = None … … 76 94 if retrieveUsersFunc is None: 77 95 self.retrieveUsersFunc = self._loadAuthorizedUsers 96 self.storeHashedCredentials = storeHashedCredentials 97 if self.storeHashedCredentials == True: 98 self.storeRealmsWithPasswords = True 99 else: 100 self.storeRealmsWithPasswords = storeRealmsWithPasswords 78 101 79 102 def _loadAuthorizedUsers(self): … … 83 106 cppassword = file(self.cppasswordPath, 'rb') 84 107 for line in cppassword: 85 username, password = line.split(':') 86 self.users[username] = password.rstrip() 108 if self.storeRealmsWithPasswords: 109 username, realm, password = line.split(':') 110 if username in self.users: 111 self.users[username][realm] = password 112 else: 113 self.users[username] = {realm:password.rstrip()} 114 else: 115 username, password = line.split(':') 116 self.users[username] = password.rstrip() 87 117 cppassword.close() 88 118 return self.users … … 98 128 # Parse the header 99 129 ah = httpauth.parseAuthorization(cherrypy.request.headerMap['Authorization']) 100 101 130 # someone might try to trick us with a request we can't understand 102 131 # let's ditch the request … … 104 133 raise cherrypy.HTTPError(400, 'Bad Request') 105 134 else: 135 # If we are dealing with hashed credentials, we need 136 # the realm to be in the auth_map for basic authentication. 137 if self.storeHashedCredentials and not 'realm' in ah: 138 ah['realm'] = self.realm 139 106 140 # the nc token of the Digest authentification 107 141 # allows a server to check for attack replay by setting a … … 122 156 # Is the user authorized? 123 157 password = self.retrieveUsersFunc().get(ah["username"], None) 124 125 if httpauth.checkResponse(ah,password,method=cherrypy.request.method): 158 if self.storeRealmsWithPasswords and password: 159 password = password.get(self.realm, None) 160 method = cherrypy.request.method.upper() 161 if httpauth.checkResponse(ah, password, 162 method=method, 163 password_is_hashed=self.storeHashedCredentials): 126 164 # yeah the user is authorized 127 165 cherrypy.request.isAuthorized = True 128 if cherrypy.request.method.upper()in ["POST", "PUT"]:129 cherrypy.request.processRequestBody = True166 if method not in ["POST", "PUT"]: 167 cherrypy.request.processRequestBody = False 130 168 131 169 def before_main(self): 170 # this should make this filter to work fine with static content as well 171 if cherrypy.config.get('static_filter.on', False) is False: 172 cherrypy.request.execute_main = True 132 173 if not cherrypy.request.isAuthorized: 133 cherrypy.request.execute_main = True134 174 cherrypy.response.status = '401 Unauthorized' 135 175 cherrypy.request.object_path = self.unauthorizedPath … … 140 180 cherrypy.response.header_list.append(('WWW-Authenticate', httpauth.basicAuth(self.realm))) 141 181 142 if __name__ == '__main__':143 144 def retrieveAuthUsers():145 return {'test':'test'}146 147 class Son:148 def index(self):149 '''Protected by the filter above'''150 return "Hello son!"151 index.exposed = True152 153 def hello(self, name):154 return "Hello %s" % (name, )155 hello.exposed = True156 157 class Admin:158 _cp_filters = [ HttpAuthFilter(realm='localhost',159 privateKey='duh!',160 unauthorizedPath='/admin/unauthorized',161 retrieveUsersFunc=retrieveAuthUsers) ]162 163 def __init__(self):164 self.son = Son()165 166 def index(self):167 '''Only available once the user is authenticated'''168 return "Hello there!"169 index.exposed = True170 171 def unauthorized(self, *args, **kwargs):172 '''Must be declared at the same level where you defined the filter'''173 return "You are not authorized to access that page"174 unauthorized.exposed = True175 176 class Root:177 def __init__(self):178 self.admin = Admin()179 180 def index(self):181 return "hey how you doing?"182 index.exposed = True183 184 conf = {'global': { 'log_debug_info_filter.on': False,185 'autoreload.on': False,186 'server.log_to_screen': False,}}187 cherrypy.tree.mount(Root(), '/', conf=conf)188 189 from cherrypy.test import helper190 191 class HttpAuthFilter(helper.CPWebCase):192 193 def testBasicAuth(self):194 self.getPage('/admin/')195 self.assertStatus('401 Unauthorized')196 self.assertHeader("WWW-Authenticate", 'Basic realm="localhost"')197 self.assertBody("You are not authorized to access that page")198 199 # make sure we keep getting that response as long as we haven't200 # given the authentification token201 self.getPage('/admin/')202 self.assertStatus('401 Unauthorized')203 self.assertHeader("WWW-Authenticate", 'Basic realm="localhost"')204 205 # right show our ID206 self.getPage('/admin/', [('Authorization', 'Basic dGVzdDp0ZXN0')])207 self.assertStatus('200 OK')208 209 # should work also through the tree below the admin instance210 self.getPage('/admin/son/', [('Authorization', 'Basic dGVzdDp0ZXN0')])211 self.assertStatus('200 OK')212 213 # should forbid us again214 self.getPage('/admin/')215 self.assertStatus('401 Unauthorized')216 self.assertHeader("WWW-Authenticate", 'Basic realm="localhost"')217 self.assertBody("You are not authorized to access that page")218 219 def testDigestAuth(self):220 self.getPage('/admin/')221 self.assertStatus('401 Unauthorized')222 self.assertHeader("WWW-Authenticate")223 self.assertBody("You are not authorized to access that page")224 225 # the filter returns two WWW-Authenticate226 # the digest auth should be the first one though227 lowkey = "www-authenticate"228 value = None229 for k, v in self.headers:230 if k.lower() == lowkey:231 if v.startswith("Digest"):232 value = v233 break234 235 if value is None:236 self._handlewebError("Digest authentification scheme was not found")237 238 # the digest algorithm uses a timestamp239 # so we need to parse the response ourself because we can't hardcode240 # the expected value241 if value:242 value = value[7:]243 items = value.split(', ')244 tokens = {}245 for item in items:246 key, value = item.split('=')247 tokens[key.lower()] = value248 249 missing_msg = "%s is missing"250 bad_value_msg = "'%s' was expecting '%s' but found '%s'"251 nonce = None252 if 'realm' not in tokens:253 self._handlewebError(missing_msg % 'realm')254 elif tokens['realm'] != '"localhost"':255 self._handlewebError(bad_value_msg % ('realm', '"localhost"', tokens['realm']))256 if 'nonce' not in tokens:257 self._handlewebError(missing_msg % 'nonce')258 else:259 nonce = tokens['nonce'].strip('"')260 if 'algorithm' not in tokens:261 self._handlewebError(missing_msg % 'algorithm')262 elif tokens['algorithm'] != '"MD5"':263 self._handlewebError(bad_value_msg % ('algorithm', '"MD5"', tokens['algorithm']))264 if 'qop' not in tokens:265 self._handlewebError(missing_msg % 'qop')266 elif tokens['qop'] != '"auth"':267 self._handlewebError(bad_value_msg % ('qop', '"auth"', tokens['qop']))268 269 # now let's see if what270 base_auth = 'Digest username="test", realm="localhost", nonce="%s", uri="/admin/", algorithm=MD5, response="%s", qop=auth, nc=%s, cnonce="1522e61005789929"'271 272 auth = base_auth % (nonce, '', '00000001')273 274 params = httpauth.parseAuthorization(auth)275 response = httpauth._computeDigestResponse(params, 'test')276 277 auth = base_auth % (nonce, response, '00000001')278 self.getPage('/admin/', [('Authorization', auth)])279 self.assertStatus('200 OK')280 281 #helper.testmain()282 cherrypy.server.start()
