1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
| import os import time import warnings from datetime import datetime from http.client import responses
import harbor_client import yaml from harbor_client.rest import ApiException
class DoHarbor(object):
def __init__(self): self.dirname = os.path.dirname(os.path.abspath(__file__)) self.configuration = harbor_client.Configuration() self.projectTemplate = {} self.templateProject = ""
def loadConfig(self): with open(os.path.join(self.dirname, "config", "HarborConfig.yaml"), encoding="UTF-8") as file: data = yaml.safe_load(file) self.configuration.host = data["host"] self.configuration.username = data["username"] self.configuration.password = data["password"] self.projectTemplate = data["project"] if self.projectTemplate["public"] == 'false': self.projectTemplate["public"] = False elif self.projectTemplate["public"] == 'true': self.projectTemplate["public"] = True self.projectTemplate["metadata"]["retention_id"] = None self.templateProject = data["templateProject"]
def getHealth(self): api_instance = harbor_client.HealthApi(harbor_client.ApiClient(self.configuration)) try: result = api_instance.get_health(async_req=True).get() anyParts = {k.name: k.status for k in result.components} anyParts["overall"] = result.status return anyParts except ApiException as e: return f"Error Reason: {e}"
def getProjects(self): api_instance = harbor_client.ProjectApi(harbor_client.ApiClient(self.configuration)) try: return api_instance.list_projects(async_req=True).get() except ApiException as e: return f"Error Reason: {e}"
def getProjectIdOrName(self, project_name_or_id): ''' :param project_name_or_id: 项目的id或名称 :return: 项目的基本内容,可用于创建项目 ''' api_instance = harbor_client.ProjectApi(harbor_client.ApiClient(self.configuration)) try: return api_instance.get_project(project_name_or_id, async_req=True).get().to_dict() except ApiException: return "-"
def cheackProjects(self, project_name): ''' :param project_name: 项目名 :return: NoneType if exists else "-" ''' api_instance = harbor_client.ProjectApi(harbor_client.ApiClient(self.configuration)) try: return api_instance.head_project(project_name, async_req=True).get() except ApiException: return "-"
def createProject(self, project): api_instance = harbor_client.ProjectApi(harbor_client.ApiClient(self.configuration)) try: return api_instance.create_project(project, async_req=True).get() except ApiException: return "-"
def listProjectMetadata(self, project_name_or_id): api_instance = harbor_client.ProjectMetadataApi(harbor_client.ApiClient(self.configuration)) try: return api_instance.list_project_metadatas(project_name_or_id, async_req=True).get() except ApiException: return "-"
def listRepository(self, project_name): api_instance = harbor_client.RepositoryApi(harbor_client.ApiClient(self.configuration)) try: return api_instance.list_repositories(project_name, async_req=True).get() except ApiException: return []
def updateRepository(self, project_name, repository): api_instance = harbor_client.RepositoryApi(harbor_client.ApiClient(self.configuration)) try: return api_instance.update_repository(project_name,repository["name"], repository, async_req=True).get() except ApiException as e: return f"Error Reason: {e}"
def listArtifacts(self, project_name, repository_name): api_instance = harbor_client.ArtifactApi(harbor_client.ApiClient(self.configuration)) try: return api_instance.list_artifacts(project_name, repository_name, async_req=True).get() except ApiException: return []
def deleteArtifact(self, project_name, repository_name, tag_name): api_instance = harbor_client.ArtifactApi(harbor_client.ApiClient(self.configuration)) try: return api_instance.delete_artifact(project_name, repository_name, tag_name, async_req=True).get() except ApiException: return "-"
def getRetention(self, retention_id): api_instance = harbor_client.RetentionApi(harbor_client.ApiClient(self.configuration)) try: return api_instance.get_retention(retention_id, async_req=True).get() except ApiException: return "-"
def getRetentionMetadata(self): warnings.warn("此方法用处不大,请忽视", DeprecationWarning) api_instance = harbor_client.RetentionApi(harbor_client.ApiClient(self.configuration)) try: return api_instance.get_rentenition_metadata(async_req=True).get() except ApiException: return "-"
def createRetention(self, retention): api_instance = harbor_client.RetentionApi(harbor_client.ApiClient(self.configuration)) try: return api_instance.create_retention(retention, async_req=True).get() except ApiException as e: return f"Error Reason: {e}"
def getRobotsInProjectIdOrName(self, project_name_or_id): api_instance = harbor_client.Robotv1Api(harbor_client.ApiClient(self.configuration)) try: return api_instance.list_robot_v1(project_name_or_id, async_req=True).get() except ApiException: return []
def getRobotsInProjectIdOrNameAndRobotId(self, project_name_or_id, robot_id): api_instance = harbor_client.Robotv1Api(harbor_client.ApiClient(self.configuration)) try: return api_instance.get_robot_by_idv1(project_name_or_id, robot_id, async_req=True).get() except ApiException: return "-"
def createRobotInProjectIdOrName(self, project_name_or_id, robot): api_instance = harbor_client.Robotv1Api(harbor_client.ApiClient(self.configuration)) try: return api_instance.create_robot_v1(project_name_or_id, robot, async_req=True).get() except ApiException: return "-"
def updateRobotInProjectIdOrNameAndRobotId(self, project_name_or_id, robot): warnings.warn("此方法无法真实修改机器人的过期时间,请忽视", DeprecationWarning) api_instance = harbor_client.Robotv1Api(harbor_client.ApiClient(self.configuration)) try: return api_instance.update_robot_v1(project_name_or_id, robot["id"], robot, async_req=True).get() except ApiException: return "-" def updateRobot(self,robot): api_instance = harbor_client.RobotApi(harbor_client.ApiClient(self.configuration)) try: return api_instance.update_robot(robot["id"], robot, async_req=True).get() except ApiException: return "-"
def deleteRobotInProjectIdOrNameAndRobotId(self, project_name_or_id, robot_id): api_instance = harbor_client.Robotv1Api(harbor_client.ApiClient(self.configuration)) try: return api_instance.delete_robot_v1(project_name_or_id, robot_id, async_req=True).get() except ApiException: return "-"
def saveRobotMessage(self, robot_id, robot_name, robot_secret): with open(os.path.join(self.dirname,"secrets",robot_name+".log"),"w+",encoding="utf-8") as file: file.write(f"robotId:{robot_id}\n") file.write(f"robotName:{robot_name}\n") file.write(f"robotSecret:{robot_secret}\n")
|