mirror of
https://github.com/CopterExpress/clever-show.git
synced 2026-06-04 11:09:33 +00:00
Client ad server sharing same connection handler now
This commit is contained in:
@@ -4,6 +4,7 @@ import random
|
||||
import socket
|
||||
import struct
|
||||
import logging
|
||||
import collections
|
||||
import selectors2 as selectors
|
||||
import ConfigParser
|
||||
from contextlib import closing
|
||||
@@ -13,7 +14,7 @@ current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentfra
|
||||
parent_dir = os.path.dirname(current_dir)
|
||||
sys.path.insert(0, parent_dir)
|
||||
|
||||
from messaging_lib import Message
|
||||
import messaging_lib as messaging
|
||||
random.seed()
|
||||
|
||||
logging.basicConfig( # TODO all prints as logs
|
||||
@@ -29,6 +30,9 @@ class Client:
|
||||
def __init__(self, config_path="client_config.ini"):
|
||||
self.selector = selectors.DefaultSelector()
|
||||
self.client_socket = None
|
||||
|
||||
self.server_connection = messaging.ConnectionManager()
|
||||
|
||||
self.server_host = None
|
||||
self.server_port = None
|
||||
self.broadcast_port = None
|
||||
@@ -70,7 +74,7 @@ class Client:
|
||||
|
||||
self.client_id = self.config.get('PRIVATE', 'id')
|
||||
if self.client_id == 'default':
|
||||
client_id = 'copter' + str(random.randrange(9999)).zfill(4)
|
||||
self.client_id = 'copter' + str(random.randrange(9999)).zfill(4)
|
||||
#write_to_config('PRIVATE', 'id', client_id)
|
||||
elif self.client_id == '/hostname':
|
||||
self.client_id = socket.gethostname()
|
||||
@@ -105,11 +109,6 @@ class Client:
|
||||
logging.critical("Shutting down on keyboard interrupt")
|
||||
raise KeyboardInterrupt
|
||||
else:
|
||||
self.connected = True
|
||||
#self.client_socket.settimeout(None)
|
||||
self.client_socket.setblocking(False)
|
||||
events = selectors.EVENT_READ | selectors.EVENT_WRITE
|
||||
self.selector.register(self.client_socket, events, data=None)
|
||||
logging.info("Connection to server successful!")
|
||||
break
|
||||
|
||||
@@ -118,14 +117,20 @@ class Client:
|
||||
self.broadcast_bind()
|
||||
attempt_count = 0
|
||||
|
||||
def _connect(self):
|
||||
self.connected = True
|
||||
self.client_socket.setblocking(False)
|
||||
events = selectors.EVENT_READ | selectors.EVENT_WRITE
|
||||
self.selector.register(self.client_socket, events, data=None)
|
||||
|
||||
def broadcast_bind(self):
|
||||
broadcast_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
broadcast_client.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
||||
broadcast_client.bind(("", self.broadcast_port))
|
||||
try:
|
||||
while True:
|
||||
data, addr = broadcast_client.recvfrom(1024)
|
||||
message = Message()
|
||||
data, addr = broadcast_client.recvfrom(self.BUFFER_SIZE)
|
||||
message = messaging.MessageManager()
|
||||
message.income_raw = data
|
||||
message.process_message()
|
||||
if message.content:
|
||||
@@ -141,39 +146,25 @@ class Client:
|
||||
finally:
|
||||
broadcast_client.close()
|
||||
|
||||
def service_connection(self, key, mask):
|
||||
sock = key.fileobj
|
||||
data = key.data
|
||||
if mask & selectors.EVENT_READ:
|
||||
recv_data = sock.recv(1024) # Should be ready to read
|
||||
if recv_data:
|
||||
print("received", repr(recv_data), "from connection", )
|
||||
if not recv_data:
|
||||
print("closing connection",)
|
||||
self.selector.unregister(sock)
|
||||
self.client_socket.close()
|
||||
if mask & selectors.EVENT_READ:
|
||||
pass
|
||||
|
||||
|
||||
def mainloop(self):
|
||||
#self.client_socket.send("104771313759739")
|
||||
try:
|
||||
while True:
|
||||
events = self.selector.select(timeout=1)
|
||||
if events:
|
||||
for key, mask in events:
|
||||
self.service_connection(key, mask)
|
||||
pass
|
||||
if key.data is None:
|
||||
connection = key.data
|
||||
connection.process_events(mask)
|
||||
|
||||
if not self.selector.get_map():
|
||||
logging.warning("No active connections left")
|
||||
logging.warning("No active connections left!")
|
||||
self.reconnect()
|
||||
except KeyboardInterrupt:
|
||||
print("caught keyboard interrupt, exiting")
|
||||
except (KeyboardInterrupt, errno.EINTR):
|
||||
logging.critical("Caught interrupt, exiting!")
|
||||
finally:
|
||||
self.selector.close()
|
||||
|
||||
# TODO class connection
|
||||
|
||||
if __name__ == "__main__":
|
||||
client = Client()
|
||||
|
||||
242
Server/server.py
242
Server/server.py
@@ -8,7 +8,7 @@ import selectors
|
||||
import collections
|
||||
import configparser
|
||||
|
||||
from messaging_lib import Message
|
||||
import messaging_lib as messaging
|
||||
|
||||
# All imports sorted in pyramid just because
|
||||
|
||||
@@ -23,9 +23,6 @@ logging.basicConfig( # TODO all prints as logs
|
||||
])
|
||||
|
||||
ConfigOption = collections.namedtuple("ConfigOption", ["section", "option", "value"])
|
||||
PendingRequest = collections.namedtuple("PendingRequest", ["value", "requested_value", # "expires_on",
|
||||
"callback", "callback_args", "callback_kwargs"
|
||||
])
|
||||
|
||||
|
||||
class Server:
|
||||
@@ -149,7 +146,7 @@ class Server:
|
||||
|
||||
def _ip_broadcast(self):
|
||||
logging.info("Broadcast sender thread started!")
|
||||
msg = Message.create_simple_message(
|
||||
msg = messaging.MessageManager.create_simple_message(
|
||||
"server_ip", {"host": self.ip, "port": str(self.port), "id": self.id})
|
||||
broadcast_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
|
||||
broadcast_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
@@ -178,7 +175,7 @@ class Server:
|
||||
try:
|
||||
while self.listener_thread_running.is_set():
|
||||
data, addr = broadcast_client.recvfrom(1024)
|
||||
message = Message()
|
||||
message = messaging.MessageManager()
|
||||
message.income_raw = data
|
||||
message.process_message()
|
||||
if message.content:
|
||||
@@ -202,7 +199,7 @@ class Server:
|
||||
def send_starttime(self, copter, dt=0):
|
||||
timenow = self.time_now()
|
||||
print('Now:', time.ctime(timenow), "+ dt =", dt)
|
||||
copter.send(Message.create_simple_message("starttime", {"time": str(timenow + dt)})) # TODO change start_on
|
||||
copter.send_message("starttime", {"time": str(timenow + dt)}) # TODO change start_on
|
||||
# TODO all commands as tasks with timestamp and priority
|
||||
|
||||
|
||||
@@ -224,9 +221,7 @@ def requires_any_connected(f):
|
||||
return wrapper
|
||||
|
||||
|
||||
class Client:
|
||||
resume_queue = True
|
||||
|
||||
class Client(messaging.ConnectionManager):
|
||||
clients = {}
|
||||
|
||||
on_connect = None # Use as callback functions
|
||||
@@ -234,48 +229,24 @@ class Client:
|
||||
on_disconnect = None
|
||||
|
||||
def __init__(self, ip):
|
||||
self.selector = None
|
||||
self.socket = None
|
||||
self.addr = None
|
||||
|
||||
self._recv_buffer = b""
|
||||
self._send_buffer = b""
|
||||
|
||||
self._send_queue = collections.deque()
|
||||
self._received_queue = collections.deque()
|
||||
self._request_queue = collections.OrderedDict()
|
||||
|
||||
self._send_lock = threading.Lock()
|
||||
self._request_lock = threading.Lock()
|
||||
|
||||
super(Client, self).__init__()
|
||||
self.copter_id = None
|
||||
self.connected = False
|
||||
|
||||
self.clients[ip] = self
|
||||
|
||||
self.connected = False
|
||||
|
||||
def _set_selector_events_mask(self, mode):
|
||||
"""Set selector to listen for events: mode is 'r', 'w', 'rw'."""
|
||||
if mode == "r":
|
||||
events = selectors.EVENT_READ
|
||||
elif mode == "w":
|
||||
events = selectors.EVENT_WRITE
|
||||
elif mode == "rw":
|
||||
events = selectors.EVENT_READ | selectors.EVENT_WRITE
|
||||
else:
|
||||
raise ValueError("Invalid events mask mode {}.".format(mode))
|
||||
self.selector.modify(self.socket, events, data=self)
|
||||
@staticmethod
|
||||
def get_by_id(copter_id):
|
||||
for client in Client.clients.values():
|
||||
if client.copter_id == copter_id:
|
||||
return client
|
||||
|
||||
def connect(self, client_selector, client_socket, client_addr):
|
||||
logging.info("Client connected")
|
||||
if not Client.resume_queue:
|
||||
if not self.resume_queue:
|
||||
self._send_queue = collections.deque()
|
||||
|
||||
self.selector = client_selector
|
||||
self.socket = client_socket
|
||||
self.addr = client_addr
|
||||
|
||||
self._set_selector_events_mask('rw')
|
||||
super(Client, self).connect(client_selector, client_socket, client_addr)
|
||||
|
||||
self.connected = True
|
||||
|
||||
@@ -291,175 +262,18 @@ class Client:
|
||||
Client.on_first_connect(self)
|
||||
|
||||
def close(self):
|
||||
logging.info("Closing connection to {}".format(self.addr))
|
||||
try:
|
||||
self.selector.unregister(self.socket)
|
||||
except AttributeError:
|
||||
pass
|
||||
except Exception as error:
|
||||
logging.error("{}: Error during selector unregistering: {}".format(self.addr, error))
|
||||
finally:
|
||||
self.selector = None
|
||||
self.connected = False
|
||||
|
||||
try:
|
||||
self.socket.close()
|
||||
except AttributeError:
|
||||
pass
|
||||
except OSError as error:
|
||||
logging.error("{}: Error during socket closing: {}".format(self.addr, error))
|
||||
finally:
|
||||
self.socket = None
|
||||
if Client.on_disconnect:
|
||||
Client.on_disconnect(self)
|
||||
|
||||
def process_events(self, mask):
|
||||
print(self.socket, self.selector, mask)
|
||||
if mask & selectors.EVENT_READ:
|
||||
self.read()
|
||||
if mask & selectors.EVENT_WRITE:
|
||||
self.write()
|
||||
super(Client, self).close()
|
||||
|
||||
with self._send_lock:
|
||||
if (not self._send_buffer) and self._send_queue:
|
||||
message = self._send_queue.popleft()
|
||||
self._send_buffer += message
|
||||
# self._set_selector_events_mask('rw')
|
||||
|
||||
def process_received(self):
|
||||
if self._received_queue:
|
||||
if self._received_queue[-1].content:
|
||||
message = self._received_queue.pop()
|
||||
message_type = message.jsonheader["message-type"]
|
||||
logging.debug("Received message! Header: {}, content: {}".format(message.jsonheader, message.content))
|
||||
if message_type == "message":
|
||||
pass
|
||||
elif message_type == "response":
|
||||
request_id, requested_value = message.content["requst_id"], message.content["requested_value"]
|
||||
with self._request_lock:
|
||||
for key, value in self._request_queue.items():
|
||||
if (key == request_id) and (value.requested_value == requested_value):
|
||||
request = self._request_queue.pop(key)
|
||||
request.value = message.content["value"]
|
||||
logging.debug(
|
||||
"Request successfully closed with value {}".format(message.content["value"])
|
||||
)
|
||||
request.callback(request.value, *request.callback_args, **request.callback_kwargs)
|
||||
break
|
||||
else:
|
||||
logging.warning("Unexpected request response!")
|
||||
elif message_type == "request":
|
||||
pass
|
||||
|
||||
def read(self):
|
||||
self._read()
|
||||
if self._recv_buffer:
|
||||
if not self._received_queue or (self._received_queue[0].content is not None):
|
||||
self._received_queue.appendleft(Message())
|
||||
|
||||
self._received_queue[0].income_raw += self._recv_buffer
|
||||
self._received_queue[0].process_message()
|
||||
|
||||
if self._received_queue[0].content and self._received_queue[0].income_raw:
|
||||
self._recv_buffer = self._received_queue[0].income_raw + self._recv_buffer
|
||||
self._received_queue[0].income_raw = b''
|
||||
|
||||
self.process_received()
|
||||
|
||||
def write(self):
|
||||
self._write()
|
||||
# if not (self._send_buffer and self._send_queue):
|
||||
# self._set_selector_events_mask("r")
|
||||
|
||||
def _read(self):
|
||||
try:
|
||||
data = self.socket.recv(Server.BUFFER_SIZE)
|
||||
except BlockingIOError:
|
||||
# Resource temporarily unavailable (errno EWOULDBLOCK)
|
||||
pass
|
||||
else:
|
||||
if data:
|
||||
self._recv_buffer += data
|
||||
logging.debug("Received {} from {}".format(data, self.addr))
|
||||
else:
|
||||
logging.warning("Connection to {} lost!".format(self.addr))
|
||||
self.connected = False
|
||||
if not Client.resume_queue:
|
||||
self._recv_buffer = b''
|
||||
|
||||
if Client.on_disconnect:
|
||||
Client.on_disconnect(self)
|
||||
|
||||
raise RuntimeError("Peer closed.")
|
||||
|
||||
def _write(self):
|
||||
if self._send_buffer:
|
||||
try:
|
||||
sent = self.socket.send(self._send_buffer)
|
||||
except BlockingIOError:
|
||||
# Resource temporarily unavailable (errno EWOULDBLOCK)
|
||||
pass
|
||||
except socket.error as error:
|
||||
logging.warning("Attempt to send message {} to {} failed due error: {}".format(
|
||||
self._send_buffer, self.addr, error))
|
||||
|
||||
if not Client.resume_queue:
|
||||
self._send_buffer = b''
|
||||
|
||||
raise error
|
||||
else:
|
||||
print(sent)
|
||||
logging.debug("Sent {} to {}".format(self._send_buffer[:sent], self.addr))
|
||||
self._send_buffer = self._send_buffer[sent:]
|
||||
print(self._send_buffer)
|
||||
time.sleep(0.1)
|
||||
|
||||
@requires_connect
|
||||
def send(self, *messages):
|
||||
for message in messages:
|
||||
with self._send_lock:
|
||||
self._send_queue.append(message)
|
||||
|
||||
def send_message(self, command, args=None):
|
||||
self.send(Message.create_simple_message(command, args))
|
||||
|
||||
@staticmethod
|
||||
@requires_any_connected
|
||||
def broadcast(message, force_all=False):
|
||||
for client in Client.clients.values():
|
||||
if client.connected or force_all:
|
||||
client.send(message)
|
||||
|
||||
@classmethod
|
||||
@requires_any_connected
|
||||
def broadcast_message(cls, command, args=None, force_all=False):
|
||||
cls.broadcast(Message.create_simple_message(command, args), force_all)
|
||||
|
||||
def get_response(self, requested_value, callback, request_args=None, # timeout=30,
|
||||
callback_args=(), callback_kwargs: dict=None):
|
||||
if request_args is None:
|
||||
request_args = {}
|
||||
if callback_kwargs is None:
|
||||
callback_kwargs = {}
|
||||
|
||||
request_id = str(random.randint(0, 9999)).zfill(4)
|
||||
with self._request_lock:
|
||||
self._request_queue[request_id] = PendingRequest(
|
||||
requested_value=requested_value,
|
||||
value=None,
|
||||
# expires_on=Server.time_now()+timeout,
|
||||
callback=callback,
|
||||
callback_args=callback_args,
|
||||
callback_kwargs=callback_kwargs,
|
||||
)
|
||||
self.send(Message.create_request(requested_value, request_id, request_args))
|
||||
|
||||
def send_file(self, filepath, dest_filepath): # clever_restart=False
|
||||
try:
|
||||
with open(filepath, 'rb') as file:
|
||||
data = file.read()
|
||||
except OSError as error:
|
||||
logging.warning("File can not be opened due error: ".format(error))
|
||||
else:
|
||||
logging.info("Sending file {} to {} (as: {})".format(filepath, self.addr, dest_filepath))
|
||||
self.send(Message.create_message(data, "binary", "filetransfer", "binary", {"path": dest_filepath}))
|
||||
def _send(self, data):
|
||||
super(Client, self)._send(data)
|
||||
logging.debug("Queued data to send: {}".format(data))
|
||||
|
||||
def send_config_options(self, *options: ConfigOption):
|
||||
logging.info("Sending config options: {} to {}".format(options, self.addr))
|
||||
@@ -471,10 +285,16 @@ class Client:
|
||||
self.send_message("config_reload")
|
||||
|
||||
@staticmethod
|
||||
def get_by_id(copter_id):
|
||||
for copter in Client.clients.values():
|
||||
if copter.copter_id == copter_id:
|
||||
return copter
|
||||
@requires_any_connected
|
||||
def broadcast(message, force_all=False):
|
||||
for client in Client.clients.values():
|
||||
if client.connected or force_all:
|
||||
client.send(message)
|
||||
|
||||
@classmethod
|
||||
@requires_any_connected
|
||||
def broadcast_message(cls, command, args=None, force_all=False):
|
||||
cls.broadcast(messaging.MessageManager.create_simple_message(command, args), force_all)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
259
messaging_lib.py
259
messaging_lib.py
@@ -2,9 +2,22 @@ import io
|
||||
import sys
|
||||
import json
|
||||
import struct
|
||||
import random
|
||||
import logging
|
||||
import threading
|
||||
import collections
|
||||
|
||||
try:
|
||||
import selectors
|
||||
except:
|
||||
import selectors2 as selectors
|
||||
|
||||
PendingRequest = collections.namedtuple("PendingRequest", ["value", "requested_value", # "expires_on",
|
||||
"callback", "callback_args", "callback_kwargs"
|
||||
])
|
||||
|
||||
|
||||
class Message:
|
||||
class MessageManager:
|
||||
def __init__(self):
|
||||
self.income_raw = b""
|
||||
self._jsonheader_len = None
|
||||
@@ -62,6 +75,15 @@ class Message:
|
||||
message = cls.create_message(cls._json_encode(contents), "json", "request")
|
||||
return message
|
||||
|
||||
@classmethod
|
||||
def create_response(cls, requested_value, request_id, value):
|
||||
contents = {"requested_value": requested_value,
|
||||
"requst_id": request_id,
|
||||
"value": value,
|
||||
}
|
||||
message = cls.create_message(cls._json_encode(contents), "json", "response")
|
||||
return message
|
||||
|
||||
def _process_protoheader(self):
|
||||
header_len = 2
|
||||
if len(self.income_raw) >= header_len:
|
||||
@@ -106,3 +128,238 @@ class Message:
|
||||
if self.jsonheader:
|
||||
if self.content is None:
|
||||
self._process_content()
|
||||
|
||||
|
||||
def message_callback(string_command):
|
||||
def inner(f):
|
||||
ConnectionManager.requests_callbacks.update({string_command: f})
|
||||
|
||||
def wrapper(*args, **kwargs):
|
||||
return f(*args, **kwargs)
|
||||
return wrapper
|
||||
return inner
|
||||
|
||||
|
||||
class ConnectionManager(object):
|
||||
messages_callbacks = {}
|
||||
requests_callbacks = {}
|
||||
|
||||
def __init__(self):
|
||||
self.selector = None
|
||||
self.socket = None
|
||||
self.addr = None
|
||||
|
||||
self.selector = None
|
||||
self.socket = None
|
||||
self.addr = None
|
||||
|
||||
self._recv_buffer = b""
|
||||
self._send_buffer = b""
|
||||
|
||||
self._send_queue = collections.deque()
|
||||
self._received_queue = collections.deque()
|
||||
self._request_queue = collections.OrderedDict()
|
||||
|
||||
self._send_lock = threading.Lock()
|
||||
self._request_lock = threading.Lock()
|
||||
|
||||
self.BUFFER_SIZE = 1024
|
||||
self.resume_queue = True
|
||||
|
||||
def _set_selector_events_mask(self, mode):
|
||||
"""Set selector to listen for events: mode is 'r', 'w', 'rw'."""
|
||||
if mode == "r":
|
||||
events = selectors.EVENT_READ
|
||||
elif mode == "w":
|
||||
events = selectors.EVENT_WRITE
|
||||
elif mode == "rw":
|
||||
events = selectors.EVENT_READ | selectors.EVENT_WRITE
|
||||
else:
|
||||
raise ValueError("Invalid events mask mode {}.".format(mode))
|
||||
self.selector.modify(self.socket, events, data=self)
|
||||
|
||||
def connect(self, client_selector, client_socket, client_addr):
|
||||
self.selector = client_selector
|
||||
self.socket = client_socket
|
||||
self.addr = client_addr
|
||||
|
||||
self._set_selector_events_mask('rw')
|
||||
|
||||
def close(self):
|
||||
logging.info("Closing connection to {}".format(self.addr))
|
||||
try:
|
||||
self.selector.unregister(self.socket)
|
||||
except AttributeError:
|
||||
pass
|
||||
except Exception as error:
|
||||
logging.error("{}: Error during selector unregistering: {}".format(self.addr, error))
|
||||
finally:
|
||||
self.selector = None
|
||||
|
||||
try:
|
||||
self.socket.close()
|
||||
except AttributeError:
|
||||
pass
|
||||
except OSError as error:
|
||||
logging.error("{}: Error during socket closing: {}".format(self.addr, error))
|
||||
finally:
|
||||
self.socket = None
|
||||
|
||||
def process_events(self, mask):
|
||||
print(self.socket, self.selector, mask)
|
||||
if mask & selectors.EVENT_READ:
|
||||
self.read()
|
||||
if mask & selectors.EVENT_WRITE:
|
||||
self.write()
|
||||
|
||||
def read(self):
|
||||
self._read()
|
||||
if self._recv_buffer:
|
||||
if not self._received_queue or (self._received_queue[0].content is not None):
|
||||
self._received_queue.appendleft(MessageManager())
|
||||
|
||||
self._received_queue[0].income_raw += self._recv_buffer
|
||||
self._received_queue[0].process_message()
|
||||
|
||||
if self._received_queue[0].content and self._received_queue[0].income_raw:
|
||||
self._recv_buffer = self._received_queue[0].income_raw + self._recv_buffer
|
||||
self._received_queue[0].income_raw = b''
|
||||
|
||||
self.process_received()
|
||||
|
||||
def _read(self):
|
||||
try:
|
||||
data = self.socket.recv(self.BUFFER_SIZE)
|
||||
except io.BlockingIOError:
|
||||
# Resource temporarily unavailable (errno EWOULDBLOCK)
|
||||
pass
|
||||
else:
|
||||
if data:
|
||||
self._recv_buffer += data
|
||||
logging.debug("Received {} from {}".format(data, self.addr))
|
||||
else:
|
||||
logging.warning("Connection to {} lost!".format(self.addr))
|
||||
if not self.resume_queue:
|
||||
self._recv_buffer = b''
|
||||
|
||||
raise RuntimeError("Peer closed.")
|
||||
|
||||
def process_received(self):
|
||||
if self._received_queue:
|
||||
if self._received_queue[-1].content:
|
||||
message = self._received_queue.pop()
|
||||
message_type = message.jsonheader["message-type"]
|
||||
logging.debug("Received message! Header: {}, content: {}".format(message.jsonheader, message.content))
|
||||
if message_type == "message":
|
||||
self._process_message(message)
|
||||
elif message_type == "response":
|
||||
self._process_response(message)
|
||||
elif message_type == "request":
|
||||
self._process_request(message)
|
||||
elif message_type == "filetransfer":
|
||||
self._process_filetransfer(message)
|
||||
|
||||
def _process_message(self, message):
|
||||
command = message.content["command"]
|
||||
args = message.content["args"]
|
||||
self.messages_callbacks[command](**args)
|
||||
|
||||
def _process_request(self, message):
|
||||
command = message.content["requested_value"]
|
||||
request_id = message.content["request_id"]
|
||||
args = message.content["args"]
|
||||
value = self.requests_callbacks[command](**args)
|
||||
self._send_response(command, request_id, value)
|
||||
|
||||
def _process_response(self, message):
|
||||
request_id, requested_value = message.content["requst_id"], message.content["requested_value"]
|
||||
with self._request_lock:
|
||||
for key, value in self._request_queue.items():
|
||||
if (key == request_id) and (value.requested_value == requested_value):
|
||||
request = self._request_queue.pop(key)
|
||||
request.value = message.content["value"]
|
||||
logging.debug(
|
||||
"Request successfully closed with value {}".format(message.content["value"])
|
||||
)
|
||||
request.callback(request.value, *request.callback_args, **request.callback_kwargs)
|
||||
break
|
||||
else:
|
||||
logging.warning("Unexpected response!")
|
||||
|
||||
def _process_filetransfer(self, message):
|
||||
if message.jsonheader["content-type"] == "binary":
|
||||
filepath = message.jsonheader["filepath"]
|
||||
try:
|
||||
with open(filepath, 'wb') as f:
|
||||
f.write(message.content)
|
||||
except OSError as error:
|
||||
logging.warning("File can not be written due error: ".format(error))
|
||||
else:
|
||||
logging.info("File {} successfully received ".format(filepath))
|
||||
|
||||
def write(self):
|
||||
with self._send_lock:
|
||||
if (not self._send_buffer) and self._send_queue:
|
||||
message = self._send_queue.popleft()
|
||||
self._send_buffer += message
|
||||
if self._send_buffer:
|
||||
self._write()
|
||||
|
||||
def _write(self):
|
||||
try:
|
||||
sent = self.socket.send(self._send_buffer)
|
||||
except io.BlockingIOError:
|
||||
# Resource temporarily unavailable (errno EWOULDBLOCK)
|
||||
pass
|
||||
except Exception as error:
|
||||
logging.warning("Attempt to send message {} to {} failed due error: {}".format(
|
||||
self._send_buffer, self.addr, error))
|
||||
|
||||
if not self.resume_queue:
|
||||
self._send_buffer = b''
|
||||
|
||||
raise error
|
||||
else:
|
||||
logging.debug("Sent {} to {}".format(self._send_buffer[:sent], self.addr))
|
||||
self._send_buffer = self._send_buffer[sent:]
|
||||
|
||||
def _send(self, data):
|
||||
with self._send_lock:
|
||||
self._send_queue.append(data)
|
||||
|
||||
def get_response(self, requested_value, callback, request_args=None, # timeout=30,
|
||||
callback_args=(), callback_kwargs=None):
|
||||
if request_args is None:
|
||||
request_args = {}
|
||||
if callback_kwargs is None:
|
||||
callback_kwargs = {}
|
||||
|
||||
request_id = str(random.randint(0, 9999)).zfill(4)
|
||||
with self._request_lock:
|
||||
self._request_queue[request_id] = PendingRequest(
|
||||
requested_value=requested_value,
|
||||
value=None,
|
||||
# expires_on=Server.time_now()+timeout, #TODO
|
||||
callback=callback,
|
||||
callback_args=callback_args,
|
||||
callback_kwargs=callback_kwargs,
|
||||
)
|
||||
self._send(MessageManager.create_request(requested_value, request_id, request_args))
|
||||
|
||||
def send_message(self, command, args=None):
|
||||
self._send(MessageManager.create_simple_message(command, args))
|
||||
|
||||
def _send_response(self, requested_value, request_id, value):
|
||||
self._send(MessageManager.create_response(requested_value, request_id, value))
|
||||
|
||||
def send_file(self, filepath, dest_filepath): # clever_restart=False
|
||||
try:
|
||||
with open(filepath, 'rb') as f:
|
||||
data = f.read()
|
||||
except OSError as error:
|
||||
logging.warning("File can not be opened due error: ".format(error))
|
||||
else:
|
||||
logging.info("Sending file {} to {} (as: {})".format(filepath, self.addr, dest_filepath))
|
||||
self._send(MessageManager.create_message(
|
||||
data, "binary", "filetransfer", "binary", {"filepath": dest_filepath}
|
||||
))
|
||||
Reference in New Issue
Block a user