收发JSON报文
下面基于Flask实现了最简单的JSON报文收发功能,收到请求后打印出内容, 重新组织后返回给客户端。 使用Flask的request.get_json()接收HTTP POST中的JSON报文, 使用json.dumps()发送JSON报文:
创建并启动服务:
cat << EOF > server.py
from flask import Flask, request
import json
app = Flask(__name__)
@app.route('/api/<user>/<uuid>', methods=['GET', 'POST'])
def add_message(user, uuid):
content = request.get_json()
print(content)
return json.dumps({"user": user, "uuid": uuid})
if __name__ == '__main__':
app.run(host= '0.0.0.0', port=3000, debug=True)
EOF
python server.py
用httpie和jq测试:
http POST localhost:3000/api/chad/097aa543 head:='{"a1":"bb","c3":"dd"}' body=theBody | jq '.user'
一个简单的Python微服务实现
下面这个服务接收HTTP请求发送来的shell脚本,执行之,并根据用户需求返回控制台输出。 为了保证服务器的安全,生产环境中的这类服务需要校验客户端身份, 并为传入的命令设置白名单。
服务器接收的url包括mode和uuid两个字段,mode分exec和poll两种情况, 前者要求服务器开始执行Post body中的指令,后者查看当前运行指令的控制台输出。 uuid可以用于验证客户端合法性,例如客户端以"调用者+时间戳"并加密后生成uuid, 服务器验证调用者和时间戳的有效性。
服务端的配置项包括服务监听的端口以及返回控制台输出的行数。
创建并启动服务:
cat << EOF > server.py
from flask import Flask, request
import json
from subprocess import Popen, PIPE
from threading import Thread
from queue import Queue
PORT = 3000
CMD_LINE_NUM = 3
app = Flask(__name__)
output_q = Queue(CMD_LINE_NUM)
cmd_q = []
def save_newest_output(p, queue, cmds):
while p.poll() is None:
line = p.stdout.readline().decode().strip() # blocking read
if queue.full():
queue.get() # abandon old output silently
queue.put(line)
cmds.clear()
@app.route('/api/<mode>/<uuid>', methods=['GET', 'POST'])
def run_cmd(mode, uuid):
if mode == 'exec':
if len(cmd_q) > 0:
return json.dumps({"mode": mode, "status": "busy"})
cmd = request.get_json()['cmd']
proc = Popen(cmd, shell=True, stdout=PIPE)
cmd_q.append(proc)
t = Thread(target=save_newest_output, args=(proc, output_q, cmd_q))
t.daemon = True
t.start()
return json.dumps({"mode": mode, "status": "accepted"})
if mode == 'poll' and len(cmd_q) > 0:
lines = []
while not output_q.empty():
line = output_q.get_nowait()
lines.append(line)
return json.dumps({"mode": mode,
"status": "running",
"output": '\n'.join(lines)
})
return json.dumps({"mode": mode, "status": "finished"})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=PORT, debug=True)
EOF
python server.py
Shell脚本在一个单独的进程中运行,并由一个独立的Thread读取它的输出, 保证HTTP服务不会被阻塞。
测试客户端:
cat << EOF > test.py
from time import sleep
for i in range(28, 49):
print(i, flush=True)
sleep(2)
EOF
http POST localhost:3000/api/exec/097aa543 cmd='python3 test.py' # accepted
http POST localhost:3000/api/exec/097aa543 cmd='python3 test.py' # busy
http POST localhost:3000/api/poll/097aa543 # realtime output: 30, 31, 32
sleep 10
http POST localhost:3000/api/poll/097aa543 # realtime output: 37, 38, 39
sleep 40
http POST localhost:3000/api/poll/097aa543 # status: finished
http POST localhost:3000/api/exec/097aa543 cmd='python3 test.py' # accepted
为了保证输出不会被缓存,需要在print函数中增加flush=True
,只有Python 3支持,
所以运行测试脚本时需要用python3 test.py
.
在Python 3.4.3, Flask 0.10.1上测试通过。
服务管理
这里使用Supervisor管理Python进程, 相比于Monit, start-stop-daemon等工具,它的优点是不依赖pidfile,配置方便, 提供日志输出,文档写得好。