anylearn.interfaces.resource.resource 源代码

from datetime import datetime
import os
from typing import Optional, Union

from anylearn.interfaces.base import BaseObject
from anylearn.interfaces.resource.resource_uploader import ResourceUploader
from anylearn.interfaces.resource.resource_downloader import ResourceDownloader
from anylearn.utils.errors import AnyLearnException
from anylearn.utils.api import (
    url_base,
    get_with_token,
    post_with_token,
)


[文档]class Resource(BaseObject): """ Attributes ---------- id 资源的唯一标识符,自动生成,由类名前四位+uuid1生成的编码中后28个有效位(小写字母和数字)组成(自动生成) name 资源的名称(长度1~50) description 资源描述(长度最大255) state 资源状态 public 数据集是否为公开(默认为False) upload_time 资源上传时间 filename 下一步中会被分片上传的资源的完整名(包括扩展名) is_zipfile 是否为zip文件 file_path 资源文件路径 size 资源文件大小 creator_id 创建者ID node_id 节点ID owner 资源的所有者,以逗号分隔的这些用户的ID拼成的字符串,无多余空格 load_detail 初始化时是否加载详情 """ """具体资源信息配置""" _fields = { # 资源创建/更新请求包体中必须包含且不能为空的字段 'required': { 'create': ['name', 'filename'], 'update': ['id', 'name'], }, # 资源创建/更新请求包体中包含的所有字段 'payload': { 'create': ['name', 'description', 'public', 'owner', 'filename'], 'update': ['id', 'name', 'description', 'public', 'owner'], }, }
[文档] def __init__(self, id: Optional[str]=None, name: Optional[str]=None, description: Optional[str]=None, state: Optional[int]=None, public=False, upload_time: Optional[Union[datetime, str]]=None, filename: Optional[str]=None, is_zipfile: Optional[int]=None, file_path: Optional[str]=None, size: Optional[str]=None, creator_id: Optional[str]=None, node_id: Optional[str]=None, owner: Optional[list]=None, load_detail=False): """ Parameters ---------- id 资源的唯一标识符,自动生成,由类名前四位+uuid1生成的编码中后28个有效位(小写字母和数字)组成(自动生成) name 资源的名称(长度1~50) description 资源描述(长度最大255) state 资源状态 public 数据集是否为公开(默认为False) upload_time 资源上传时间 filename 下一步中会被分片上传的资源的完整名(包括扩展名) is_zipfile 是否为zip文件 file_path 资源文件路径 size 资源文件大小 creator_id 创建者ID node_id 节点ID owner 资源的所有者,以逗号分隔的这些用户的ID拼成的字符串,无多余空格 load_detail 初始化时是否加载详情 """ self.name = name self.description = description self.state = state self.public = public self.upload_time = upload_time self.filename = filename self.is_zipfile = is_zipfile self.file_path = file_path self.size = size self.creator_id = creator_id self.node_id = node_id self.owner = owner super().__init__(id=id, load_detail=load_detail)
def _payload_create(self): payload = super()._payload_create() payload['public'] = int(payload['public']) return payload def _payload_update(self): payload = super()._payload_update() payload['public'] = int(payload['public']) return payload
[文档] @classmethod def list_dir(cls, resource_id): """ 文件列表查询接口 :param resource_id: :obj:`str` 文件ID :return: .. code-block:: json { "vgg_ssd300_tianyuan.yaml": { "name": "vgg_ssd300_tianyuan.yaml", "type": "file" } } """ assert resource_id return get_with_token(f"{url_base()}/resource/listdir", params={ 'file_id': resource_id })
[文档] @classmethod def upload_file(cls, resource_id: str, file_path: str, uploader: Optional[ResourceUploader]=None, chunk_size: int=2048000): """ 对指定路径的本地文件进行分割并使用aiohttp异步上传 Parameters ---------- resource_id : :obj:`str` 资源ID。 file_path : :obj:`str` 文件路径。 uploader : :obj:`ResourceUploader` 可以使用SDK中的AsyncResourceUploader,也可以自定义实现ResourceUploader。 chunk_size : :obj:`int` 文件分割大小,默认2048000。 Returns ------- bool True or False """ assert resource_id, file_path # 读取选定文件的内容并根据chunk_size进行切割 file_size = os.path.getsize(file_path) n_chunks = (file_size // chunk_size) + 1 with open(file_path, 'rb') as f: chunks = [f.read(chunk_size) for i in range(n_chunks)] # 执行异步上传 uploader.run(resource_id=resource_id, chunks=chunks) # 告知后端上传结束 res = post_with_token(f"{url_base()}/resource/upload_finish", data={ 'file_id': resource_id }) return not not res
[文档] @classmethod def download_file(cls, resource_id: str, save_path: str, polling: Union[float, int]=5, downloader: Optional[ResourceDownloader]=None, ): """ 把服务器资源使用aiohttp异步下载到本地指定的文件夹 Parameters ---------- resource_id : :obj:`str` 资源ID。 save_path : :obj:`str` 保存路径。 downloader : :obj:`ResourceDownloader` 可以使用SDK中的AsyncResourceDownloader,也可以自定义实现ResourceDownloader。 polling : :obj:`float, int` 下载前要先压缩文件,轮询查看文件有没有压缩完的时间间隔,单位:秒。默认值5 Returns ------- str 文件名。 """ assert resource_id, save_path if not os.path.exists(save_path): raise AnyLearnException(f"保存路径{save_path}不存在") # 执行异步下载 return downloader.run(resource_id=resource_id, # type: ignore save_path=save_path, polling=polling, )
[文档]class ResourceState: """ 资源状态标识: - 1(CREATED)表示已创建 - 2(UPLOADING)表示上传中 - 3(READY)表示就绪 - -1(DELETED)表示已删除 - -2(ERROR)表示出错 """ CREATED = 1 UPLOADING = 2 READY = 3 DELETED = -1 ERROR = -2