]> git.openstreetmap.org Git - osqa.git/blob - forum_modules/openidauth/consumer.py
17d1378f53dc3248c800f70a1532e2fd36dd097e
[osqa.git] / forum_modules / openidauth / consumer.py
1 from django.utils.html import escape\r
2 from django.http import get_host\r
3 \r
4 from forum.authentication.base import AuthenticationConsumer, InvalidAuthentication\r
5 import settings\r
6 \r
7 from openid.yadis import xri\r
8 from openid.consumer.consumer import Consumer, SUCCESS, CANCEL, FAILURE, SETUP_NEEDED\r
9 from openid.consumer.discover import DiscoveryFailure\r
10 from openid.extensions.sreg import SRegRequest, SRegResponse\r
11 from openid.extensions.ax import FetchRequest as AXFetchRequest, AttrInfo, FetchResponse as AXFetchResponse\r
12 from django.utils.translation import ugettext as _\r
13 \r
14 from store import OsqaOpenIDStore\r
15 \r
16 class OpenIdAbstractAuthConsumer(AuthenticationConsumer):\r
17 \r
18     def get_user_url(self, request):\r
19         try:\r
20             return request.POST['openid_identifier']\r
21         except:\r
22             raise NotImplementedError()\r
23 \r
24     def prepare_authentication_request(self, request, redirect_to):\r
25         if not redirect_to.startswith('http://') or redirect_to.startswith('https://'):\r
26                     redirect_to =  get_url_host(request) + redirect_to\r
27 \r
28         user_url = self.get_user_url(request)\r
29 \r
30         if xri.identifierScheme(user_url) == 'XRI' and getattr(\r
31             settings, 'OPENID_DISALLOW_INAMES', False\r
32         ):\r
33             raise InvalidAuthentication('i-names are not supported')\r
34 \r
35         consumer = Consumer(request.session, OsqaOpenIDStore())\r
36 \r
37         try:\r
38             auth_request = consumer.begin(user_url)\r
39         except DiscoveryFailure:\r
40             raise InvalidAuthentication(_('Sorry, but your input is not a valid OpenId'))\r
41 \r
42         #sreg = getattr(settings, 'OPENID_SREG', False)\r
43 \r
44         #if sreg:\r
45         #    s = SRegRequest()\r
46         #    for sarg in sreg:\r
47         #        if sarg.lower().lstrip() == "policy_url":\r
48         #            s.policy_url = sreg[sarg]\r
49         #        else:\r
50         #            for v in sreg[sarg].split(','):\r
51         #                s.requestField(field_name=v.lower().lstrip(), required=(sarg.lower().lstrip() == "required"))\r
52         #    auth_request.addExtension(s)\r
53 \r
54         #auth_request.addExtension(SRegRequest(required=['email']))\r
55 \r
56         if request.session.get('force_email_request', True):\r
57             axr = AXFetchRequest()\r
58             axr.add(AttrInfo("http://axschema.org/contact/email", 1, True, "email"))\r
59             auth_request.addExtension(axr)\r
60 \r
61         trust_root = getattr(\r
62             settings, 'OPENID_TRUST_ROOT', get_url_host(request) + '/'\r
63         )\r
64 \r
65 \r
66         return auth_request.redirectURL(trust_root, redirect_to)\r
67 \r
68     def process_authentication_request(self, request):\r
69         consumer = Consumer(request.session, OsqaOpenIDStore())\r
70 \r
71         query_dict = dict([\r
72             (k.encode('utf8'), v.encode('utf8')) for k, v in request.GET.items()\r
73         ])\r
74 \r
75         #for i in query_dict.items():\r
76         #    print "%s : %s" % i\r
77 \r
78         url = get_url_host(request) + request.path\r
79         openid_response = consumer.complete(query_dict, url)\r
80 \r
81         if openid_response.status == SUCCESS:\r
82             if request.session.get('force_email_request', True):\r
83                 try:\r
84                     ax = AXFetchResponse.fromSuccessResponse(openid_response)\r
85                     email = ax.getExtensionArgs()['value.ext0.1']\r
86                     request.session['auth_email_request'] = email\r
87                 except Exception, e:\r
88                     pass\r
89 \r
90             return request.GET['openid.identity']\r
91         elif openid_response.status == CANCEL:\r
92             raise InvalidAuthentication(_('The OpenId authentication request was canceled'))\r
93         elif openid_response.status == FAILURE:\r
94             raise InvalidAuthentication(_('The OpenId authentication failed: ') + openid_response.message)\r
95         elif openid_response.status == SETUP_NEEDED:\r
96             raise InvalidAuthentication(_('Setup needed'))\r
97         else:\r
98             raise InvalidAuthentication(_('The OpenId authentication failed with an unknown status: ') + openid_response.status)\r
99 \r
100     def get_user_data(self, key):\r
101         return {}\r
102 \r
103 def get_url_host(request):\r
104     if request.is_secure():\r
105         protocol = 'https'\r
106     else:\r
107         protocol = 'http'\r
108     host = escape(get_host(request))\r
109     return '%s://%s' % (protocol, host)\r
110 \r
111 def get_full_url(request):\r
112     return get_url_host(request) + request.get_full_path()