From 3b828db2a2ce753889209b9ff10e55e03957af2d Mon Sep 17 00:00:00 2001 From: Bach Dániel <bd@ik.bme.hu> Date: Mon, 9 Jun 2014 20:24:30 +0200 Subject: [PATCH] handle responses from agent --- protocol.py | 38 +++++++++++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/protocol.py b/protocol.py index 176c523..e12751b 100644 --- a/protocol.py +++ b/protocol.py @@ -5,7 +5,9 @@ import pickle import logging import time import struct +from threading import Event from os import getenv +from celery.result import TimeoutError from utils import SerialLineReceiverBase @@ -62,8 +64,9 @@ class SerialLineReceiver(SerialLineReceiverBase): agent_stopped.apply_async(queue='localhost.man', args=(self.factory.vm, )) if command == 'agent_started': + version = args.get('version', None) agent_started.apply_async(queue='localhost.man', - args=(self.factory.vm, )) + args=(self.factory.vm, version)) if command == 'ping': self.send_response(response='pong', args=args) @@ -71,6 +74,14 @@ class SerialLineReceiver(SerialLineReceiverBase): def handle_response(self, response, args): if response == 'status': self.send_to_graphite(args) + else: + uuid = args.get('uuid', None) + if not uuid: + return + event = self.factory.running_tasks.get(uuid, None) + if event: + self.factory.ended_tasks[uuid] = args + event.set() def connectionMade(self): logger.info("connected to %s" % self.factory.vm) @@ -80,9 +91,34 @@ class SerialLineReceiver(SerialLineReceiverBase): logger.info("disconnected from %s" % self.factory.vm) del reactor.connections[self.factory.vm] + def send_command(self, command, args, timeout=10.0, uuid=None): + if not uuid: + super(SerialLineReceiver, self).send_command(command, args) + return + + event = Event() + args['uuid'] = uuid + self.factory.running_tasks[uuid] = event + self.factory.ended_tasks[uuid] = None + + super(SerialLineReceiver, self).send_command(command, args) + + success = event.wait(timeout) + retval = self.factory.ended_tasks[uuid] + + del self.factory.ended_tasks[uuid] + del self.factory.running_tasks[uuid] + + if not success: + raise TimeoutError() + + return retval + class SerialLineReceiverFactory(protocol.ClientFactory): protocol = SerialLineReceiver def __init__(self, vm): self.vm = vm + self.running_tasks = {} + self.ended_tasks = {} -- libgit2 0.26.0