# Copyright 2017 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Support for resumable uploads.

Also supported here are simple (media) uploads and multipart
uploads that contain both metadata and a small file as payload.
"""


from google.resumable_media import _upload
from google.resumable_media.requests import _request_helpers


class SimpleUpload(_request_helpers.RequestsMixin, _upload.SimpleUpload):
    """Upload a resource to a Google API.

    A **simple** media upload sends no metadata and completes the upload
    in a single request.

    Args:
        upload_url (str): The URL where the content will be uploaded.
        headers (Optional[Mapping[str, str]]): Extra headers that should
            be sent with the request, e.g. headers for encrypted data.

    Attributes:
        upload_url (str): The URL where the content will be uploaded.
    """

    def transmit(
        self,
        transport,
        data,
        content_type,
        timeout=(
            _request_helpers._DEFAULT_CONNECT_TIMEOUT,
            _request_helpers._DEFAULT_READ_TIMEOUT,
        ),
    ):
        """Transmit the resource to be uploaded.

        Args:
            transport (~requests.Session): A ``requests`` object which can
                make authenticated requests.
            data (bytes): The resource content to be uploaded.
            content_type (str): The content type of the resource, e.g. a JPEG
                image has content type ``image/jpeg``.
            timeout (Optional[Union[float, Tuple[float, float]]]):
                The number of seconds to wait for the server response.
                Depending on the retry strategy, a request may be repeated
                several times using the same timeout each time.

                Can also be passed as a tuple (connect_timeout, read_timeout).
                See :meth:`requests.Session.request` documentation for details.

        Returns:
            ~requests.Response: The HTTP response returned by ``transport``.
        """
        method, url, payload, headers = self._prepare_request(data, content_type)
        response = _request_helpers.http_request(
            transport,
            method,
            url,
            data=payload,
            headers=headers,
            retry_strategy=self._retry_strategy,
            timeout=timeout,
        )
        self._process_response(response)
        return response


class MultipartUpload(_request_helpers.RequestsMixin, _upload.MultipartUpload):
    """Upload a resource with metadata to a Google API.

    A **multipart** upload sends both metadata and the resource in a single
    (multipart) request.

    Args:
        upload_url (str): The URL where the content will be uploaded.
        headers (Optional[Mapping[str, str]]): Extra headers that should
            be sent with the request, e.g. headers for encrypted data.
        checksum Optional([str]): The type of checksum to compute to verify
            the integrity of the object. The request metadata will be amended
            to include the computed value. Using this option will override a
            manually-set checksum value. Supported values are "md5",
            "crc32c" and None. The default is None.

    Attributes:
        upload_url (str): The URL where the content will be uploaded.
    """

    def transmit(
        self,
        transport,
        data,
        metadata,
        content_type,
        timeout=(
            _request_helpers._DEFAULT_CONNECT_TIMEOUT,
            _request_helpers._DEFAULT_READ_TIMEOUT,
        ),
    ):
        """Transmit the resource to be uploaded.

        Args:
            transport (~requests.Session): A ``requests`` object which can
                make authenticated requests.
            data (bytes): The resource content to be uploaded.
            metadata (Mapping[str, str]): The resource metadata, such as an
                ACL list.
            content_type (str): The content type of the resource, e.g. a JPEG
                image has content type ``image/jpeg``.
            timeout (Optional[Union[float, Tuple[float, float]]]):
                The number of seconds to wait for the server response.
                Depending on the retry strategy, a request may be repeated
                several times using the same timeout each time.

                Can also be passed as a tuple (connect_timeout, read_timeout).
                See :meth:`requests.Session.request` documentation for details.

        Returns:
            ~requests.Response: The HTTP response returned by ``transport``.
        """
        method, url, payload, headers = self._prepare_request(
            data, metadata, content_type
        )
        response = _request_helpers.http_request(
            transport,
            method,
            url,
            data=payload,
            headers=headers,
            retry_strategy=self._retry_strategy,
            timeout=timeout,
        )

        self._process_response(response)
        return response


class ResumableUpload(_request_helpers.RequestsMixin, _upload.ResumableUpload):
    """Initiate and fulfill a resumable upload to a Google API.

    A **resumable** upload sends an initial request with the resource metadata
    and then gets assigned an upload ID / upload URL to send bytes to.
    Using the upload URL, the upload is then done in chunks (determined by
    the user) until all bytes have been uploaded.

    When constructing a resumable upload, only the resumable upload URL and
    the chunk size are required:

    .. testsetup:: resumable-constructor

       bucket = u'bucket-foo'

    .. doctest:: resumable-constructor

       >>> from google.resumable_media.requests import ResumableUpload
       >>>
       >>> url_template = (
       ...     u'https://www.googleapis.com/upload/storage/v1/b/{bucket}/o?'
       ...     u'uploadType=resumable')
       >>> upload_url = url_template.format(bucket=bucket)
       >>>
       >>> chunk_size = 3 * 1024 * 1024  # 3MB
       >>> upload = ResumableUpload(upload_url, chunk_size)

    When initiating an upload (via :meth:`initiate`), the caller is expected
    to pass the resource being uploaded as a file-like ``stream``. If the size
    of the resource is explicitly known, it can be passed in directly:

    .. testsetup:: resumable-explicit-size

       import os
       import tempfile

       import mock
       import requests
       from six.moves import http_client

       from google.resumable_media.requests import ResumableUpload

       upload_url = u'http://test.invalid'
       chunk_size = 3 * 1024 * 1024  # 3MB
       upload = ResumableUpload(upload_url, chunk_size)

       file_desc, filename = tempfile.mkstemp()
       os.close(file_desc)

       data = b'some bytes!'
       with open(filename, u'wb') as file_obj:
           file_obj.write(data)

       fake_response = requests.Response()
       fake_response.status_code = int(http_client.OK)
       fake_response._content = b''
       resumable_url = u'http://test.invalid?upload_id=7up'
       fake_response.headers[u'location'] = resumable_url

       post_method = mock.Mock(return_value=fake_response, spec=[])
       transport = mock.Mock(request=post_method, spec=['request'])

    .. doctest:: resumable-explicit-size

       >>> import os
       >>>
       >>> upload.total_bytes is None
       True
       >>>
       >>> stream = open(filename, u'rb')
       >>> total_bytes = os.path.getsize(filename)
       >>> metadata = {u'name': filename}
       >>> response = upload.initiate(
       ...     transport, stream, metadata, u'text/plain',
       ...     total_bytes=total_bytes)
       >>> response
       <Response [200]>
       >>>
       >>> upload.total_bytes == total_bytes
       True

    .. testcleanup:: resumable-explicit-size

       os.remove(filename)

    If the stream is in a "final" state (i.e. it won't have any more bytes
    written to it), the total number of bytes can be determined implicitly
    from the ``stream`` itself:

    .. testsetup:: resumable-implicit-size

       import io

       import mock
       import requests
       from six.moves import http_client

       from google.resumable_media.requests import ResumableUpload

       upload_url = u'http://test.invalid'
       chunk_size = 3 * 1024 * 1024  # 3MB
       upload = ResumableUpload(upload_url, chunk_size)

       fake_response = requests.Response()
       fake_response.status_code = int(http_client.OK)
       fake_response._content = b''
       resumable_url = u'http://test.invalid?upload_id=7up'
       fake_response.headers[u'location'] = resumable_url

       post_method = mock.Mock(return_value=fake_response, spec=[])
       transport = mock.Mock(request=post_method, spec=['request'])

       data = b'some MOAR bytes!'
       metadata = {u'name': u'some-file.jpg'}
       content_type = u'image/jpeg'

    .. doctest:: resumable-implicit-size

       >>> stream = io.BytesIO(data)
       >>> response = upload.initiate(
       ...     transport, stream, metadata, content_type)
       >>>
       >>> upload.total_bytes == len(data)
       True

    If the size of the resource is **unknown** when the upload is initiated,
    the ``stream_final`` argument can be used. This might occur if the
    resource is being dynamically created on the client (e.g. application
    logs). To use this argument:

    .. testsetup:: resumable-unknown-size

       import io

       import mock
       import requests
       from six.moves import http_client

       from google.resumable_media.requests import ResumableUpload

       upload_url = u'http://test.invalid'
       chunk_size = 3 * 1024 * 1024  # 3MB
       upload = ResumableUpload(upload_url, chunk_size)

       fake_response = requests.Response()
       fake_response.status_code = int(http_client.OK)
       fake_response._content = b''
       resumable_url = u'http://test.invalid?upload_id=7up'
       fake_response.headers[u'location'] = resumable_url

       post_method = mock.Mock(return_value=fake_response, spec=[])
       transport = mock.Mock(request=post_method, spec=['request'])

       metadata = {u'name': u'some-file.jpg'}
       content_type = u'application/octet-stream'

       stream = io.BytesIO(b'data')

    .. doctest:: resumable-unknown-size

       >>> response = upload.initiate(
       ...     transport, stream, metadata, content_type,
       ...     stream_final=False)
       >>>
       >>> upload.total_bytes is None
       True

    Args:
        upload_url (str): The URL where the resumable upload will be initiated.
        chunk_size (int): The size of each chunk used to upload the resource.
        headers (Optional[Mapping[str, str]]): Extra headers that should
            be sent with the :meth:`initiate` request, e.g. headers for
            encrypted data. These **will not** be sent with
            :meth:`transmit_next_chunk` or :meth:`recover` requests.
        checksum Optional([str]): The type of checksum to compute to verify
            the integrity of the object. After the upload is complete, the
            server-computed checksum of the resulting object will be checked
            and google.resumable_media.common.DataCorruption will be raised on
            a mismatch. The corrupted file will not be deleted from the remote
            host automatically. Supported values are "md5", "crc32c" and None.
            The default is None.

    Attributes:
        upload_url (str): The URL where the content will be uploaded.

    Raises:
        ValueError: If ``chunk_size`` is not a multiple of
            :data:`.UPLOAD_CHUNK_SIZE`.
    """

    def initiate(
        self,
        transport,
        stream,
        metadata,
        content_type,
        total_bytes=None,
        stream_final=True,
        timeout=(
            _request_helpers._DEFAULT_CONNECT_TIMEOUT,
            _request_helpers._DEFAULT_READ_TIMEOUT,
        ),
    ):
        """Initiate a resumable upload.

        By default, this method assumes your ``stream`` is in a "final"
        state ready to transmit. However, ``stream_final=False`` can be used
        to indicate that the size of the resource is not known. This can happen
        if bytes are being dynamically fed into ``stream``, e.g. if the stream
        is attached to application logs.

        If ``stream_final=False`` is used, :attr:`chunk_size` bytes will be
        read from the stream every time :meth:`transmit_next_chunk` is called.
        If one of those reads produces strictly fewer bites than the chunk
        size, the upload will be concluded.

        Args:
            transport (~requests.Session): A ``requests`` object which can
                make authenticated requests.
            stream (IO[bytes]): The stream (i.e. file-like object) that will
                be uploaded. The stream **must** be at the beginning (i.e.
                ``stream.tell() == 0``).
            metadata (Mapping[str, str]): The resource metadata, such as an
                ACL list.
            content_type (str): The content type of the resource, e.g. a JPEG
                image has content type ``image/jpeg``.
            total_bytes (Optional[int]): The total number of bytes to be
                uploaded. If specified, the upload size **will not** be
                determined from the stream (even if ``stream_final=True``).
            stream_final (Optional[bool]): Indicates if the ``stream`` is
                "final" (i.e. no more bytes will be added to it). In this case
                we determine the upload size from the size of the stream. If
                ``total_bytes`` is passed, this argument will be ignored.
            timeout (Optional[Union[float, Tuple[float, float]]]):
                The number of seconds to wait for the server response.
                Depending on the retry strategy, a request may be repeated
                several times using the same timeout each time.

                Can also be passed as a tuple (connect_timeout, read_timeout).
                See :meth:`requests.Session.request` documentation for details.

        Returns:
            ~requests.Response: The HTTP response returned by ``transport``.
        """
        method, url, payload, headers = self._prepare_initiate_request(
            stream,
            metadata,
            content_type,
            total_bytes=total_bytes,
            stream_final=stream_final,
        )
        response = _request_helpers.http_request(
            transport,
            method,
            url,
            data=payload,
            headers=headers,
            retry_strategy=self._retry_strategy,
            timeout=timeout,
        )
        self._process_initiate_response(response)
        return response

    def transmit_next_chunk(
        self,
        transport,
        timeout=(
            _request_helpers._DEFAULT_CONNECT_TIMEOUT,
            _request_helpers._DEFAULT_READ_TIMEOUT,
        ),
    ):
        """Transmit the next chunk of the resource to be uploaded.

        If the current upload was initiated with ``stream_final=False``,
        this method will dynamically determine if the upload has completed.
        The upload will be considered complete if the stream produces
        fewer than :attr:`chunk_size` bytes when a chunk is read from it.

        In the case of failure, an exception is thrown that preserves the
        failed response:

        .. testsetup:: bad-response

           import io

           import mock
           import requests
           from six.moves import http_client

           from google import resumable_media
           import google.resumable_media.requests.upload as upload_mod

           transport = mock.Mock(spec=['request'])
           fake_response = requests.Response()
           fake_response.status_code = int(http_client.BAD_REQUEST)
           transport.request.return_value = fake_response

           upload_url = u'http://test.invalid'
           upload = upload_mod.ResumableUpload(
               upload_url, resumable_media.UPLOAD_CHUNK_SIZE)
           # Fake that the upload has been initiate()-d
           data = b'data is here'
           upload._stream = io.BytesIO(data)
           upload._total_bytes = len(data)
           upload._resumable_url = u'http://test.invalid?upload_id=nope'

        .. doctest:: bad-response
           :options: +NORMALIZE_WHITESPACE

           >>> error = None
           >>> try:
           ...     upload.transmit_next_chunk(transport)
           ... except resumable_media.InvalidResponse as caught_exc:
           ...     error = caught_exc
           ...
           >>> error
           InvalidResponse('Request failed with status code', 400,
                           'Expected one of', <HTTPStatus.OK: 200>, 308)
           >>> error.response
           <Response [400]>

        Args:
            transport (~requests.Session): A ``requests`` object which can
                make authenticated requests.
            timeout (Optional[Union[float, Tuple[float, float]]]):
                The number of seconds to wait for the server response.
                Depending on the retry strategy, a request may be repeated
                several times using the same timeout each time.

                Can also be passed as a tuple (connect_timeout, read_timeout).
                See :meth:`requests.Session.request` documentation for details.

        Returns:
            ~requests.Response: The HTTP response returned by ``transport``.

        Raises:
            ~google.resumable_media.common.InvalidResponse: If the status
                code is not 200 or 308.
            ~google.resumable_media.common.DataCorruption: If this is the final
                chunk, a checksum validation was requested, and the checksum
                does not match or is not available.
        """
        method, url, payload, headers = self._prepare_request()
        response = _request_helpers.http_request(
            transport,
            method,
            url,
            data=payload,
            headers=headers,
            retry_strategy=self._retry_strategy,
            timeout=timeout,
        )
        self._process_response(response, len(payload))
        return response

    def recover(self, transport):
        """Recover from a failure.

        This method should be used when a :class:`ResumableUpload` is in an
        :attr:`~ResumableUpload.invalid` state due to a request failure.

        This will verify the progress with the server and make sure the
        current upload is in a valid state before :meth:`transmit_next_chunk`
        can be used again.

        Args:
            transport (~requests.Session): A ``requests`` object which can
                make authenticated requests.

        Returns:
            ~requests.Response: The HTTP response returned by ``transport``.
        """
        method, url, payload, headers = self._prepare_recover_request()
        # NOTE: We assume "payload is None" but pass it along anyway.
        response = _request_helpers.http_request(
            transport,
            method,
            url,
            data=payload,
            headers=headers,
            retry_strategy=self._retry_strategy,
        )
        self._process_recover_response(response)
        return response