请上传宽度大于 1200px,高度大于 164px 的封面图片
    调整图片尺寸与位置
    滚轮可以放大缩小图片尺寸,按住图片拖动可调整位置,多余的会自动被裁剪掉
取消
dreamer(uid:402522)
职业资格认证:尚未取得认证
V5版API的python类(开源,希望大家共同完善)
简道云API基本可以让简道云平台拥有无限的可能,也能充分实现开发者的思路和想法,让简道云平台更智能、更体贴、更个性。 V5版API是一个非常大的且很实用的更新,我这几天快速更新了V5版python语言的类,并且更新了原有的api代码,这真是一个大的工作量。 由于简道云用户多为简代码编程爱好者,时间、精力、能力都会受到限制,所以想着大家共享,共同完善。把我整理的类发出来,大家共同提建议,共同完善。 class APIUtils: WEBSITE = "https://api.jiandaoyun.com" RETRY_IF_LIMITED = True # 构造函数 def __init__(self, api_key): self.url_get_apps = APIUtils.WEBSITE + '/api/v5/app/list' # 用户应用查询接口 self.url_get_forms = APIUtils.WEBSITE + '/api/v5/app/entry/list' # 用户表单查询接口 self.url_get_widgets = APIUtils.WEBSITE + '/api/v5/app/entry/widget/list' # 表单字段查询接口 self.url_get_data = APIUtils.WEBSITE + '/api/v5/app/entry/data/list' # 查询多条数据接口 self.url_retrieve_data = APIUtils.WEBSITE + '/api/v5/app/entry/data/get' # 查询单条数据接口 self.url_update_data = APIUtils.WEBSITE + '/api/v5/app/entry/data/update' # 修改单条数据接口 self.url_data_batch_update = APIUtils.WEBSITE + '/api/v5/app/entry/data/batch_update' # 更新多条数据接口 self.url_create_data = APIUtils.WEBSITE + '/api/v5/app/entry/data/create' # 新建单条数据接口 self.url_batch_create_data = APIUtils.WEBSITE + '/api/v5/app/entry/data/batch_create' # 新建多条数据接口 self.url_delete_data = APIUtils.WEBSITE + '/api/v5/app/entry/data/delete' # 删除单条数据接口 self.url_batch_delete_data = APIUtils.WEBSITE + '/api/v5/app/entry/data/batch_delete' # 删除多条数据接口 self.url_get_upload_token = APIUtils.WEBSITE + '/api/v5/app/entry/file/get_upload_token' # 获取文件上传凭证和上传地址接口 self.url_workflow_approval_comments = APIUtils.WEBSITE + '/api/v1/app/{}/entry/{}/data/{}/approval_comments' # 该接口用来获取单条表单流程数据的审批意见 self.url_workflow_query = APIUtils.WEBSITE + '/api/v1/process/task/query' # 该接口用来获取用户当前待办信息 self.url_workflow_get = APIUtils.WEBSITE + '/api/v1/workflow/instance/get' # 该接口用来查询流程实例信息 self.url_workflow_close = APIUtils.WEBSITE + '/api/v1/workflow/instance/close' # 该接口用于管理员结束当前流程实例 self.url_workflow_list = APIUtils.WEBSITE + '/api/v1/workflow/task/list' # 该接口用来获取用户当前待办信息 self.url_workflow_approve = APIUtils.WEBSITE + '/api/v1/workflow/task/approve' # 该接口用来提交流程待办 self.url_workflow_rollback = APIUtils.WEBSITE + '/api/v1/workflow/task/rollback' # 该接口用来回退流程待办 self.url_workflow_transfer = APIUtils.WEBSITE + '/api/v1/workflow/task/transfer' # 该接口用来转交流程待办 self.url_get_department_data = APIUtils.WEBSITE + '/api/v5/corp/department/list' # (递归)获取部门列表 self.url_create_department = APIUtils.WEBSITE + '/api/v5/corp/department/create' # 创建部门 self.url_delete_department = APIUtils.WEBSITE + '/api/v5/corp/department/delete' # 删除部门 self.url_update_department = APIUtils.WEBSITE + '/api/v5/corp/department/update' # 修改部门 self.url_get_deptno_by_integrateid = APIUtils.WEBSITE + '/api/v5/corp/department/dept_no/get' # 根据集成模式通讯录的部门ID获取部门编号 self.url_import_department = APIUtils.WEBSITE + '/api/v5/corp/department/import' # 全量导入部门接口 self.url_get_user_retrieve = APIUtils.WEBSITE + '/api/v5/corp/user/get' # 获取成员的信息 self.url_create_user = APIUtils.WEBSITE + '/api/v5/corp/user/create' # 添加成员接口 self.url_update_user = APIUtils.WEBSITE + '/api/v5/corp/user/update' # 修改成员 self.url_get_member_list = APIUtils.WEBSITE + '/api/v5/corp/department/user/list' # (递归)获取部门成员 self.url_delete_user = APIUtils.WEBSITE + '/api/v5/corp/user/delete' # 删除成员 self.url_batch_delete_user = APIUtils.WEBSITE + '/api/v5/corp/user/batch_delete' # 批量删除成员 self.url_import_user = APIUtils.WEBSITE + '/api/v5/corp/user/import' # 增量导入成员接口 self.url_role_list = APIUtils.WEBSITE + '/api/v5/corp/role/list' # 列出角色接口 self.url_role_create = APIUtils.WEBSITE + '/api/v5/corp/role/create' # 创建一个自建角色 self.url_role_update = APIUtils.WEBSITE + '/api/v5/corp/role/update' # 更新一个自建角色 self.url_role_delete = APIUtils.WEBSITE + '/api/v5/corp/role/delete' # 删除一个自建角色 self.url_role_user_list = APIUtils.WEBSITE + '/api/v5/corp/role/user/list' # 列出角色下的成员 self.url_role_add_membersh = APIUtils.WEBSITE + '/api/v5/corp/role/add_membersh' # 为自建角色批量添加成员 self.url_role_remove_members = APIUtils.WEBSITE + '/api/v5/corp/role/remove_members' # 为自建角色批量移除成员 self.url_role_group_list = APIUtils.WEBSITE + '/api/v5/corp/role_group/list' # 列出自建角色组 self.url_role_group_create = APIUtils.WEBSITE + '/api/v5/corp/role_group/create' # 创建自建角色组 self.url_role_group_update = APIUtils.WEBSITE + '/api/v5/corp/role_group/update' # 更新自建角色组 self.url_role_group_delete = APIUtils.WEBSITE + '/api/v5/corp/role_group/delete' # 删除自建角色组 self.url_guest_department_list = APIUtils.WEBSITE + '/api/v5/corp/guest/department/list' # 列出我连接的企业 self.url_guest_member_list = APIUtils.WEBSITE + '/api/v5/corp/guest/member_list' # 列出我连接的企业的对接人 self.url_guest_user_get = APIUtils.WEBSITE + '/api/v5/corp/guest/user/get' # 获取我连接的企业对接人的详细信息 self.api_key = api_key def get_req_header(self): # 带有认证信息的请求头 return { 'Authorization': 'Bearer ' + self.api_key, 'Content-Type': 'application/json;charset=utf-8' } def send_request(self, method, request_url, data): # 发送http请求 headers = self.get_req_header() if method == 'GET': res = requests.get(request_url, params=data, headers=headers, verify=False) if method == 'POST': res = requests.post(request_url, data=json.dumps(data), headers=headers, verify=False) result = res.json() # print("result:",result) print("res:",res) print("res.status_code:",res.status_code) if res.status_code >= 400: if result == 8303 and APIUtils.RETRY_IF_LIMITED: # 5s后重试 print("频率过高") time.sleep(8) return self.send_request(method, request_url, data) elif result == 4012 and APIUtils.RETRY_IF_LIMITED: # 5s后重试 print('操作失败,当前表单正在执行其他批量编辑任务,请稍后重试') time.sleep(8) return self.send_request(method, request_url, data) elif result == 1010: print("不存在此用户") pass else: raise Exception('请求错误!', result) else: return result '''应用接口''' def get_apps(self, limit, skip): # 获取应用 data = { "limit": limit, "skip": skip } result = self.send_request('POST', self.url_get_apps, data) return result '''表单和数据接口''' def get_forms(self, appId): # 获取表单 data = { "app_id": appId } result = self.send_request('POST', self.url_get_forms, data) return result def get_form_widgets(self, appId, entryId): # 获取表单字段 data = { "app_id": appId, "entry_id": entryId } result = self.send_request('POST', self.url_get_widgets, data) return result def get_retrieve_data(self, appId, entryId, dataId): # 检索一条数据 data = { "app_id": appId, "entry_id": entryId, 'data_id': dataId } result = self.send_request('POST', self.url_retrieve_data, data) return result def get_form_data(self, appId, entryId, dataId, limit, fields, data_filter): # 根据条件获取表单中的数据 result = self.send_request('POST', self.url_get_data, { "app_id": appId, "entry_id": entryId, 'data_id': dataId, 'limit': limit, 'fields': fields, 'filter': data_filter }) # print("这是get_form_data根据查询参数查到的简道云表单的数据:",result) return result def get_all_data(self, appId, entryId, fields, data_filter): # 获取表单中满足条件的所有数据 form_data = def get_next_page(dataId): # 递归取下一页数据 data = self.get_form_data(appId, entryId, dataId, 100, fields, data_filter) if data: for v in data: form_data.append(v) dataId = data get_next_page(dataId) get_next_page('') return form_data def get_all_dataId(self, appId, entryId, data_filter): # 获取表单中满足条件的所有data_id,这个是我根据示例修改的,也应该是最稳定的 form_data = # 递归取下一页数据 def get_next_page(dataId): data = self.get_form_data(appId, entryId, dataId, 100, None, data_filter) if data: for v in data: form_data.append(v) dataId = data get_next_page(dataId) get_next_page('') print("本次查询到的data_id条数为:", len(form_data)) return form_data def get_dataId(self, appId, entryId, data_filter): # 根据字段的值查询dataid form_dataId = result = self.get_form_data(appId, entryId, None, 100, None, data_filter) if result: for v in result: temp_dataId = v # print('v中的dataId:', temp_dataId) form_dataId.append(temp_dataId) # print('添加后的form_dataId:', form_dataId) print("这是根据查询参数查到的简道云表单的dataID:", form_dataId) return form_dataId # 根据关键字段获取数据,这段代码是调试原来搜索错误的时候,想运用这一个进行纠错的,先放这儿,不删除 def get_dataId_from_dataid_and_filter(self, appId, entryId, data_id, data_filter): # 根据字段的值查询dataid form_dataId = # print("这是get_dataId_from_dataid_and_filter根据查询参数查到的简道云表单的data_filter:", data_filter) result = self.get_form_data(appId, entryId, data_id, 100, None, data_filter) # print("这是get_dataId_from_dataid_and_filter根据查询参数查到的简道云表单的result:", result) if result: for v in result: temp_dataId = v # print('v中的dataId:', temp_dataId) form_dataId.append(temp_dataId) # print('添加后的form_dataId:', form_dataId) print("这是get_dataId_from_dataid_and_filter根据查询参数查到的简道云表单的dataID:", len(form_dataId)) return form_dataId def get_form_data_from_dataId(self, appId, entryId, dataId, limit): data = { "app_id": appId, "entry_id": entryId, 'data_id': dataId, 'limit': limit } result = self.send_request('POST', self.url_get_data, data) return result def create_data(self, appId, entryId, data): # 创建一条数据 transaction_id = self.get_transaction_id() data = { "app_id": appId, "entry_id": entryId, "transaction_id": transaction_id, 'data': data, "is_start_trigger": True } result = self.send_request('POST', self.url_create_data, data) return result def batch_create_data(self, appId, entryId, data_list): # 批量新增数据 transaction_id = self.get_transaction_id() print("进入到了批量新增数据") data = { "app_id": appId, "entry_id": entryId, "transaction_id": transaction_id, "data_list": data_list } result = self.send_request('POST', self.url_batch_create_data, data) return result def update_data(self, appId, entryId, dataId, data): # 更新数据 transaction_id = self.get_transaction_id() data = { "app_id": appId, "entry_id": entryId, "transaction_id": transaction_id, 'data_id': dataId, 'data': data, 'is_start_trigger': True } result = self.send_request('POST', self.url_update_data, data) return result def batch_update_data(self, appId, entryId, data_ids, data): # 批量修改数据 transaction_id = self.get_transaction_id() print("进入到了批量修改数据") data = { "app_id": appId, "entry_id": entryId, 'transaction_id': transaction_id, 'data_ids': data_ids, 'data': data } result = self.send_request('POST', self.url_data_batch_update, data) return result def delete_data(self, appId, entryId, dataId): # 删除数据 data = { "app_id": appId, "entry_id": entryId, 'data_id': dataId, "is_start_trigger": True } result = self.send_request('POST', self.url_delete_data, data) return result def batch_delete_data(self, appId, entryId, data_ids): # 删除数据 transaction_id = self.get_transaction_id() data = { "app_id": appId, "entry_id": entryId, 'transaction_id': transaction_id, 'data_ids': data_ids } result = self.send_request('POST', self.url_batch_delete_data, data) return result '''条件修改,先根据filter的值进行查找,再一一修改''' def update_from_filter(self, appId, entryId, data, data_filter): # 判断查找的结果是否为空,如果不为空,则修改 form_dataId = self.get_all_dataId(appId, entryId, data_filter) if form_dataId: n = 100 # 将列表每100个组成一个小列表 for i in range(0, len(form_dataId), n): dataIds = form_dataId # 形成小列表,按100 self.batch_update_data(appId, entryId, dataIds, data) # num_update = 0 # for dataId in form_dataId: # print(dataId) # self.update_data(appId, entryId, dataId, data) # num_update += 1 # print("本次修改的数据条数:", num_update) # return num_update # 返回修改的条数 '''修改或新增,先根据data_filter的值进行查找,有则修改,无则新增''' def update_or_create(self, appId, entryId, data, data_filter): # 判断查找的结果是否为空,如果不为空,则修改 form_dataId = self.get_all_dataId(appId, entryId, data_filter) if form_dataId: num_update = 0 for dataId in form_dataId: print(dataId) self.update_data(appId, entryId, dataId, data) num_update += 1 print("修改的数据数:", num_update) return num_update # 返回修改的条数 else: # self.create_data(data) result = self.create_data(appId, entryId, data) print('新增数据:', result) '''成员接口''' def get_user_retrieve(self, username): # 获取成员的信息 data = { "username": username } result = self.send_request('POST', self.url_get_user_retrieve, data) return result def get_user_department(self, username): # 获取成员的部门 data = { "username": username } result = self.send_request('POST', self.url_get_user_retrieve, data) return result def create_user(self, username, name, departments): # 添加成员接口 data = { "username": username, "name": name, "departments": departments } result = self.send_request('POST', self.url_create_user, data) return result def update_user_department_name(self, username, name, departments): # 更新成员部门和呢称 data = { "username": username, "name": name, "departments": departments } result = self.send_request('POST', self.url_update_user, data) print(result) if result is None: return else: return result def delete_user(self, username): # 删除成员 data = { "username": username } result = self.send_request('POST', self.url_delete_user, data) return result def batch_delete_user(self, usernames): # 批量删除成员 data = { "usernames": usernames } result = self.send_request('POST', self.url_batch_delete_user, data) return result def import_user(self, users): # 批量导入成员 data = { "users": users } result = self.send_request('POST', self.url_import_user, data) return result def get_member_list(self, department_id): # (递归)获取部门成员 data = { "dept_no": department_id, "has_child": True } result = self.send_request('POST', self.url_get_member_list, data) return result '''部门接口''' def get_department_list(self, department_id): # (递归)获取部门列表 data = { "dept_no": department_id, "has_child": True } result = self.send_request('POST', self.url_get_department_data, data) return result def create_department(self, data): # 创建部门 # data = { # "name": "研发部门", # "parent_no": 1, # "dept_no": 2 # } result = self.send_request('POST', self.url_create_department, data) return result def update_department(self, data): # 修改部门 # data = { # "name": "研发部门", # "parent_no": 1 # } result = self.send_request('POST', self.url_update_department, data) return result def delete_department(self, dept_no): # 删除部门 data = { "dept_no": dept_no } result = self.send_request('POST', self.url_delete_department, data) return result def get_deptno_by_integrateid(self, integrate_id): # 获取集成模式部门编号 data = { "integrate_id": integrate_id } result = self.send_request('POST', self.url_get_deptno_by_integrateid, data) return result def import_department(self, departments): # 全量导入部门 data = { "departments": departments } result = self.send_request('POST', self.url_import_department, data) return result '''角色接口''' def list_role(self, skip, limit): # 列出角色 data = { "skip": skip, "limit": limit, "has_internal": "Ture", "has_sync": "Ture" } result = self.send_request('POST', self.url_role_list, data) return result def create_role(self, group_no, name): # 只能创建内部自建角色。支持公共模式和集成模式 data = { "group_no": group_no, "name": name } result = self.send_request('POST', self.url_role_create, data) return result def update_role(self, data): # 对自建角色进行更新。支持公共模式和集成模式 # data = { # "role_no":2558, # "group_no": 2547, # "name": "研发部门" # } result = self.send_request('POST', self.url_role_update, data) return result def delete_role(self, role_no): # 删除一个自建角色 data = { "role_no": role_no } result = self.send_request('POST', self.url_role_delete, data) return result def list_user_role(self, skip, limit, role_no): # 删除一个自建角色 data = { "skip": skip, "limit": limit, "role_no": role_no } result = self.send_request('POST', self.url_role_user_list, data) return result def add_membersh_role(self, role_no, usernames): # 删除一个自建角色 data = { "role_no": role_no, "usernames": usernames } result = self.send_request('POST', self.url_role_add_membersh, data) return result def remove_members_role(self, role_no, usernames): # 将成员从角色中批量移除。支持公共模式和集成模式 data = { "role_no": role_no, "usernames": usernames } result = self.send_request('POST', self.url_role_remove_members, data) return result '''角色组接口''' def role_group_list(self, skip, limit): # 将角色组全部拉取出来。支持公共模式和集成模式。 data = { "skip": skip, "limit": limit, "has_internal": "Ture", "has_sync": "Ture" } result = self.send_request('POST', self.url_role_group_list, data) return result def role_group_create(self, name): # 通过接口新建角色组。支持公共模式和集成模式。 data = { "name": name } result = self.send_request('POST', self.url_role_group_create, data) return result def role_group_update(self, role_group_no, name): # 更新创建好的角色组信息。支持公共模式和集成模式。 data = { "role_group_no": role_group_no, "name": name } result = self.send_request('POST', self.url_role_group_update, data) return result def role_group_delete(self, role_group_no): # 更新创建好的角色组信息。支持公共模式和集成模式。 data = { "role_group_no": role_group_no } result = self.send_request('POST', self.url_role_group_delete, data) return result '''企业互联接口''' def guest_department_list(self, dept_no): # 能够获取所有外部部门的列表,dept_on空值,返回所有外部部门 data = { "dept_no": dept_no } result = self.send_request('POST', self.url_guest_department_list, data) return result def guest_member_list(self, dept_no): # 能够获取所有外部部门的列表,dept_on空值,返回全部 data = { "dept_no": dept_no } result = self.send_request('POST', self.url_guest_member_list, data) return result def guest_user_get(self, username): # 获取某个 对接人 的信息详细信息。 data = { "username": username } result = self.send_request('POST', self.url_guest_user_get, data) return result def get_transaction_id(self): import uuid uid = str(uuid.uuid1()) suid = ''.join(uid.split('-')) return suid
简道云字段为空值时的判断(python版)
书写简道云API时,时常对字段空值判断,今天统一做了一下测试,给大家分享: 1.时间字段为空值时 print(data) # 输出的值为None if data=="": print(1) elif data is None: print(2) # 输出的值为2  2.单行文本为空时 print(data) # 输出为空行 if data == "": print(1) elif data is None: print(2) # 输出的值为1  3.数字为空值时 print("shuzi:",data) # 输出为None if data == "": print(1) elif data is None: print(2) # 输出的值为2 4.成员单选为空值时 print("chengyuan_danxuan:",data) #输出为None if data == "": print(1) elif data is None: print(2) # 输出的值为2  5.地址字段为空值时 print("dizhi:",data) # 输出的值为{} if data: print("地址非空") elif not data: print("地址为空") # 输出的值为 地址为空  6.复选框组为空值时 print("fuxuan_kuangzu:",data) # 输出为 if data: print("复选框组非空") elif not data: print("复选框组为空") # 输出的值为 复选框组为空  7.成员多选为空值时 print("chengyuan_duoxuan:",data) # 输出为 if len(data)>0: print("成员多选非空") elif len(data)== 0: print("成员多选为空") # 输出的值为 成员多选为空  8.图片为空值时 print("tupian:",data) # 输出为 if data: print("图片非空") elif not data: print("图片为空") # 输出的值为 图片为空  9.定位为空值时 print("dingwei:",data) # 输出的值为{} if data: print("定位非空") elif not data: print("定位为空") # 输出的值为 定位为空
企业微信+微信+简道云+API:创建贴心实用的学生请假系统(二)之技术方案
上次假期,制作了一个学生请假系统,并写了一下实施思路,大家比较喜欢,一直想写技术细节,现在暑假又过了一半,对系统也做了修正,把写技术细节的愿望实现一下。下面的链接是上次的文章 企业微信+微信+简道云+API:创建贴心实用的学生请假系统 接下来是本次书写的技术方案。 01 学生的信息库建立 1.从企业微信家校通讯录导出学生的信息,获取student_userId。点击下图位置导出家校通讯录。   2.打开导出的通讯录,可以看到下图中的学生账号。     3.在简道云中建立数据表单,方便查看和修改。数据库中含学生的身份证号码,供以后家长绑订学生时使用。     4.建立mysql数据表单 5.把导出的student_userId和学生的学号信息精准对应,分别导入到简道云和mysql数据库中 6.简道云学生数据表单,新建数据推送,和mysql数据库表单实现实时同步 02 云家校主页 1.主页预览   2.表单属性设置及主页选项卡       3.打卡记录选项卡     4.请假记录选项卡       5.表单外链样式     03 家长绑订学生 1.微信绑订学生表单 (1)界面   (2)表单字段       (3)表单添加的智能助手       2.微信绑订学生数据库表单 (1)界面 (2)字段     3.删除我绑订的学生   (1)界面     (2)字段   (3)添加的智能助手 04 学生请假流程表单 1.界面   2.流程设定 3.新建数据推送 06 学校返校前上传核酸证明 1.界面 2.表单字段 3.添加智能助手 06 发送家长通知的代码 1.接收请假表单数据推送的网络路径代码 @jiandaoyun.route('/app1/', methods = )# 这一段接收数据的语句不用修改def hello_world():    # 从简道云表单根据A表单动作触发获取值    sj = json.loads(request.data)    # print(sj)    op = sj    data = sj    if op == "data_test": # 测试触发的这个语句        print("测试成功")        return '200'    else:        # 多线程进行数据处理程序        threading.Thread(target = Processing_data().from_to_defname, args = (op, data,)).start()        return 'success', 200 2.处理接收到学生请假表单推送的数据代码 class Processing_data(object):    def create_tables(self):        pass    def from_to_defname(self, op, data):        appId_entryId = data + data        # 学习调试—智能助手案例-A表        if appId_entryId == "5c44742d930c2878a14fcb885ec227cdd67f73000676c419":            self.main1(op, data)    def main1(self,op,data):        print("疫情防控——学生请假")        print("op:",op)        print("data:", data)        banzhuren_yijian=data        banzhuren_name=data        xuehao_xingming=data        qingjia_yuanyin=data        print(qingjia_yuanyin)        hehaizi_guanxi=data        print(hehaizi_guanxi)        jiazhang_shouji=data        print(jiazhang_shouji)        start_time=data        start_wubie=data        end_time=data        end_wubie=data        shifou_hesuan = data        fanxiao_shijian = data        print(start_time)        Jiaxiaogoutong().send_xuexiaotongzhi_xueshengqingjia(xuehao_xingming,op,qingjia_yuanyin,hehaizi_guanxi,jiazhang_shouji,start_time,start_wubie,end_time,end_wubie,banzhuren_name,banzhuren_yijian,shifou_hesuan,fanxiao_shijian)class Jiaxiaogoutong(): # 这个是家校沟通发送消息的功能    def send_xuexiaotongzhi_xueshengkaoqin(self,xuehaoxingming_list,wubie,chuqin_jilu,shangbaoren_T):        riqi = datetime.datetime.today().strftime("%Y-%m-%d")        sql_sel_stu_userid = "SELECT stu_userid FROM student_info WHERE xuehao_xingming = %s LIMIT 1"        print("xuehaoxingming_list:",xuehaoxingming_list)        for xuehao_xingming in xuehaoxingming_list:            userids=MySQLUtil().select_one_none(sql_sel_stu_userid, xuehao_xingming)            print("userids:",userids)            if userids is not None:                xingming = xuehao_xingming                WeChat().msg_text_school(xingming,userids,riqi,wubie,chuqin_jilu,shangbaoren_T)    def send_xuexiaotongzhi_xueshengqingjia(self,xuehao_xingming,op,qingjia_yuanyin,hehaizi_guanxi,jiazhang_shouji,start_time,start_wubie,end_time,end_wubie,banzhuren_name,banzhuren_yijian,shifou_hesuan,fanxiao_shijian):        riqi = datetime.datetime.today().strftime("%Y-%m-%d")        start_time=JdyFuctionUtils().get_local_time(start_time)        print(start_time)        start_time=start_time.strftime("%Y-%m-%d")+start_wubie        end_time=JdyFuctionUtils().get_local_time(end_time)        end_time=end_time.strftime("%Y-%m-%d")+end_wubie        sql_sel_stu_userid = "SELECT stu_userid FROM student_info WHERE xuehao_xingming = %s LIMIT 1"        userids=MySQLUtil().select_one_none(sql_sel_stu_userid, xuehao_xingming)        print("userids:",userids)        print(fanxiao_shijian)        if op == "data_create":            if userids is not None:                xingming = xuehao_xingming                WeChat().msg_text_school_qingjia_satrt(xingming,userids,riqi,qingjia_yuanyin,hehaizi_guanxi,jiazhang_shouji,start_time,end_time)        elif op == "data_update" and fanxiao_shijian is None:            if userids is not None:                xingming = xuehao_xingming                WeChat().msg_text_school_qingjia_end(xingming,userids,riqi,qingjia_yuanyin,hehaizi_guanxi,jiazhang_shouji,start_time,end_time,banzhuren_name,banzhuren_yijian,shifou_hesuan) 3.根据相关流程,发送家长通知的代码 # !/usr/bin/env python# -*- coding: utf-8 -*-'''1.这是处理学生请假,给家长发送通知的文件'''from flask import Flask, request, render_templateimport http.clientimport datetime,timefrom datetime import timedeltafrom apscheduler.schedulers.blocking import BlockingSchedulerimport requestsimport jsonimport threadingimport mathfrom wx_util.yiqingdaka import WX_ARGimport osimport syscurrent_directory = os.path.dirname(os.path.abspath(__file__))sys.path.append("../")'''这是企业微信的类'''class WeChat(object):    '''    下面是构造函数    1.成员函数的函数名可以自定义,但是,构造函数的函数名是固定的__init__    2.成员函数需要被手动调用,但是,构造函数在创建对象的过程中是自动被调用的对于同一个对象而言    3.成员函数可以被调用多次,但是,构造函数只能被调用一次    4.构造函数的特点:创建对象;给对象的成员变量赋值    5.构造函数也被称为构造器,当创建对象的时候第一个被自动调用的函数    '''    def __init__(self):        self.CORPID = WX_ARG.corpid # 企业ID,在管理后台获取        self.CORPSECRET = WX_ARG.corpsecret # 自建应用的Secret,每个自建应用里都有单独的secret        self.TOUSER = ""  # 接收者用户名,多个用户用|分割        self.CORPNAME = WX_ARG.corpname # 给应用起一个名字,方便在后面写入access_token时调用        self.access_token_filename=self.CORPNAME+'_access_token.conf'    '''获取我的企业微信中家校通讯录的Access Token'''    def _get_access_token(self):        url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken'        values = {'corpid': self.CORPID,                  'corpsecret': self.CORPSECRET,                  }        req = requests.post(url, params=values)        data = json.loads(req.text)#转换为json格式        # print(data)        return data#返回data中的"access_token"    def get_access_token(self):        cur_time = time.time()#获取现在的时间        # print('执行的foxtable access_token')        try:            # print('执行获取access_token的try,读取文件内的Access_token,进行判断')            with open(current_directory +'/temp/'+self.access_token_filename+'', 'r') as f:#读取文件中的t即上次获取access token的时间和access token的值                t, access_token = f.read().split()#以空格为分格符,分别给t和access_token赋值                # print(t)                # print(access_token)            if 0 < cur_time - float(t) < 7260:#判断access token是否在有效期内,如果是,则继续使用access token                # print('access_token在有效范围内')                # print(access_token)                return access_token            else:                with open(current_directory +'/temp/'+self.access_token_filename+'', 'w') as f:#否则,先打开文件,权限可以编辑                    access_token = self._get_access_token()#获取access token                    f.write('\t'.join())#把获取到的当前时间和Access token写入文件                    return access_token#返回access token的值        except:#如果是第一次运行,文件中为空值,则运行此语句,获取access token的值            with open(current_directory +'/temp/'+self.access_token_filename+'', 'w') as f:                access_token = self._get_access_token()                cur_time = time.time()                f.write('\t'.join())                # print("执行获取access_token的except")                return access_token    def send_school(self,data_str):  # 发送学校通知        return self.post_request_send(WX_ARG.url_send_xuexiaotongzhi.format(self.get_access_token()), data_str)    def get_rquest(self,url):#所有的get请求        try:            conn = http.client.HTTPSConnection(WX_ARG.url)            conn.request('GET', url)            response = conn.getresponse()            # print(response)            arg = eval(response.read().decode())            # print(arg)            return arg        except:            # print(sys.exc_info())            return None        finally:            conn.close()    def post_request(self,url, data_str):#所有的post请求        # print("进入post环节")        try:            # print("进入try")            conn = http.client.HTTPSConnection(WX_ARG.url)            conn.request('POST', url, data_str)            response = conn.getresponse()            # print('response:', response)            arg = eval(response.read().decode())            # print('arg:', arg)            return arg        except:            # print("进入except")            print(sys.exc_info())            return None        finally:            conn.close()    def post_request_send(self,url, data_str):#所有的post请求        try:            conn = http.client.HTTPSConnection(WX_ARG.url)            print("进入尝试发送环节")            conn.request('POST', url, data_str)            response = conn.getresponse() #值为            print("发送后的返回值",response)            arg = eval(response.read().decode()) #转换后为{'errcode': 0, 'errmsg': 'ok', 'invaliduser': ''}            print(arg)            return arg        except:            print(sys.exc_info())#正常情况下为(None, None, None)0的值也为None            return None        finally:            conn.close()    def msg_text_school(self, xingming,userids,riqi,wubie,chuqin_jilu,shangbaoren_T):        send_values = {            "to_student_userid": ,            "msgtype" : "text",            "agentid": WX_ARG.corpagentld,            "text" : {                "content" : "您宝贝的课后服务考勤\n姓名:"+ xingming + "\n日期:"+ riqi + "\n午别:"+ wubie + "\n考勤:"+ chuqin_jilu + "\n教师:"+ shangbaoren_T +"\n\n点击可查看考勤详情\n"            }        }        # print("进入发送环节")        send_msges = (bytes(json.dumps(send_values), 'utf-8'))        self.send_school(send_msges)    def msg_text_school_qingjia_satrt(self,xingming,userids,riqi,qingjia_yuanyin,hehaizi_guanxi,jiazhang_shouji,start_time,end_time):        send_values = {            "to_student_userid": ,            "msgtype": "text",            "agentid": WX_ARG.corpagentld,            "text": {                "content":"" + xingming + "的家长发起了请假申请\n学生姓名:"+ xingming + "\n开始时间:"+ start_time + "\n结束时间:"+ end_time + "\n请假原因:"+ qingjia_yuanyin + "\n发起人:"+ xingming + "" + hehaizi_guanxi + "\n发起人手机:"+ jiazhang_shouji +"\n查看更多\n"            }        }        # print("进入发送环节")        send_msges = (bytes(json.dumps(send_values), 'utf-8'))        self.send_school(send_msges)    def msg_text_school_qingjia_end(self,xingming,userids,riqi,qingjia_yuanyin,hehaizi_guanxi,jiazhang_shouji,start_time,end_time,banzhuren_name,banzhuren_yijian,shifou_hesuan):        if shifou_hesuan == "是":            send_values = {                "to_student_userid": ,                "msgtype": "text",                "agentid": WX_ARG.corpagentld,                "text": {                    "content": ""+ xingming + "的请假审批意见\n班主任:"+ banzhuren_name + "\n班主任意见:"+ banzhuren_yijian +",您的学生返校前需要点击击本链接,提交核酸检测阴性证明\n学生姓名:"+ xingming + "\n假期开始:"+ start_time + "\n假期结束:"+ end_time + "\n请假原因:"+ qingjia_yuanyin + "\n发起人:"+ xingming + "" + hehaizi_guanxi + "\n发起人手机:"+ jiazhang_shouji +"\n"                }            }        else:            send_values = {                "to_student_userid": ,                "msgtype": "text",                "agentid": WX_ARG.corpagentld,                "text": {                    "content": ""+ xingming + "的请假审批意见\n班主任:" + banzhuren_name + "\n班主任意见:" + banzhuren_yijian + "\n\n学生姓名:" + xingming + "\n假期开始:" + start_time + "\n假期结束:" + end_time + "\n请假原因:" + qingjia_yuanyin + "\n发起人:" + xingming +""+ hehaizi_guanxi + "\n发起人手机:" + jiazhang_shouji + "\n查看更多\n"                }            }        # print("进入发送环节")        send_msges = (bytes(json.dumps(send_values), 'utf-8'))        self.send_school(send_msges) WX_ARG.py文件内容 # -*- coding: utf-8 -*-'''请假审批的应用修改需要的值,就在这个地方进行修改'''corpid = '*******'corpsecret = '******'corpname = '******'corpagentld = '*******' # 疫情打卡的id号url = 'qyapi.weixin.qq.com''''base最基本的链接'''url_send_xuexiaotongzhi = '/cgi-bin/externalcontact/message/send?access_token={}''''发送学校通知'''
个人成就
内容被浏览23,534
加入社区4年192天
返回顶部