from abc import ABC, abstractmethod
import asyncio
import aiohttp
import requests
from tqdm import tqdm
from anylearn.config import AnylearnConfig
from anylearn.utils import logger
from anylearn.utils.api import url_base
[文档]class ResourceUploader(ABC):
    """
    资源上传工具接口
    """
[文档]    @abstractmethod
    def run(self, resource_id: str, chunks: list):
        """
        执行资源上传,自定义资源上传需实现此方法
        Parameters
        ----------
        resource_id
            资源ID
        chunks
            被切割后的文件内容列表
        """
        raise NotImplementedError  
[文档]class AsyncResourceUploader(ResourceUploader):
    """
    资源异步上传工具类
    """
    def __init__(self, on_start=None, on_progress=None):
        self.__counter = 0
        self.__total = 0
        self.__on_start = on_start
        self.__on_progress = on_progress
        super().__init__()
[文档]    def run(self, resource_id: str, chunks: list):
        """
        执行资源上传
        """
        return asyncio.run(self.__run(resource_id=resource_id,
                                      chunks=chunks)) 
    async def __run(self, resource_id: str, chunks: list):
        """执行异步上传请求
        Args:
            resource_id: 后端资源ID
            chunks: 被切割后的文件内容列表
        Returns:
            bool: 上传成功与否
        
        Raises:
            ClientResponseError: 当上传过程中出错时由aiohttp包(默认情况)抛出错误
        """
        self.__counter = 0
        self.__total = len(chunks)
        if callable(self.__on_start):
            self.__on_start(self.__total)
        headers = {'Authorization': f"Bearer {AnylearnConfig.token}"}
        async with aiohttp.ClientSession(headers=headers) as session:
            tasks = [self.__do_upload(session=session,
                                      resource_id=resource_id,
                                      chunk=chunk,
                                      chunk_index=i)
                        for (i, chunk) in enumerate(chunks)]
            res = await asyncio.gather(*tasks)
        return all(res)
    async def __do_upload(self,
                          session: aiohttp.ClientSession,
                          resource_id: str,
                          chunk: str,
                          chunk_index: int):
        res = await session.request(method="POST",
                                    url=f"{url_base()}/resource/upload",
                                    raise_for_status=True,
                                    data={
                                        'file_id': resource_id,
                                        'file': chunk,
                                        'chunk': str(chunk_index),
                                    })
        self.__counter += 1
        if callable(self.__on_progress):
            self.__on_progress(self.__counter, self.__total)
        return res.ok 
[文档]class SyncResourceUploader(ResourceUploader):
[文档]    def run(self, resource_id: str, chunks: list):
        for i, chunk in enumerate(tqdm(chunks)):
            url = f"{url_base()}/resource/upload"
            headers = {'Authorization': f"Bearer {AnylearnConfig.token}"}
            files = {'file': chunk}
            data = {'file_id': resource_id, 'chunk': str(i)}
            res = requests.post(url, headers=headers, files=files, data=data)
            res.raise_for_status()