]> git.openstreetmap.org Git - osqa.git/blob - forum_modules/openidauth/consumer.py
4c3818cbc41c47c6fa43e2cfe6b20dcc551a003f
[osqa.git] / forum_modules / openidauth / consumer.py
1 import re
2
3 from django.utils.html import escape
4 from django.http import get_host
5
6 from forum.authentication.base import AuthenticationConsumer, InvalidAuthentication
7 import settings
8
9 from openid.yadis import xri
10 from openid.consumer.consumer import Consumer, SUCCESS, CANCEL, FAILURE, SETUP_NEEDED
11 from openid.consumer.discover import DiscoveryFailure
12 from openid.extensions.sreg import SRegRequest, SRegResponse
13 from openid.extensions.ax import FetchRequest as AXFetchRequest, AttrInfo, FetchResponse as AXFetchResponse
14 from django.utils.translation import ugettext as _
15
16 from store import OsqaOpenIDStore
17
18 class OpenIdAbstractAuthConsumer(AuthenticationConsumer):
19
20     dataype2ax_schema = {
21         #'username': 'http://axschema.org/namePerson/friendly',
22         'email': 'http://axschema.org/contact/email',
23         #'web': 'http://axschema.org/contact/web/default',
24         #'firstname': 'http://axschema.org/namePerson/first',
25         #'lastname': 'http://axschema.org/namePerson/last',
26         #'birthdate': 'http://axschema.org/birthDate',
27     }
28
29     sreg_attributes = {
30         "required": {
31             "email": "email",
32             "nickname": "username"
33         }
34     }
35
36     def get_user_url(self, request):
37         try:
38             return request.POST['openid_identifier']
39         except:
40             raise NotImplementedError()
41
42     def prepare_authentication_request(self, request, redirect_to):
43         if not redirect_to.startswith('http://') or redirect_to.startswith('https://'):
44                     redirect_to =  get_url_host(request) + redirect_to
45
46         user_url = self.get_user_url(request)
47
48         if xri.identifierScheme(user_url) == 'XRI' and getattr(
49             settings, 'OPENID_DISALLOW_INAMES', False
50         ):
51             raise InvalidAuthentication('i-names are not supported')
52
53         consumer = Consumer(request.session, OsqaOpenIDStore())
54
55         try:
56             auth_request = consumer.begin(user_url)
57         except DiscoveryFailure:
58             raise InvalidAuthentication(_('Sorry, but your input is not a valid OpenId'))
59
60         sreg = getattr(self, 'sreg_attributes', False)
61
62         if sreg:
63             s = SRegRequest()
64
65             for k, attr_dic in sreg.items():
66                 if k == "policy_url":
67                     s.policy_url = attr_dic
68                     continue
69
70                 for attr_name in attr_dic.keys():
71                     s.requestField(field_name=attr_name, required=(k == "required"))
72
73             auth_request.addExtension(s)
74
75         ax_schema = getattr(self, 'dataype2ax_schema', False)
76
77         if ax_schema and request.session.get('force_email_request', True):
78             axr = AXFetchRequest()
79             for data_type, schema in ax_schema.items():
80                 if isinstance(schema, tuple):
81                     axr.add(AttrInfo(schema[0], 1, True, schema[1]))
82                 else:
83                     axr.add(AttrInfo(schema, 1, True, data_type))
84
85             auth_request.addExtension(axr)
86
87         trust_root = getattr(
88             settings, 'OPENID_TRUST_ROOT', get_url_host(request) + '/'
89         )
90
91         return auth_request.redirectURL(trust_root, redirect_to)
92
93     def process_authentication_request(self, request):
94         consumer = Consumer(request.session, OsqaOpenIDStore())
95
96         query_dict = dict([
97             (k.encode('utf8'), v.encode('utf8')) for k, v in request.GET.items()
98         ])
99
100         #for i in query_dict.items():
101             #print "%s : %s" % i
102
103         url = get_url_host(request) + request.path
104         openid_response = consumer.complete(query_dict, url)
105
106         if openid_response.status == SUCCESS:
107
108             consumer_data = {}
109
110             sreg_attrs = getattr(self, 'sreg_attributes', False)
111
112             if sreg_attrs:
113                 sreg_response = SRegResponse.fromSuccessResponse(openid_response)
114
115                 all_attrs = {}
116                 [all_attrs.update(d) for k,d in sreg_attrs.items() if k != "policy_url"]
117
118                 for attr_name, local_name in all_attrs.items():
119                     if attr_name in sreg_response:
120                         consumer_data[local_name] = sreg_response[attr_name]
121
122             ax_schema = getattr(self, 'dataype2ax_schema', False)
123
124             if ax_schema:
125                 ax = AXFetchResponse.fromSuccessResponse(openid_response)
126
127                 axargs = ax.getExtensionArgs()
128
129                 ax_schema2data_type = dict([(s, t) for t, s in ax_schema.items()])
130
131                 available_types = dict([
132                     (ax_schema2data_type[s], re.sub('^type\.', '', n))
133                     for n, s in axargs.items() if s in ax_schema2data_type
134                 ])
135
136                 for t, s in available_types.items():
137                     if not t in consumer_data:
138                         consumer_data[t] = axargs["value.%s.1" % s]
139                     
140             request.session['auth_consumer_data'] = consumer_data
141
142
143             return request.GET['openid.identity']
144         elif openid_response.status == CANCEL:
145             raise InvalidAuthentication(_('The OpenId authentication request was canceled'))
146         elif openid_response.status == FAILURE:
147             raise InvalidAuthentication(_('The OpenId authentication failed: ') + openid_response.message)
148         elif openid_response.status == SETUP_NEEDED:
149             raise InvalidAuthentication(_('Setup needed'))
150         else:
151             raise InvalidAuthentication(_('The OpenId authentication failed with an unknown status: ') + openid_response.status)
152
153     def get_user_data(self, key):
154         return {}
155
156 def get_url_host(request):
157     if request.is_secure():
158         protocol = 'https'
159     else:
160         protocol = 'http'
161     host = escape(get_host(request))
162     return '%s://%s' % (protocol, host)
163
164 def get_full_url(request):
165     return get_url_host(request) + request.get_full_path()