anylearn.interfaces.service.service 源代码

from datetime import datetime
import json
from os import path
import requests
from typing import Optional

from anylearn.config import AnylearnConfig
from anylearn.utils.api import url_base, get_with_token
from anylearn.utils.errors import AnyLearnException
from anylearn.interfaces.base import BaseObject
from .record import ServiceRecord, ServiceRecordState


[文档]class ServiceVisibility: """ 服务可见性标识: - 1(PRIVATE)表示仅创建者可见 - 2(PROTECTED)表示所有者可见 - 3(PUBLIC)表示公开 """ PRIVATE = 1 PROTECTED = 2 PUBLIC = 3
[文档]class ServiceState: """ 服务状态标识: - 0(CREATED)表示已创建 - 1(RUNNING)表示已部署运行中 - -1(DELETED)表示已删除 - -2(STOPPED)表示已停止 """ CREATED = 0 RUNNING = 1 DELETED = -1 STOPPED = -2
[文档]class Service(BaseObject): """ Attributes ---------- id 模型服务的唯一标识符,自动生成,由SERV+uuid1生成的编码中后28个有效位(小写字母和数字)组成(非空) name 模型服务的名称 description 模型服务描述(默认None,最大长度255) visibility 模型服务的可见性(默认值3) model_id 模型服务使用的模型ID address 模型服务运行地址 secret_key 模型服务秘钥 creator_id 创建者ID envs 环境变量 replicas 模型服务副本数量 state 状态 create_time 创建时间 owner 模型服务的所有者,以逗号分隔的这些用户的ID拼成的字符串,无多余空格 load_detail 初始化时是否加载详情 """ _fields = { # 资源创建/更新请求包体中必须包含且不能为空的字段 'required': { 'create': ['name', 'model_id'], 'update': ['id', 'name'], }, # 资源创建/更新请求包体中包含的所有字段 'payload': { 'create': ['name', 'description', 'visibility', 'model_id', 'envs', 'replicas', 'owner'], 'update': ['id', 'name', 'description', 'visibility', 'owner'], }, } """ 创建/更新对象时: - 必须包含且不能为空的字段 :obj:`_fields['required']` - 所有字段 :obj:`_fields['payload']` """
[文档] def __init__(self, id: Optional[str]=None, name: Optional[str]=None, description: Optional[str]=None, visibility=ServiceVisibility.PUBLIC, model_id: Optional[str]=None, address: Optional[str]=None, secret_key: Optional[str]=None, creator_id: Optional[str]=None, envs: Optional[str]=None, replicas=1, state: Optional[int]=None, create_time: Optional[datetime]=None, owner: Optional[list]=None, load_detail=False): """ Parameters ---------- id 模型服务的唯一标识符,自动生成,由SERV+uuid1生成的编码中后28个有效位(小写字母和数字)组成(非空) name 模型服务的名称 description 模型服务描述(默认None,最大长度255) visibility 模型服务的可见性(默认值3) model_id 模型服务使用的模型ID address 模型服务运行地址 secret_key 模型服务秘钥 creator_id 创建者ID envs 环境变量 replicas 模型服务副本数量 state 状态 create_time 创建时间 owner 模型服务的所有者,以逗号分隔的这些用户的ID拼成的字符串,无多余空格 load_detail 初始化时是否加载详情 """ self.name = name self.description = description self.state = state self.visibility = visibility self.creator_id = creator_id self.owner = owner self.model_id = model_id self.address = address self.secret_key = secret_key self.envs = envs self.replicas = replicas self.create_time = create_time super().__init__(id=id, load_detail=load_detail)
[文档] @classmethod def get_list(cls) -> list: """ 获取服务列表 Returns ------- List [Service] 服务对象的集合。 """ res = get_with_token(f"{url_base()}/service/list") if res is None or not isinstance(res, list): raise AnyLearnException("请求未能得到有效响应") return [ Service(id=s['id'], name=s['name'], description=s['description'], state=s['state'], visibility=s['visibility'], owner=s['owner'], model_id=s['model_id'], creator_id=s['creator_id'], address=s['address'], secret_key=s['secret_key'], create_time=s['create_time']) for s in res ]
[文档] def get_detail(self): """ 获取服务详细信息 - 对象属性 :obj:`id` 应为非空 Returns ------- Service 服务对象。 """ self._check_fields(required=['id']) res = get_with_token(f"{url_base()}/service/query", params={'id': self.id}) if not res or not isinstance(res, list): raise AnyLearnException("请求未能得到有效响应") res = res[0] self.__init__(id=res['id'], name=res['name'], description=res['description'], visibility=res['visibility'], model_id=res['model_id'], address=res['address'], secret_key=res['secret_key'], creator_id=res['creator_id'], envs=res['envs'], replicas=res['replicas'], state=res['state'], create_time=res['create_time'], owner=res['owner'])
[文档] def scale(self, replicas: int): """ 模型服务伸缩接口 - 对象属性 :obj:`id` 应为非空 Parameters ---------- replicas : :obj:`int` 模型服务副本数量(replicas>=1)。 Returns ------- bool True or False """ if replicas < 1: raise AnyLearnException("服务副本数量不应小于1") self._check_fields(required=['id']) res = get_with_token(f"{url_base()}/service/scale", params={ 'id': self.id, 'replicas': replicas, }) if not res or 'data' not in res: raise AnyLearnException("请求未能得到有效响应") self.replicas = replicas return res.get('data', None) == self.id
[文档] def stop(self): """ 模型服务停止接口 - 对象属性 :obj:`id` 应为非空 Returns ------- bool True or False """ self._check_fields(required=['id']) res = get_with_token(f"{url_base()}/service/stop", params={ 'id': self.id, }) if not res or 'data' not in res: raise AnyLearnException("请求未能得到有效响应") self.state = ServiceState.STOPPED return res.get('data', None) == self.id
[文档] def restart(self): """ 模型服务重启接口 - 对象属性 :obj:`id` 应为非空 Returns ------- bool True or False """ self._check_fields(required=['id']) res = get_with_token(f"{url_base()}/service/restart", params={ 'id': self.id, }) if not res or 'data' not in res: raise AnyLearnException("请求未能得到有效响应") self.state = ServiceState.RUNNING return res.get('data', None) == self.id
[文档] def get_deployment_status(self): """ 模型服务状态查询接口 - 对象属性 :obj:`id` 应为非空 :return: .. code-block:: json { "pod_names": [ "deployment-serv001", "deployment-serv002", "deployment-serv003" ], "workers": { "available_workers": 1, "replicas": 1, "unavailable_workers": 0 } } """ self._check_fields(required=['id']) res = get_with_token(f"{url_base()}/service/status", params={ 'id': self.id, }) if not res or 'pods' not in res or 'workers' not in res: raise AnyLearnException("请求未能得到有效响应") res = { 'pod_names': [p['name'] for p in res['pods']], 'workers': res['workers'], } return res
[文档] def get_log(self, pod_name, limit=100, direction="init", offset=0, includes=False): """ 模型服务日志查询接口 - 对象属性 :obj:`id` 应为非空 :param pod_name: :obj:`str` 容器单位名称。 :param limit: :obj:`int` 日志条数上限(默认值100)。 :param direction: :obj:`str` 日志查询方向。 :param offset: :obj:`int` 日志查询索引。 :param includes: :obj:`bool` 是否包含指定索引记录本身。 :return: .. code-block:: json [ { "offset": 6554, "text": "[Xlearn-Serving SERV123 ] - 2020-11-26 01:55:23,660 - DEBUG - (worker:1) is waiting" } ] """ self._check_fields(required=['id']) res = get_with_token(f"{url_base()}/service/log", params={ 'id': self.id, 'pod_name': pod_name, 'limit': limit, 'direction': direction, 'index': offset, 'self': 1 if includes else 0, }) if not res: raise AnyLearnException("请求未能得到有效响应") return res
[文档] def get_records(self, page=1, size=20, load_detail=False): """ 模型服务运行结果查询接口 - 对象属性 :obj:`id` 应为非空 Parameters ---------- page: :obj:`sintr` 页面索引(默认值1)。 size: :obj:`int` 每页结果数量(默认值20)。 load_detail: :obj:`bool` 是否加载记录详情(默认为False)。 Returns ------- List [ServiceRecord] 服务运行记录结果的集合。 """ self._check_fields(required=['id']) ress = get_with_token(f"{url_base()}/service/record", params={ 'id': self.id, 'page_index': page, 'page_size': size, }) if not ress or not isinstance(ress, list): raise AnyLearnException("请求未能得到有效响应") return [ ServiceRecord(id=res['id'], service_id=res['service_id'], inference_data_file_id=res['file_id'], error=res['error'], state=res['state'], create_time=res['create_time'], finish_time=res['finish_time'], load_detail=load_detail) for res in ress ]
[文档] def predict_online(self, file_binary: str): """ 在线预测接口 Parameters ---------- file_binary: :obj:`str` 预测的文件。 Returns ------- ServiceRecord.result 结果示例: :obj:`[[0.06298828125, 0.11171875149011612, 0.44580078125, 0.35546875, 5, 0.9521484375]]` ServiceRecord 服务记录。 """ # Ensure service's address if not self.address: self.get_detail() # Call service's predict_online API url = lambda :AnylearnConfig.cluster_address + \ self.address + \ "/predict_online" res = requests.post(url(), files={'file': file_binary}) res.raise_for_status() res.encoding = "utf-8" res = res.json() if not res or 'data' not in res: raise AnyLearnException("请求未能得到有效响应") # Fetch prediction results record = ServiceRecord(id=res['data']) while record.state not in [ServiceRecordState.FINISHED, ServiceRecordState.FAILED]: record.get_detail() filename = record.inference_data_file_id if not record.result: return "", record results = json.loads(json.loads(record.result)[filename])['object'] return results, record
[文档] def _namespace(self): return "service"