Source code for gerrit.utils.requester

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# @Author: Jialiang Shi
import urllib.parse as urlparse
from requests import Session
from requests.adapters import HTTPAdapter
from gerrit.utils.exceptions import (
    NotAllowedError,
    ValidationError,
    AuthError,
    UnauthorizedError,
    ConflictError,
    ClientError,
    ServerError,
)


[docs] class Requester: """ A class which carries out HTTP requests. You can replace this class with one of your own implementation if you require some other way to access Gerrit. This default class can handle simple authentication only. """ VALID_STATUS_CODES = [ 200, ] AUTH_COOKIE = None def __init__(self, **kwargs): """ :param kwargs: """ timeout = 10 base_url = kwargs.get("base_url") self.base_scheme = urlparse.urlsplit(base_url).scheme if base_url else None self.username = kwargs.get("username") self.password = kwargs.get("password") self.ssl_verify = kwargs.get("ssl_verify") self.cert = kwargs.get("cert") self.timeout = kwargs.get("timeout", timeout) self.session = Session() self.max_retries = kwargs.get("max_retries") if self.max_retries is not None: retry_adapter = HTTPAdapter(max_retries=self.max_retries) self.session.mount("http://", retry_adapter) self.session.mount("https://", retry_adapter) def _update_url_scheme(self, url): """ Updates scheme of given url to the one used in Gerrit base_url. """ if self.base_scheme and not url.startswith(f"{self.base_scheme}://"): url_split = urlparse.urlsplit(url) url = urlparse.urlunsplit( [ self.base_scheme, url_split.netloc, url_split.path, url_split.query, url_split.fragment, ] ) return url
[docs] def get_request_dict( self, params=None, data=None, json=None, headers=None, **kwargs ): """ :param params: :param data: :param json: :param headers: :param kwargs: :return: """ request_kwargs = kwargs if self.username and self.password: request_kwargs["auth"] = (self.username, self.password) if params: if not isinstance(params, dict): raise ValueError(f"Params must be a dict, got {repr(params)}") request_kwargs["params"] = params if headers: if not isinstance(headers, dict): raise ValueError(f"headers must be a dict, got {repr(headers)}") request_kwargs["headers"] = headers if self.AUTH_COOKIE: currentheaders = request_kwargs.get("headers", {}) currentheaders.update({"Cookie": self.AUTH_COOKIE}) request_kwargs["headers"] = currentheaders request_kwargs["verify"] = self.ssl_verify request_kwargs["cert"] = self.cert if data and json: raise ValueError("Cannot use data and json together") if data: request_kwargs["data"] = data if json: request_kwargs["json"] = json request_kwargs["timeout"] = self.timeout return request_kwargs
[docs] def get( self, url, params=None, headers=None, allow_redirects=True, stream=False, raise_for_status: bool = True, **kwargs, ): """ :param url: :param params: :param headers: :param allow_redirects: :param stream: :param raise_for_status: :param kwargs: :return: """ request_kwargs = self.get_request_dict( params=params, headers=headers, allow_redirects=allow_redirects, stream=stream, **kwargs, ) response = self.session.get(self._update_url_scheme(url), **request_kwargs) if raise_for_status: self.confirm_status(response) return response
[docs] def post( self, url, params=None, data=None, json=None, files=None, headers=None, allow_redirects=True, raise_for_status: bool = True, **kwargs, ): """ :param url: :param params: :param data: :param json: :param files: :param headers: :param allow_redirects: :param raise_for_status: :param kwargs: :return: """ request_kwargs = self.get_request_dict( params=params, data=data, json=json, files=files, headers=headers, allow_redirects=allow_redirects, **kwargs, ) response = self.session.post(self._update_url_scheme(url), **request_kwargs) if raise_for_status: self.confirm_status(response) return response
[docs] def put( self, url, params=None, data=None, json=None, files=None, headers=None, allow_redirects=True, raise_for_status: bool = True, **kwargs, ): """ :param url: :param params: :param data: :param json: :param files: :param headers: :param allow_redirects: :param raise_for_status: :param kwargs: :return: """ request_kwargs = self.get_request_dict( params=params, data=data, json=json, files=files, headers=headers, allow_redirects=allow_redirects, **kwargs, ) response = self.session.put(self._update_url_scheme(url), **request_kwargs) if raise_for_status: self.confirm_status(response) return response
[docs] def delete( self, url, headers=None, allow_redirects=True, raise_for_status: bool = True, **kwargs, ): """ :param url: :param headers: :param allow_redirects: :param raise_for_status: :param kwargs: :return: """ request_kwargs = self.get_request_dict( headers=headers, allow_redirects=allow_redirects, **kwargs ) response = self.session.delete(self._update_url_scheme(url), **request_kwargs) if raise_for_status: self.confirm_status(response) return response
[docs] @staticmethod def confirm_status(res): # pylint: disable=too-many-branches """ check response status code :param res: :return: """ http_error_msg = "" if isinstance(res.reason, bytes): # We attempt to decode utf-8 first because some servers # choose to localize their reason strings. If the string # isn't utf-8, we fall back to iso-8859-1 for all other # encodings. (See PR #3538) try: reason = res.reason.decode("utf-8") except UnicodeDecodeError: reason = res.reason.decode("iso-8859-1") else: reason = res.reason if 400 <= res.status_code < 500: http_error_msg = ( f"{res.status_code} Client Error: {reason} for url: {res.url}" ) elif 500 <= res.status_code < 600: http_error_msg = ( f"{res.status_code} Server Error: {reason} for url: {res.url}" ) if not http_error_msg: return if res.status_code == 400: # Validation error raise ValidationError(http_error_msg) elif res.status_code == 401: # Unauthorized error raise UnauthorizedError(http_error_msg) elif res.status_code == 403: # Auth error raise AuthError(http_error_msg) elif res.status_code == 404: # Not Found res.raise_for_status() elif res.status_code == 405: # Method Not Allowed raise NotAllowedError(http_error_msg) elif res.status_code == 409: # Conflict raise ConflictError(http_error_msg) elif res.status_code < 500: # Other 4xx, generic client error raise ClientError(http_error_msg) else: # 5xx is server error raise ServerError(http_error_msg)