You have 2 hosts: algo (IP: 192.168.100.1) and boms.
boms can ssh to algo with ssh 192.168.100.1
.
You want run some tasks (some Python functions) using each core of each CPU on each host.
In this demo we use host algo as the server.
Hello World
First install RabbitMQ on both hosts:sudo apt install rabbitmq-server
.
Then config RabbitMQ user account on server algo:
sudo rabbitmqctl add_user leo zhangjingg
sudo rabbitmqctl add_vhost myvhost
sudo rabbitmqctl set_permissions -p myvhost leo ".*" ".*" ".*"
Install celery and start workers on both hosts:
conda create -n celery python=3.6 ipython
. activate celery
conda install -c conda-forge celery
take celeryEx
cat << EOF > tasks.py
from celery import Celery
app = Celery('tasks', backend='rpc://', broker='pyamqp://leo:zhangjingg@192.168.100.1:5672/myvhost')
@app.task
def add(x, y):
return x + y
EOF
celery -A tasks worker --loglevel=info --hostname=worker1@%h
celery -A tasks worker --loglevel=info --hostname=worker2@%h
ipython
from tasks import add
result = add.delay(2, 3)
result.get()
celery.current_app.control.inspect().ping()
celery.current_app.control.inspect().stats().keys()
You can see the task is executed by anyone of the 4 workers (we start 2 workers with different names on each host).
The key point is the broker
parameter of the Celery
class,
which defines the location of the rabbitmq server.
Ref:
Serial vs Parallel
Source codes
pdser.py:
import numpy as np
import pandas as pd
def f(x):
return x * (x - 1)
def integrate_f(a, b, N):
s = 0
dx = (b - a) / N
for i in range(N):
s += f(a + i * dx)
return s * dx
def dfsum(size):
df = pd.DataFrame({'a': np.random.randn(size),
'b': np.random.randn(size),
'N': np.random.randint(100, 1000, (size)),
'x': 'x'})
return sum(df.apply(lambda x: integrate_f(x['a'], x['b'], x['N']), axis=1))
if __name__ == "__main__":
res = []
def serint(times):
for i in range(times):
res.append(dfsum(10000))
return sum(res)
print('serial sum = %s' % serint(80))
print('serial list: %s' % res[0:5])
pdpara.py:
from celery import Celery
import numpy as np
import pandas as pd
from pdser import dfsum
app = Celery('pdpara', backend='rpc://',
broker='pyamqp://leo:zhangjingg@192.168.100.1:5672/myvhost')
@app.task
def paraint():
return dfsum(10000)
pararun.py:
from pdpara import paraint
times = 80
pack = []
for i in range(times):
ares = paraint.delay()
pack.append(ares)
print('tasks commit over')
res = map(lambda x: x.get(), pack)
print('parallel list: %s' % res)
print('parallel sum = %s' % sum(res))
Commit tasks
-
Copy above source codes to both boms and algo;
-
Start worker on both hosts:
celery -A pdpara worker --loglevel=debug --hostname=worker1@%h
-
Run in parallel:
time python pararun.py
output: python pararun.py 0.56s user 0.05s system 2% cpu 25.741 total -
Run in serial:
time python pdser.py
output: python pdser.py 80.46s user 0.04s system 99% cpu 1:20.51 total
You can see the parallel version is much faster than the serial one.