]> git.openstreetmap.org Git - osqa.git/blob - forum_modules/openidauth/consumer.py
650c6f64f64ba4b347d866a17bf82bb80bc4275b
[osqa.git] / forum_modules / openidauth / consumer.py
1 import re
2
3 from django.utils.html import escape
4 from django.http import get_host
5
6 from forum.authentication.base import AuthenticationConsumer, InvalidAuthentication
7 import settings
8
9 from openid.yadis import xri
10 from openid.consumer.consumer import Consumer, SUCCESS, CANCEL, FAILURE, SETUP_NEEDED
11 from openid.consumer.discover import DiscoveryFailure
12 from openid.extensions.sreg import SRegRequest, SRegResponse
13 from openid.extensions.ax import FetchRequest as AXFetchRequest, AttrInfo, FetchResponse as AXFetchResponse
14 from django.utils.translation import ugettext as _
15
16 from store import OsqaOpenIDStore
17
18 class OpenIdAbstractAuthConsumer(AuthenticationConsumer):
19
20     dataype2ax_schema = {
21         'username': 'http://axschema.org/namePerson/friendly',
22         'email': 'http://axschema.org/contact/email',
23         'web': 'http://axschema.org/contact/web/default',
24         #'firstname': 'http://axschema.org/namePerson/first',
25         #'lastname': 'http://axschema.org/namePerson/last',
26         'birthdate': 'http://axschema.org/birthDate',
27     }
28
29     def get_user_url(self, request):
30         try:
31             return request.POST['openid_identifier']
32         except:
33             raise NotImplementedError()
34
35     def prepare_authentication_request(self, request, redirect_to):
36         if not redirect_to.startswith('http://') or redirect_to.startswith('https://'):
37                     redirect_to =  get_url_host(request) + redirect_to
38
39         user_url = self.get_user_url(request)
40
41         if xri.identifierScheme(user_url) == 'XRI' and getattr(
42             settings, 'OPENID_DISALLOW_INAMES', False
43         ):
44             raise InvalidAuthentication('i-names are not supported')
45
46         consumer = Consumer(request.session, OsqaOpenIDStore())
47
48         try:
49             auth_request = consumer.begin(user_url)
50         except DiscoveryFailure:
51             raise InvalidAuthentication(_('Sorry, but your input is not a valid OpenId'))
52
53         #sreg = getattr(settings, 'OPENID_SREG', False)
54
55         #if sreg:
56         #    s = SRegRequest()
57         #    for sarg in sreg:
58         #        if sarg.lower().lstrip() == "policy_url":
59         #            s.policy_url = sreg[sarg]
60         #        else:
61         #            for v in sreg[sarg].split(','):
62         #                s.requestField(field_name=v.lower().lstrip(), required=(sarg.lower().lstrip() == "required"))
63         #    auth_request.addExtension(s)
64
65         #auth_request.addExtension(SRegRequest(required=['email']))
66
67         if request.session.get('force_email_request', True):
68             axr = AXFetchRequest()
69             for data_type, schema in self.dataype2ax_schema.items():
70                 if isinstance(schema, tuple):
71                     axr.add(AttrInfo(schema[0], 1, True, schema[1]))
72                 else:
73                     axr.add(AttrInfo(schema, 1, True, data_type))
74
75             auth_request.addExtension(axr)
76
77         trust_root = getattr(
78             settings, 'OPENID_TRUST_ROOT', get_url_host(request) + '/'
79         )
80
81         return auth_request.redirectURL(trust_root, redirect_to)
82
83     def process_authentication_request(self, request):
84         consumer = Consumer(request.session, OsqaOpenIDStore())
85
86         query_dict = dict([
87             (k.encode('utf8'), v.encode('utf8')) for k, v in request.GET.items()
88         ])
89
90         #for i in query_dict.items():
91             #print "%s : %s" % i
92
93         url = get_url_host(request) + request.path
94         openid_response = consumer.complete(query_dict, url)
95
96         if openid_response.status == SUCCESS:
97             if request.session.get('force_email_request', True):
98                 try:
99                     ax = AXFetchResponse.fromSuccessResponse(openid_response)
100
101                     axargs = ax.getExtensionArgs()
102
103                     ax_schema2data_type = dict([(s, t) for t, s in self.dataype2ax_schema.items()])
104
105                     available_types = dict([
106                         (ax_schema2data_type[s], re.sub('^type\.', '', n))
107                         for n, s in axargs.items() if s in ax_schema2data_type
108                     ])
109
110                     #available_data = dict([
111                     #    (t, axargs["value.%s.1" % s]) for t, s in available_types.items()
112                     #])
113
114                     #print available_data
115                     
116
117                     #email = ax.getExtensionArgs()['value.ext0.1']
118                     #username = ax.getExtensionArgs()['value.ext0.2']
119                     
120                     request.session['auth_consumer_data'] = {
121                         'email': '',
122                         'username': ''
123                     }
124
125                 except Exception, e:
126                     import sys, traceback
127                     traceback.print_exc(file=sys.stdout)
128
129             return request.GET['openid.identity']
130         elif openid_response.status == CANCEL:
131             raise InvalidAuthentication(_('The OpenId authentication request was canceled'))
132         elif openid_response.status == FAILURE:
133             raise InvalidAuthentication(_('The OpenId authentication failed: ') + openid_response.message)
134         elif openid_response.status == SETUP_NEEDED:
135             raise InvalidAuthentication(_('Setup needed'))
136         else:
137             raise InvalidAuthentication(_('The OpenId authentication failed with an unknown status: ') + openid_response.status)
138
139     def get_user_data(self, key):
140         return {}
141
142 def get_url_host(request):
143     if request.is_secure():
144         protocol = 'https'
145     else:
146         protocol = 'http'
147     host = escape(get_host(request))
148     return '%s://%s' % (protocol, host)
149
150 def get_full_url(request):
151     return get_url_host(request) + request.get_full_path()