From f6dde5b949eefc244b4ce9bc866604b7e24d9fbf Mon Sep 17 00:00:00 2001 From: Szeberenyi Imre <szebi@iit.bme.hu> Date: Sun, 11 Feb 2024 18:06:16 +0100 Subject: [PATCH] broken connection + inotify fix --- agentcelery.py | 1 + agentdriver.py | 1 + protocol.py | 88 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------- 3 files changed, 81 insertions(+), 9 deletions(-) diff --git a/agentcelery.py b/agentcelery.py index e4268ad..1c043d5 100644 --- a/agentcelery.py +++ b/agentcelery.py @@ -16,6 +16,7 @@ AMQP_URI = getenv('AMQP_URI') celery = Celery('agent', broker=AMQP_URI) celery.conf.update(CELERY_RESULT_BACKEND='amqp', CELERY_TASK_RESULT_EXPIRES=300, + CELERYD_PREFETCH_MULTIPLIER=32, CELERY_QUEUES=(Queue(HOSTNAME + '.agent', Exchange('agent', type='direct'), routing_key='agent'), )) diff --git a/agentdriver.py b/agentdriver.py index 94ee072..da7b616 100644 --- a/agentdriver.py +++ b/agentdriver.py @@ -26,6 +26,7 @@ Worker.install_platform_tweaks = install_platform_tweaks def reactor_started(): + logger.info("reactor_started") reactor.running_tasks = {} reactor.ended_tasks = {} for f in listdir(SOCKET_DIR): diff --git a/protocol.py b/protocol.py index 49f1022..946af7c 100644 --- a/protocol.py +++ b/protocol.py @@ -1,11 +1,12 @@ #!/usr/bin/env python -from twisted.internet import protocol, reactor +from twisted.internet import protocol, reactor, inotify import pickle import logging import time import struct from os import getenv +import gc from utils import SerialLineReceiverBase @@ -15,9 +16,16 @@ logger = logging.getLogger() reactor.connections = {} +def numObjsByName(name): + num = 0 + for ob in gc.get_objects(): + if isinstance(ob, name): + num += 1 + return num class GraphiteClientProtocol(protocol.Protocol): def connectionMade(self): + logger.info("Monitor connection %s", self.name) timestamp = time.time() data_list = [] for key, value in self.data.items(): @@ -41,18 +49,37 @@ class GraphiteClientFactory(protocol.ClientFactory): def inotify_handler(self, file, mask): + if file.basename().startswith('cloud'): + return vm = file.basename().replace('vio-', '') logger.info('inotify: %s (%s)', vm, file.path) - for conn in reactor.connections.get(vm, []): - if file.path == conn.transport.addr: - return + if mask: + logger.info("event %s (%s) on %s" % (', '.join(inotify.humanReadableMask(mask)), mask, file)) + if vm in reactor.running_tasks: + for addr in reactor.running_tasks[vm].get('started', None).copy() : + if file.path == addr: + if mask and mask == inotify.IN_DELETE and reactor.running_tasks[vm]['started'][addr]: + _p = reactor.running_tasks[vm]['started'][addr] + logger.info("DELETE %s", _p) + logger.info('NumOBJSerialLineReceiverFactory1: %s' , numObjsByName(SerialLineReceiverFactory)) + del _p + reactor.running_tasks[vm]['started'].pop(addr) + logger.info('NumOBJSerialLineReceiverFactory2: %s' , numObjsByName(SerialLineReceiverFactory)) + logger.info('reacror_running1 %s', reactor.running_tasks) + return + elif reactor.running_tasks[vm]['started'][addr]: + return serial = SerialLineReceiverFactory(vm) logger.info("connecting to %s (%s)", vm, file.path) - reactor.connectUNIX(file.path, serial) + ic = reactor.connectUNIX(file.path, serial, 10) + logger.info('IConnector state: %s', ic.state) + logger.info('reacror_running2 %s', reactor.running_tasks) +# logger.info('NumOBJSerialLineReceiverFactory2: %s' , numObjsByName(SerialLineReceiverFactory)) class SerialLineReceiver(SerialLineReceiverBase): def send_to_graphite(self, data): + logger.info("Send_TO_Graphite") client = GraphiteClientFactory() client.protocol.data = data client.protocol.name = self.factory.vm @@ -61,6 +88,7 @@ class SerialLineReceiver(SerialLineReceiverBase): client) def handle_command(self, command, args): + logger.info('serial_command: %s %s', command, args) if command == 'agent_stopped': agent_stopped.apply_async(queue='localhost.man', args=(self.factory.vm, )) @@ -77,6 +105,7 @@ class SerialLineReceiver(SerialLineReceiverBase): args=args) def handle_response(self, response, args): + logger.info('handle_reponse: %s %s', response, args) vm = self.factory.vm if response == 'status': self.send_to_graphite(args) @@ -89,17 +118,25 @@ class SerialLineReceiver(SerialLineReceiverBase): reactor.ended_tasks[vm][uuid] = args event.set() + def connectionMade(self): - logger.info("connected to %s (%s)", self.factory.vm, - self.transport.addr) + logger.info("connected to %s (%s)", self.factory.vm, self.transport.addr) + logger.info("reactor connections: %s", reactor.connections) if self.factory.vm not in reactor.connections: reactor.connections[self.factory.vm] = set() + logger.info("reactor connections factory: %s", reactor.connections[self.factory.vm]) reactor.connections[self.factory.vm].add(self) + logger.info('NumOBJSerialLineReceiver: %s' , numObjsByName(SerialLineReceiver)) def connectionLost(self, reason): - logger.info("disconnected from %s (%s)", self.factory.vm, - self.transport.addr) + logger.info("disconnected from %s (%s)", self.factory.vm, self.transport.addr) reactor.connections[self.factory.vm].remove(self) + vm = self.factory.vm +# for addr in reactor.running_tasks[vm].get('started', None): +# if addr == self.transport.addr : +# reactor.running_tasks[vm]['started'][addr] = None + logger.info("active connetions: %s", reactor.running_tasks[vm]) + class SerialLineReceiverFactory(protocol.ClientFactory): @@ -111,3 +148,36 @@ class SerialLineReceiverFactory(protocol.ClientFactory): reactor.running_tasks[vm] = {} if vm not in reactor.ended_tasks: reactor.ended_tasks[vm] = {} + + def startedConnecting(self, connector): + vm = self.vm + addr = connector.address + logger.info("startedConnecting to %s (%s)", vm, addr) + logger.info("started connetions: %s", reactor.running_tasks[vm]) + if not reactor.running_tasks[vm].get('started', None): + reactor.running_tasks[vm]['started'] = {} + reactor.running_tasks[vm]['started'][addr] = self + logger.info('NumOBJSerialLineReceiverFactory: %s' , numObjsByName(SerialLineReceiverFactory)) + logger.info('NumOBJSerialLineReceiver: %s' , numObjsByName(SerialLineReceiver)) + logger.info("NumConnetions: %s", reactor.running_tasks) + + def clientrConnectionLost(self, connector): + vm = self.vm + addr = connector.address + logger.info("clientConnectionLost with %s (%s)", vm, addr) + for _addr in reactor.running_tasks[vm].get('started', None): + if _addr == addr: + reactor.running_tasks[vm]['started'].pop(addr) + logger.info("active connetions: %s", reactor.running_tasks[vm]) + + def clientrConnectionFailed(self, connector, reason): + vm = self.vm + addr = connector.address + logger.info("clientConnectionFailed with %s (%s)", vm, connector.addr) + for _addr in reactor.running_tasks[vm].get('started', None): + if _addr == addr: + reactor.running_tasks[vm]['started'].pop(addr) + logger.info("active connetions: %s", reactor.running_tasks[vm]) + + + -- libgit2 0.26.0