Sıradan bir Celery app

Celery ile basit bir uygulama yazalım, bu yazdığımız uygulama parametre olarak verdiğimiz bir değerin len() çıktısını geri döndürsün. O hâlde kodumuz aşağıdaki gibi olacaktır.

tasks.py

1
2
3
4
5
6
7
8
9
10
11
from celery import Celery

app = Celery(
  'tasks',
  broker='redis://localhost:6379/0',
  backend = 'redis://localhost:6379/0'
)

@app.task
def length(s):
  return len(s)


Şimdi ise yeni bir terminalden python3 yorumlayıcısı açalım ve tasks dosyasındaki length() fonksiyonumuzu import edelim ve celery workerlarına bu işlevi çalıştırmasını söyleyelim. Bu esnada da celery workerlarını takip edelim bunun için celery -q -A tasks worker -l info şeklinde komut çalıştırmamız yeterli olacaktır.

celery -q -A tasks worker -l info

[2020-03-09 13:36:58,235: INFO/MainProcess] Connected to redis://localhost:6379/0
[2020-03-09 13:36:58,244: INFO/MainProcess] mingle: searching for neighbors
[2020-03-09 13:36:59,263: INFO/MainProcess] mingle: all alone
[2020-03-09 13:36:59,295: INFO/MainProcess] celery@eredotpkfr ready.
[2020-03-09 13:37:29,253: INFO/MainProcess] Received task: tasks.length[9bccfa35-a6e6-4e45-8163-d8132c7d05bc] 
[2020-03-09 13:37:29,256: INFO/ForkPoolWorker-8] Task tasks.length[9bccfa35-a6e6-4e45-8163-d8132c7d05bc] succeeded in 0.0028191699998387776s: 7


Çıktıya baktığımızda herhangi bir problem gözükmüyor, worker task’ı almış ve 0.002 saniye içerisinde sonuç döndürmüş. Döndürdüğü değer ise 7 fakat python3 yorumlayıcımıza baktığımızda!

python3

Python 3.7.5 (default, Nov 20 2019, 09:21:52) 
[GCC 9.2.1 20191008] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from tasks import length
>>> res = length.delay('python3')
>>> res.wait()
Traceback (most recent call last):
 File "<stdin>", line 1, in <module>
 File "/home/eredot_pkfr/.local/lib/python3.7/site-packages/celery/result.py", line 228, in get
  on_message=on_message,
 File "/home/eredot_pkfr/.local/lib/python3.7/site-packages/celery/backends/asynchronous.py", line 200, in wait_for_pending
  for _ in self._wait_for_pending(result, **kwargs):
 File "/home/eredot_pkfr/.local/lib/python3.7/site-packages/celery/backends/asynchronous.py", line 263, in _wait_for_pending
  self.on_wait_for_pending(result, timeout=timeout, **kwargs)
 File "/home/eredot_pkfr/.local/lib/python3.7/site-packages/celery/backends/redis.py", line 150, in on_wait_for_pending
  for meta in result._iter_meta(**kwargs):
TypeError: _iter_meta() got an unexpected keyword argument 'timeout'
>>> 


res = length.delay('python3') komutunu verdiğimizde celery worker işi yapmaya başladı ve bize bir sonuç döndürdü. Celery kütüphanesinde sonucu görmek için kullandığımız birkaç metod var bunlardan biri wait() metodudur, wait() metodu task’ın bitmesini bekler ve bittiğinde sonucu döndürür. wait() metodu yerine get() metoduda kullanılabilir, yukarıda wait() metodunu çağırdığımızda TypeError: _iter_meta() got an unexpected keyword argument 'timeout' hatası ile karşılaşıyoruz. Bu hatada _iter_meta() fonksiyonunun beklenmeyen bir ‘timeout’ argümanını aldığını söylüyor. Aslında hatanın sebebi şurada, wait() metodunu çağırdığımızda bize veriyi döndürmüyor ve hata veriyor fakat res.ready() şeklinde bir çıtı istediğimizde True değerini döndürüyor, veriyi alabiliriz şuanda hazır fakat wait() metodu bize veriyi getirmiyor! işlevini tam olarak yerine getiremiyor aynı şekilde get() metoduda veriyi getiremiyor ve hata veriyor. İş bittiği için ‘timeout’ yiyoruz aslında fakat hata direkt karşımıza bu şekilde çıkmıyor.
Normal şartlarda bu şekilde bir hata ile karşılaşılmaması lazım çünkü wait() metodu task’ın bitmesini bekler. wait() metodu görevini yerine getiremiyor ise biz kendimiz işlev bitesiye kadar beklemeyi seçebiliriz. Böyle bir durumda aşağıdaki alternatif yöntemleri uygulamak çözüm olacaktır wait() metodu da aslında aşağıdakilere benzer bir yöntem kullanır!

alternatives.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# Imports
from tasks import length
from time import sleep

# 0
res = length.delay('alternative_0')
while not res.status == 'SUCCESS':
  sleep(0.5)
# result.wait(timeout=None, interval=0.5) is the same \
# thing as the above code!
print(
  res.get(), # returns result
  res.id, # returns task id (str)
  res.status, # returns task status (str)
  res.info # returns task information
)
# 1
res = length.delay('alternative_1')
while not res.ready():
  sleep(0.3)

print(
  res.get(),
  res.task_id, # returns task id (str)
  res.ready(), # returns task is ready? (boolean)
  res.info
)
# 2
res = length.delay('alternative_2')
while not res.successful():
  sleep(0.1)

print(
  res.get(),
  res.id,
  res.successful(), # returns task is ready? (boolean)
  res.info
)

# OUTPUT

# 13 cdece656-372b-46c2-9956-5b6c8a296a7f SUCCESS 13
# 13 1634167e-9aee-42c3-8bd9-828b34525a99 True 13
# 13 f086e8f7-7a1b-42b6-98f3-362f43a1cbfd True 13