Source code for requests.sessions


This module provides a Session object to manage and persist settings across
requests (cookies, auth, proxies).
import os
import sys
import time
from collections import OrderedDict
from datetime import timedelta

from ._internal_utils import to_native_string
from .adapters import HTTPAdapter
from .auth import _basic_auth_str
from .compat import Mapping, cookielib, urljoin, urlparse
from .cookies import (
from .exceptions import (
from .hooks import default_hooks, dispatch_hook

# formerly defined here, reexposed here for backward compatibility
from .models import (  # noqa: F401
from .status_codes import codes
from .structures import CaseInsensitiveDict
from .utils import (  # noqa: F401

# Preferred clock, based on which one is more accurate on a given system.
if sys.platform == "win32":
    preferred_clock = time.perf_counter
    preferred_clock = time.time

def merge_setting(request_setting, session_setting, dict_class=OrderedDict):
    """Determines appropriate setting for a given request, taking into account
    the explicit setting on that request, and the setting in the session. If a
    setting is a dictionary, they will be merged together using `dict_class`

    if session_setting is None:
        return request_setting

    if request_setting is None:
        return session_setting

    # Bypass if not a dictionary (e.g. verify)
    if not (
        isinstance(session_setting, Mapping) and isinstance(request_setting, Mapping)
        return request_setting

    merged_setting = dict_class(to_key_val_list(session_setting))

    # Remove keys that are set to None. Extract keys first to avoid altering
    # the dictionary during iteration.
    none_keys = [k for (k, v) in merged_setting.items() if v is None]
    for key in none_keys:
        del merged_setting[key]

    return merged_setting

def merge_hooks(request_hooks, session_hooks, dict_class=OrderedDict):
    """Properly merges both requests and session hooks.

    This is necessary because when request_hooks == {'response': []}, the
    merge breaks Session hooks entirely.
    if session_hooks is None or session_hooks.get("response") == []:
        return request_hooks

    if request_hooks is None or request_hooks.get("response") == []:
        return session_hooks

    return merge_setting(request_hooks, session_hooks, dict_class)

class SessionRedirectMixin:
    def get_redirect_target(self, resp):
        """Receives a Response. Returns a redirect URI or ``None``"""
        # Due to the nature of how requests processes redirects this method will
        # be called at least once upon the original response and at least twice
        # on each subsequent redirect response (if any).
        # If a custom mixin is used to handle this logic, it may be advantageous
        # to cache the redirect location onto the response object as a private
        # attribute.
        if resp.is_redirect:
            location = resp.headers["location"]
            # Currently the underlying http module on py3 decode headers
            # in latin1, but empirical evidence suggests that latin1 is very
            # rarely used with non-ASCII characters in HTTP headers.
            # It is more likely to get UTF8 header rather than latin1.
            # This causes incorrect handling of UTF8 encoded location headers.
            # To solve this, we re-encode the location in latin1.
            location = location.encode("latin1")
            return to_native_string(location, "utf8")
        return None

    def should_strip_auth(self, old_url, new_url):
        """Decide whether Authorization header should be removed when redirecting"""
        old_parsed = urlparse(old_url)
        new_parsed = urlparse(new_url)
        if old_parsed.hostname != new_parsed.hostname:
            return True
        # Special case: allow http -> https redirect when using the standard
        # ports. This isn't specified by RFC 7235, but is kept to avoid
        # breaking backwards compatibility with older versions of requests
        # that allowed any redirects on the same host.
        if (
            old_parsed.scheme == "http"
            and old_parsed.port in (80, None)
            and new_parsed.scheme == "https"
            and new_parsed.port in (443, None)
            return False

        # Handle default port usage corresponding to scheme.
        changed_port = old_parsed.port != new_parsed.port
        changed_scheme = old_parsed.scheme != new_parsed.scheme
        default_port = (DEFAULT_PORTS.get(old_parsed.scheme, None), None)
        if (
            not changed_scheme
            and old_parsed.port in default_port
            and new_parsed.port in default_port
            return False

        # Standard case: root URI must match
        return changed_port or changed_scheme

    def resolve_redirects(
        """Receives a Response. Returns a generator of Responses or Requests."""

        hist = []  # keep track of history

        url = self.get_redirect_target(resp)
        previous_fragment = urlparse(req.url).fragment
        while url:
            prepared_request = req.copy()

            # Update history and keep track of redirects.
            # resp.history must ignore the original request in this loop
            resp.history = hist[1:]

                resp.content  # Consume socket so it can be released
            except (ChunkedEncodingError, ContentDecodingError, RuntimeError):

            if len(resp.history) >= self.max_redirects:
                raise TooManyRedirects(
                    f"Exceeded {self.max_redirects} redirects.", response=resp

            # Release the connection back into the pool.

            # Handle redirection without scheme (see: RFC 1808 Section 4)
            if url.startswith("//"):
                parsed_rurl = urlparse(resp.url)
                url = ":".join([to_native_string(parsed_rurl.scheme), url])

            # Normalize url case and attach previous fragment if needed (RFC 7231 7.1.2)
            parsed = urlparse(url)
            if parsed.fragment == "" and previous_fragment:
                parsed = parsed._replace(fragment=previous_fragment)
            elif parsed.fragment:
                previous_fragment = parsed.fragment
            url = parsed.geturl()

            # Facilitate relative 'location' headers, as allowed by RFC 7231.
            # (e.g. '/path/to/resource' instead of 'http://domain.tld/path/to/resource')
            # Compliant with RFC3986, we percent encode the url.
            if not parsed.netloc:
                url = urljoin(resp.url, requote_uri(url))
                url = requote_uri(url)

            prepared_request.url = to_native_string(url)

            self.rebuild_method(prepared_request, resp)

            if resp.status_code not in (
                purged_headers = ("Content-Length", "Content-Type", "Transfer-Encoding")
                for header in purged_headers:
                    prepared_request.headers.pop(header, None)
                prepared_request.body = None

            headers = prepared_request.headers
            headers.pop("Cookie", None)

            # Extract any cookies sent on the response to the cookiejar
            # in the new request. Because we've mutated our copied prepared
            # request, use the old one that we haven't yet touched.
            extract_cookies_to_jar(prepared_request._cookies, req, resp.raw)
            merge_cookies(prepared_request._cookies, self.cookies)

            # Rebuild auth and proxy information.
            proxies = self.rebuild_proxies(prepared_request, proxies)
            self.rebuild_auth(prepared_request, resp)

            # A failed tell() sets `_body_position` to `object()`. This non-None
            # value ensures `rewindable` will be True, allowing us to raise an
            # UnrewindableBodyError, instead of hanging the connection.
            rewindable = prepared_request._body_position is not None and (
                "Content-Length" in headers or "Transfer-Encoding" in headers

            # Attempt to rewind consumed file-like object.
            if rewindable:

            # Override the original request.
            req = prepared_request

            if yield_requests:
                yield req

                resp = self.send(

                extract_cookies_to_jar(self.cookies, prepared_request, resp.raw)

                # extract redirect url, if any, for the next loop
                url = self.get_redirect_target(resp)
                yield resp

    def rebuild_auth(self, prepared_request, response):
        """When being redirected we may want to strip authentication from the
        request to avoid leaking credentials. This method intelligently removes
        and reapplies authentication where possible to avoid credential loss.
        headers = prepared_request.headers
        url = prepared_request.url

        if "Authorization" in headers and self.should_strip_auth(
            response.request.url, url
            # If we get redirected to a new host, we should strip out any
            # authentication headers.
            del headers["Authorization"]

        # .netrc might have more auth for us on our new host.
        new_auth = get_netrc_auth(url) if self.trust_env else None
        if new_auth is not None:

    def rebuild_proxies(self, prepared_request, proxies):
        """This method re-evaluates the proxy configuration by considering the
        environment variables. If we are redirected to a URL covered by
        NO_PROXY, we strip the proxy configuration. Otherwise, we set missing
        proxy keys for this URL (in case they were stripped by a previous

        This method also replaces the Proxy-Authorization header where

        :rtype: dict
        headers = prepared_request.headers
        scheme = urlparse(prepared_request.url).scheme
        new_proxies = resolve_proxies(prepared_request, proxies, self.trust_env)

        if "Proxy-Authorization" in headers:
            del headers["Proxy-Authorization"]

            username, password = get_auth_from_url(new_proxies[scheme])
        except KeyError:
            username, password = None, None

        # urllib3 handles proxy authorization for us in the standard adapter.
        # Avoid appending this to TLS tunneled requests where it may be leaked.
        if not scheme.startswith('https') and username and password:
            headers["Proxy-Authorization"] = _basic_auth_str(username, password)

        return new_proxies

    def rebuild_method(self, prepared_request, response):
        """When being redirected we may want to change the method of the request
        based on certain specs or browser behavior.
        method = prepared_request.method

        if response.status_code == codes.see_other and method != "HEAD":
            method = "GET"

        # Do what the browsers do, despite standards...
        # First, turn 302s into GETs.
        if response.status_code == codes.found and method != "HEAD":
            method = "GET"

        # Second, if a POST is responded to with a 301, turn it into a GET.
        # This bizarre behaviour is explained in Issue 1704.
        if response.status_code == codes.moved and method == "POST":
            method = "GET"

        prepared_request.method = method

[docs]class Session(SessionRedirectMixin): """A Requests session. Provides cookie persistence, connection-pooling, and configuration. Basic Usage:: >>> import requests >>> s = requests.Session() >>> s.get('') <Response [200]> Or as a context manager:: >>> with requests.Session() as s: ... s.get('') <Response [200]> """ __attrs__ = [ "headers", "cookies", "auth", "proxies", "hooks", "params", "verify", "cert", "adapters", "stream", "trust_env", "max_redirects", ] def __init__(self): #: A case-insensitive dictionary of headers to be sent on each #: :class:`Request <Request>` sent from this #: :class:`Session <Session>`. self.headers = default_headers() #: Default Authentication tuple or object to attach to #: :class:`Request <Request>`. self.auth = None #: Dictionary mapping protocol or protocol and host to the URL of the proxy #: (e.g. {'http': '', '': ''}) to #: be used on each :class:`Request <Request>`. self.proxies = {} #: Event-handling hooks. self.hooks = default_hooks() #: Dictionary of querystring data to attach to each #: :class:`Request <Request>`. The dictionary values may be lists for #: representing multivalued query parameters. self.params = {} #: Stream response content default. = False #: SSL Verification default. #: Defaults to `True`, requiring requests to verify the TLS certificate at the #: remote end. #: If verify is set to `False`, requests will accept any TLS certificate #: presented by the server, and will ignore hostname mismatches and/or #: expired certificates, which will make your application vulnerable to #: man-in-the-middle (MitM) attacks. #: Only set this to `False` for testing. self.verify = True #: SSL client certificate default, if String, path to ssl client #: cert file (.pem). If Tuple, ('cert', 'key') pair. self.cert = None #: Maximum number of redirects allowed. If the request exceeds this #: limit, a :class:`TooManyRedirects` exception is raised. #: This defaults to requests.models.DEFAULT_REDIRECT_LIMIT, which is #: 30. self.max_redirects = DEFAULT_REDIRECT_LIMIT #: Trust environment settings for proxy configuration, default #: authentication and similar. self.trust_env = True #: A CookieJar containing all currently outstanding cookies set on this #: session. By default it is a #: :class:`RequestsCookieJar <requests.cookies.RequestsCookieJar>`, but #: may be any other ``cookielib.CookieJar`` compatible object. self.cookies = cookiejar_from_dict({}) # Default connection adapters. self.adapters = OrderedDict() self.mount("https://", HTTPAdapter()) self.mount("http://", HTTPAdapter()) def __enter__(self): return self def __exit__(self, *args): self.close()
[docs] def prepare_request(self, request): """Constructs a :class:`PreparedRequest <PreparedRequest>` for transmission and returns it. The :class:`PreparedRequest` has settings merged from the :class:`Request <Request>` instance and those of the :class:`Session`. :param request: :class:`Request` instance to prepare with this session's settings. :rtype: requests.PreparedRequest """ cookies = request.cookies or {} # Bootstrap CookieJar. if not isinstance(cookies, cookielib.CookieJar): cookies = cookiejar_from_dict(cookies) # Merge with session cookies merged_cookies = merge_cookies( merge_cookies(RequestsCookieJar(), self.cookies), cookies ) # Set environment's basic authentication if not explicitly set. auth = request.auth if self.trust_env and not auth and not self.auth: auth = get_netrc_auth(request.url) p = PreparedRequest() p.prepare( method=request.method.upper(), url=request.url, files=request.files,, json=request.json, headers=merge_setting( request.headers, self.headers, dict_class=CaseInsensitiveDict ), params=merge_setting(request.params, self.params), auth=merge_setting(auth, self.auth), cookies=merged_cookies, hooks=merge_hooks(request.hooks, self.hooks), ) return p
[docs] def request( self, method, url, params=None, data=None, headers=None, cookies=None, files=None, auth=None, timeout=None, allow_redirects=True, proxies=None, hooks=None, stream=None, verify=None, cert=None, json=None, ): """Constructs a :class:`Request <Request>`, prepares it and sends it. Returns :class:`Response <Response>` object. :param method: method for the new :class:`Request` object. :param url: URL for the new :class:`Request` object. :param params: (optional) Dictionary or bytes to be sent in the query string for the :class:`Request`. :param data: (optional) Dictionary, list of tuples, bytes, or file-like object to send in the body of the :class:`Request`. :param json: (optional) json to send in the body of the :class:`Request`. :param headers: (optional) Dictionary of HTTP Headers to send with the :class:`Request`. :param cookies: (optional) Dict or CookieJar object to send with the :class:`Request`. :param files: (optional) Dictionary of ``'filename': file-like-objects`` for multipart encoding upload. :param auth: (optional) Auth tuple or callable to enable Basic/Digest/Custom HTTP Auth. :param timeout: (optional) How long to wait for the server to send data before giving up, as a float, or a :ref:`(connect timeout, read timeout) <timeouts>` tuple. :type timeout: float or tuple :param allow_redirects: (optional) Set to True by default. :type allow_redirects: bool :param proxies: (optional) Dictionary mapping protocol or protocol and hostname to the URL of the proxy. :param stream: (optional) whether to immediately download the response content. Defaults to ``False``. :param verify: (optional) Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use. Defaults to ``True``. When set to ``False``, requests will accept any TLS certificate presented by the server, and will ignore hostname mismatches and/or expired certificates, which will make your application vulnerable to man-in-the-middle (MitM) attacks. Setting verify to ``False`` may be useful during local development or testing. :param cert: (optional) if String, path to ssl client cert file (.pem). If Tuple, ('cert', 'key') pair. :rtype: requests.Response """ # Create the Request. req = Request( method=method.upper(), url=url, headers=headers, files=files, data=data or {}, json=json, params=params or {}, auth=auth, cookies=cookies, hooks=hooks, ) prep = self.prepare_request(req) proxies = proxies or {} settings = self.merge_environment_settings( prep.url, proxies, stream, verify, cert ) # Send the request. send_kwargs = { "timeout": timeout, "allow_redirects": allow_redirects, } send_kwargs.update(settings) resp = self.send(prep, **send_kwargs) return resp
[docs] def get(self, url, **kwargs): r"""Sends a GET request. Returns :class:`Response` object. :param url: URL for the new :class:`Request` object. :param \*\*kwargs: Optional arguments that ``request`` takes. :rtype: requests.Response """ kwargs.setdefault("allow_redirects", True) return self.request("GET", url, **kwargs)
[docs] def options(self, url, **kwargs): r"""Sends a OPTIONS request. Returns :class:`Response` object. :param url: URL for the new :class:`Request` object. :param \*\*kwargs: Optional arguments that ``request`` takes. :rtype: requests.Response """ kwargs.setdefault("allow_redirects", True) return self.request("OPTIONS", url, **kwargs)
[docs] def head(self, url, **kwargs): r"""Sends a HEAD request. Returns :class:`Response` object. :param url: URL for the new :class:`Request` object. :param \*\*kwargs: Optional arguments that ``request`` takes. :rtype: requests.Response """ kwargs.setdefault("allow_redirects", False) return self.request("HEAD", url, **kwargs)
[docs] def post(self, url, data=None, json=None, **kwargs): r"""Sends a POST request. Returns :class:`Response` object. :param url: URL for the new :class:`Request` object. :param data: (optional) Dictionary, list of tuples, bytes, or file-like object to send in the body of the :class:`Request`. :param json: (optional) json to send in the body of the :class:`Request`. :param \*\*kwargs: Optional arguments that ``request`` takes. :rtype: requests.Response """ return self.request("POST", url, data=data, json=json, **kwargs)
[docs] def put(self, url, data=None, **kwargs): r"""Sends a PUT request. Returns :class:`Response` object. :param url: URL for the new :class:`Request` object. :param data: (optional) Dictionary, list of tuples, bytes, or file-like object to send in the body of the :class:`Request`. :param \*\*kwargs: Optional arguments that ``request`` takes. :rtype: requests.Response """ return self.request("PUT", url, data=data, **kwargs)
[docs] def patch(self, url, data=None, **kwargs): r"""Sends a PATCH request. Returns :class:`Response` object. :param url: URL for the new :class:`Request` object. :param data: (optional) Dictionary, list of tuples, bytes, or file-like object to send in the body of the :class:`Request`. :param \*\*kwargs: Optional arguments that ``request`` takes. :rtype: requests.Response """ return self.request("PATCH", url, data=data, **kwargs)
[docs] def delete(self, url, **kwargs): r"""Sends a DELETE request. Returns :class:`Response` object. :param url: URL for the new :class:`Request` object. :param \*\*kwargs: Optional arguments that ``request`` takes. :rtype: requests.Response """ return self.request("DELETE", url, **kwargs)
[docs] def send(self, request, **kwargs): """Send a given PreparedRequest. :rtype: requests.Response """ # Set defaults that the hooks can utilize to ensure they always have # the correct parameters to reproduce the previous request. kwargs.setdefault("stream", kwargs.setdefault("verify", self.verify) kwargs.setdefault("cert", self.cert) if "proxies" not in kwargs: kwargs["proxies"] = resolve_proxies(request, self.proxies, self.trust_env) # It's possible that users might accidentally send a Request object. # Guard against that specific failure case. if isinstance(request, Request): raise ValueError("You can only send PreparedRequests.") # Set up variables needed for resolve_redirects and dispatching of hooks allow_redirects = kwargs.pop("allow_redirects", True) stream = kwargs.get("stream") hooks = request.hooks # Get the appropriate adapter to use adapter = self.get_adapter(url=request.url) # Start time (approximately) of the request start = preferred_clock() # Send the request r = adapter.send(request, **kwargs) # Total elapsed time of the request (approximately) elapsed = preferred_clock() - start r.elapsed = timedelta(seconds=elapsed) # Response manipulation hooks r = dispatch_hook("response", hooks, r, **kwargs) # Persist cookies if r.history: # If the hooks create history then we want those cookies too for resp in r.history: extract_cookies_to_jar(self.cookies, resp.request, resp.raw) extract_cookies_to_jar(self.cookies, request, r.raw) # Resolve redirects if allowed. if allow_redirects: # Redirect resolving generator. gen = self.resolve_redirects(r, request, **kwargs) history = [resp for resp in gen] else: history = [] # Shuffle things around if there's history. if history: # Insert the first (original) request at the start history.insert(0, r) # Get the last request made r = history.pop() r.history = history # If redirects aren't being followed, store the response on the Request for if not allow_redirects: try: r._next = next( self.resolve_redirects(r, request, yield_requests=True, **kwargs) ) except StopIteration: pass if not stream: r.content return r
[docs] def merge_environment_settings(self, url, proxies, stream, verify, cert): """ Check the environment and merge it with some settings. :rtype: dict """ # Gather clues from the surrounding environment. if self.trust_env: # Set environment's proxies. no_proxy = proxies.get("no_proxy") if proxies is not None else None env_proxies = get_environ_proxies(url, no_proxy=no_proxy) for (k, v) in env_proxies.items(): proxies.setdefault(k, v) # Look for requests environment configuration # and be compatible with cURL. if verify is True or verify is None: verify = ( os.environ.get("REQUESTS_CA_BUNDLE") or os.environ.get("CURL_CA_BUNDLE") or verify ) # Merge all the kwargs. proxies = merge_setting(proxies, self.proxies) stream = merge_setting(stream, verify = merge_setting(verify, self.verify) cert = merge_setting(cert, self.cert) return {"proxies": proxies, "stream": stream, "verify": verify, "cert": cert}
[docs] def get_adapter(self, url): """ Returns the appropriate connection adapter for the given URL. :rtype: requests.adapters.BaseAdapter """ for (prefix, adapter) in self.adapters.items(): if url.lower().startswith(prefix.lower()): return adapter # Nothing matches :-/ raise InvalidSchema(f"No connection adapters were found for {url!r}")
[docs] def close(self): """Closes all adapters and as such the session""" for v in self.adapters.values(): v.close()
[docs] def mount(self, prefix, adapter): """Registers a connection adapter to a prefix. Adapters are sorted in descending order by prefix length. """ self.adapters[prefix] = adapter keys_to_move = [k for k in self.adapters if len(k) < len(prefix)] for key in keys_to_move: self.adapters[key] = self.adapters.pop(key)
def __getstate__(self): state = {attr: getattr(self, attr, None) for attr in self.__attrs__} return state def __setstate__(self, state): for attr, value in state.items(): setattr(self, attr, value)
def session(): """ Returns a :class:`Session` for context-management. .. deprecated:: 1.0.0 This method has been deprecated since version 1.0.0 and is only kept for backwards compatibility. New code should use :class:`~requests.sessions.Session` to create a session. This may be removed at a future date. :rtype: Session """ return Session()