123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- # -*- coding: utf-8 -*-
- """
- 知识图谱
- """
- import re
- import sys
- import math
- import time
- from .base import AipBase
- from .base import base64
- from .base import json
- from .base import urlencode
- from .base import quote
- class AipKg(AipBase):
- """
- 知识图谱
- """
- __createTaskUrl = 'https://aip.baidubce.com/rest/2.0/kg/v1/pie/task_create'
- __updateTaskUrl = 'https://aip.baidubce.com/rest/2.0/kg/v1/pie/task_update'
- __taskInfoUrl = 'https://aip.baidubce.com/rest/2.0/kg/v1/pie/task_info'
- __taskQueryUrl = 'https://aip.baidubce.com/rest/2.0/kg/v1/pie/task_query'
- __taskStartUrl = 'https://aip.baidubce.com/rest/2.0/kg/v1/pie/task_start'
- __taskStatusUrl = 'https://aip.baidubce.com/rest/2.0/kg/v1/pie/task_status'
-
- def createTask(self, name, template_content, input_mapping_file, output_file, url_pattern, options=None):
- """
- 创建任务
- """
- options = options or {}
- data = {}
- data['name'] = name
- data['template_content'] = template_content
- data['input_mapping_file'] = input_mapping_file
- data['output_file'] = output_file
- data['url_pattern'] = url_pattern
- data.update(options)
- return self._request(self.__createTaskUrl, data)
-
- def updateTask(self, id, options=None):
- """
- 更新任务
- """
- options = options or {}
- data = {}
- data['id'] = id
- data.update(options)
- return self._request(self.__updateTaskUrl, data)
-
- def getTaskInfo(self, id, options=None):
- """
- 获取任务详情
- """
- options = options or {}
- data = {}
- data['id'] = id
- data.update(options)
- return self._request(self.__taskInfoUrl, data)
-
- def getUserTasks(self, options=None):
- """
- 以分页的方式查询当前用户所有的任务信息
- """
- options = options or {}
- data = {}
- data.update(options)
- return self._request(self.__taskQueryUrl, data)
-
- def startTask(self, id, options=None):
- """
- 启动任务
- """
- options = options or {}
- data = {}
- data['id'] = id
- data.update(options)
- return self._request(self.__taskStartUrl, data)
-
- def getTaskStatus(self, id, options=None):
- """
- 查询任务状态
- """
- options = options or {}
- data = {}
- data['id'] = id
- data.update(options)
- return self._request(self.__taskStatusUrl, data)
-
|