Module uploadcare.uploadcare

Expand source code
#!/usr/bin/env python
# coding: utf-8

import hashlib
import hmac
import json
import mimetypes
import time
from pathlib import Path
from typing import Optional, Union

import dateparser
import requests
from requests.structures import CaseInsensitiveDict

from .exceptions import (InvalidDatetimeString, MissingExpireKwarg,
                        MissingSecretKey)


class UploadCare:
    """Python wrapper for UploadCare's Upload API

    Attributes:
        pub_key (str): Your public key.
        secret_key (Optional[str]): Your secret key.
        api_url (Optional[str]): API URL.
    """

    def __init__(self,
                 pub_key: str,
                 secret_key: Optional[str] = None,
                 api_url: Optional[str] = 'https://upload.uploadcare.com'):
        """Initializes the client.

        Args:
            pub_key (str): Your public key.
            secret_key (Optional[str]): Your secret key.
            api_url (Optional[str]): API URL. Defaults to
                'https://upload.uploadcare.com'.
        """
        self.pub_key = pub_key
        self.secret_key = secret_key
        self.api_url = api_url

    def generate_secure_signature(secret_key, expire):
        """Generates a signature to be sent alongside a secure upload request

        Note:
            Source: 
            https://uploadcare.com/docs/security/secure-uploads/#make-signature

        Args:
            secret_key (str): The secret key for your Uploadcare project.
                expire (int): A UNIX timestamp indicating the time when the
                signature will expire.

        Returns:
            str: A signature string.

        Raises:
            ValueError: If the secret key is not a string.
            TypeError: If the expire argument is not an integer.
        """
        k, m = secret_key, str(expire).encode('utf-8')
        if not isinstance(k, (bytes, bytearray)):
            k = k.encode('utf-8')
        return hmac.new(k, m, hashlib.sha256).hexdigest()

    def _secure_expire_signature(
            self, expire: Optional[Union[float, int, str]]) -> dict:
        if not isinstance(expire, (float, int, str)):
            raise ValueError(
                '`expire` value must be a float or a string representing date '
                'and/or time in a recognizably valid format.')

        if not self.secret_key:
            raise MissingSecretKey

        if isinstance(expire, str):
            expire = dateparser.parse(expire)
            if not expire:
                raise InvalidDatetimeString(expire)
            expire = time.mktime(expire.timetuple())
        if expire - time.time() < 0:
            raise ValueError('Expire timestamp cannot be in the past!')

        expire = int(expire)
        signature = generate_secure_signature(self.secret_key, expire)
        return {'expire': expire, 'signature': signature}

    def _check_response(self, resp):
        if resp.status_code != 200:
            if '`signature` is required' in resp.text and not self.secret_key:
                raise MissingSecretKey
            raise ConnectionError(resp.text)
        else:
            return resp.json()

    def _input_identity(self, _input) -> tuple:
        is_url = True
        endpoint = f'{self.api_url}/from_url/'
        if Path(_input).exists():
            is_url = False
            endpoint = f'{self.api_url}/base/'
        else:
            if not _input.startswith('http'):
                raise ValueError(
                    'Input is neither an existing file or a valid URL!')
        return is_url, endpoint

    def check_status(self, token) -> tuple:
        """Check the status of a file upload.

        Args:
            token (str): The token returned by the upload endpoint.

        Returns:
            tuple: A tuple containing the filename and uuid of the uploaded
            file.

        Raises:
            ConnectionError: If the status is 'error' or 'unknown'.
        """
        status_endpoint = f'{self.api_url}/from_url/status/'

        while True:
            status_res = requests.post(status_endpoint, data={'token': token})
            status = status_res.json()['status']
            if status == 'success':
                break
            elif status in ['error', 'unknown']:
                raise ConnectionError(status_res.text)
            time.sleep(0.5)

        wait_res = status_res.json()
        return wait_res['filename'], wait_res['uuid']

    def upload(self,
               _input: str,
               store: Union[int, str] = 'auto',
               metadata: Optional[dict] = None,
               expire: Optional[Union[float, int, str]] = None,
               **kwargs) -> str:
        """Uploads a file to Uploadcare.

        Args:
            _input (str): A path to a file or a URL.
            store (Union[int, str], optional): Whether to store the file.
                Defaults to 'auto'.
            metadata (Optional[dict], optional): Metadata to be attached to
                the file. Defaults to None.
            expire (Optional[Union[float, int, str]], optional): Expiration time
                for the upload. Defaults to None.
            **kwargs: Additional arguments to be passed to the API.

        Returns:
            str: The URL of the uploaded file.

        Raises:
            MissingExpireKwarg: If the secret key is set but the expire kwarg
                is not.
        """
        if self.secret_key and not expire:
            raise MissingExpireKwarg

        fname = Path(_input).name

        is_url, endpoint = self._input_identity(_input)

        if not is_url:
            with open(_input, 'rb') as f:
                fdata = f.read()

        pub_k = 'pub_key' if is_url else 'UPLOADCARE_PUB_KEY'

        data = {pub_k: self.pub_key, 'UPLOADCARE_STORE': str(store), **kwargs}

        if is_url:
            data.update({'source_url': _input})

        if metadata:
            for k, v in metadata.items():
                data.update({f'metadata[{k}]': v})

        if expire:
            data.update(self._secure_expire_signature(expire))

        content_type = mimetypes.guess_type(fname)[0]

        if is_url:
            res = requests.post(endpoint, data=data)
        else:
            res = requests.post(endpoint,
                                data=data,
                                files={'file': (fname, fdata, content_type)})

        res_json = self._check_response(res)

        if not is_url:
            _uuid = res_json['file']
        elif res_json.get('token'):
            token = res_json['token']
            fname, _uuid = self.check_status(token)
        else:
            fname, _uuid = res_json['filename'], res_json['uuid']

        return f'https://ucarecdn.com/{_uuid}/{fname}'

    def info(self, _input: str, pretty: bool = False):
        """Get information about a file.

        Args:
            _input (str): The file id or url of the file.
            pretty (bool): If True, returns a pretty printed json.

        Returns:
            dict: A dictionary containing information about the file.
        """
        endpoint = f'{self.api_url}/info/'

        if _input.startswith('http'):
            _uuid = Path(_input).parts[-2]
        else:
            _uuid = _input
        params = {'file_id': _uuid, 'pub_key': self.pub_key}

        res = requests.get(endpoint, params=params)
        res_json = self._check_response(res)

        if pretty:
            return json.dumps(res_json, indent=4)
        return res_json

    def start_multipart(self,
                        filename: str,
                        size: int,
                        expire: Optional[Union[float, int, str]] = None,
                        **kwargs):
        """Starts a multipart upload.

        Args:
            filename (str): The name of the file to be uploaded.
            size (int): The size of the file in bytes.
            expire (Optional[Union[float, int, str]]): The time when the upload
                will expire. If not provided, the upload will never expire.
                Can be a number of seconds since the epoch, a datetime object,
                or a string in a valid date/time format.
            **kwargs: Additional parameters to be passed to the server.

        Returns:
            dict: A dictionary containing the upload URL and the upload ID.

        Raises:
            MissingExpireKwarg: If the secret key is set and the expire argument
                is not provided.
            UploadcareException: If the response from the server is not
                successful.
        """
        if self.secret_key and not expire:
            raise MissingExpireKwarg

        endpoint = f'{self.api_url}/multipart/start/'
        data = {
            'UPLOADCARE_PUB_KEY': self.pub_key,
            'filename': filename,
            'size': size,
            **kwargs
        }

        if expire:
            data.update(self._secure_expire_signature(expire))

        res = requests.post(endpoint, data=data)
        return self._check_response(res)

    def upload_parts(presigned_url_x: str, content_type: str, **kwargs):
        """Uploads parts to a presigned url.

        Args:
            presigned_url_x: The presigned url to upload to.
            content_type: The content type of the file.

        Returns:
            A dict containing the response from the server.
        """
        endpoint = f'{self.api_url}/{presigned_url_x}'
        headers = CaseInsensitiveDict()
        headers['Content-type'] = content_type
        res = requests.put(endpoint)
        return self._check_response(res)

    def complete_multipart(self, uuid):
        """Complete multipart upload.
    
        Args:
            uuid (str): UUID of the file to complete.
            
        Returns:
            dict: A dictionary containing the response from the server.
        """
        endpoint = f'{self.api_url}/multipart/complete/'
        data = {'UPLOADCARE_PUB_KEY': self.pub_key, 'uuid': uuid}
        res = requests.post(endpoint, data=data)
        return self._check_response(res)

    def create_group(self,
                     files: list,
                     expire: Optional[Union[float, int, str]] = None,
                     **kwargs):
        """Creates a group of files.

        Args:
            files (list): A list of file objects.
            expire (Optional[Union[float, int, str]]): The time in seconds until
                the group expires.
            **kwargs: Additional keyword arguments to pass to the API.

        Returns:
            dict: The response from the API.

        Raises:
            MissingExpireKwarg: If the secret key is set and the expire kwarg is
                not.
        """

        if self.secret_key and not expire:
            raise MissingExpireKwarg

        endpoint = f'{self.api_url}/group/'
        data = {'pub_key': self.pub_key, **kwargs}
        for n, file in enumerate(files):
            data.update({f'files[{n}]': file})

        if expire:
            data.update(self._secure_expire_signature(expire))

        res = requests.post(endpoint, data=data)
        return self._check_response(res)

    def group_info(self, group_id: str):
        """Returns information about a files group.

        Args:
            group_id (str): The id of the group to get info for.
        Returns:
            dict: A dictionary containing the group's info.
        """
        endpoint = f'{self.api_url}/group/info/'
        params = {'pub_key': self.pub_key, 'group_id': group_id}
        res = requests.get(endpoint, params=params)
        return self._check_response(res)

Classes

class UploadCare (pub_key: str, secret_key: Optional[str] = None, api_url: Optional[str] = 'https://upload.uploadcare.com')

Python wrapper for UploadCare's Upload API

Attributes

pub_key : str
Your public key.
secret_key : Optional[str]
Your secret key.
api_url : Optional[str]
API URL.

Initializes the client.

Args

pub_key : str
Your public key.
secret_key : Optional[str]
Your secret key.
api_url : Optional[str]
API URL. Defaults to 'https://upload.uploadcare.com'.
Expand source code
class UploadCare:
    """Python wrapper for UploadCare's Upload API

    Attributes:
        pub_key (str): Your public key.
        secret_key (Optional[str]): Your secret key.
        api_url (Optional[str]): API URL.
    """

    def __init__(self,
                 pub_key: str,
                 secret_key: Optional[str] = None,
                 api_url: Optional[str] = 'https://upload.uploadcare.com'):
        """Initializes the client.

        Args:
            pub_key (str): Your public key.
            secret_key (Optional[str]): Your secret key.
            api_url (Optional[str]): API URL. Defaults to
                'https://upload.uploadcare.com'.
        """
        self.pub_key = pub_key
        self.secret_key = secret_key
        self.api_url = api_url

    def generate_secure_signature(secret_key, expire):
        """Generates a signature to be sent alongside a secure upload request

        Note:
            Source: 
            https://uploadcare.com/docs/security/secure-uploads/#make-signature

        Args:
            secret_key (str): The secret key for your Uploadcare project.
                expire (int): A UNIX timestamp indicating the time when the
                signature will expire.

        Returns:
            str: A signature string.

        Raises:
            ValueError: If the secret key is not a string.
            TypeError: If the expire argument is not an integer.
        """
        k, m = secret_key, str(expire).encode('utf-8')
        if not isinstance(k, (bytes, bytearray)):
            k = k.encode('utf-8')
        return hmac.new(k, m, hashlib.sha256).hexdigest()

    def _secure_expire_signature(
            self, expire: Optional[Union[float, int, str]]) -> dict:
        if not isinstance(expire, (float, int, str)):
            raise ValueError(
                '`expire` value must be a float or a string representing date '
                'and/or time in a recognizably valid format.')

        if not self.secret_key:
            raise MissingSecretKey

        if isinstance(expire, str):
            expire = dateparser.parse(expire)
            if not expire:
                raise InvalidDatetimeString(expire)
            expire = time.mktime(expire.timetuple())
        if expire - time.time() < 0:
            raise ValueError('Expire timestamp cannot be in the past!')

        expire = int(expire)
        signature = generate_secure_signature(self.secret_key, expire)
        return {'expire': expire, 'signature': signature}

    def _check_response(self, resp):
        if resp.status_code != 200:
            if '`signature` is required' in resp.text and not self.secret_key:
                raise MissingSecretKey
            raise ConnectionError(resp.text)
        else:
            return resp.json()

    def _input_identity(self, _input) -> tuple:
        is_url = True
        endpoint = f'{self.api_url}/from_url/'
        if Path(_input).exists():
            is_url = False
            endpoint = f'{self.api_url}/base/'
        else:
            if not _input.startswith('http'):
                raise ValueError(
                    'Input is neither an existing file or a valid URL!')
        return is_url, endpoint

    def check_status(self, token) -> tuple:
        """Check the status of a file upload.

        Args:
            token (str): The token returned by the upload endpoint.

        Returns:
            tuple: A tuple containing the filename and uuid of the uploaded
            file.

        Raises:
            ConnectionError: If the status is 'error' or 'unknown'.
        """
        status_endpoint = f'{self.api_url}/from_url/status/'

        while True:
            status_res = requests.post(status_endpoint, data={'token': token})
            status = status_res.json()['status']
            if status == 'success':
                break
            elif status in ['error', 'unknown']:
                raise ConnectionError(status_res.text)
            time.sleep(0.5)

        wait_res = status_res.json()
        return wait_res['filename'], wait_res['uuid']

    def upload(self,
               _input: str,
               store: Union[int, str] = 'auto',
               metadata: Optional[dict] = None,
               expire: Optional[Union[float, int, str]] = None,
               **kwargs) -> str:
        """Uploads a file to Uploadcare.

        Args:
            _input (str): A path to a file or a URL.
            store (Union[int, str], optional): Whether to store the file.
                Defaults to 'auto'.
            metadata (Optional[dict], optional): Metadata to be attached to
                the file. Defaults to None.
            expire (Optional[Union[float, int, str]], optional): Expiration time
                for the upload. Defaults to None.
            **kwargs: Additional arguments to be passed to the API.

        Returns:
            str: The URL of the uploaded file.

        Raises:
            MissingExpireKwarg: If the secret key is set but the expire kwarg
                is not.
        """
        if self.secret_key and not expire:
            raise MissingExpireKwarg

        fname = Path(_input).name

        is_url, endpoint = self._input_identity(_input)

        if not is_url:
            with open(_input, 'rb') as f:
                fdata = f.read()

        pub_k = 'pub_key' if is_url else 'UPLOADCARE_PUB_KEY'

        data = {pub_k: self.pub_key, 'UPLOADCARE_STORE': str(store), **kwargs}

        if is_url:
            data.update({'source_url': _input})

        if metadata:
            for k, v in metadata.items():
                data.update({f'metadata[{k}]': v})

        if expire:
            data.update(self._secure_expire_signature(expire))

        content_type = mimetypes.guess_type(fname)[0]

        if is_url:
            res = requests.post(endpoint, data=data)
        else:
            res = requests.post(endpoint,
                                data=data,
                                files={'file': (fname, fdata, content_type)})

        res_json = self._check_response(res)

        if not is_url:
            _uuid = res_json['file']
        elif res_json.get('token'):
            token = res_json['token']
            fname, _uuid = self.check_status(token)
        else:
            fname, _uuid = res_json['filename'], res_json['uuid']

        return f'https://ucarecdn.com/{_uuid}/{fname}'

    def info(self, _input: str, pretty: bool = False):
        """Get information about a file.

        Args:
            _input (str): The file id or url of the file.
            pretty (bool): If True, returns a pretty printed json.

        Returns:
            dict: A dictionary containing information about the file.
        """
        endpoint = f'{self.api_url}/info/'

        if _input.startswith('http'):
            _uuid = Path(_input).parts[-2]
        else:
            _uuid = _input
        params = {'file_id': _uuid, 'pub_key': self.pub_key}

        res = requests.get(endpoint, params=params)
        res_json = self._check_response(res)

        if pretty:
            return json.dumps(res_json, indent=4)
        return res_json

    def start_multipart(self,
                        filename: str,
                        size: int,
                        expire: Optional[Union[float, int, str]] = None,
                        **kwargs):
        """Starts a multipart upload.

        Args:
            filename (str): The name of the file to be uploaded.
            size (int): The size of the file in bytes.
            expire (Optional[Union[float, int, str]]): The time when the upload
                will expire. If not provided, the upload will never expire.
                Can be a number of seconds since the epoch, a datetime object,
                or a string in a valid date/time format.
            **kwargs: Additional parameters to be passed to the server.

        Returns:
            dict: A dictionary containing the upload URL and the upload ID.

        Raises:
            MissingExpireKwarg: If the secret key is set and the expire argument
                is not provided.
            UploadcareException: If the response from the server is not
                successful.
        """
        if self.secret_key and not expire:
            raise MissingExpireKwarg

        endpoint = f'{self.api_url}/multipart/start/'
        data = {
            'UPLOADCARE_PUB_KEY': self.pub_key,
            'filename': filename,
            'size': size,
            **kwargs
        }

        if expire:
            data.update(self._secure_expire_signature(expire))

        res = requests.post(endpoint, data=data)
        return self._check_response(res)

    def upload_parts(presigned_url_x: str, content_type: str, **kwargs):
        """Uploads parts to a presigned url.

        Args:
            presigned_url_x: The presigned url to upload to.
            content_type: The content type of the file.

        Returns:
            A dict containing the response from the server.
        """
        endpoint = f'{self.api_url}/{presigned_url_x}'
        headers = CaseInsensitiveDict()
        headers['Content-type'] = content_type
        res = requests.put(endpoint)
        return self._check_response(res)

    def complete_multipart(self, uuid):
        """Complete multipart upload.
    
        Args:
            uuid (str): UUID of the file to complete.
            
        Returns:
            dict: A dictionary containing the response from the server.
        """
        endpoint = f'{self.api_url}/multipart/complete/'
        data = {'UPLOADCARE_PUB_KEY': self.pub_key, 'uuid': uuid}
        res = requests.post(endpoint, data=data)
        return self._check_response(res)

    def create_group(self,
                     files: list,
                     expire: Optional[Union[float, int, str]] = None,
                     **kwargs):
        """Creates a group of files.

        Args:
            files (list): A list of file objects.
            expire (Optional[Union[float, int, str]]): The time in seconds until
                the group expires.
            **kwargs: Additional keyword arguments to pass to the API.

        Returns:
            dict: The response from the API.

        Raises:
            MissingExpireKwarg: If the secret key is set and the expire kwarg is
                not.
        """

        if self.secret_key and not expire:
            raise MissingExpireKwarg

        endpoint = f'{self.api_url}/group/'
        data = {'pub_key': self.pub_key, **kwargs}
        for n, file in enumerate(files):
            data.update({f'files[{n}]': file})

        if expire:
            data.update(self._secure_expire_signature(expire))

        res = requests.post(endpoint, data=data)
        return self._check_response(res)

    def group_info(self, group_id: str):
        """Returns information about a files group.

        Args:
            group_id (str): The id of the group to get info for.
        Returns:
            dict: A dictionary containing the group's info.
        """
        endpoint = f'{self.api_url}/group/info/'
        params = {'pub_key': self.pub_key, 'group_id': group_id}
        res = requests.get(endpoint, params=params)
        return self._check_response(res)

Methods

def check_status(self, token) ‑> tuple

Check the status of a file upload.

Args

token : str
The token returned by the upload endpoint.

Returns

tuple
A tuple containing the filename and uuid of the uploaded

file.

Raises

ConnectionError
If the status is 'error' or 'unknown'.
Expand source code
def check_status(self, token) -> tuple:
    """Check the status of a file upload.

    Args:
        token (str): The token returned by the upload endpoint.

    Returns:
        tuple: A tuple containing the filename and uuid of the uploaded
        file.

    Raises:
        ConnectionError: If the status is 'error' or 'unknown'.
    """
    status_endpoint = f'{self.api_url}/from_url/status/'

    while True:
        status_res = requests.post(status_endpoint, data={'token': token})
        status = status_res.json()['status']
        if status == 'success':
            break
        elif status in ['error', 'unknown']:
            raise ConnectionError(status_res.text)
        time.sleep(0.5)

    wait_res = status_res.json()
    return wait_res['filename'], wait_res['uuid']
def complete_multipart(self, uuid)

Complete multipart upload.

Args

uuid : str
UUID of the file to complete.

Returns

dict
A dictionary containing the response from the server.
Expand source code
def complete_multipart(self, uuid):
    """Complete multipart upload.

    Args:
        uuid (str): UUID of the file to complete.
        
    Returns:
        dict: A dictionary containing the response from the server.
    """
    endpoint = f'{self.api_url}/multipart/complete/'
    data = {'UPLOADCARE_PUB_KEY': self.pub_key, 'uuid': uuid}
    res = requests.post(endpoint, data=data)
    return self._check_response(res)
def create_group(self, files: list, expire: Union[float, int, str, None] = None, **kwargs)

Creates a group of files.

Args

files : list
A list of file objects.
expire : Optional[Union[float, int, str]]
The time in seconds until the group expires.
**kwargs
Additional keyword arguments to pass to the API.

Returns

dict
The response from the API.

Raises

MissingExpireKwarg
If the secret key is set and the expire kwarg is not.
Expand source code
def create_group(self,
                 files: list,
                 expire: Optional[Union[float, int, str]] = None,
                 **kwargs):
    """Creates a group of files.

    Args:
        files (list): A list of file objects.
        expire (Optional[Union[float, int, str]]): The time in seconds until
            the group expires.
        **kwargs: Additional keyword arguments to pass to the API.

    Returns:
        dict: The response from the API.

    Raises:
        MissingExpireKwarg: If the secret key is set and the expire kwarg is
            not.
    """

    if self.secret_key and not expire:
        raise MissingExpireKwarg

    endpoint = f'{self.api_url}/group/'
    data = {'pub_key': self.pub_key, **kwargs}
    for n, file in enumerate(files):
        data.update({f'files[{n}]': file})

    if expire:
        data.update(self._secure_expire_signature(expire))

    res = requests.post(endpoint, data=data)
    return self._check_response(res)
def generate_secure_signature(secret_key, expire)

Generates a signature to be sent alongside a secure upload request

Note

Source: https://uploadcare.com/docs/security/secure-uploads/#make-signature

Args

secret_key : str
The secret key for your Uploadcare project. expire (int): A UNIX timestamp indicating the time when the signature will expire.

Returns

str
A signature string.

Raises

ValueError
If the secret key is not a string.
TypeError
If the expire argument is not an integer.
Expand source code
def generate_secure_signature(secret_key, expire):
    """Generates a signature to be sent alongside a secure upload request

    Note:
        Source: 
        https://uploadcare.com/docs/security/secure-uploads/#make-signature

    Args:
        secret_key (str): The secret key for your Uploadcare project.
            expire (int): A UNIX timestamp indicating the time when the
            signature will expire.

    Returns:
        str: A signature string.

    Raises:
        ValueError: If the secret key is not a string.
        TypeError: If the expire argument is not an integer.
    """
    k, m = secret_key, str(expire).encode('utf-8')
    if not isinstance(k, (bytes, bytearray)):
        k = k.encode('utf-8')
    return hmac.new(k, m, hashlib.sha256).hexdigest()
def group_info(self, group_id: str)

Returns information about a files group.

Args

group_id : str
The id of the group to get info for.

Returns

dict
A dictionary containing the group's info.
Expand source code
def group_info(self, group_id: str):
    """Returns information about a files group.

    Args:
        group_id (str): The id of the group to get info for.
    Returns:
        dict: A dictionary containing the group's info.
    """
    endpoint = f'{self.api_url}/group/info/'
    params = {'pub_key': self.pub_key, 'group_id': group_id}
    res = requests.get(endpoint, params=params)
    return self._check_response(res)
def info(self, _input: str, pretty: bool = False)

Get information about a file.

Args

_input : str
The file id or url of the file.
pretty : bool
If True, returns a pretty printed json.

Returns

dict
A dictionary containing information about the file.
Expand source code
def info(self, _input: str, pretty: bool = False):
    """Get information about a file.

    Args:
        _input (str): The file id or url of the file.
        pretty (bool): If True, returns a pretty printed json.

    Returns:
        dict: A dictionary containing information about the file.
    """
    endpoint = f'{self.api_url}/info/'

    if _input.startswith('http'):
        _uuid = Path(_input).parts[-2]
    else:
        _uuid = _input
    params = {'file_id': _uuid, 'pub_key': self.pub_key}

    res = requests.get(endpoint, params=params)
    res_json = self._check_response(res)

    if pretty:
        return json.dumps(res_json, indent=4)
    return res_json
def start_multipart(self, filename: str, size: int, expire: Union[float, int, str, None] = None, **kwargs)

Starts a multipart upload.

Args

filename : str
The name of the file to be uploaded.
size : int
The size of the file in bytes.
expire : Optional[Union[float, int, str]]
The time when the upload will expire. If not provided, the upload will never expire. Can be a number of seconds since the epoch, a datetime object, or a string in a valid date/time format.
**kwargs
Additional parameters to be passed to the server.

Returns

dict
A dictionary containing the upload URL and the upload ID.

Raises

MissingExpireKwarg
If the secret key is set and the expire argument is not provided.
UploadcareException
If the response from the server is not successful.
Expand source code
def start_multipart(self,
                    filename: str,
                    size: int,
                    expire: Optional[Union[float, int, str]] = None,
                    **kwargs):
    """Starts a multipart upload.

    Args:
        filename (str): The name of the file to be uploaded.
        size (int): The size of the file in bytes.
        expire (Optional[Union[float, int, str]]): The time when the upload
            will expire. If not provided, the upload will never expire.
            Can be a number of seconds since the epoch, a datetime object,
            or a string in a valid date/time format.
        **kwargs: Additional parameters to be passed to the server.

    Returns:
        dict: A dictionary containing the upload URL and the upload ID.

    Raises:
        MissingExpireKwarg: If the secret key is set and the expire argument
            is not provided.
        UploadcareException: If the response from the server is not
            successful.
    """
    if self.secret_key and not expire:
        raise MissingExpireKwarg

    endpoint = f'{self.api_url}/multipart/start/'
    data = {
        'UPLOADCARE_PUB_KEY': self.pub_key,
        'filename': filename,
        'size': size,
        **kwargs
    }

    if expire:
        data.update(self._secure_expire_signature(expire))

    res = requests.post(endpoint, data=data)
    return self._check_response(res)
def upload(self, _input: str, store: Union[int, str] = 'auto', metadata: Optional[dict] = None, expire: Union[float, int, str, None] = None, **kwargs) ‑> str

Uploads a file to Uploadcare.

Args

_input : str
A path to a file or a URL.
store : Union[int, str], optional
Whether to store the file. Defaults to 'auto'.
metadata : Optional[dict], optional
Metadata to be attached to the file. Defaults to None.
expire : Optional[Union[float, int, str]], optional
Expiration time for the upload. Defaults to None.
**kwargs
Additional arguments to be passed to the API.

Returns

str
The URL of the uploaded file.

Raises

MissingExpireKwarg
If the secret key is set but the expire kwarg is not.
Expand source code
def upload(self,
           _input: str,
           store: Union[int, str] = 'auto',
           metadata: Optional[dict] = None,
           expire: Optional[Union[float, int, str]] = None,
           **kwargs) -> str:
    """Uploads a file to Uploadcare.

    Args:
        _input (str): A path to a file or a URL.
        store (Union[int, str], optional): Whether to store the file.
            Defaults to 'auto'.
        metadata (Optional[dict], optional): Metadata to be attached to
            the file. Defaults to None.
        expire (Optional[Union[float, int, str]], optional): Expiration time
            for the upload. Defaults to None.
        **kwargs: Additional arguments to be passed to the API.

    Returns:
        str: The URL of the uploaded file.

    Raises:
        MissingExpireKwarg: If the secret key is set but the expire kwarg
            is not.
    """
    if self.secret_key and not expire:
        raise MissingExpireKwarg

    fname = Path(_input).name

    is_url, endpoint = self._input_identity(_input)

    if not is_url:
        with open(_input, 'rb') as f:
            fdata = f.read()

    pub_k = 'pub_key' if is_url else 'UPLOADCARE_PUB_KEY'

    data = {pub_k: self.pub_key, 'UPLOADCARE_STORE': str(store), **kwargs}

    if is_url:
        data.update({'source_url': _input})

    if metadata:
        for k, v in metadata.items():
            data.update({f'metadata[{k}]': v})

    if expire:
        data.update(self._secure_expire_signature(expire))

    content_type = mimetypes.guess_type(fname)[0]

    if is_url:
        res = requests.post(endpoint, data=data)
    else:
        res = requests.post(endpoint,
                            data=data,
                            files={'file': (fname, fdata, content_type)})

    res_json = self._check_response(res)

    if not is_url:
        _uuid = res_json['file']
    elif res_json.get('token'):
        token = res_json['token']
        fname, _uuid = self.check_status(token)
    else:
        fname, _uuid = res_json['filename'], res_json['uuid']

    return f'https://ucarecdn.com/{_uuid}/{fname}'
def upload_parts(presigned_url_x: str, content_type: str, **kwargs)

Uploads parts to a presigned url.

Args

presigned_url_x
The presigned url to upload to.
content_type
The content type of the file.

Returns

A dict containing the response from the server.

Expand source code
def upload_parts(presigned_url_x: str, content_type: str, **kwargs):
    """Uploads parts to a presigned url.

    Args:
        presigned_url_x: The presigned url to upload to.
        content_type: The content type of the file.

    Returns:
        A dict containing the response from the server.
    """
    endpoint = f'{self.api_url}/{presigned_url_x}'
    headers = CaseInsensitiveDict()
    headers['Content-type'] = content_type
    res = requests.put(endpoint)
    return self._check_response(res)