+ def get_header(self, name: str, default: Optional[str] = None) -> Optional[str]:
+ return self.request.headers.get(name, default)
+
+
+ def error(self, msg: str, status: int = 400) -> HTTPException:
+ return HTTPException(status, detail=msg,
+ headers={'content-type': self.content_type})
+
+
+ def create_response(self, status: int, output: str, num_results: int) -> Response:
+ self.request.state.num_results = num_results
+ return Response(output, status_code=status, media_type=self.content_type)
+
+
+ def base_uri(self) -> str:
+ scheme = self.request.url.scheme
+ host = self.request.url.hostname
+ port = self.request.url.port
+ root = self.request.scope['root_path']
+ if (scheme == 'http' and port == 80) or (scheme == 'https' and port == 443):
+ port = None
+ if port is not None:
+ return f"{scheme}://{host}:{port}{root}"
+
+ return f"{scheme}://{host}{root}"
+
+
+ def config(self) -> Configuration:
+ return cast(Configuration, self.request.app.state.API.config)
+
+
+def _wrap_endpoint(func: api_impl.EndpointFunc)\
+ -> Callable[[Request], Coroutine[Any, Any, Response]]:
+ async def _callback(request: Request) -> Response:
+ return cast(Response, await func(request.app.state.API, ParamWrapper(request)))
+
+ return _callback