#!/usr/bin/env python from twisted.internet import protocol, reactor, inotify import pickle import logging import time import struct from os import getenv import gc from utils import SerialLineReceiverBase from agentcelery import agent_started, agent_stopped, renew from celery.utils.log import get_task_logger logger = get_task_logger(__name__) reactor.connections = {} def numObjsByName(name): num = 0 for ob in gc.get_objects(): if isinstance(ob, name): num += 1 return num class GraphiteClientProtocol(protocol.Protocol): def connectionMade(self): logger.info("Monitor connection %s", self.name) timestamp = time.time() data_list = [] for key, value in self.data.items(): if not isinstance(value, dict): continue for k, v in value.items(): data_list.append(('agent.%s.%s.%s' % (self.name, key, k), (timestamp, float(v)))) payload = pickle.dumps(data_list) header = struct.pack("!L", len(payload)) message = header + payload self.transport.write(message) self.transport.loseConnection() logger.debug('s: %s' % self.data) logger.info("Monitor info from: %s", self.name) class GraphiteClientFactory(protocol.ClientFactory): protocol = GraphiteClientProtocol def inotify_handler(self, file, mask): file = file.asTextMode(encoding='utf-8') if file.basename().startswith('cloud'): return vm = file.basename().replace('vio-', '') logger.info('inotify: %s (%s)', vm, file.path) if mask: logger.info("event %s (%s) on %s" % (', '.join(inotify.humanReadableMask(mask)), mask, file)) if vm in reactor.running_tasks: for addr in reactor.running_tasks[vm].get('started', None).copy() : if file.path == addr: if mask and mask == inotify.IN_DELETE and reactor.running_tasks[vm]['started'][addr]: _p = reactor.running_tasks[vm]['started'][addr] logger.info("DELETE %s", _p) logger.info('NumOBJSerialLineReceiverFactory1: %s' , numObjsByName(SerialLineReceiverFactory)) del _p reactor.running_tasks[vm]['started'].pop(addr) logger.info('NumOBJSerialLineReceiverFactory2: %s' , numObjsByName(SerialLineReceiverFactory)) logger.info('reacror_running1 %s', reactor.running_tasks) return elif reactor.running_tasks[vm]['started'][addr]: return serial = SerialLineReceiverFactory(vm) logger.info("connecting to %s (%s)", vm, file.path) ic = reactor.connectUNIX(file.path, serial, 10) logger.info('IConnector state: %s', ic.state) logger.info('reacror_running2 %s', reactor.running_tasks) # logger.info('NumOBJSerialLineReceiverFactory2: %s' , numObjsByName(SerialLineReceiverFactory)) class SerialLineReceiver(SerialLineReceiverBase): def send_to_graphite(self, data): logger.info("Send_TO_Graphite") client = GraphiteClientFactory() client.protocol.data = data client.protocol.name = self.factory.vm reactor.connectTCP(getenv('GRAPHITE_HOST', '127.0.0.1'), int(getenv('GRAPHITE_PORT', '2004')), client) def handle_command(self, command, args): logger.info('serial_command: %s %s', command, args) if command == 'agent_stopped': agent_stopped.apply_async(queue='localhost.man', args=(self.factory.vm, )) elif command == 'agent_started': version = args.get('version', None) system = args.get('system', None) agent_started.apply_async(queue='localhost.man', args=(self.factory.vm, version, system)) elif command == 'renew': renew.apply_async(queue='localhost.man', args=(self.factory.vm, )) elif command == 'ping': self.send_response(response='pong', args=args) def handle_response(self, response, args): logger.info('handle_reponse: %s %s', response, args) vm = self.factory.vm if response == 'status': self.send_to_graphite(args) else: uuid = args.get('uuid', None) if not uuid: return event = reactor.running_tasks[vm].get(uuid, None) if event: reactor.ended_tasks[vm][uuid] = args event.set() def connectionMade(self): logger.info("connected to %s (%s)", self.factory.vm, self.transport.addr) logger.info("reactor connections: %s", reactor.connections) if self.factory.vm not in reactor.connections: reactor.connections[self.factory.vm] = set() logger.info("reactor connections factory: %s", reactor.connections[self.factory.vm]) reactor.connections[self.factory.vm].add(self) logger.info('NumOBJSerialLineReceiver: %s' , numObjsByName(SerialLineReceiver)) def connectionLost(self, reason): logger.info("disconnected from %s (%s)", self.factory.vm, self.transport.addr) reactor.connections[self.factory.vm].remove(self) vm = self.factory.vm # for addr in reactor.running_tasks[vm].get('started', None): # if addr == self.transport.addr : # reactor.running_tasks[vm]['started'][addr] = None logger.info("active connetions: %s", reactor.running_tasks[vm]) class SerialLineReceiverFactory(protocol.ClientFactory): protocol = SerialLineReceiver def __init__(self, vm): self.vm = vm if vm not in reactor.running_tasks: reactor.running_tasks[vm] = {} if vm not in reactor.ended_tasks: reactor.ended_tasks[vm] = {} def startedConnecting(self, connector): vm = self.vm addr = connector.address logger.info("startedConnecting to %s (%s)", vm, addr) logger.info("started connetions: %s", reactor.running_tasks[vm]) if not reactor.running_tasks[vm].get('started', None): reactor.running_tasks[vm]['started'] = {} reactor.running_tasks[vm]['started'][addr] = self logger.info('NumOBJSerialLineReceiverFactory: %s' , numObjsByName(SerialLineReceiverFactory)) logger.info('NumOBJSerialLineReceiver: %s' , numObjsByName(SerialLineReceiver)) logger.info("NumConnetions: %s", reactor.running_tasks) def clientrConnectionLost(self, connector): vm = self.vm addr = connector.address logger.info("clientConnectionLost with %s (%s)", vm, addr) for _addr in reactor.running_tasks[vm].get('started', None): if _addr == addr: reactor.running_tasks[vm]['started'].pop(addr) logger.info("active connetions: %s", reactor.running_tasks[vm]) def clientrConnectionFailed(self, connector, reason): vm = self.vm addr = connector.address logger.info("clientConnectionFailed with %s (%s)", vm, connector.addr) for _addr in reactor.running_tasks[vm].get('started', None): if _addr == addr: reactor.running_tasks[vm]['started'].pop(addr) logger.info("active connetions: %s", reactor.running_tasks[vm])