Commit b40e6ffa by Szeberényi Imre

python3.9

parent f79359f8
# from twisted.internet.defer import Deferred # from twisted.internet.defer import Deferred
from twisted.internet import reactor # threads from twisted.internet import reactor # threads
from celery.result import TimeoutError
from celery import Celery from celery import Celery
import serializers
from celery.result import TimeoutError
from kombu import Queue, Exchange from kombu import Queue, Exchange
from os import getenv from os import getenv
from socket import gethostname from socket import gethostname
...@@ -17,8 +18,10 @@ HOSTNAME = gethostname().split('.')[0] ...@@ -17,8 +18,10 @@ HOSTNAME = gethostname().split('.')[0]
AMQP_URI = getenv('AMQP_URI') AMQP_URI = getenv('AMQP_URI')
celery = Celery('agent', broker=AMQP_URI) celery = Celery('agent', broker=AMQP_URI)
celery.conf.update(CELERY_RESULT_BACKEND='amqp',
CELERY_TASK_RESULT_EXPIRES=300, celery.config_from_object('celeryconfig')
celery.conf.update(
CELERYD_PREFETCH_MULTIPLIER=32, CELERYD_PREFETCH_MULTIPLIER=32,
CELERY_QUEUES=(Queue(HOSTNAME + '.agent', CELERY_QUEUES=(Queue(HOSTNAME + '.agent',
Exchange('agent', type='direct'), Exchange('agent', type='direct'),
...@@ -137,21 +140,3 @@ def change_ip(vm, interfaces, dns): ...@@ -137,21 +140,3 @@ def change_ip(vm, interfaces, dns):
send_command(vm, command='change_ip', interfaces=interfaces, dns=dns) send_command(vm, command='change_ip', interfaces=interfaces, dns=dns)
@celery.task(name='vm.tasks.local_agent_tasks.renew')
def renew(vm):
print(vm)
@celery.task(name='vm.tasks.local_agent_tasks.agent_started')
def agent_started(vm):
print(vm)
@celery.task(name='vm.tasks.local_agent_tasks.agent_stopped')
def agent_stopped(vm):
print(vm)
@celery.task(name='vm.tasks.local_agent_tasks.agent_ok')
def agent_ok(vm):
print(vm)
## from twisted.internet.defer import Deferred
#from twisted.internet import reactor # threads
#from celery.result import TimeoutError
from celery import Celery
import serializers
from kombu import Queue, Exchange
from os import getenv
#from socket import gethostname
#from threading import Event
import logging
from celery.utils.log import get_task_logger
#from celery.signals import after_setup_task_logger
#from celery.app.log import TaskFormatter
logger = get_task_logger(__name__)
QUEUE_NAME = "localhots.man"
AMQP_URI = getenv('AMQP_URI')
celery = Celery('agentman', broker=AMQP_URI)
celery.config_from_object('celeryconfig')
celery.conf.update(
CELERY_QUEUES=(Queue(QUEUE_NAME, Exchange('manager', type='direct'),
routing_key='manager'), ),
task_protocol = 1, # Celery 3 compatibility
task_serializer='json',
accept_content=['json'], # Ignore other content
result_serializer='json',
timezone='UTC',
enable_utc=True,)
@celery.task(name='vm.tasks.local_agent_tasks.renew')
def renew(vm):
pass
@celery.task(name='vm.tasks.local_agent_tasks.agent_started')
def agent_started(vm, version, system):
pass
@celery.task(name='vm.tasks.local_agent_tasks.agent_stopped')
def agent_stopped(vm):
pass
@celery.task(name='vm.tasks.local_agent_tasks.agent_ok')
def agent_ok(vm):
pass
result_backend = 'amqp://'
task_result_expires = 300
result_expires = 300
timezone = 'utc'
enable_utc = True
accept_content = ['json', 'pickle_v2']
task_serializer = 'json'
result_serializer = 'pickle_v2'
task_store_errors_even_if_ignored = True
...@@ -8,7 +8,7 @@ import struct ...@@ -8,7 +8,7 @@ import struct
from os import getenv from os import getenv
import gc import gc
from utils import SerialLineReceiverBase from utils import SerialLineReceiverBase
from agentcelery import agent_started, agent_stopped, renew from agentmancelery import agent_started, agent_stopped, renew
from celery.utils.log import get_task_logger from celery.utils.log import get_task_logger
logger = get_task_logger(__name__) logger = get_task_logger(__name__)
...@@ -95,8 +95,30 @@ class SerialLineReceiver(SerialLineReceiverBase): ...@@ -95,8 +95,30 @@ class SerialLineReceiver(SerialLineReceiverBase):
elif command == 'agent_started': elif command == 'agent_started':
version = args.get('version', None) version = args.get('version', None)
system = args.get('system', None) system = args.get('system', None)
agent_started.apply_async(queue='localhost.man', from kombu import Exchange, Queue
args=(self.factory.vm, version, system)) st = agent_started.apply_async(
args=(self.factory.vm, version, system),
exchange='manager',
routing_key='manager',
serializer='json',
declare=[Queue('localhost.man', Exchange('manager','direct'),
routing_key='manager')],
)
# st = agent_started.apply_async(
# args=(self.factory.vm, version, system),
# queue='localhost.man',
# exchange='manager',
# routing_key='manager',
# serializer='json',
# )
logger.debug("apply_async %r", st.id)
logger.debug("apply_st %r", st.status)
time.sleep(2)
logger.debug("apply_st2 %r", st.status)
time.sleep(8)
logger.debug("apply_st3 %r", st.status)
elif command == 'renew': elif command == 'renew':
renew.apply_async(queue='localhost.man', renew.apply_async(queue='localhost.man',
args=(self.factory.vm, )) args=(self.factory.vm, ))
...@@ -105,7 +127,7 @@ class SerialLineReceiver(SerialLineReceiverBase): ...@@ -105,7 +127,7 @@ class SerialLineReceiver(SerialLineReceiverBase):
args=args) args=args)
def handle_response(self, response, args): def handle_response(self, response, args):
logger.info('handle_reponse: %s %s', response, args) logger.info('handle_response: %s %s', response, args)
vm = self.factory.vm vm = self.factory.vm
if response == 'status': if response == 'status':
self.send_to_graphite(args) self.send_to_graphite(args)
......
celery==3.1.17 celery==4.4.7
#Twisted==22.10.0
Twisted==20.3.0 Twisted==20.3.0
threadpool==1.3.2 threadpool==1.3.2
setuptools>=58,<70
pip<24
# serializers.py
from kombu.serialization import register
import pickle
def pickle_v2_dumps(obj):
return pickle.dumps(obj, protocol=2)
def pickle_v2_loads(s):
return pickle.loads(s)
register(
'pickle_v2',
pickle_v2_dumps,
pickle_v2_loads,
content_type='application/x-pickle',
content_encoding='binary',
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment