]> git.openstreetmap.org Git - osqa.git/blob - forum_modules/openidauth/consumer.py
Merge branch 'threadsafe_requestholder' into update_django
[osqa.git] / forum_modules / openidauth / consumer.py
1 # -*- coding: utf-8 -*-
2
3 import re
4
5 from django.utils.encoding import smart_unicode
6 from django.utils.html import escape
7
8 from forum.authentication.base import AuthenticationConsumer, InvalidAuthentication
9 import settings
10
11 from openid.yadis import xri
12 from openid.consumer.consumer import Consumer, SUCCESS, CANCEL, FAILURE, SETUP_NEEDED
13 from openid.consumer.discover import DiscoveryFailure
14 from openid.extensions.sreg import SRegRequest, SRegResponse
15 from openid.extensions.ax import FetchRequest as AXFetchRequest, AttrInfo, FetchResponse as AXFetchResponse
16 from django.utils.translation import ugettext as _
17
18 from store import OsqaOpenIDStore
19
20 class OpenIdAbstractAuthConsumer(AuthenticationConsumer):
21
22     dataype2ax_schema = {
23         'username': 'http://axschema.org/namePerson/friendly',
24         'email': 'http://axschema.org/contact/email',
25         #'web': 'http://axschema.org/contact/web/default',
26         #'firstname': 'http://axschema.org/namePerson/first',
27         #'lastname': 'http://axschema.org/namePerson/last',
28         #'birthdate': 'http://axschema.org/birthDate',
29     }
30
31     sreg_attributes = {
32         "required": {
33             "email": "email",
34             "nickname": "username",
35             "fullname": "real_name"
36         }
37     }
38
39     def get_user_url(self, request):
40         try:
41             return request.POST['openid_identifier']
42         except:
43             raise NotImplementedError()
44
45     def prepare_authentication_request(self, request, redirect_to):
46         if not redirect_to.startswith('http://') or redirect_to.startswith('https://'):
47             redirect_to =  get_url_host(request) + redirect_to
48
49         user_url = self.get_user_url(request)
50
51         if xri.identifierScheme(user_url) == 'XRI' and getattr(
52             settings, 'OPENID_DISALLOW_INAMES', False
53         ):
54             raise InvalidAuthentication('i-names are not supported')
55
56         consumer = Consumer(request.session, OsqaOpenIDStore())
57
58         try:
59             auth_request = consumer.begin(user_url)
60         except DiscoveryFailure:
61             raise InvalidAuthentication(_('Sorry, but your input is not a valid OpenId'))
62
63         sreg = getattr(self, 'sreg_attributes', False)
64
65         if sreg:
66             s = SRegRequest()
67
68             for k, attr_dic in sreg.items():
69                 if k == "policy_url":
70                     s.policy_url = attr_dic
71                     continue
72
73                 for attr_name in attr_dic.keys():
74                     s.requestField(field_name=attr_name, required=(k == "required"))
75
76             auth_request.addExtension(s)
77
78         ax_schema = getattr(self, 'dataype2ax_schema', False)
79
80         if ax_schema and request.session.get('force_email_request', True):
81             axr = AXFetchRequest()
82             for data_type, schema in ax_schema.items():
83                 if isinstance(schema, tuple):
84                     axr.add(AttrInfo(schema[0], required=True, alias=schema[1]))
85                 else:
86                     axr.add(AttrInfo(schema, required=True, alias=data_type))
87
88             auth_request.addExtension(axr)
89
90         trust_root = getattr(
91             settings, 'OPENID_TRUST_ROOT', get_url_host(request) + '/'
92         )
93
94         return auth_request.redirectURL(trust_root, redirect_to)
95
96     def process_authentication_request(self, request):
97         consumer = Consumer(request.session, OsqaOpenIDStore())
98
99         query_dict = dict([
100             (smart_unicode(k), smart_unicode(v)) for k, v in request.GET.items()
101         ])
102
103         #for i in query_dict.items():
104             #print "%s : %s" % i
105
106         url = get_url_host(request) + request.path
107         openid_response = consumer.complete(query_dict, url)
108
109         if openid_response.status == SUCCESS:
110
111             consumer_data = {}
112
113             sreg_attrs = getattr(self, 'sreg_attributes', False)
114
115             if sreg_attrs:
116                 sreg_response = SRegResponse.fromSuccessResponse(openid_response)
117
118                 if sreg_response:
119                     all_attrs = {}
120                     [all_attrs.update(d) for k,d in sreg_attrs.items() if k != "policy_url"]
121
122                     for attr_name, local_name in all_attrs.items():
123                         if attr_name in sreg_response:
124                             consumer_data[local_name] = sreg_response[attr_name]
125
126             ax_schema = getattr(self, 'dataype2ax_schema', False)
127
128             if ax_schema:
129                 ax = AXFetchResponse.fromSuccessResponse(openid_response, False)
130
131                 if ax:
132                     axargs = ax.getExtensionArgs()
133
134                     ax_schema2data_type = dict([(s, t) for t, s in ax_schema.items()])
135
136                     available_types = dict([
137                         (ax_schema2data_type[s], re.sub('^type\.', '', n))
138                         for n, s in axargs.items() if s in ax_schema2data_type
139                     ])
140
141                     for t, s in available_types.items():
142                         if not t in consumer_data:
143                             if axargs.get("value.%s.1" % s, None):
144                                 consumer_data[t] = axargs["value.%s.1" % s]
145                     
146             request.session['auth_consumer_data'] = consumer_data
147
148
149             return request.GET['openid.identity']
150         elif openid_response.status == CANCEL:
151             raise InvalidAuthentication(_('The OpenId authentication request was canceled'))
152         elif openid_response.status == FAILURE:
153             raise InvalidAuthentication(_('The OpenId authentication failed: ') + openid_response.message)
154         elif openid_response.status == SETUP_NEEDED:
155             raise InvalidAuthentication(_('Setup needed'))
156         else:
157             raise InvalidAuthentication(_('The OpenId authentication failed with an unknown status: ') + openid_response.status)
158
159     def get_user_data(self, key):
160         return {}
161
162 def get_url_host(request):
163     if request.is_secure():
164         protocol = 'https'
165     else:
166         protocol = 'http'
167     host = escape(request.get_host())
168     return '%s://%s' % (protocol, host)
169
170 def get_full_url(request):
171     return get_url_host(request) + request.get_full_path()