地图
地图

Flask实践

1
2
3
4
5
6
7
8
9
10
11
12
13
开发环境
Python 3.7.2
Flask 1.0.2

本机
macOS 10.14.3

远程
Linux version 4.14.77-81.59.amzn2.x86_64

其他软件
Atom Editor 1.34
Postman 6.7.2

学完了Python,就开始Flask实践。


准备工作

  1. 虚拟环境

    在虚拟环境安装私有包,保持全局的干净整洁。

    新建一个文件夹

    创建虚拟环境:

    python3 -m venv venv

  2. 激活虚拟环境

    source venv/bin/activate

    激活虚拟环境后会修改命令提示符,加入环境名(venv) $,在虚拟环境中使用python3命令,调用的是虚拟环境中的解释器。

  3. 安装Python包

    使用pip命令安装Python包,pip3 install XXX

    使用pip3 freeze查看安装了哪些包,生成requirements.txt文件,pip3 freeze >requirements.txt

    安装所有需求包,pip3 install -r requirements.txt

    安装Flask,pip3 install flask


基本结构

  1. 初始化

    1
    2
    from flask import Flask
    app = Flask(__name__)


  1. 路由和视图

    处理URL和函数之间关系的程序称为路由,在Flask中定义路由的最简便方式,是使用应用实例提供的app.route装饰器。

    1
    2
    3
    @app.route('/')
    def index():
    return 'Hello World!'

    app.add_url_rule()方法,接受3个参数,URL、端点名、视图函数。

    1
    2
    3
    4
    def index():
    return 'Hello World!'

    app.add_url_rule('/', 'index', index)


  1. 一个完整的应用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# hello.py

from flask import Flask

app = Flask(__name__)


@app.route('/user/<name>')
def user(name):
return 'Hello, {}!'.format(name)


if __name__ == '__main__':
app.run(debug=True)

在虚拟环境的终端中,输入python3 hello.py启动Flask。


GET和POST

使用浏览器(Safari 12)和Postman(6.7.2)来测试各种GET和POST。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from flask import Flask, request, jsonify

app = Flask(__name__)


@app.route('/', methods=['get']) # 一个简单的get请求
def hello_world():
return 'Hello World!'


def is_number(s): # 判断字符串是否为数字的函数
try:
float(s)
return True
except ValueError:
pass

try:
import unicodedata
unicodedata.numeric(s)
return True
except (TypeError, ValueError):
pass

return False


@app.route('/api', methods=['get']) # 通过get请求判断"http://127.0.0.1:5000/api?number=X"中X是奇数还是偶数
# 使用浏览器访问网址
def api():
s = request.args.get('number')

if is_number(s): # 判断是否是数字
if float(s) % 2 == 0: # 判断是否是偶数,如果不加float会报错,not all arguments converted during string formatting
return "偶数"
elif float(s) % 2 != 0: # 判断是否是奇数
return "奇数"
else:
return "数据错误"


@app.route('/', methods=['post']) # 通过post请求和json数据判断number是奇数还是偶数
# 利用Postman发送post请求
def response():
data = request.get_json()
# 从请求的包体里取出数据并格式化为dict格式
print(data)

s = data.get('number')

if is_number(s): # 判断是否是数字
if s % 2 == 0: # 判断是否是偶数
return jsonify({"msg": "偶数"})
elif s % 2 != 0: # 判断是否是奇数
return jsonify({"msg": "奇数"})
else:
return jsonify({"msg": "数据错误"})


if __name__ == '__main__':
app.run(debug=True)


用类视图改写上面的函数视图:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from flask import Flask, request, jsonify
from flask.views import MethodView

app = Flask(__name__)


def is_number(s): # 判断字符串是否为数字
try:
float(s)
return True
except ValueError:
pass

try:
import unicodedata
unicodedata.numeric(s)
return True
except (TypeError, ValueError):
pass

return False


class UserView(MethodView):
methods = ['GET', 'POST']

def dispatch_request(self):
if request.method == 'GET': # 当客户端通过get方法进行访问的时候
s = request.args.get('number')
if is_number(s): # 判断是否是数字
if float(s) % 2 == 0: # 判断是否是偶数,如果不加float会报错,not all arguments converted during string formatting
return "偶数"
elif float(s) % 2 != 0: # 判断是否是奇数
return "奇数"
else:
return "数据错误"

elif request.method == 'POST': # 当客户端通过post方法进行访问的时候
data = request.get_json()
print(data)
print(type(data))

s = data.get('number')
if is_number(s): # 判断是否是数字
if s % 2 == 0: # 判断是否是偶数
return jsonify({"msg": "偶数"})
elif s % 2 != 0: # 判断是否是奇数
return jsonify({"msg": "奇数"})
else:
return jsonify({"msg": "数据错误"})


user_view = UserView.as_view('user')
# 将GET /user 请求绑定到UserView.get()方法上
app.add_url_rule('/user', view_func=user_view, methods=['GET', ])
# 将POST /user 请求绑定到UserView.post()方法上
app.add_url_rule('/user', view_func=user_view, methods=['POST', ])


if __name__ == '__main__':
app.run(debug=True)


Flask的数据库基础操作

使用flask-sqlalchemy操作mysql实现增删改查。

安装Python包:

1
2
pip3 install flask-sqlalchemy
pip3 install PyMySQL

具体代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# 注意使用的不同请求

from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy


app = Flask(__name__)

app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:密码@localhost:3306/test?charset=utf8'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True

mydb = SQLAlchemy(app)


# 模型
class Movie(mydb.Model): # 创建一个表
__tablename__ = 'movies'
id = mydb.Column(mydb.Integer, primary_key=True)
name = mydb.Column(mydb.String(100), unique=True)
date = mydb.Column(mydb.Integer)
star = mydb.Column(mydb.Float, nullable=True)

def __repr__(self):
return '<Name %r>' % self.name


# 获取列表
@app.route('/users', methods=['GET'])
def getMovies():
data = Movie.query.all()
datas = []
for movies in data:
datas.append({'id': movies.id, 'name': movies.name, 'date': movies.date, 'star': movies.star})
return jsonify(data=datas)


# 添加数据
@app.route('/user', methods=['POST'])
def addMovie():

adict = request.get_json()
name = adict.get('name')
date = adict.get('date')
star = adict.get('star')

print(name, date, star)

movies = Movie(name=name, date=date, star=star)
try:
mydb.session.add(movies)
mydb.session.commit()
except Exception as e:
print(e)
mydb.session.rollback()
mydb.session.flush()
userId = movies.id
if (movies.id is None):
result = {'msg': '添加失败'}
return jsonify(data=result)

data = Movie.query.filter_by(id=userId).first()
result = {'id': movies.id, 'name': movies.name, 'date': movies.date, 'star': movies.star}
return jsonify(data=result)


# 获取单条数据
@app.route('/user/<int:userId>', methods=['GET'])
def getMovie(userId):
movies = Movie.query.filter_by(id=userId).first()
if (movies is None):
result = {'msg': 'None'}
else:
result = {'id': movies.id, 'name': movies.name, 'date': movies.date, 'star': movies.star}
return jsonify(data=result)


# 修改数据
@app.route('/user/<int:userId>', methods=['PATCH'])
def updateMovie(userId):

adict = request.get_json()
name = adict.get('name')
date = adict.get('date')
star = adict.get('star')

try:
movies = Movie.query.filter_by(id=userId).first()
if (movies is None):
result = {'msg': '找不到要修改的记录'}
return jsonify(data=result)
else:
movies.name = name
movies.date = date
movies.star = star
mydb.session.commit()
except:
mydb.session.rollback()
mydb.session.flush()
userId = movies.id
data = Movie.query.filter_by(id=userId).first()
result = {'id': movies.id, 'name': movies.name, 'date': movies.date, 'star': movies.star}
return jsonify(data=result)


# # 删除数据
# @app.route('/user/<int:userId>', methods=['DELETE'])
# def deleteMovie(userId):
# Movie.query.filter_by(id=userId).delete()
# mydb.session.commit()
# return getMovies()

# 删除数据
@app.route('/user/delete', methods=['POST'])
def deleteMovie():
adict = request.get_json()
id = adict.get('id')
Movie.query.filter_by(id=id).delete()
mydb.session.commit()
return getMovies()


if __name__ == '__main__':
# mydb.drop_all()
# mydb.create_all()
app.run(debug=True)


Flask连接企业微信的自建应用

  1. 发送消息给自建应用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@app.route('/wx', methods=['post'])
def sendMsg():
data = {
"touser": ('|').join(self.userid.split(',')),
"toparty": ('|').join(self.partid.split(',')),
# "toparty":int(self.partid),
"msgtype": "text",
"agentid": self.agent_id,
"text": {
"content": request.get_json().get("content")
},
"safe": 0
}
print(data.get('text'))

requests.post(self.send_msg_url.format(self.access_token), json=data)
return 'success'


  1. 接收应用的消息,并回复相同的内容

首先进行回调配置,改写腾讯官方的WXBizMsgCrypt.py工具,导入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
# 将原版本改写成Python3版

#!/usr/bin/env python
# -*- encoding:utf-8 -*-

""" 对企业微信发送给企业后台的消息加解密示例代码.
@copyright: Copyright (c) 1998-2014 Tencent Inc.

"""
# ------------------------------------------------------------------------

import base64
import string
import random
import hashlib
import time
import struct
from Crypto.Cipher import AES
import xml.etree.cElementTree as ET
import socket
import json
import ierror

# import sys
# reload(sys)
# sys.setdefaultencoding('utf-8')

# import importlib
# importlib.reload(sys)


"""
关于Crypto.Cipher模块,ImportError: No module named 'Crypto'解决方案
请到官方网站 https://www.dlitz.net/software/pycrypto/ 下载pycrypto。
下载后,按照README中的“Installation”小节的提示进行pycrypto安装。
"""


class FormatException(Exception):
pass


def throw_exception(message, exception_class=FormatException):
"""my define raise exception function"""
raise exception_class(message)


class SHA1:
"""计算企业微信的消息签名接口"""

def getSHA1(self, token, timestamp, nonce, encrypt):
"""用SHA1算法生成安全签名
@param token: 票据
@param timestamp: 时间戳
@param encrypt: 密文
@param nonce: 随机字符串
@return: 安全签名
"""

sortlist = [token, timestamp, nonce, encrypt]
sortlist.sort()
sha = hashlib.sha1()
sha.update("".join(sortlist).encode("utf-8"))
return ierror.WXBizMsgCrypt_OK, sha.hexdigest()

# try:
# sortlist = [token, timestamp, nonce, encrypt]
# sortlist.sort()
# sha = hashlib.sha1()
# sha.update("".join(sortlist))
# return ierror.WXBizMsgCrypt_OK, sha.hexdigest()
# except Exception as e:
# print(e)
# return ierror.WXBizMsgCrypt_ComputeSignature_Error, None


class JsonParse:
"""提供提取消息格式中的密文及生成回复消息格式的接口"""

# json消息模板
AES_TEXT_RESPONSE_TEMPLATE = '''{
"Encrypt": "%(msg_encrypt)s",
"MsgSignature": "%(msg_signaturet)s",
"TimeStamp": %(timestamp)s,
"Nonce": "%(nonce)s"
}'''

def extract(self, jsontext):
"""提取出json数据包中的加密消息
@param jsontext: 待提取的json字符串
@return: 提取出的加密消息字符串
"""
try:
json_dict = json.loads(jsontext)
return ierror.WXBizMsgCrypt_OK, json_dict['Encrypt']
except Exception as e:
print(e)
return ierror.WXBizMsgCrypt_ParseXml_Error, None

def generate(self, encrypt, signature, timestamp, nonce):
"""生成json消息
@param encrypt: 加密后的消息密文
@param signature: 安全签名
@param timestamp: 时间戳
@param nonce: 随机字符串
@return: 生成的json字符串
"""
resp_dict = {
'msg_encrypt': encrypt,
'msg_signaturet': signature,
'timestamp': timestamp,
'nonce': nonce,
}
resp_json = self.AES_TEXT_RESPONSE_TEMPLATE % resp_dict
return resp_json


class XMLParse:
"""提供提取消息格式中的密文及生成回复消息格式的接口"""

# xml消息模板
AES_TEXT_RESPONSE_TEMPLATE = """<xml>
<Encrypt><![CDATA[%(msg_encrypt)s]]></Encrypt>
<MsgSignature><![CDATA[%(msg_signaturet)s]]></MsgSignature>
<TimeStamp>%(timestamp)s</TimeStamp>
<Nonce><![CDATA[%(nonce)s]]></Nonce>
</xml>"""

def extract(self, xmltext):
"""提取出xml数据包中的加密消息
@param xmltext: 待提取的xml字符串
@return: 提取出的加密消息字符串
"""
try:
xml_tree = ET.fromstring(xmltext)
encrypt = xml_tree.find("Encrypt")
return ierror.WXBizMsgCrypt_OK, encrypt.text
except Exception as e:
print(e)
return ierror.WXBizMsgCrypt_ParseXml_Error, None, None

def generate(self, encrypt, signature, timestamp, nonce):
"""生成xml消息
@param encrypt: 加密后的消息密文
@param signature: 安全签名
@param timestamp: 时间戳
@param nonce: 随机字符串
@return: 生成的xml字符串
"""
resp_dict = {
'msg_encrypt': encrypt,
'msg_signaturet': signature,
'timestamp': timestamp,
'nonce': nonce,
}
resp_xml = self.AES_TEXT_RESPONSE_TEMPLATE % resp_dict
return resp_xml


class PKCS7Encoder():
"""提供基于PKCS7算法的加解密接口"""

block_size = 32

def encode(self, text):
""" 对需要加密的明文进行填充补位
@param text: 需要进行填充补位操作的明文
@return: 补齐明文字符串
"""
text_length = len(text)
# 计算需要填充的位数
amount_to_pad = self.block_size - (text_length % self.block_size)
if amount_to_pad == 0:
amount_to_pad = self.block_size
# 获得补位所用的字符
pad = chr(amount_to_pad)
return text + pad * amount_to_pad

def decode(self, decrypted):
"""删除解密后明文的补位字符
@param decrypted: 解密后的明文
@return: 删除补位字符后的明文
"""
# pad = ord(decrypted[-1])
pad = decrypted[-1]
if pad < 1 or pad > 32:
pad = 0
return decrypted[:-pad]


class Prpcrypt(object):
"""提供接收和推送给企业微信消息的加解密接口"""

def __init__(self, key):

# self.key = base64.b64decode(key+"=")
self.key = key
# 设置加解密模式为AES的CBC模式
self.mode = AES.MODE_CBC

def encrypt(self, text, receiveid):
"""对明文进行加密
@param text: 需要加密的明文
@return: 加密得到的字符串
"""
# 16位随机字符串添加到明文开头
text = self.get_random_str() + struct.pack("I", socket.htonl(len(text))) + text + receiveid
# 使用自定义的填充方式对明文进行补位填充
pkcs7 = PKCS7Encoder()
text = pkcs7.encode(text)
# 加密
cryptor = AES.new(self.key, self.mode, self.key[:16])
try:
ciphertext = cryptor.encrypt(text)
# 使用BASE64对加密后的字符串进行编码
return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext)
except Exception as e:
print(e)
return ierror.WXBizMsgCrypt_EncryptAES_Error, None

def decrypt(self, text, receiveid):
"""对解密后的明文进行补位删除
@param text: 密文
@return: 删除填充补位后的明文
"""
try:
cryptor = AES.new(self.key, self.mode, self.key[:16])
# 使用BASE64对密文进行解码,然后AES-CBC解密
plain_text = cryptor.decrypt(base64.b64decode(text))
except Exception as e:
print(e)
return ierror.WXBizMsgCrypt_DecryptAES_Error, None
try:
# pad = ord(plain_text[-1])
pad = plain_text[-1]
# 去掉补位字符串
# pkcs7 = PKCS7Encoder()
# plain_text = pkcs7.encode(plain_text)
# 去除16位随机字符串
content = plain_text[16:-pad]
json_len = socket.ntohl(struct.unpack("I", content[: 4])[0])
json_content = content[4: json_len+4]
from_receiveid = content[json_len+4:].decode("utf8")
except Exception as e:
print(e)
return ierror.WXBizMsgCrypt_IllegalBuffer, None
if from_receiveid != receiveid:
print("receiveid not match")
print(from_receiveid)
return ierror.WXBizMsgCrypt_ValidateCorpid_Error, None
return 0, json_content

def get_random_str(self):
""" 随机生成16位字符串
@return: 16位字符串
"""
rule = string.letters + string.digits
str = random.sample(rule, 16)
return "".join(str)


class WXBizMsgCrypt(object):
# 构造函数
def __init__(self,sToken,sEncodingAESKey,sReceiveId):
try:
self.key = base64.b64decode(sEncodingAESKey+"=")
assert len(self.key) == 32
except:
throw_exception("[error]: EncodingAESKey unvalid !", FormatException)
# return ierror.WXBizMsgCrypt_IllegalAesKey,None
self.m_sToken = sToken
self.m_sReceiveId = sReceiveId

# 验证URL
# @param sMsgSignature: 签名串,对应URL参数的msg_signature
# @param sTimeStamp: 时间戳,对应URL参数的timestamp
# @param sNonce: 随机串,对应URL参数的nonce
# @param sEchoStr: 随机串,对应URL参数的echostr
# @param sReplyEchoStr: 解密之后的echostr,当return返回0时有效
# @return:成功0,失败返回对应的错误码

def VerifyURL(self, sMsgSignature, sTimeStamp, sNonce, sEchoStr):
sha1 = SHA1()
ret,signature = sha1.getSHA1(self.m_sToken, sTimeStamp, sNonce, sEchoStr)
if ret != 0:
return ret, None
if not signature == sMsgSignature:
return ierror.WXBizMsgCrypt_ValidateSignature_Error, None
pc = Prpcrypt(self.key)
ret,sReplyEchoStr = pc.decrypt(sEchoStr,self.m_sReceiveId)
return ret,sReplyEchoStr

def EncryptMsg(self, sReplyMsg, sNonce, timestamp=None):
# 将企业回复用户的消息加密打包
# @param sReplyMsg: 企业号待回复用户的消息,xml格式的字符串
# @param sTimeStamp: 时间戳,可以自己生成,也可以用URL参数的timestamp,如为None则自动用当前时间
# @param sNonce: 随机串,可以自己生成,也可以用URL参数的nonce
# sEncryptMsg: 加密后的可以直接回复用户的密文,包括msg_signature, timestamp, nonce, encrypt的xml格式的字符串,
# return:成功0,sEncryptMsg,失败返回对应的错误码None
pc = Prpcrypt(self.key)
ret,encrypt = pc.encrypt(sReplyMsg, self.m_sReceiveId)
if ret != 0:
return ret,None
if timestamp is None:
timestamp = str(int(time.time()))
# 生成安全签名
sha1 = SHA1()
ret,signature = sha1.getSHA1(self.m_sToken, timestamp, sNonce, encrypt)
if ret != 0:
return ret,None
xmlParse = XMLParse()
return ret,xmlParse.generate(encrypt, signature, timestamp, sNonce)

def DecryptMsg(self, sPostData, sMsgSignature, sTimeStamp, sNonce):
# 检验消息的真实性,并且获取解密后的明文
# @param sMsgSignature: 签名串,对应URL参数的msg_signature
# @param sTimeStamp: 时间戳,对应URL参数的timestamp
# @param sNonce: 随机串,对应URL参数的nonce
# @param sPostData: 密文,对应POST请求的数据
# xml_content: 解密后的原文,当return返回0时有效
# @return: 成功0,失败返回对应的错误码
# 验证安全签名
xmlParse = XMLParse()
ret, encrypt = xmlParse.extract(sPostData)
if ret != 0:
return ret, None
sha1 = SHA1()
ret,signature = sha1.getSHA1(self.m_sToken, sTimeStamp, sNonce, encrypt)
if ret != 0:
return ret, None
if not signature == sMsgSignature:
return ierror.WXBizMsgCrypt_ValidateSignature_Error, None
pc = Prpcrypt(self.key)
ret, xml_content = pc.decrypt(encrypt,self.m_sReceiveId)
return ret, xml_content


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
from flask import Flask, request
import requests
import sys
import json
from configparser import ConfigParser
from WXBizMsgCrypt import WXBizMsgCrypt
import xml.etree.cElementTree as ET


class WeChatMsg():
def __init__(self, host='0.0.0.0', port=8000):
config = ConfigParser()
config.read('config.py', encoding='utf-8')
# 定义服务器监听地址和端口
self.host = host
self.port = port
#
self.sCorpID = config['common']['corpid']
self.agent_secret = config['appconfig']['secret']
self.agent_id = config['appconfig']['agentid']
self.userid = config['appconfig']['userid']
self.partid = config['appconfig']['partid']
self.get_access_token_url = config['urlconfig']['get_access_token_url']
self.send_msg_url = config['urlconfig']['send_msg_url']

# 转发接收消息的应用信息配置
self.sToken = config['recmsg']['Token']
self.sEncodingAESKey = config['recmsg']['EncodingAESKey']

# 获取access_token
self.access_token = json.loads(requests.get(self.get_access_token_url.format(self.sCorpID,self.agent_secret)).content)['access_token']

def _send_text_msg(self, content, userid):
data = {
"touser": userid,
"toparty": ('|').join(self.partid.split(',')),
# "toparty":int(self.partid),
"msgtype": "text",
"agentid": self.agent_id,
"text": {
"content": content
},
"safe": 0
}
response = requests.post(self.send_msg_url.format(self.access_token), json.dumps(data))
print(response.text)
result_msg = json.loads(response.content)['errmsg']
return result_msg

def server_run(self):
app = Flask(__name__)

@app.route('/', methods=['GET', 'POST'])
def index():
wxcpt = WXBizMsgCrypt(self.sToken, self.sEncodingAESKey, self.sCorpID)
# 获取url验证时微信发送的相关参数
sVerifyMsgSig = request.args.get('msg_signature')
sVerifyTimeStamp = request.args.get('timestamp')
sVerifyNonce = request.args.get('nonce')
sVerifyEchoStr = request.args.get('echostr')
print(sVerifyMsgSig, sVerifyTimeStamp, sVerifyNonce, sVerifyEchoStr)

# 验证url
if request.method == 'GET':
ret, sEchoStr = wxcpt.VerifyURL(sVerifyMsgSig, sVerifyTimeStamp, sVerifyNonce, sVerifyEchoStr)
print(type(ret))
print(type(sEchoStr))

if (ret != 0):
print("ERR: VerifyURL ret:" + str(ret))
sys.exit(1)
return sEchoStr

# 接收客户端消息
if request.method == 'POST':
sReqMsgSig = sVerifyMsgSig
sReqTimeStamp = sVerifyTimeStamp
sReqNonce = sVerifyNonce
sReqData = request.data
print('sReqData=', sReqData, '\n')

ret, sMsg = wxcpt.DecryptMsg(sReqData, sReqMsgSig, sReqTimeStamp, sReqNonce)
print(ret, sMsg, '\n')
if (ret != 0):
print("ERR: DecryptMsg ret: " + str(ret))
sys.exit(1)
# 解析发送的内容并打印

print(type(sMsg))
string = str(sMsg,'utf-8')
print(string)

# 转成字符串索引分片
# i = string.index('<Content>')
# j = string.index(r'</Content>')
# print(i, j)
# print("收到消息:", string[i+18:j-3])

# 解析XML
root = ET.fromstring(string) # 调用fromstring(), 直接返回解析树的根元素
for child in root:
# 第二层节点的标签名称和属性
print(child.tag,":", child.text)

# 索引获取
# print(root.tag, ":", root.attrib)
# print('收到消息:', root[4].text)

# 直接查找
content = root.find("Content").text
print('\n收到消息:', content)

# xml_tree = ET.fromstring(sMsg)
# print('xml_tree is ', xml_tree)

# 自动回复相同消息给客户端
access_token = json.loads(requests.get(self.get_access_token_url.format(self.sCorpID, self.agent_secret)).content)['access_token']
print(access_token)

userid = root.find("FromUserName").text
print(userid)
result = self._send_text_msg(content, userid)
return result


if __name__ == '__main__':
wechatserver = WeChatMsg()
wechatserver.server_run()


  1. 对通讯录进行增删改查
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
from flask import Flask, request, jsonify
import requests
import sys
import json
from configparser import ConfigParser
from WXBizMsgCrypt import WXBizMsgCrypt


class WeChattxl():
def __init__(self, host='0.0.0.0', port=8000):
config = ConfigParser()
config.read('config.py', encoding='utf-8')
# 定义服务器监听地址和端口
self.host = host
self.port = port
#
self.sCorpID = config['common']['corpid']
self.partid = config['appconfig']['partid']
self.txlsecret = config['appconfig']['txlsecret']

self.get_access_token_url = config['urlconfig']['get_access_token_url']
self.user_simplelist_url = config['urlconfig']['user_simplelist_url']
self.user_list_url = config['urlconfig']['user_list_url']
self.get_user_url = config['urlconfig']['get_user_url']
self.create_user_url = config['urlconfig']['create_user_url']
self.update_user_url = config['urlconfig']['update_user_url']
self.delete_user_url = config['urlconfig']['delete_user_url']

# 应用信息配置
self.sToken = config['recmsg']['Token']
self.sEncodingAESKey = config['recmsg']['EncodingAESKey']

# 获取access_token
self.access_token = json.loads(requests.get(self.get_access_token_url.format(self.sCorpID,self.txlsecret)).content)['access_token']

def server_run(self):
app = Flask(__name__)

@app.route('/', methods=['GET', 'POST'])
def index():
wxcpt = WXBizMsgCrypt(self.sToken, self.sEncodingAESKey, self.sCorpID)
# 获取url验证时微信发送的相关参数
sVerifyMsgSig = request.args.get('msg_signature')
sVerifyTimeStamp = request.args.get('timestamp')
sVerifyNonce = request.args.get('nonce')
sVerifyEchoStr = request.args.get('echostr')
print(sVerifyMsgSig, sVerifyTimeStamp, sVerifyNonce, sVerifyEchoStr)

# 验证url
if request.method == 'GET':
ret, sEchoStr = wxcpt.VerifyURL(sVerifyMsgSig, sVerifyTimeStamp, sVerifyNonce, sVerifyEchoStr)
print(type(ret))
print(type(sEchoStr))

if (ret != 0):
print("ERR: VerifyURL ret:" + str(ret))
sys.exit(1)
return sEchoStr

# if request.method == 'POST':
# return 'success'

'''
获取成员列表 127.0.0.1:8000/user get
成员详细信息 /users get
创建成员 /create post
{
"userid": userid, #必填
"name": name, #必填
"mobile": mobile, #选填,使用默认值
"email": email, #选填,使用默认值
"position": position, #选填,使用默认值
"department": self.partid #不填
}
读取成员 /get?id=[userid] get
更新成员 /update post
{
"userid": userid, #必填
"name": name, #选填
"mobile": mobile, #选填
"email": email, #选填
"position": position, #选填
"department": self.partid #不填
}
删除成员 /delete?id=[userid] get
'''

@app.route('/user', methods=['get']) # 获取成员列表
def user():
url = self.user_simplelist_url.format(self.access_token, self.partid)
r = requests.get(url)
return jsonify(data=r.json())

@app.route('/users', methods=['get']) # 获取成员详细信息
def users():
url = self.user_list_url.format(self.access_token, self.partid)
r = requests.get(url)
return jsonify(data=r.json())

@app.route('/create', methods=['post']) # 创建成员
def create():
data = request.get_json()
print(data)

userid = data.get('userid') # 必须,方便数据的录入,使用两位数字
name = data.get('name') # 必须

mobile = data.get('mobile') # 可选
email = data.get('email') # 可选
position = data.get('position') # 可选
if mobile is None:
mobile = '185000000'+userid # 方便录入,用userid直接拼接成手机号
if email is None:
email = userid+'@ianna.com' # 方便录入,邮箱也用userid拼接而成
if position is None:
position = 'Worker'

print(mobile, email, position)

data = {
"userid": userid,
"name": name,
"mobile": mobile,
"email": email,
"position": position,
"department": self.partid
}
print(data)

r = requests.post(self.create_user_url.format(self.access_token), json.dumps(data))
print(r.text)
result_msg = json.loads(r.content)['errmsg']
return result_msg

@app.route('/get', methods=['get']) # 读取成员
def get():
userid = request.args.get('id')
url = self.get_user_url.format(self.access_token, userid)
r = requests.get(url)
return jsonify(data=r.json())

@app.route('/update', methods=['post']) # 更新成员
def update():
'''
data = {
"userid": userid,
"name": name,
"mobile": mobile,
"email": email,
"position": position,
"department": self.partid
}
'''

data = request.get_json()
data.update({"department": self.partid})
print(data)

r = requests.post(self.update_user_url.format(self.access_token), json.dumps(data))
print(r.text)
result_msg = json.loads(r.content)['errmsg']
return result_msg

@app.route('/delete', methods=['get']) # 删除成员
def delete():
userid = request.args.get('id')
url = self.delete_user_url.format(self.access_token, userid)
r = requests.get(url)
result_msg = json.loads(r.content)['errmsg']
return result_msg

app.run(host=self.host, port=self.port, debug=True)


if __name__ == '__main__':
wechatserver = WeChattxl()
wechatserver.server_run()