anylearn.interfaces.resource.resource_uploader 源代码

from abc import ABC, abstractmethod
import asyncio

import aiohttp
import requests
from tqdm import tqdm

from anylearn.config import AnylearnConfig
from anylearn.utils import logger
from anylearn.utils.api import url_base


[文档]class ResourceUploader(ABC): """ 资源上传工具接口 """
[文档] @abstractmethod def run(self, resource_id: str, chunks: list): """ 执行资源上传,自定义资源上传需实现此方法 Parameters ---------- resource_id 资源ID chunks 被切割后的文件内容列表 """ raise NotImplementedError
[文档]class AsyncResourceUploader(ResourceUploader): """ 资源异步上传工具类 """ def __init__(self, on_start=None, on_progress=None): self.__counter = 0 self.__total = 0 self.__on_start = on_start self.__on_progress = on_progress super().__init__()
[文档] def run(self, resource_id: str, chunks: list): """ 执行资源上传 """ return asyncio.run(self.__run(resource_id=resource_id, chunks=chunks))
async def __run(self, resource_id: str, chunks: list): """执行异步上传请求 Args: resource_id: 后端资源ID chunks: 被切割后的文件内容列表 Returns: bool: 上传成功与否 Raises: ClientResponseError: 当上传过程中出错时由aiohttp包(默认情况)抛出错误 """ self.__counter = 0 self.__total = len(chunks) if callable(self.__on_start): self.__on_start(self.__total) headers = {'Authorization': f"Bearer {AnylearnConfig.token}"} async with aiohttp.ClientSession(headers=headers) as session: tasks = [self.__do_upload(session=session, resource_id=resource_id, chunk=chunk, chunk_index=i) for (i, chunk) in enumerate(chunks)] res = await asyncio.gather(*tasks) return all(res) async def __do_upload(self, session: aiohttp.ClientSession, resource_id: str, chunk: str, chunk_index: int): res = await session.request(method="POST", url=f"{url_base()}/resource/upload", raise_for_status=True, data={ 'file_id': resource_id, 'file': chunk, 'chunk': str(chunk_index), }) self.__counter += 1 if callable(self.__on_progress): self.__on_progress(self.__counter, self.__total) return res.ok
[文档]class SyncResourceUploader(ResourceUploader):
[文档] def run(self, resource_id: str, chunks: list): for i, chunk in enumerate(tqdm(chunks)): url = f"{url_base()}/resource/upload" headers = {'Authorization': f"Bearer {AnylearnConfig.token}"} files = {'file': chunk} data = {'file_id': resource_id, 'chunk': str(i)} res = requests.post(url, headers=headers, files=files, data=data) res.raise_for_status()