DarkMatter in Cyberspace
  • Home
  • Categories
  • Tags
  • Archives

Parallel Processing on Celery Cluster


You have 2 hosts: algo (IP: 192.168.100.1) and boms. boms can ssh to algo with ssh 192.168.100.1.

You want run some tasks (some Python functions) using each core of each CPU on each host.

In this demo we use host algo as the server.

Hello World

First install RabbitMQ on both hosts:sudo apt install rabbitmq-server.

Then config RabbitMQ user account on server algo:

sudo rabbitmqctl add_user leo zhangjingg
sudo rabbitmqctl add_vhost myvhost
sudo rabbitmqctl set_permissions -p myvhost leo ".*" ".*" ".*"

Install celery and start workers on both hosts:

conda create -n celery python=3.6 ipython
. activate celery
conda install -c conda-forge celery
take celeryEx
cat << EOF > tasks.py
from celery import Celery
app = Celery('tasks', backend='rpc://', broker='pyamqp://leo:zhangjingg@192.168.100.1:5672/myvhost')
@app.task
def add(x, y):
    return x + y
EOF

celery -A tasks worker --loglevel=info --hostname=worker1@%h
celery -A tasks worker --loglevel=info --hostname=worker2@%h

ipython
from tasks import add
result = add.delay(2, 3)
result.get()
celery.current_app.control.inspect().ping()
celery.current_app.control.inspect().stats().keys()

You can see the task is executed by anyone of the 4 workers (we start 2 workers with different names on each host).

The key point is the broker parameter of the Celery class, which defines the location of the rabbitmq server.

Ref:

  • Using RabbitMQ

  • Monitoring and Management Guide

Serial vs Parallel

Source codes

pdser.py:

import numpy as np
import pandas as pd

def f(x):
    return x * (x - 1)

def integrate_f(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f(a + i * dx)
    return s * dx

def dfsum(size):
    df = pd.DataFrame({'a': np.random.randn(size),
                   'b': np.random.randn(size),
                   'N': np.random.randint(100, 1000, (size)),
                   'x': 'x'})
    return sum(df.apply(lambda x: integrate_f(x['a'], x['b'], x['N']), axis=1))


if __name__ == "__main__":
    res = []
    def serint(times):
        for i in range(times):
            res.append(dfsum(10000))
        return sum(res)

    print('serial sum = %s' % serint(80))
    print('serial list: %s' % res[0:5])

pdpara.py:

from celery import Celery
import numpy as np
import pandas as pd
from pdser import dfsum

app = Celery('pdpara', backend='rpc://',
        broker='pyamqp://leo:zhangjingg@192.168.100.1:5672/myvhost')

@app.task
def paraint():
    return dfsum(10000)

pararun.py:

from pdpara import paraint

times = 80
pack = []
for i in range(times):
    ares = paraint.delay()
    pack.append(ares)

print('tasks commit over')

res = map(lambda x: x.get(), pack)
print('parallel list: %s' % res)
print('parallel sum = %s' % sum(res))

Commit tasks

  1. Copy above source codes to both boms and algo;

  2. Start worker on both hosts: celery -A pdpara worker --loglevel=debug --hostname=worker1@%h

  3. Run in parallel: time python pararun.py output: python pararun.py 0.56s user 0.05s system 2% cpu 25.741 total

  4. Run in serial: time python pdser.py output: python pdser.py 80.46s user 0.04s system 99% cpu 1:20.51 total

You can see the parallel version is much faster than the serial one.



Published

Dec 6, 2018

Last Updated

Dec 7, 2018

Category

Tech

Tags

  • celery 1
  • cluter 1
  • parallel 2

Contact

  • Powered by Pelican. Theme: Elegant by Talha Mansoor