DarkMatter in Cyberspace
  • Home
  • Categories
  • Tags
  • Archives

基于Flask的Python微服务


收发JSON报文

下面基于Flask实现了最简单的JSON报文收发功能,收到请求后打印出内容, 重新组织后返回给客户端。 使用Flask的request.get_json()接收HTTP POST中的JSON报文, 使用json.dumps()发送JSON报文:

创建并启动服务:

cat << EOF > server.py
from flask import Flask, request
import json

app = Flask(__name__)

@app.route('/api/<user>/<uuid>', methods=['GET', 'POST'])
def add_message(user, uuid):
  content = request.get_json()
  print(content)
  return json.dumps({"user": user, "uuid": uuid})

if __name__ == '__main__':
  app.run(host= '0.0.0.0', port=3000, debug=True)
EOF

python server.py

用httpie和jq测试: http POST localhost:3000/api/chad/097aa543 head:='{"a1":"bb","c3":"dd"}' body=theBody | jq '.user'

一个简单的Python微服务实现

下面这个服务接收HTTP请求发送来的shell脚本,执行之,并根据用户需求返回控制台输出。 为了保证服务器的安全,生产环境中的这类服务需要校验客户端身份, 并为传入的命令设置白名单。

服务器接收的url包括mode和uuid两个字段,mode分exec和poll两种情况, 前者要求服务器开始执行Post body中的指令,后者查看当前运行指令的控制台输出。 uuid可以用于验证客户端合法性,例如客户端以"调用者+时间戳"并加密后生成uuid, 服务器验证调用者和时间戳的有效性。

服务端的配置项包括服务监听的端口以及返回控制台输出的行数。

创建并启动服务:

cat << EOF > server.py
from flask import Flask, request
import json
from subprocess import Popen, PIPE
from threading import Thread
from queue import Queue

PORT = 3000
CMD_LINE_NUM = 3

app = Flask(__name__)
output_q = Queue(CMD_LINE_NUM)
cmd_q = []


def save_newest_output(p, queue, cmds):
    while p.poll() is None:
        line = p.stdout.readline().decode().strip()  # blocking read
        if queue.full():
            queue.get()   # abandon old output silently
        queue.put(line)
    cmds.clear()


@app.route('/api/<mode>/<uuid>', methods=['GET', 'POST'])
def run_cmd(mode, uuid):
    if mode == 'exec':
        if len(cmd_q) > 0:
            return json.dumps({"mode": mode, "status": "busy"})
        cmd = request.get_json()['cmd']
        proc = Popen(cmd, shell=True, stdout=PIPE)
        cmd_q.append(proc)
        t = Thread(target=save_newest_output, args=(proc, output_q, cmd_q))
        t.daemon = True
        t.start()
        return json.dumps({"mode": mode, "status": "accepted"})
    if mode == 'poll' and len(cmd_q) > 0:
        lines = []
        while not output_q.empty():
            line = output_q.get_nowait()
            lines.append(line)
        return json.dumps({"mode": mode,
                           "status": "running",
                           "output": '\n'.join(lines)
                           })
    return json.dumps({"mode": mode, "status": "finished"})


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=PORT, debug=True)
EOF
python server.py

Shell脚本在一个单独的进程中运行,并由一个独立的Thread读取它的输出, 保证HTTP服务不会被阻塞。

测试客户端:

cat << EOF > test.py 
from time import sleep
for i in range(28, 49):
    print(i, flush=True)
    sleep(2)
EOF
http POST localhost:3000/api/exec/097aa543 cmd='python3 test.py'    # accepted
http POST localhost:3000/api/exec/097aa543 cmd='python3 test.py'    # busy
http POST localhost:3000/api/poll/097aa543         # realtime output: 30, 31, 32
sleep 10
http POST localhost:3000/api/poll/097aa543         # realtime output: 37, 38, 39
sleep 40
http POST localhost:3000/api/poll/097aa543         # status: finished
http POST localhost:3000/api/exec/097aa543 cmd='python3 test.py'    # accepted

为了保证输出不会被缓存,需要在print函数中增加flush=True,只有Python 3支持, 所以运行测试脚本时需要用python3 test.py.

在Python 3.4.3, Flask 0.10.1上测试通过。

服务管理

这里使用Supervisor管理Python进程, 相比于Monit, start-stop-daemon等工具,它的优点是不依赖pidfile,配置方便, 提供日志输出,文档写得好。



Published

Apr 29, 2016

Last Updated

Apr 29, 2016

Category

Tech

Tags

  • flask 2
  • json 7
  • microservice 1
  • python 136
  • subprocess 2
  • supervisor 1

Contact

  • Powered by Pelican. Theme: Elegant by Talha Mansoor