1 from django.utils.html import escape
\r
2 from django.http import get_host
\r
4 from forum.authentication.base import AuthenticationConsumer, InvalidAuthentication
\r
7 from openid.yadis import xri
\r
8 from openid.consumer.consumer import Consumer, SUCCESS, CANCEL, FAILURE, SETUP_NEEDED
\r
9 from openid.consumer.discover import DiscoveryFailure
\r
10 from openid.extensions.sreg import SRegRequest, SRegResponse
\r
11 from openid.extensions.ax import FetchRequest as AXFetchRequest, AttrInfo, FetchResponse as AXFetchResponse
\r
12 from django.utils.translation import ugettext as _
\r
14 from store import OsqaOpenIDStore
\r
16 class OpenIdAbstractAuthConsumer(AuthenticationConsumer):
\r
18 def get_user_url(self, request):
\r
20 return request.POST['openid_identifier']
\r
22 raise NotImplementedError()
\r
24 def prepare_authentication_request(self, request, redirect_to):
\r
25 if not redirect_to.startswith('http://') or redirect_to.startswith('https://'):
\r
26 redirect_to = get_url_host(request) + redirect_to
\r
28 user_url = self.get_user_url(request)
\r
30 if xri.identifierScheme(user_url) == 'XRI' and getattr(
\r
31 settings, 'OPENID_DISALLOW_INAMES', False
\r
33 raise InvalidAuthentication('i-names are not supported')
\r
35 consumer = Consumer(request.session, OsqaOpenIDStore())
\r
38 auth_request = consumer.begin(user_url)
\r
39 except DiscoveryFailure:
\r
40 raise InvalidAuthentication(_('Sorry, but your input is not a valid OpenId'))
\r
42 #sreg = getattr(settings, 'OPENID_SREG', False)
\r
47 # if sarg.lower().lstrip() == "policy_url":
\r
48 # s.policy_url = sreg[sarg]
\r
50 # for v in sreg[sarg].split(','):
\r
51 # s.requestField(field_name=v.lower().lstrip(), required=(sarg.lower().lstrip() == "required"))
\r
52 # auth_request.addExtension(s)
\r
54 #auth_request.addExtension(SRegRequest(required=['email']))
\r
56 if request.session.get('force_email_request', True):
\r
57 axr = AXFetchRequest()
\r
58 axr.add(AttrInfo("http://axschema.org/contact/email", 1, True, "email"))
\r
59 auth_request.addExtension(axr)
\r
61 trust_root = getattr(
\r
62 settings, 'OPENID_TRUST_ROOT', get_url_host(request) + '/'
\r
66 return auth_request.redirectURL(trust_root, redirect_to)
\r
68 def process_authentication_request(self, request):
\r
69 consumer = Consumer(request.session, OsqaOpenIDStore())
\r
72 (k.encode('utf8'), v.encode('utf8')) for k, v in request.GET.items()
\r
75 #for i in query_dict.items():
\r
76 # print "%s : %s" % i
\r
78 url = get_url_host(request) + request.path
\r
79 openid_response = consumer.complete(query_dict, url)
\r
81 if openid_response.status == SUCCESS:
\r
82 if request.session.get('force_email_request', True):
\r
84 ax = AXFetchResponse.fromSuccessResponse(openid_response)
\r
85 email = ax.getExtensionArgs()['value.ext0.1']
\r
86 request.session['auth_email_request'] = email
\r
87 except Exception, e:
\r
90 return request.GET['openid.identity']
\r
91 elif openid_response.status == CANCEL:
\r
92 raise InvalidAuthentication(_('The OpenId authentication request was canceled'))
\r
93 elif openid_response.status == FAILURE:
\r
94 raise InvalidAuthentication(_('The OpenId authentication failed: ') + openid_response.message)
\r
95 elif openid_response.status == SETUP_NEEDED:
\r
96 raise InvalidAuthentication(_('Setup needed'))
\r
98 raise InvalidAuthentication(_('The OpenId authentication failed with an unknown status: ') + openid_response.status)
\r
100 def get_user_data(self, key):
\r
103 def get_url_host(request):
\r
104 if request.is_secure():
\r
108 host = escape(get_host(request))
\r
109 return '%s://%s' % (protocol, host)
\r
111 def get_full_url(request):
\r
112 return get_url_host(request) + request.get_full_path()