说明:
因部分代码涉及到敏感信息,做了些微调,使用时请自行补充相关内容;另本篇内容建议大家仅做参考。
特别说明:部分代码基于网上内容修订。
补充:
导航:云函数&前端事件 内容集
试卷类综合函数(参考)
# -*- coding: utf8 -*-
import json
import requests
import random
class Factory:
def __init__(self,tkl,ctl,xx,da):
self.release={}
self.release["处理状态"]="反馈正常"
if not((len(xx)>0 and len(da)>0) or (len(tkl)>0 and len(ctl)>0)):
self.release["处理状态"]="参数无效"
if len(ctl)>0:
if len(tkl)==0 or int(tkl)<int(ctl):
tkl=ctl
self.tkl=int(tkl)
self.ctl=int(ctl)
self.ctxl()
if len(xx)>0 and len(da)>0:
self.release["cs1"]="xxda"
self.xx=[i for i in xx.split("###") if i !='']
self.da=[i for i in da.split("###") if i !='']
self.yy=self.xx
self.sjxx()
def ctxl(self):
self.release["试卷题号"]=list(range(1,int(self.ctl)+1))
self.release["按序编号"]=list(range(1,int(self.ctl)+1))
self.release["随机编号"]=random.sample(range(1, int(self.tkl)+1), int(self.ctl))
def csxx(self):
self.release["cs4"]=self.xx
def sjxx(self):
self.yy=self.xxmx(self.xx)
da=[i for i in self.damx(self.yy).split("###") if i!=""]
self.release["当前答案"]=da
self.yy=self.cznr(self.yy)
self.release["当前选项"]=self.yy
def xxmx(self,xx):
x=0
s=""
self.release["cs2-1"]=xx
while x<len(xx):
self.release["cs2-2"]=x
yy=[i for i in xx[x].split("^^^") if i !='']
self.release["cs2-3"]=yy
y=0
random.shuffle(yy)
while y<len(yy):
self.release["cs2-4"]=y
s=s+yy[y]+"^^^"
y=y+1
s=s+"###"
x=x+1
self.release["cs2"]="xxmx"
return s
def damx(self,yy):
x=0
s=""
yy=[i for i in yy.split("###") if i !='']
while x<len(yy):
yyy=[i for i in yy[x].split("^^^") if i !='']
da=[i for i in self.da[x].split("^^^") if i !='']
y=0
ss=""
while y<len(da):
self.release["cs3-1"]="damx"
ss=ss+chr(yyy.index(da[y])+65)
y=y+1
ss=list(ss)
ss.sort()
ss="".join(ss)
s=s+ss+"###"
x=x+1
self.release["cs3"]="damx"
return s
def cznr(self,x):
q={}
q["a"]=[i for i in x.split("###") if i !=""]
print("b2:",q,len(q))
l=len(q["a"])
s=""
b=0
while b<l:
c=[i for i in q["a"].split("^^^") if i != ""]
d=0
while d<len(c):
s=s+chr(d+65)+". "+c[d]+"\n"
d=d+1
s=s+"###"
print("b3:",s,b)
b=b+1
return [i for i in s.split("###") if i != ""]
def Release(self):
return {
"isBase64Encoded": False,
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(self.release)
}
def main_handler(event, context):
# 处理获取到的参数
H=event['headers']
S=event["queryString"]
if "tkl" in H:
tkl= H["tkl"]
else:
tkl = ""
if "ctl" in H:
ctl= H["ctl"]
else:
ctl = ""
if "xx" in S:
xx = S["xx"]
else:
xx = ""
if "da" in S:
da = S["da"]
else:
da = ""
chuli = Factory(tkl,ctl,xx,da)
# 返回处理结果
return chuli.Release()
抖音解析函数(参考)
基于网络源码修订
# -*- coding: utf8 -*-
import json
import re
import requests
class Douyin:
def __init__(self, url):
self.__url = url
self.__aweme = "https://www.iesdouyin.com/web/api/v2/aweme/iteminfo/?item_ids="
self.__title = ""
self.__author = ""
self.__mp3_title = ""
self.__mp3_url = ""
self.__mp4_url = ""
self.__void_url = ""
def __request(self):
share = requests.get(self.__url).url
void_id = re.search(r'video/.*?p', share)
str_id = void_id.group()
str_id = str_id.replace("video/", "").replace("?p", "")
self.__str_id = str_id
def request_vide(self):
headers = {'user-agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.131 Safari/537.36'}
self.__request()
dow_url = self.__aweme + self.__str_id
print("dow_url------",dow_url)
date = requests.get(dow_url,headers=headers).json()
print("datel------",date)
self.__title = date['item_list'][0]['desc']
self.__author = date['item_list'][0]['author']['nickname']
self.__mp3_title = date['item_list'][0]['music']['title']
self.__mp3_url = date['item_list'][0]['music']['play_url']['uri']
video_url = str(date['item_list'][0]['video']['play_addr']['url_list'][0])
video_url = video_url.replace("playwm", "play")
self.__void_url = video_url
def response(self):
self.__mp4_url = requests.get(self.__void_url).url
print("SELFMP4URL------",self.__mp4_url)
return {
"isBase64Encoded": False,
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({'mp3_title': self.__mp3_title, 'mp3_url': self.__mp3_url, 'video_url': self.__mp4_url, 'title': self.__title, 'author': self.__author})
}
def main_handler(event, context):
url = event['queryString']['cs']
douyin = Douyin(url)
douyin.request_vide()
return douyin.response()
群通知函数(参考)
注意事项:
1、在钉钉群或企业微信群中创建群机器人;
2、获取 webhook 等信息,这个要自己保存好,泄露有安全风险;
# 企业微信群通知
import requests
import json
def main_handler(event, context):
headers = {"Content-Type": "text/plain"}
url = event['queryString']['webhook']
ss = event['queryString']['text']
s=""
l=0
while l<len(ss.split("###")):
s=s+"\n"+ss.split("###")[l]+"\n"
l=l+1
data = {
"msgtype": "text",
"text": {
"content": s,
}
}
requests.post(url, headers=headers, json=data)
r={}
r["release"]="ok"
return {
"isBase64Encoded": False,
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(r)
}
# 钉钉群通知
import requests
import json
def main_handler(event, context):
headers = {'Content-Type': 'application/json'}
url = event['queryString']['webhook']
ss= event['queryString']['text']
s=""
l=0
while l<len(ss.split("###")):
s=s+"\n"+ss.split("###")[l]+"\n"
l=l+1
s=s + "来自:微分享" # 关键词
data = {
"msgtype": "text",
"text": {
"content": s,
}
}
requests.post(url, headers=headers, json=data)
r={}
r["release"]="ok"
return {
"isBase64Encoded": False,
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(r)
}
邮件通知函数(参考)
参考资料:
https://www.52pojie.cn/thread-1286071-1-1.html
# -*- coding: utf8 -*-
import json
import requests
import urllib.parse
from smtplib import SMTP_SSL
from email.mime.text import MIMEText
def main_handler(event, context):
#功能配置
mail_from = "xxxxxxx@qq.com" #发送者邮箱
pwd ="password" #授权码
ss = SMTP_SSL("smtp.qq.com") #邮箱服务器
#发送内容
mail_to = urllib.parse.unquote(event['queryString']["m"])
subject = urllib.parse.unquote(event['queryString']["s"])
content = urllib.parse.unquote(event['queryString']["c"].replace("###","\n"))
print("mail_to:",mail_to)
print("subject:",subject)
print("content:",content)
#邮件发送
msg = MIMEText(content)
msg['Subject'] = subject
msg['From'] = mail_from
msg['To'] = mail_to
ss.login(mail_from,pwd)
ss.sendmail(mail_from,mail_to,msg.as_string())
ss.close()
#发送状态
try:
r="success send"
except:
r="error"
return{
"isBase64Encoded": False,
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(r)
}
外连数据库函数(参考)
参考资料:
https://www.cnblogs.com/jiahuasir/p/10728045.html
https://blog.csdn.net/qq_39905917/article/details/88981867
import json
import requests
import pymysql
def tianjia(Mn,Mt): # 添加数据
r = {}
db = pymysql.connect(host='数据库地址', port=端口号,user='用户名',passwd='密码',db="数据库名",charset='utf8',cursorclass=pymysql.cursors.DictCursor)
cursor=db.cursor() #获取数据库的操作游标
sql="insert into 数据表名(myname,mytext) values(%s,%s)"
try:
cursor.execute(sql,(Mn,Mt))
db.commit() #这个方法才是真正将语句提交到数据库执行的方法
r["release"] = "添加成功"
r["myname"] = Mn
except:
print("failed")
db.rollback() #回滚
r["release"] = "添加失败"
db.close()
return r
def gengxin(Mn,Mt): # 更新数据
r = {}
db = pymysql.connect(host='数据库地址', port=端口号,user='用户名',passwd='密码',db="数据库名",charset='utf8',cursorclass=pymysql.cursors.DictCursor)
cursor=db.cursor() #获取数据库的操作游标
sql="update 数据表名 set mytext = %s where myname = %s"
try:
if cursor.execute(sql,(Mt,Mn)):
db.commit()
r["release"] = "更新成功"
r["mytext"] = Mt
except:
db.rollback()
r["release"] = "更新失败"
db.close()
return r
def tiqu(): # 提取数据
r = {}
db = pymysql.connect(host='数据库地址', port=端口号,user='用户名',passwd='密码',db="数据库名",charset='utf8',cursorclass=pymysql.cursors.DictCursor)
cursor=db.cursor() #获取数据库的操作游标
sql="select * from 数据表名"
try:
Mn=[]
Mt=[]
if cursor.execute(sql):
print("Count:",cursor.rowcount) #查询出的总条数
one=cursor.fetchone() #获取结果的第一条数据
print("One:",one)
result=cursor.fetchall() #获取所有结果
for row in result:
print(row)
Mn.append(row["myname"])
Mt.append(row["mytext"])
r["myname"]=list(reversed(Mn))
r["mytext"]=list(reversed(Mt))
r["release"]= "提取成功"
except:
r["release"]= "提取失败"
db.close()
return r
def chaxun(Mn): # 查询数据
r = {}
db = pymysql.connect(host='数据库地址', port=端口号,user='用户名',passwd='密码',db="数据库名",charset='utf8',cursorclass=pymysql.cursors.DictCursor)
cursor=db.cursor() #获取数据库的操作游标
sql="select * from 数据表名"
try:
t={}
if cursor.execute(sql):
result=cursor.fetchall() #获取所有结果
for row in result:
#print(row)
t[row["myname"]]=row["mytext"]
r["mytext"]=t[Mn]
r["release"]= "查询成功"
except:
r["release"]= "查询失败"
r["mytext"]="未查询到此相关内容"
db.close()
return r
def shanchu(Mn): # 删除数据
r = {}
db = pymysql.connect(host='数据库地址', port=端口号,user='用户名',passwd='密码',db="数据库名",charset='utf8',cursorclass=pymysql.cursors.DictCursor)
cursor=db.cursor() #获取数据库的操作游标
table="数据表名"
condition="myname="+chr(34)+Mn+chr(34)
sql="delete from {table} where {condition}".format(table=table,condition=condition)
print("sql:",sql)
try:
if cursor.execute(sql):
db.commit()
r["release"]= "删除成功"
r["myname"] = Mn
except:
db.rollback()
r["release"]= "删除失败"
return r
def main_handler(event, context):
r = {}
cz = event['queryString']["cz"]
if "添加" in cz:
r = tianjia(event['queryString']['mn'],event['queryString']['mt'])
elif "更新" in cz:
r = gengxin(event['queryString']['mn'],event['queryString']['mt'])
elif "提取" in cz:
r = tiqu()
elif "查询" in cz:
r = chaxun(event['queryString']['mn'])
elif "删除" in cz:
r = shanchu(event['queryString']['mn'])
return {
"isBase64Encoded": False,
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(r)
}
多功能函数(参考)
# -*- coding: utf8 -*-
import json
import random
import numpy
import urllib.parse
import requests
def main_handler(event, context):
string=urllib.parse.unquote(event['queryString']['string'])
if( "requests" in string):
eval(string)
s={}
s["string"]="已发送"
return {
"isBase64Encoded": False,
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(s)
}
else:
r={}
r["release"]=eval(string)
r["string"]=str(r["release"])
return {
"isBase64Encoded": False,
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(r)
}
都看到这里了
如果您觉得有用
赞一个呗
赏一个呗
偶会更有动力哈
每天参与论坛“摇一摇”活动可免费获得F豆
更多沟通交流可添加微信(zmlnow)
添加时请备注:简道云 编辑于 2021-7-30 12:06
|