mirror of
https://github.com/CopterExpress/clever-show.git
synced 2026-05-26 07:07:58 +00:00
refactored communication, added example
This commit is contained in:
35
examples/code/protocol.py
Normal file
35
examples/code/protocol.py
Normal file
@@ -0,0 +1,35 @@
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
import lib.protocols as p
|
||||
import lib.messages as messages
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG,
|
||||
format="%(asctime)s [%(name)-7.7s] [%(threadName)-19.19s] [%(levelname)-7.7s] %(message)s",
|
||||
handlers=[
|
||||
logging.StreamHandler()
|
||||
])
|
||||
|
||||
async def server():
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
server = await loop.create_server(lambda: p.PeerProtocol(None, None), host='', port=8181)
|
||||
|
||||
await server.wait_closed()
|
||||
|
||||
async def client():
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
transport, protocol = await loop.create_connection(lambda: p.PeerProtocol(None, None),
|
||||
host='', port=8181)
|
||||
await protocol.send(messages.Request("greetings"))
|
||||
|
||||
await protocol.closed
|
||||
|
||||
async def main():
|
||||
s = asyncio.create_task(server())
|
||||
c = asyncio.create_task(client())
|
||||
await asyncio.gather(s, c)
|
||||
|
||||
asyncio.run(main(), debug=True)
|
||||
314
lib/connections.py
Normal file
314
lib/connections.py
Normal file
@@ -0,0 +1,314 @@
|
||||
"""
|
||||
`messaging` is an universal for server and clients module (running both on Python 2.7 and 3.6+). This module contains utility functions and classes implementing high level protocol for TCP socket communication.
|
||||
"""
|
||||
import os
|
||||
import random
|
||||
import asyncio
|
||||
# import aiofiles
|
||||
import logging
|
||||
import traceback
|
||||
|
||||
import messages
|
||||
import exceptions
|
||||
from messages import MessageDecoder, Request
|
||||
from protocols import PeerProtocol
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class CallbackManager:
|
||||
def __init__(self):
|
||||
self.action_callbacks = dict()
|
||||
self.request_callbacks = dict()
|
||||
|
||||
self.connected_callback = None
|
||||
self.disconnected_callback = None
|
||||
|
||||
@staticmethod
|
||||
def _register_function(d, key):
|
||||
def inner(f):
|
||||
if not callable(f):
|
||||
raise TypeError("f should be callable")
|
||||
d[key] = f
|
||||
logger.debug("Registered callback function {} for {}".format(f, key))
|
||||
return f
|
||||
|
||||
return inner
|
||||
|
||||
def action_callback(self, key):
|
||||
return self._register_function(self.action_callbacks, key)
|
||||
|
||||
def request_callback(self, key):
|
||||
return self._register_function(self.request_callbacks, key)
|
||||
|
||||
class temp:
|
||||
def _process_received(self, message: MessageDecoder):
|
||||
message_type = message.message_type
|
||||
# content = message.content if message.content_type != "binary" else message.content[:256]
|
||||
# logger.debug(f"Received message! Header: {message.jsonheader}, content: {content}")
|
||||
|
||||
def _process_message(self, message: MessageDecoder):
|
||||
if message.jsonheader["action"] == "filetransfer":
|
||||
self._write_file(message.jsonheader["filepath"], message.content)
|
||||
else:
|
||||
self._process_action(message)
|
||||
|
||||
def _process_action(self, message: MessageDecoder):
|
||||
action = message.header["action"]
|
||||
args = message.content["args"]
|
||||
kwargs = message.content["kwargs"]
|
||||
try:
|
||||
callback = self._callbacks.action_callbacks[action]
|
||||
except KeyError:
|
||||
logger.warning(f"Action '{action}' does not exist!")
|
||||
return
|
||||
|
||||
try:
|
||||
callback(self, *args, **kwargs)
|
||||
except Exception as error:
|
||||
logger.error("Error during action {} execution: {}".format(action, error))
|
||||
# traceback.print_exc()
|
||||
|
||||
def _process_request(self, message):
|
||||
requested_value = message.content["requested_value"]
|
||||
request_id = message.content["request_id"]
|
||||
args = message.content["args"]
|
||||
kwargs = message.content["kwargs"]
|
||||
|
||||
filetransfer = requested_value == "filetransfer"
|
||||
try:
|
||||
if filetransfer:
|
||||
value = self._read_file(kwargs["filepath"])
|
||||
else:
|
||||
try:
|
||||
callback = self._callbacks.request_callbacks[requested_value]
|
||||
except KeyError:
|
||||
logger.warning(f"Request '{requested_value}' does not exist!")
|
||||
raise
|
||||
|
||||
value = callback(self, *args, **kwargs)
|
||||
except Exception as error: # TODO send response error\cancel
|
||||
logger.error("Error during request {} processing: {}".format(requested_value, error))
|
||||
# self._send_status_response(error.message, error.args)
|
||||
else:
|
||||
self._send_response(requested_value, request_id, value, filetransfer)
|
||||
|
||||
@staticmethod
|
||||
def _read_file(filepath):
|
||||
with open(filepath, mode='rb') as f:
|
||||
return f.read()
|
||||
|
||||
def _write_file(self, filepath, content):
|
||||
try:
|
||||
with open(filepath, 'wb') as f:
|
||||
f.write(content)
|
||||
except OSError as error:
|
||||
logger.error("File {} can not be written due error: {}".format(filepath, error))
|
||||
else:
|
||||
logger.info("File {} successfully received ".format(filepath))
|
||||
# if self.whoami == "pi":
|
||||
# logger.info("Return rights to pi:pi after file transfer")
|
||||
# os.system("chown pi:pi {}".format(filepath))
|
||||
# process = await
|
||||
|
||||
# Sending api functions
|
||||
|
||||
class Connection:
|
||||
def __init__(self, callbacks):
|
||||
self.callbacks = callbacks
|
||||
|
||||
self.protocol: PeerProtocol = None
|
||||
self.transport = None
|
||||
|
||||
self._sent_requests = dict() # holds requests awaiting reply
|
||||
self._recv_requests = asyncio.Queue()
|
||||
|
||||
def connect(self, protocol: PeerProtocol, transport):
|
||||
self.protocol = protocol
|
||||
self.transport = transport
|
||||
|
||||
async def close(self):
|
||||
self.transport.close()
|
||||
await self.protocol.closed
|
||||
|
||||
async def send(self, request: Request):
|
||||
self._sent_requests[request.chain_id] = request
|
||||
|
||||
try:
|
||||
await self.protocol.send(request)
|
||||
response = await request.response
|
||||
except asyncio.TimeoutError:
|
||||
self._sent_requests.pop(request.chain_id, None)
|
||||
request.sent.cancel("Request timed out")
|
||||
request.response.set_exception(exceptions.ConnectionClosedError)
|
||||
raise
|
||||
else:
|
||||
return response
|
||||
|
||||
async def send_reply(self, response: messages.Response):
|
||||
await self.protocol.send(response)
|
||||
|
||||
async def receive(self):
|
||||
if self._recv_waiter is not None:
|
||||
raise RuntimeError("receive() is already being awaited")
|
||||
|
||||
self._recv_waiter = asyncio.create_task(self._recv_requests.get())
|
||||
|
||||
try:
|
||||
return await self._recv_waiter
|
||||
except asyncio.CancelledError:
|
||||
raise exceptions.ConnectionClosedError
|
||||
finally:
|
||||
self._recv_waiter = None
|
||||
|
||||
async def _receive(self):
|
||||
msg = await self.protocol.receive_message()
|
||||
if msg.message_type == messages.MessageTypes.RESPONSE:
|
||||
await self._process_response(msg)
|
||||
elif msg.message_type == messages.MessageTypes.REQUEST:
|
||||
pass
|
||||
|
||||
async def _process_response(self, msg: MessageDecoder):
|
||||
try:
|
||||
request = self._sent_requests[msg.chain_id]
|
||||
except KeyError:
|
||||
logger.error(f"Unknown response {msg} with id {msg.chain_id}")
|
||||
return
|
||||
|
||||
request.response.set_result(msg)
|
||||
|
||||
async def _process_request(self):
|
||||
pass
|
||||
|
||||
class LegacyConnectionManager:
|
||||
"""
|
||||
This class represents high-level protocol of TCP connection.
|
||||
|
||||
Attributes:
|
||||
addr (str): Address of the peer.
|
||||
buffer_size (int): Size of the sending/receiving buffer.
|
||||
resume_queue (bool): Whether to resume sending queue upon peer reconnection.
|
||||
resend_requests (bool): Whether to resend unanswered requests in queue to reconnected client.
|
||||
"""
|
||||
|
||||
def __init__(self, callbacks, whoami="computer"):
|
||||
"""
|
||||
Args:
|
||||
whoami (str, optional): What type of system the ConnectionManager is running on (`computer` or `pi`). Defaults to "computer".
|
||||
|
||||
Example:
|
||||
|
||||
```python
|
||||
connection = ConnectionManager(whoami)
|
||||
connection.connect(client_selector, client_socket, client_addr)
|
||||
```
|
||||
"""
|
||||
self.whoami = whoami
|
||||
|
||||
|
||||
def _process_response(self, message):
|
||||
request_id, requested_value = message.jsonheader["request_id"], message.jsonheader["requested_value"]
|
||||
|
||||
with self._request_lock:
|
||||
request = self._request_queue.pop(request_id, None)
|
||||
if (request is None) or (request.requested_value != requested_value):
|
||||
logger.warning("Unexpected response!")
|
||||
return
|
||||
|
||||
if requested_value == "filetransfer":
|
||||
value = True
|
||||
self._process_filetransfer(message.content, request.callback_kwargs["filepath"])
|
||||
logger.debug(
|
||||
"Request {} successfully closed with file bytes {}...".format(request, message.content[:256])
|
||||
)
|
||||
else:
|
||||
value = message.content["value"]
|
||||
logger.debug(
|
||||
"Request {} successfully closed with value {}".format(request, message.content["value"])
|
||||
)
|
||||
if request.callback is not None:
|
||||
try:
|
||||
request.callback(self, value, *request.callback_args, **request.callback_kwargs)
|
||||
except Exception as error:
|
||||
logger.error("Error during response {} processing: {}".format(request, error))
|
||||
else:
|
||||
logger.info("No callback were registered for response: {}".format(request))
|
||||
|
||||
def get_response(self, requested_value, callback, # timeout=30,
|
||||
request_args=(), request_kwargs=None,
|
||||
callback_args=(), callback_kwargs=None, ):
|
||||
"""
|
||||
Sends request to the client and adds it to the request queue. The callback will be called upon receiving the response (see example below).
|
||||
|
||||
Args:
|
||||
requested_value (string): Name of requested value.
|
||||
callback (function): Callable object (function, binded method, etc.) that would be called upon receiving response to this request or None.
|
||||
request_args (tuple, optional): Arguments for the request. Defaults to ().
|
||||
request_kwargs (dict, optional): Keyword arguments for the request. Defaults to None.
|
||||
callback_args (tuple, optional): Arguments for the callback. Defaults to ().
|
||||
callback_kwargs (dict, optional): Keyword arguments for the callback. Defaults to None.
|
||||
|
||||
Example of a callback:
|
||||
|
||||
```python
|
||||
def callback(client, value, *args, **kwargs):
|
||||
print(value, args, kwargs)
|
||||
```
|
||||
|
||||
First argument passed to callback function is an instance of `ConnectionManager`, representing connection by which the message was received.
|
||||
Second arguments is received value. Arguments and keyword arguments from response will also be passed.
|
||||
"""
|
||||
|
||||
def get_file(self, client_filepath, filepath=None, callback=None,
|
||||
callback_args=(), callback_kwargs=None, ):
|
||||
"""
|
||||
Requests file from peer located at `client_filepath`. Received file will be written to the `filepath` if specified.
|
||||
|
||||
Args:
|
||||
client_filepath (str): Path to file to retrieve from peer.
|
||||
filepath (str, optional): Path to write file upon receiving. If `None` - file will not be written. Defaults to None.
|
||||
callback (function): Callable object (function, binded method, etc.) that would be called upon receiving response to this request or None.
|
||||
callback_args (tuple, optional): Arguments for the callback. Defaults to ().
|
||||
callback_kwargs (dict, optional): Keyword arguments for the callback. Defaults to None.
|
||||
|
||||
"""
|
||||
if callback_kwargs is None:
|
||||
callback_kwargs = {}
|
||||
|
||||
if filepath is None:
|
||||
filepath = os.path.split(client_filepath)[1]
|
||||
|
||||
request_kwargs = {"filepath": client_filepath}
|
||||
callback_kwargs.update({"filepath": filepath})
|
||||
|
||||
self.get_response("filetransfer", callback, request_kwargs=request_kwargs,
|
||||
callback_args=callback_args, callback_kwargs=callback_kwargs)
|
||||
|
||||
def _resend_requests(self):
|
||||
for request_id, request in self._request_queue.items(): # TODO filter
|
||||
if request.resend:
|
||||
self._send(Message.create_request(
|
||||
request.requested_value, request_id, request.request_kwargs.update(resend=request.resend))
|
||||
)
|
||||
request.resend = False
|
||||
|
||||
def _send_response(self, requested_value, request_id, value, filetransfer=False):
|
||||
self._send(Message.create_response(requested_value, request_id, value, filetransfer))
|
||||
|
||||
def send_file(self, filepath, dest_filepath): # clever_restart=False
|
||||
"""
|
||||
Sends to peer a file from `filepath` to write it on `dest_filepath`.
|
||||
|
||||
Args:
|
||||
filepath (str): Path of the file to send.
|
||||
dest_filepath (str): Path on peer where recieved file will be written to.
|
||||
"""
|
||||
try:
|
||||
with open(filepath, 'rb') as f:
|
||||
data = f.read()
|
||||
except OSError as error:
|
||||
logger.warning("File can not be opened due error: ".format(error))
|
||||
else:
|
||||
logger.info("Sending file {} to {} (as: {})".format(filepath, self.addr, dest_filepath))
|
||||
self._send(Message.create_message(data, "binary", "message",
|
||||
additional_headers={"action": "filetransfer",
|
||||
"filepath": dest_filepath}))
|
||||
5
lib/exceptions.py
Normal file
5
lib/exceptions.py
Normal file
@@ -0,0 +1,5 @@
|
||||
class CloverException(Exception):
|
||||
pass
|
||||
|
||||
class ConnectionClosedError(Exception):
|
||||
pass
|
||||
355
lib/messages.py
Normal file
355
lib/messages.py
Normal file
@@ -0,0 +1,355 @@
|
||||
import io
|
||||
import abc
|
||||
import json
|
||||
import uuid
|
||||
import copy
|
||||
import struct
|
||||
import random
|
||||
import asyncio
|
||||
import functools
|
||||
|
||||
random.seed()
|
||||
|
||||
class Codec(abc.ABC):
|
||||
def encode(self, data) -> bytes:
|
||||
raise NotImplementedError
|
||||
|
||||
def decode(self, data: bytes):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class BytesCodec(Codec):
|
||||
def encode(self, data: bytes) -> bytes:
|
||||
return data
|
||||
|
||||
def decode(self, data: bytes):
|
||||
return data
|
||||
|
||||
class JsonCodec(Codec):
|
||||
def __init__(self, encoding="utf-8"):
|
||||
self.encoding = encoding
|
||||
|
||||
def encode(self, data) -> bytes:
|
||||
return json.dumps(data, ensure_ascii=False).encode(self.encoding)
|
||||
|
||||
def decode(self, data):
|
||||
with io.TextIOWrapper(io.BytesIO(data), encoding=self.encoding, newline="") as tiow:
|
||||
obj = json.load(tiow)
|
||||
return obj
|
||||
|
||||
|
||||
default_codec = JsonCodec()
|
||||
|
||||
class ContentTypes:
|
||||
BINARY = "binary"
|
||||
ENCODED = "encoded"
|
||||
|
||||
class MessageTypes:
|
||||
REQUEST = "request"
|
||||
RESPONSE = "response"
|
||||
|
||||
class ResponseTypes:
|
||||
RESULT = "result"
|
||||
ERROR = "error"
|
||||
STATUS = "status"
|
||||
|
||||
class MessageDecoder:
|
||||
"""
|
||||
MessageManager class represents single incoming by TCP stream message and contains methods to decode and extract data from incoming data. It also contains static class methods for encoding various types of messages.
|
||||
|
||||
Messages in protocol implemented by this class consists of 3 parts:
|
||||
|
||||
* Fixed-length (2 bytes) protoheader - contains length of json header
|
||||
* header - contains information about message contents: length, encoding, type of message and contents, etc.
|
||||
* content - contains actual contents of message (json information, bytes, etc.)
|
||||
|
||||
|
||||
Attributes:
|
||||
header (dict): Headers dictionary with information about message encoding and purpose. Would be populated when receiving and processing of the json header will be completed.
|
||||
content (object): Would be populated when receiving and processing of the message will be completed. Defaults to None.
|
||||
"""
|
||||
|
||||
def __init__(self, data, codec=default_codec):
|
||||
"""
|
||||
```python
|
||||
message = MessageManager()
|
||||
```
|
||||
"""
|
||||
|
||||
self.codec = codec
|
||||
self._income_raw = None
|
||||
|
||||
self._header_len = None
|
||||
self.header = None
|
||||
|
||||
self.message_type = None
|
||||
self.content_type = None
|
||||
self.chain_id = None
|
||||
|
||||
self.content = None
|
||||
|
||||
self.set_buffer(data)
|
||||
|
||||
def _process_protoheader(self):
|
||||
header_len = 2
|
||||
if len(self._income_raw) >= header_len:
|
||||
self._header_len = struct.unpack(">H", self._income_raw[:header_len])[0]
|
||||
self._income_raw = self._income_raw[header_len:]
|
||||
|
||||
def _process_header(self):
|
||||
header_len = self._header_len
|
||||
if not len(self._income_raw) >= header_len:
|
||||
return
|
||||
self.header = self.codec.decode(self._income_raw[:header_len])
|
||||
self._income_raw = self._income_raw[header_len:]
|
||||
|
||||
for reqhdr in (
|
||||
"content-length",
|
||||
"content-type",
|
||||
"message-type",
|
||||
"chain-id",
|
||||
):
|
||||
if reqhdr not in self.header:
|
||||
raise ValueError('Missing required header {}'.format(reqhdr))
|
||||
|
||||
self.content_type = self.header["content-type"]
|
||||
self.message_type = self.header["message-type"]
|
||||
self.chain_id = self.header["chain-id"]
|
||||
|
||||
def _process_content(self):
|
||||
content_len = self.header["content-length"]
|
||||
if not len(self._income_raw) >= content_len:
|
||||
return
|
||||
data = self._income_raw[:content_len]
|
||||
self._income_raw = self._income_raw[content_len:]
|
||||
|
||||
if self.content_type == ContentTypes.ENCODED:
|
||||
self.content = self.codec.decode(data)
|
||||
elif self.content_type == ContentTypes.BINARY:
|
||||
self.content = data
|
||||
|
||||
def set_buffer(self, data):
|
||||
self._income_raw = memoryview(data)
|
||||
|
||||
def get_buffer(self):
|
||||
return bytearray(self._income_raw)
|
||||
|
||||
def reset_buffer(self):
|
||||
self._income_raw = None
|
||||
|
||||
def process_message(self):
|
||||
"""
|
||||
Attempts processing the message. Chunks of `income_raw` would be consumed as different parts of the message will be processed. The result of processing (body of the message) will be available at `content` and `jsonheader`.
|
||||
"""
|
||||
if self._header_len is None:
|
||||
self._process_protoheader()
|
||||
|
||||
if self._header_len is not None:
|
||||
if self.header is None:
|
||||
self._process_header()
|
||||
|
||||
if self.header:
|
||||
if not self.processed:
|
||||
self._process_content()
|
||||
|
||||
@property
|
||||
def processed(self):
|
||||
return self.content is not None
|
||||
|
||||
class MessageEncoder:
|
||||
def __init__(self, codec=default_codec):
|
||||
self.chain_id = None
|
||||
|
||||
self.codec = codec
|
||||
|
||||
def encode(self, *args, **kwargs):
|
||||
return self.encode_raw_message(*args, **kwargs)
|
||||
|
||||
def encode_raw_message(self, content: bytes, content_type, message_type, chain_id=None, additional_headers=None):
|
||||
"""Returns encoded message in bytes. It is recommended use other encoding functions for general purposes.
|
||||
Args:
|
||||
content (byte string): Content of the message.
|
||||
content_type (str): Type of the message content (json, bytes, etc.).
|
||||
message_type (str): Type of the message (action, request, etc.).
|
||||
chain_id:
|
||||
additional_headers (dict, optional): Optional dict argument, additional json headers of the message. Defaults to None.
|
||||
|
||||
Returns:
|
||||
bytes: encoded message
|
||||
"""
|
||||
|
||||
if chain_id is None:
|
||||
chain_id = uuid.uuid4().int
|
||||
elif self.chain_id is not None:
|
||||
chain_id = self.chain_id
|
||||
else:
|
||||
self.chain_id = chain_id
|
||||
|
||||
header = {
|
||||
"content-length": len(content),
|
||||
"content-type": content_type,
|
||||
"message-type": message_type,
|
||||
"chain-id": chain_id
|
||||
}
|
||||
if additional_headers:
|
||||
header.update(additional_headers)
|
||||
|
||||
header_bytes = self.codec.encode(header)
|
||||
protoheader = struct.pack(">H", len(header_bytes))
|
||||
message = protoheader + header_bytes + content
|
||||
|
||||
return message
|
||||
|
||||
def encode_message(self, content, message_type, chain_id=None, additional_headers=None):
|
||||
"""Returns encoded message with encoded content in bytes.
|
||||
|
||||
Args:
|
||||
content (object): Any object convertible to json, content of the message.
|
||||
additional_headers (dict, optional): Optional dict argument, additional json headers of the message. Defaults to None.
|
||||
|
||||
Returns:
|
||||
bytes: encoded message
|
||||
"""
|
||||
message = self.encode_raw_message(self.codec.encode(content), ContentTypes.ENCODED, message_type, chain_id,
|
||||
additional_headers=additional_headers)
|
||||
return message
|
||||
|
||||
# @classmethod
|
||||
# def create_action_message(cls, action, args=(), kwargs=None):
|
||||
# """
|
||||
# Returns encoded command with arguments as json-encoded message in bytes.
|
||||
#
|
||||
# Args:
|
||||
# action (str): action(command) to perform upon receiving. Should correspond with `action_string` of function registered in `message_callback()` on the peer.
|
||||
# args (tuple, optional): Arguments for the command. Defaults to ().
|
||||
# kwargs (dict, optional): Keyword arguments for the command. Defaults to None.
|
||||
#
|
||||
# Returns:
|
||||
# bytes: encoded message
|
||||
# """
|
||||
# if kwargs is None:
|
||||
# kwargs = {}
|
||||
# message = cls.create_json_message({"args": args, "kwargs": kwargs}, {"action": action, })
|
||||
# return message
|
||||
#
|
||||
# @classmethod
|
||||
# def create_response(cls, requested_value, request_id, value, filetransfer=False):
|
||||
# """ Returns encoded response to request in bytes.
|
||||
#
|
||||
# Args:
|
||||
# requested_value (str): name of requested value. Should correspond with received one.
|
||||
# request_id (int): unique ID of the request. Should correspond with received one.
|
||||
# value: returned value or bytes to send back.
|
||||
# filetransfer (bool, optional): Whether `value` of response contains file bytes or actual value.. Defaults to False.
|
||||
#
|
||||
# Returns:
|
||||
# bytes: encoded message
|
||||
# """
|
||||
# headers = {"requested_value": requested_value,
|
||||
# "request_id": request_id, # TODO status
|
||||
# }
|
||||
# if filetransfer:
|
||||
# contents = value
|
||||
# else:
|
||||
# contents = cls._json_encode({"value": value, })
|
||||
# message = cls.create_message(contents, "binary" if filetransfer else "json",
|
||||
# "response", additional_headers=headers)
|
||||
# return message
|
||||
|
||||
class PendingMessage(MessageEncoder):
|
||||
def __init__(self, codec=default_codec):
|
||||
super().__init__(codec)
|
||||
self._sent = asyncio.Future()
|
||||
|
||||
@property
|
||||
def sent(self):
|
||||
return self._sent
|
||||
|
||||
async def send(self, connection):
|
||||
if self._sent:
|
||||
raise RuntimeError("This message was already sent, create another one")
|
||||
return connection.send(self)
|
||||
|
||||
class Response(PendingMessage):
|
||||
def __init__(self, chain_id, result, type, codec=default_codec):
|
||||
super().__init__(codec)
|
||||
|
||||
self._chain_id = chain_id
|
||||
self._result = result
|
||||
|
||||
def encode(self):
|
||||
contents = {"value": self._result}
|
||||
return self.encode_message(contents, MessageTypes.RESPONSE, chain_id=self._chain_id)
|
||||
|
||||
|
||||
class Request(PendingMessage):
|
||||
def __init__(self, name, args=(), kwargs=None, callback=None, codec=default_codec):
|
||||
super().__init__(codec)
|
||||
|
||||
self._name = name
|
||||
self._args = args
|
||||
|
||||
if kwargs is None:
|
||||
kwargs = {}
|
||||
self._kwargs = kwargs
|
||||
|
||||
self.callback = callback
|
||||
self._response = asyncio.Future()
|
||||
|
||||
@property
|
||||
def response(self):
|
||||
return self._response
|
||||
|
||||
def encode(self):
|
||||
contents = {"name": self._name,
|
||||
"args": self._args,
|
||||
"kwargs": self._kwargs,
|
||||
}
|
||||
return self.encode_message(contents, MessageTypes.REQUEST)
|
||||
|
||||
def __copy__(self):
|
||||
return self.__class__(self._name, self._args, self._kwargs, self.callback, self.codec)
|
||||
|
||||
class RequestBatch:
|
||||
def __init__(self):
|
||||
self._request_dict = dict()
|
||||
|
||||
def from_dict(self, d):
|
||||
self._request_dict = d.copy()
|
||||
|
||||
def from_prototype(self, connections, request):
|
||||
self._request_dict = {connection: copy.copy(request) for connection in connections}
|
||||
|
||||
def set_request(self, connection, request):
|
||||
self._request_dict[connection] = request
|
||||
|
||||
async def send(self):
|
||||
for connection, request in self._request_dict.items():
|
||||
connection.send(request)
|
||||
|
||||
@property
|
||||
def request_dict(self):
|
||||
return self._request_dict.copy()
|
||||
|
||||
@property
|
||||
def response_dict(self):
|
||||
return {connection: request.response for connection, request in self._request_dict.items()}
|
||||
|
||||
@property
|
||||
def all_responses(self):
|
||||
return asyncio.gather(*self.request_dict.values(), return_exceptions=True)
|
||||
|
||||
class ReceivedRequest:
|
||||
def __init__(self, connection, message):
|
||||
super().__init__()
|
||||
self.connection = connection
|
||||
self.message: MessageDecoder = message
|
||||
|
||||
async def reply(self, data):
|
||||
reply = Response(self.message.chain_id)
|
||||
|
||||
async def reply_processing(self, progress: float=0):
|
||||
progress = max(0, min(1, progress))
|
||||
|
||||
async def reply_error(self, err: Exception):
|
||||
pass
|
||||
732
lib/messaging.py
732
lib/messaging.py
@@ -1,732 +0,0 @@
|
||||
"""
|
||||
`messaging` is an universal for server and clients module (running both on Python 2.7 and 3.6+). This module contains utility functions and classes implementing high level protocol for TCP socket communication.
|
||||
"""
|
||||
import io
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import time
|
||||
import socket
|
||||
import struct
|
||||
import random
|
||||
import asyncio
|
||||
import logging
|
||||
import platform
|
||||
import traceback
|
||||
import collections
|
||||
|
||||
|
||||
class Namespace:
|
||||
def __init__(self, **kwargs):
|
||||
self.__dict__.update(kwargs)
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.__dict__[key]
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
self.__dict__[key] = value
|
||||
|
||||
|
||||
class PendingRequest(Namespace): pass
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def str_peername(peername):
|
||||
return f"{peername[0]}:{peername[1]}"
|
||||
|
||||
|
||||
def get_ip_address(): # dodo async
|
||||
"""
|
||||
Returns the IP address of current computer or `localhost` if no network connection present.
|
||||
|
||||
Returns:
|
||||
string: IP address of current computer or `localhost` if no network connection present
|
||||
"""
|
||||
ip_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # TODO IPv6
|
||||
try:
|
||||
ip_socket.connect(("8.8.8.8", 80))
|
||||
ip = ip_socket.getsockname()[0]
|
||||
except OSError as e:
|
||||
logging.warning(f"No network connection detected, using localhost: {e}")
|
||||
ip = "localhost"
|
||||
finally:
|
||||
ip_socket.close()
|
||||
return ip
|
||||
|
||||
|
||||
def get_ntp_time(ntp_host, ntp_port):
|
||||
"""
|
||||
Gets and returns time from specified host and port of NTP server.
|
||||
|
||||
Args:
|
||||
ntp_host (string): hostname or address of the NTP server.
|
||||
ntp_port (int): port of the NTP server.
|
||||
|
||||
Returns:
|
||||
int: Current time recieved from the NTP server
|
||||
"""
|
||||
NTP_DELTA = 2208988800 # 1970-01-01 00:00:00
|
||||
NTP_QUERY = b'\x1b' + bytes(47)
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as ntp_socket:
|
||||
ntp_socket.sendto(NTP_QUERY, (ntp_host, ntp_port))
|
||||
msg, _ = ntp_socket.recvfrom(1024)
|
||||
return int.from_bytes(msg[-8:], 'big') / 2 ** 32 - NTP_DELTA
|
||||
|
||||
|
||||
def set_keepalive(sock, after_idle_sec=1, interval_sec=3, max_fails=5):
|
||||
"""
|
||||
Sets `keepalive` parameters of given socket.
|
||||
|
||||
Args:
|
||||
sock (socket): Socket which parameters will be changed.
|
||||
after_idle_sec (int, optional): Start sending keepalive packets after this amount of seconds. Defaults to 1.
|
||||
interval_sec (int, optional): Interval of keepalive packets in seconds. Defaults to 3.
|
||||
max_fails (int, optional): Count of fails leading to socket disconnect. Defaults to 5.
|
||||
|
||||
Raises:
|
||||
NotImplementedError: for unknown platform.
|
||||
"""
|
||||
current_platform = platform.system() # could be empty
|
||||
|
||||
if current_platform == "Linux":
|
||||
return _set_keepalive_linux(sock, after_idle_sec, interval_sec, max_fails)
|
||||
if current_platform == "Windows":
|
||||
return _set_keepalive_windows(sock, after_idle_sec, interval_sec)
|
||||
if current_platform == "Darwin":
|
||||
return _set_keepalive_osx(sock, interval_sec)
|
||||
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
def _set_keepalive_linux(sock, after_idle_sec, interval_sec, max_fails):
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
|
||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, after_idle_sec)
|
||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval_sec)
|
||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, max_fails)
|
||||
|
||||
|
||||
def _set_keepalive_windows(sock, after_idle_sec, interval_sec):
|
||||
sock.ioctl(socket.SIO_KEEPALIVE_VALS, (1, after_idle_sec * 1000, interval_sec * 1000))
|
||||
|
||||
|
||||
def _set_keepalive_osx(sock, interval_sec):
|
||||
TCP_KEEPALIVE = 0x10
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
|
||||
sock.setsockopt(socket.IPPROTO_TCP, TCP_KEEPALIVE, interval_sec)
|
||||
|
||||
|
||||
class BroadcastProtocol:
|
||||
def __init__(self, on_broadcast):
|
||||
self._on_broadcast = on_broadcast
|
||||
|
||||
self._closed = asyncio.Event()
|
||||
|
||||
@property
|
||||
def closed(self):
|
||||
return self._closed.wait()
|
||||
|
||||
def connection_made(self, transport):
|
||||
self.transport = transport
|
||||
logging.info("Broadcast connection established")
|
||||
|
||||
def connection_lost(self, exc):
|
||||
logging.info(f"Broadcast connection lost: {'closed' if exc is None else exc}")
|
||||
self._closed.set()
|
||||
|
||||
def datagram_received(self, data, addr):
|
||||
message = MessageManager(data)
|
||||
message.process_message()
|
||||
content = message.content
|
||||
|
||||
is_ip_broadcast = (content is not None and message.jsonheader["action"] == "server_ip")
|
||||
|
||||
if is_ip_broadcast:
|
||||
logging.debug(f"Got broadcast message from {addr}: {content}")
|
||||
asyncio.get_event_loop().call_soon(self._on_broadcast, message)
|
||||
self.transport.close()
|
||||
else:
|
||||
logging.warning(f"Got wrong broadcast message from {addr}")
|
||||
|
||||
def error_received(self, exc):
|
||||
logging.warning(f"Error on broadcast connection received: {exc}")
|
||||
|
||||
|
||||
class BroadcastSendProtocol:
|
||||
pass
|
||||
|
||||
|
||||
class MessageManager:
|
||||
"""
|
||||
MessageManager class represents single incoming by TCP stream message and contains methods to decode and extract data from incoming data. It also contains static class methods for encoding various types of messages.
|
||||
|
||||
Messages in protocol implemented by this class consists of 3 parts:
|
||||
|
||||
* Fixed-length (2 bytes) protoheader - contains length of json header
|
||||
* json header - contains information about message contents: length, encoding, type of message and contents, etc.
|
||||
* content - contains actual contents of message (json information, bytes, etc.)
|
||||
|
||||
|
||||
Attributes:
|
||||
_income_raw (bytes string): Holds incoming data bytes. Append incoming data to this attribute. It may not be empty after processing.
|
||||
jsonheader (dict): Headers dictionary with information about message encoding and purpose. Would be populated when receiving and processing of the json header will be completed.
|
||||
content (object): Would be populated when receiving and processing of the message will be completed. Defaults to None.
|
||||
"""
|
||||
|
||||
def __init__(self, data):
|
||||
"""
|
||||
```python
|
||||
message = MessageManager()
|
||||
```
|
||||
"""
|
||||
self._income_raw = None
|
||||
|
||||
self._jsonheader_len = None
|
||||
self.jsonheader = None
|
||||
self.content = None
|
||||
|
||||
# self._processed = False
|
||||
|
||||
self.set_buffer(data)
|
||||
|
||||
@staticmethod
|
||||
def _json_encode(obj, encoding="utf-8"):
|
||||
return json.dumps(obj, ensure_ascii=False).encode(encoding)
|
||||
|
||||
@staticmethod
|
||||
def _json_decode(json_bytes, encoding="utf-8"):
|
||||
with io.TextIOWrapper(io.BytesIO(json_bytes), encoding=encoding, newline="") as tiow:
|
||||
obj = json.load(tiow, object_pairs_hook=collections.OrderedDict)
|
||||
return obj
|
||||
|
||||
@classmethod
|
||||
def create_message(cls, content_bytes, content_type, message_type, content_encoding="utf-8",
|
||||
additional_headers=None):
|
||||
"""Returns encoded message in bytes. It is recommended use other encoding functions for general purposes.
|
||||
Args:
|
||||
content_bytes (byte string): Content of the message.
|
||||
content_type (str): Type of the message content (json, bytes, etc.).
|
||||
message_type (str): Type of the message (command, request, etc.).
|
||||
content_encoding (str, optional): Encoding of the message content. Defaults to "utf-8".
|
||||
additional_headers (dict, optional): Optional dict argument, additional json headers of the message. Defaults to None.
|
||||
|
||||
Returns:
|
||||
bytes: encoded message
|
||||
"""
|
||||
|
||||
jsonheader = {
|
||||
"content-length": len(content_bytes),
|
||||
"content-type": content_type,
|
||||
"content-encoding": content_encoding,
|
||||
"message-type": message_type,
|
||||
# "message-uuid":
|
||||
}
|
||||
if additional_headers:
|
||||
jsonheader.update(additional_headers)
|
||||
|
||||
jsonheader_bytes = cls._json_encode(jsonheader, "utf-8")
|
||||
message_hdr = struct.pack(">H", len(jsonheader_bytes))
|
||||
message = message_hdr + jsonheader_bytes + content_bytes
|
||||
return message
|
||||
|
||||
@classmethod
|
||||
def create_json_message(cls, contents, additional_headers=None):
|
||||
"""Returns encoded message with json-encoded content in bytes.
|
||||
|
||||
Args:
|
||||
contents (object): Any object convertible to json, content of the message.
|
||||
additional_headers (dict, optional): Optional dict argument, additional json headers of the message. Defaults to None.
|
||||
|
||||
Returns:
|
||||
bytes: encoded message
|
||||
"""
|
||||
message = cls.create_message(cls._json_encode(contents), "json", "message",
|
||||
additional_headers=additional_headers)
|
||||
return message
|
||||
|
||||
@classmethod
|
||||
def create_action_message(cls, action, args=(), kwargs=None):
|
||||
"""
|
||||
Returns encoded command with arguments as json-encoded message in bytes.
|
||||
|
||||
Args:
|
||||
action (str): action(command) to perform upon receiving. Should correspond with `action_string` of function registered in `message_callback()` on the peer.
|
||||
args (tuple, optional): Arguments for the command. Defaults to ().
|
||||
kwargs (dict, optional): Keyword arguments for the command. Defaults to None.
|
||||
|
||||
Returns:
|
||||
bytes: encoded message
|
||||
"""
|
||||
if kwargs is None:
|
||||
kwargs = {}
|
||||
message = cls.create_json_message({"args": args, "kwargs": kwargs}, {"action": action, })
|
||||
return message
|
||||
|
||||
@classmethod
|
||||
def create_request(cls, requested_value, request_id, args=(), kwargs=None):
|
||||
"""Returns encoded request with arguments as json-encoded message in bytes.
|
||||
|
||||
Args:
|
||||
requested_value (str): name of requested value. Should correspond with `string_command` of function registered in `request_callback()` on the peer.
|
||||
request_id (int): unique ID of the request.
|
||||
args (tuple, optional): Arguments for the request.. Defaults to ().
|
||||
kwargs (dict, optional): Keyword arguments for the request. Defaults to None.
|
||||
|
||||
Returns:
|
||||
bytes: encoded message
|
||||
"""
|
||||
if kwargs is None:
|
||||
kwargs = {}
|
||||
contents = {"requested_value": requested_value,
|
||||
"request_id": request_id,
|
||||
"args": args,
|
||||
"kwargs": kwargs,
|
||||
}
|
||||
message = cls.create_message(cls._json_encode(contents), "json", "request")
|
||||
return message
|
||||
|
||||
@classmethod
|
||||
def create_response(cls, requested_value, request_id, value, filetransfer=False):
|
||||
""" Returns encoded response to request in bytes.
|
||||
|
||||
Args:
|
||||
requested_value (str): name of requested value. Should correspond with received one.
|
||||
request_id (int): unique ID of the request. Should correspond with received one.
|
||||
value: returned value or bytes to send back.
|
||||
filetransfer (bool, optional): Whether `value` of response contains file bytes or actual value.. Defaults to False.
|
||||
|
||||
Returns:
|
||||
bytes: encoded message
|
||||
"""
|
||||
headers = {"requested_value": requested_value,
|
||||
"request_id": request_id, # TODO status
|
||||
}
|
||||
if filetransfer:
|
||||
contents = value
|
||||
else:
|
||||
contents = cls._json_encode({"value": value, })
|
||||
message = cls.create_message(contents, "binary" if filetransfer else "json",
|
||||
"response", additional_headers=headers)
|
||||
return message
|
||||
|
||||
def _process_protoheader(self):
|
||||
header_len = 2
|
||||
if len(self._income_raw) >= header_len:
|
||||
self._jsonheader_len = struct.unpack(">H", self._income_raw[:header_len])[0]
|
||||
self._income_raw = self._income_raw[header_len:]
|
||||
|
||||
def _process_jsonheader(self):
|
||||
header_len = self._jsonheader_len
|
||||
if not len(self._income_raw) >= header_len:
|
||||
return
|
||||
self.jsonheader = self._json_decode(self._income_raw[:header_len], "utf-8")
|
||||
self._income_raw = self._income_raw[header_len:]
|
||||
for reqhdr in (
|
||||
"content-length",
|
||||
"content-type",
|
||||
"content-encoding",
|
||||
"message-type",
|
||||
):
|
||||
if reqhdr not in self.jsonheader:
|
||||
raise ValueError('Missing required header {}'.format(reqhdr))
|
||||
|
||||
def _process_content(self):
|
||||
content_len = self.jsonheader["content-length"]
|
||||
if not len(self._income_raw) >= content_len:
|
||||
return
|
||||
data = self._income_raw[:content_len]
|
||||
self._income_raw = self._income_raw[content_len:]
|
||||
if self.jsonheader["content-type"] == "json":
|
||||
encoding = self.jsonheader["content-encoding"]
|
||||
self.content = self._json_decode(data, encoding)
|
||||
else:
|
||||
self.content = data
|
||||
|
||||
def set_buffer(self, data):
|
||||
self._income_raw = memoryview(data)
|
||||
|
||||
def get_buffer(self):
|
||||
return self._income_raw
|
||||
|
||||
def reset_buffer(self):
|
||||
self._income_raw = None
|
||||
|
||||
def process_message(self):
|
||||
"""
|
||||
Attempts processing the message. Chunks of `income_raw` would be consumed as different parts of the message will be processed. The result of processing (body of the message) will be available at `content` and `jsonheader`.
|
||||
"""
|
||||
if self._jsonheader_len is None:
|
||||
self._process_protoheader()
|
||||
|
||||
if self._jsonheader_len is not None:
|
||||
if self.jsonheader is None:
|
||||
self._process_jsonheader()
|
||||
|
||||
if self.jsonheader:
|
||||
if not self.processed:
|
||||
self._process_content()
|
||||
|
||||
@property
|
||||
def processed(self):
|
||||
return self.content is not None
|
||||
|
||||
|
||||
class CallbackManager:
|
||||
def __init__(self):
|
||||
self.action_callbacks = dict()
|
||||
self.request_callbacks = dict()
|
||||
|
||||
self.connected_callback = None
|
||||
|
||||
@staticmethod
|
||||
def _register_function(d, key):
|
||||
def inner(f):
|
||||
if not callable(f):
|
||||
raise TypeError("f should be callable")
|
||||
d[key] = f
|
||||
logger.debug("Registered callback function {} for {}".format(f, key))
|
||||
return f
|
||||
|
||||
return inner
|
||||
|
||||
def action_callback(self, key):
|
||||
return self._register_function(self.action_callbacks, key)
|
||||
|
||||
def request_callback(self, key):
|
||||
return self._register_function(self.request_callbacks, key)
|
||||
|
||||
|
||||
class PeerProtocol(asyncio.Protocol):
|
||||
def __init__(self, parent, callbacks):
|
||||
self._parent = parent
|
||||
self._callbacks = callbacks
|
||||
|
||||
self.transport = None
|
||||
|
||||
self._recv_buffer = bytearray()
|
||||
self._current_msg = None
|
||||
self._msg_queue = asyncio.Queue()
|
||||
|
||||
self._connected = asyncio.Event()
|
||||
self._closed = asyncio.Event()
|
||||
self._closed.set()
|
||||
|
||||
@property
|
||||
def peername(self):
|
||||
return self.transport.get_extra_info('peername')
|
||||
|
||||
@property
|
||||
def is_connected(self):
|
||||
return self._connected.is_set()
|
||||
|
||||
@property
|
||||
def connected(self):
|
||||
return self._connected.wait()
|
||||
|
||||
@property
|
||||
def closed(self):
|
||||
return self._closed.wait()
|
||||
|
||||
def connection_made(self, transport):
|
||||
self.transport = transport
|
||||
logging.info(f"Connected to {str_peername(self.peername)}")
|
||||
|
||||
self._connected.set()
|
||||
self._closed.clear()
|
||||
|
||||
# self._resend_requests()
|
||||
|
||||
def connection_lost(self, exc):
|
||||
logger.info(f"Lost connection to {str_peername(self.peername)}: {'closed' if exc is None else exc}")
|
||||
self._connected.clear()
|
||||
self._closed.set()
|
||||
|
||||
def error_received(self, exc):
|
||||
logging.warning(f"Error on broadcast connection received: {exc}")
|
||||
|
||||
def data_received(self, data):
|
||||
self._recv_buffer += data
|
||||
logger.debug("Received {} bytes from {}".format(len(data), self.peername))
|
||||
logger.debug(f'{data}, {self._recv_buffer}')
|
||||
|
||||
asyncio.create_task(self._proceess_received())
|
||||
|
||||
async def _proceess_received(self):
|
||||
while self._recv_buffer:
|
||||
if self._current_msg is None:
|
||||
self._current_msg = MessageManager(self._recv_buffer)
|
||||
else:
|
||||
self._current_msg.set_buffer(self._recv_buffer)
|
||||
|
||||
self._current_msg.process_message()
|
||||
self._recv_buffer = bytearray(self._current_msg.get_buffer())
|
||||
# self._current_msg.reset_buffer()
|
||||
|
||||
if self._current_msg.processed:
|
||||
logging.info(self._current_msg.content)
|
||||
await self._msg_queue.put(self._current_msg)
|
||||
self._current_msg = None
|
||||
else:
|
||||
await asyncio.sleep(0)
|
||||
|
||||
# Sending api functions
|
||||
|
||||
def _send(self, msg):
|
||||
if not self.is_connected:
|
||||
logger.error("Peer is disconnected, can't send")
|
||||
return
|
||||
|
||||
self.transport.write(msg[:10])
|
||||
time.sleep(0.5)
|
||||
self.transport.write(msg[10:])
|
||||
|
||||
def send_message(self, action, args=(), kwargs=None):
|
||||
"""
|
||||
Sends to peer message with specified action, arguments and keyword arguments.
|
||||
|
||||
Args:
|
||||
action (str): action(command) to perform upon receiving. Should correspond with `action_string` of function registered in `message_callback()` on the peer.
|
||||
args (tuple, optional): Arguments for the command. Defaults to ().
|
||||
kwargs (dict, optional): Keyword arguments for the command. Defaults to None.
|
||||
"""
|
||||
self._send(MessageManager.create_action_message(action, args, kwargs))
|
||||
|
||||
|
||||
class ConnectionManager:
|
||||
"""
|
||||
This class represents high-level protocol of TCP connection.
|
||||
|
||||
Attributes:
|
||||
selector (selector): Related selector object.
|
||||
socket (socket): Socket object of the connection.
|
||||
addr (str): Address of the peer.
|
||||
buffer_size (int): Size of the sending/receiving buffer.
|
||||
resume_queue (bool): Whether to resume sending queue upon peer reconnection.
|
||||
resend_requests (bool): Whether to resend unanswered requests in queue to reconnected client.
|
||||
"""
|
||||
|
||||
def __init__(self, callbacks, whoami="computer"):
|
||||
"""
|
||||
Args:
|
||||
whoami (str, optional): What type of system the ConnectionManager is running on (`computer` or `pi`). Defaults to "computer".
|
||||
|
||||
Example:
|
||||
|
||||
```python
|
||||
connection = ConnectionManager(whoami)
|
||||
connection.connect(client_selector, client_socket, client_addr)
|
||||
```
|
||||
"""
|
||||
self.callbacks = callbacks
|
||||
|
||||
self.whoami = whoami
|
||||
|
||||
def _clear(self):
|
||||
if not self.resume_queue: # maybe needs locks
|
||||
self._recv_buffer = b''
|
||||
self._send_buffer = b''
|
||||
self._received_queue.clear()
|
||||
self._send_queue.clear()
|
||||
|
||||
def process_received(self, message):
|
||||
message_type = message.jsonheader["message-type"]
|
||||
content = message.content if message.jsonheader["content-type"] != "binary" \
|
||||
else message.content[:256]
|
||||
logger.debug(
|
||||
"Received message! Header: {}, content: {}".format(message.jsonheader, content))
|
||||
|
||||
if message_type == "message":
|
||||
self._process_message(message)
|
||||
elif message_type == "response":
|
||||
self._process_response(message)
|
||||
elif message_type == "request":
|
||||
self._process_request(message)
|
||||
|
||||
def _process_message(self, message):
|
||||
if message.jsonheader["action"] == "filetransfer":
|
||||
self._process_filetransfer(message.content, message.jsonheader["filepath"])
|
||||
else:
|
||||
self._process_action(message)
|
||||
|
||||
def _process_action(self, message):
|
||||
action = message.jsonheader["action"]
|
||||
args = message.content["args"]
|
||||
kwargs = message.content["kwargs"]
|
||||
callback = self.callbacks.action_callbacks.get(action, None)
|
||||
if callback is None:
|
||||
logger.warning("Action {} does not exist!".format(action))
|
||||
return
|
||||
try:
|
||||
callback(self, *args, **kwargs)
|
||||
except Exception as error:
|
||||
logger.error("Error during action {} execution: {}".format(action, error))
|
||||
traceback.print_exc()
|
||||
|
||||
def _process_request(self, message):
|
||||
requested_value = message.content["requested_value"]
|
||||
request_id = message.content["request_id"]
|
||||
args = message.content["args"]
|
||||
kwargs = message.content["kwargs"]
|
||||
|
||||
filetransfer = requested_value == "filetransfer"
|
||||
try:
|
||||
if filetransfer:
|
||||
value = self._read_file(kwargs["filepath"])
|
||||
else:
|
||||
callback = self.callbacks.request_callbacks.get(requested_value, None)
|
||||
if callback is None:
|
||||
logger.warning("Request {} does not exist!".format(requested_value))
|
||||
return
|
||||
|
||||
value = callback(self, *args, **kwargs)
|
||||
except Exception as error: # TODO send response error\cancel
|
||||
logger.error("Error during request {} processing: {}".format(requested_value, error))
|
||||
else:
|
||||
self._send_response(requested_value, request_id, value, filetransfer)
|
||||
|
||||
def _process_response(self, message):
|
||||
request_id, requested_value = message.jsonheader["request_id"], message.jsonheader["requested_value"]
|
||||
|
||||
with self._request_lock:
|
||||
request = self._request_queue.pop(request_id, None)
|
||||
if (request is None) or (request.requested_value != requested_value):
|
||||
logger.warning("Unexpected response!")
|
||||
return
|
||||
|
||||
if requested_value == "filetransfer":
|
||||
value = True
|
||||
self._process_filetransfer(message.content, request.callback_kwargs["filepath"])
|
||||
logger.debug(
|
||||
"Request {} successfully closed with file bytes {}...".format(request, message.content[:256])
|
||||
)
|
||||
else:
|
||||
value = message.content["value"]
|
||||
logger.debug(
|
||||
"Request {} successfully closed with value {}".format(request, message.content["value"])
|
||||
)
|
||||
if request.callback is not None:
|
||||
try:
|
||||
request.callback(self, value, *request.callback_args, **request.callback_kwargs)
|
||||
except Exception as error:
|
||||
logger.error("Error during response {} processing: {}".format(request, error))
|
||||
else:
|
||||
logger.info("No callback were registered for response: {}".format(request))
|
||||
|
||||
@staticmethod
|
||||
def _read_file(filepath):
|
||||
with open(filepath, mode='rb') as f:
|
||||
return f.read()
|
||||
|
||||
def _process_filetransfer(self, content, filepath):
|
||||
try:
|
||||
with open(filepath, 'wb') as f:
|
||||
f.write(content)
|
||||
except OSError as error:
|
||||
logger.error("File {} can not be written due error: {}".format(filepath, error))
|
||||
else:
|
||||
logger.info("File {} successfully received ".format(filepath))
|
||||
if self.whoami == "pi":
|
||||
logger.info("Return rights to pi:pi after file transfer")
|
||||
os.system("chown pi:pi {}".format(filepath))
|
||||
|
||||
self._set_selector_events_mask('r') # we're done writing
|
||||
|
||||
def get_response(self, requested_value, callback, # timeout=30,
|
||||
request_args=(), request_kwargs=None,
|
||||
callback_args=(), callback_kwargs=None, ):
|
||||
"""
|
||||
Sends request to the client and adds it to the request queue. The callback will be called upon receiving the response (see example below).
|
||||
|
||||
Args:
|
||||
requested_value (string): Name of requested value.
|
||||
callback (function): Callable object (function, binded method, etc.) that would be called upon receiving response to this request or None.
|
||||
request_args (tuple, optional): Arguments for the request. Defaults to ().
|
||||
request_kwargs (dict, optional): Keyword arguments for the request. Defaults to None.
|
||||
callback_args (tuple, optional): Arguments for the callback. Defaults to ().
|
||||
callback_kwargs (dict, optional): Keyword arguments for the callback. Defaults to None.
|
||||
|
||||
Example of a callback:
|
||||
|
||||
```python
|
||||
def callback(client, value, *args, **kwargs):
|
||||
print(value, args, kwargs)
|
||||
```
|
||||
|
||||
First argument passed to callback function is an instance of `ConnectionManager`, representing connection by which the message was received.
|
||||
Second arguments is received value. Arguments and keyword arguments from response will also be passed.
|
||||
"""
|
||||
if request_kwargs is None:
|
||||
request_kwargs = {}
|
||||
if callback_kwargs is None:
|
||||
callback_kwargs = {}
|
||||
|
||||
request_id = str(random.randint(0, 9999)).zfill(4) # maybe hash
|
||||
with self._request_lock:
|
||||
self._request_queue[request_id] = PendingRequest(
|
||||
requested_value=requested_value,
|
||||
value=None,
|
||||
# expires_on=Server.time_now()+timeout, #TODO
|
||||
callback=callback,
|
||||
callback_args=callback_args,
|
||||
callback_kwargs=callback_kwargs,
|
||||
request_args=request_args,
|
||||
request_kwargs=request_kwargs,
|
||||
resend=True,
|
||||
)
|
||||
self._send(MessageManager.create_request(requested_value, request_id, request_args, request_kwargs))
|
||||
|
||||
def get_file(self, client_filepath, filepath=None, callback=None,
|
||||
callback_args=(), callback_kwargs=None, ):
|
||||
"""
|
||||
Requests file from peer located at `client_filepath`. Received file will be written to the `filepath` if specified.
|
||||
|
||||
Args:
|
||||
client_filepath (str): Path to file to retrieve from peer.
|
||||
filepath (str, optional): Path to write file upon receiving. If `None` - file will not be written. Defaults to None.
|
||||
callback (function): Callable object (function, binded method, etc.) that would be called upon receiving response to this request or None.
|
||||
callback_args (tuple, optional): Arguments for the callback. Defaults to ().
|
||||
callback_kwargs (dict, optional): Keyword arguments for the callback. Defaults to None.
|
||||
|
||||
"""
|
||||
if callback_kwargs is None:
|
||||
callback_kwargs = {}
|
||||
|
||||
if filepath is None:
|
||||
filepath = os.path.split(client_filepath)[1]
|
||||
|
||||
request_kwargs = {"filepath": client_filepath}
|
||||
callback_kwargs.update({"filepath": filepath})
|
||||
|
||||
self.get_response("filetransfer", callback, request_kwargs=request_kwargs,
|
||||
callback_args=callback_args, callback_kwargs=callback_kwargs)
|
||||
|
||||
def _resend_requests(self):
|
||||
for request_id, request in self._request_queue.items(): # TODO filter
|
||||
if request.resend:
|
||||
self._send(MessageManager.create_request(
|
||||
request.requested_value, request_id, request.request_kwargs.update(resend=request.resend))
|
||||
)
|
||||
request.resend = False
|
||||
|
||||
def _send_response(self, requested_value, request_id, value, filetransfer=False):
|
||||
self._send(MessageManager.create_response(requested_value, request_id, value, filetransfer))
|
||||
|
||||
def send_file(self, filepath, dest_filepath): # clever_restart=False
|
||||
"""
|
||||
Sends to peer a file from `filepath` to write it on `dest_filepath`.
|
||||
|
||||
Args:
|
||||
filepath (str): Path of the file to send.
|
||||
dest_filepath (str): Path on peer where recieved file will be written to.
|
||||
"""
|
||||
try:
|
||||
with open(filepath, 'rb') as f:
|
||||
data = f.read()
|
||||
except OSError as error:
|
||||
logger.warning("File can not be opened due error: ".format(error))
|
||||
else:
|
||||
logger.info("Sending file {} to {} (as: {})".format(filepath, self.addr, dest_filepath))
|
||||
self._send(MessageManager.create_message(data, "binary", "message",
|
||||
additional_headers={"action": "filetransfer",
|
||||
"filepath": dest_filepath}))
|
||||
88
lib/network.py
Normal file
88
lib/network.py
Normal file
@@ -0,0 +1,88 @@
|
||||
import socket
|
||||
import logging
|
||||
import platform
|
||||
|
||||
def str_peername(peername):
|
||||
if peername is None:
|
||||
return "None"
|
||||
return f"{peername[0]}:{peername[1]}"
|
||||
|
||||
|
||||
def get_ip_address(): # dodo async
|
||||
"""
|
||||
Returns the IP address of current computer or `localhost` if no network connection present.
|
||||
|
||||
Returns:
|
||||
string: IP address of current computer or `localhost` if no network connection present
|
||||
"""
|
||||
ip_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # TODO IPv6
|
||||
try:
|
||||
ip_socket.connect(("8.8.8.8", 80))
|
||||
ip = ip_socket.getsockname()[0]
|
||||
except OSError as e:
|
||||
logging.warning(f"No network connection detected, using localhost: {e}")
|
||||
ip = "localhost"
|
||||
finally:
|
||||
ip_socket.close()
|
||||
return ip
|
||||
|
||||
|
||||
def get_ntp_time(ntp_host, ntp_port):
|
||||
"""
|
||||
Gets and returns time from specified host and port of NTP server.
|
||||
|
||||
Args:
|
||||
ntp_host (string): hostname or address of the NTP server.
|
||||
ntp_port (int): port of the NTP server.
|
||||
|
||||
Returns:
|
||||
int: Current time recieved from the NTP server
|
||||
"""
|
||||
NTP_DELTA = 2208988800 # 1970-01-01 00:00:00
|
||||
NTP_QUERY = b'\x1b' + bytes(47)
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as ntp_socket:
|
||||
ntp_socket.sendto(NTP_QUERY, (ntp_host, ntp_port))
|
||||
msg, _ = ntp_socket.recvfrom(1024)
|
||||
return int.from_bytes(msg[-8:], 'big') / 2 ** 32 - NTP_DELTA
|
||||
|
||||
|
||||
def set_keepalive(sock, after_idle_sec=1, interval_sec=3, max_fails=5):
|
||||
"""
|
||||
Sets `keepalive` parameters of given socket.
|
||||
|
||||
Args:
|
||||
sock (socket): Socket which parameters will be changed.
|
||||
after_idle_sec (int, optional): Start sending keepalive packets after this amount of seconds. Defaults to 1.
|
||||
interval_sec (int, optional): Interval of keepalive packets in seconds. Defaults to 3.
|
||||
max_fails (int, optional): Count of fails leading to socket disconnect. Defaults to 5.
|
||||
|
||||
Raises:
|
||||
NotImplementedError: for unknown platform.
|
||||
"""
|
||||
current_platform = platform.system() # could be empty
|
||||
|
||||
if current_platform == "Linux":
|
||||
return _set_keepalive_linux(sock, after_idle_sec, interval_sec, max_fails)
|
||||
if current_platform == "Windows":
|
||||
return _set_keepalive_windows(sock, after_idle_sec, interval_sec)
|
||||
if current_platform == "Darwin":
|
||||
return _set_keepalive_osx(sock, interval_sec)
|
||||
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
def _set_keepalive_linux(sock, after_idle_sec, interval_sec, max_fails):
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
|
||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, after_idle_sec)
|
||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval_sec)
|
||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, max_fails)
|
||||
|
||||
|
||||
def _set_keepalive_windows(sock, after_idle_sec, interval_sec):
|
||||
sock.ioctl(socket.SIO_KEEPALIVE_VALS, (1, after_idle_sec * 1000, interval_sec * 1000))
|
||||
|
||||
|
||||
def _set_keepalive_osx(sock, interval_sec):
|
||||
TCP_KEEPALIVE = 0x10
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
|
||||
sock.setsockopt(socket.IPPROTO_TCP, TCP_KEEPALIVE, interval_sec)
|
||||
197
lib/protocols.py
Normal file
197
lib/protocols.py
Normal file
@@ -0,0 +1,197 @@
|
||||
import logging
|
||||
import asyncio
|
||||
|
||||
import messages
|
||||
import exceptions
|
||||
|
||||
from network import str_peername
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BroadcastProtocol:
|
||||
def __init__(self, on_broadcast):
|
||||
self.transport = None
|
||||
self._on_broadcast = on_broadcast
|
||||
|
||||
self._closed = asyncio.Event()
|
||||
|
||||
@property
|
||||
def closed(self):
|
||||
return self._closed.wait()
|
||||
|
||||
def connection_made(self, transport):
|
||||
self.transport = transport
|
||||
logger.info("Broadcast connection established")
|
||||
|
||||
def connection_lost(self, exc):
|
||||
logger.info(f"Broadcast connection lost: {'closed' if exc is None else exc}")
|
||||
self._closed.set()
|
||||
|
||||
def datagram_received(self, data, addr):
|
||||
message = message.MessageDecoder(data)
|
||||
message.process_message()
|
||||
content = message.content
|
||||
|
||||
is_ip_broadcast = (content is not None and message.header["action"] == "server_ip")
|
||||
|
||||
if is_ip_broadcast:
|
||||
logger.debug(f"Got broadcast message from {addr}: {content}")
|
||||
asyncio.get_event_loop().call_soon(self._on_broadcast, message)
|
||||
self.transport.close()
|
||||
else:
|
||||
logger.warning(f"Got wrong broadcast message from {addr}")
|
||||
|
||||
def error_received(self, exc):
|
||||
logger.warning(f"Error on broadcast connection received: {exc}")
|
||||
|
||||
class PeerProtocol(asyncio.Protocol):
|
||||
def __init__(self, connected_callback, callbacks):
|
||||
self._callbacks = callbacks
|
||||
|
||||
self.transport: asyncio.Transport = None
|
||||
|
||||
self._connected = asyncio.Event()
|
||||
self._closed = asyncio.Event()
|
||||
self._closed.set()
|
||||
|
||||
self._recv_buffer = bytearray()
|
||||
self._recv_queue = asyncio.Queue()
|
||||
self._current_msg = None
|
||||
self._recv_process_task = None
|
||||
|
||||
self._send_queue = asyncio.Queue() # holds messages to send
|
||||
self._send_task = None
|
||||
|
||||
self._can_write = asyncio.Event()
|
||||
|
||||
self._recv_waiter = None
|
||||
|
||||
@property
|
||||
def peername(self):
|
||||
return self.transport.get_extra_info('peername')
|
||||
|
||||
@property
|
||||
def is_connected(self):
|
||||
return self._connected.is_set()
|
||||
|
||||
@property
|
||||
def connected(self):
|
||||
return self._connected.wait()
|
||||
|
||||
@property
|
||||
def closed(self):
|
||||
return self._closed.wait()
|
||||
|
||||
# Drain control
|
||||
|
||||
def pause_writing(self) -> None:
|
||||
self._can_write.clear()
|
||||
|
||||
def resume_writing(self) -> None:
|
||||
self._can_write.set()
|
||||
|
||||
async def drain(self) -> None:
|
||||
await self._can_write.wait()
|
||||
|
||||
async def send(self, msg: messages.PendingMessage):
|
||||
if not self.is_connected: # and not send_disconnected
|
||||
msg.sent.cancel("Peer is disconnected, can't send")
|
||||
raise RuntimeError("Peer is disconnected, can't send")
|
||||
|
||||
await self._send_queue.put(msg)
|
||||
if self._send_task is None:
|
||||
self._send_task = asyncio.create_task(self._send())
|
||||
|
||||
logger.debug(f"Queued sending of message {msg} to {str_peername(self.peername)}")
|
||||
await msg.sent
|
||||
|
||||
async def _send(self):
|
||||
while self.is_connected:
|
||||
msg = None
|
||||
try:
|
||||
msg: messages.PendingMessage = await self._send_queue.get()
|
||||
self.transport.write(msg.encode())
|
||||
await self.drain()
|
||||
except asyncio.CancelledError:
|
||||
if msg is not None:
|
||||
msg.sent.set_exception(exceptions.ConnectionClosedError)
|
||||
raise
|
||||
else:
|
||||
msg.sent.set_result(None)
|
||||
logger.debug(f"Sent message {msg} to {str_peername(self.peername)}")
|
||||
|
||||
def connection_made(self, transport):
|
||||
self.transport = transport
|
||||
logger.info(f"Connected to {str_peername(self.peername)}")
|
||||
|
||||
self._connected.set()
|
||||
self._closed.clear()
|
||||
|
||||
self._can_write.set()
|
||||
|
||||
def connection_lost(self, exc):
|
||||
logger.info(f"Lost connection to {str_peername(self.peername)}: {'closed' if exc is None else exc}")
|
||||
self._connected.clear()
|
||||
self._closed.set()
|
||||
|
||||
if self._recv_waiter is not None:
|
||||
self._recv_waiter.cancel()
|
||||
|
||||
if self._recv_process_task is not None:
|
||||
self._recv_process_task.cancel()
|
||||
|
||||
if self._send_task is not None:
|
||||
self._send_task.cancel()
|
||||
|
||||
self._can_write.set()
|
||||
|
||||
self._recv_buffer = bytearray()
|
||||
self._current_msg = None
|
||||
|
||||
def error_received(self, exc):
|
||||
logger.warning(f"Error on {str_peername(self.peername)} connection received: {exc}")
|
||||
|
||||
def data_received(self, data):
|
||||
self._recv_buffer += data
|
||||
logger.debug(f"Received {len(data)} bytes from {str_peername(self.peername)}")
|
||||
|
||||
if self._recv_process_task is None:
|
||||
self._recv_process_task = asyncio.create_task(self._proceess_received())
|
||||
|
||||
async def _proceess_received(self):
|
||||
try:
|
||||
while self._recv_buffer:
|
||||
if self._current_msg is None:
|
||||
self._current_msg = messages.MessageDecoder(self._recv_buffer)
|
||||
else:
|
||||
self._current_msg.set_buffer(self._recv_buffer)
|
||||
|
||||
self._current_msg.process_message()
|
||||
self._recv_buffer = self._current_msg.get_buffer()
|
||||
# self._current_msg.reset_buffer()
|
||||
if self._current_msg.processed:
|
||||
logger.debug(f"Recieved message {self._current_msg.content} from {str_peername(self.peername)}")
|
||||
await self._recv_queue.put(self._current_msg)
|
||||
self._current_msg = None
|
||||
else:
|
||||
await asyncio.sleep(0)
|
||||
except Exception as e:
|
||||
logger.error(f"Error during message processing: {e}")
|
||||
else:
|
||||
logger.debug("All data processed")
|
||||
finally:
|
||||
self._recv_process_task = None
|
||||
|
||||
async def receive_message(self):
|
||||
if self._recv_waiter is not None:
|
||||
raise RuntimeError("receive_message() is already being awaited")
|
||||
|
||||
self._recv_waiter = asyncio.create_task(self._recv_queue.get())
|
||||
|
||||
try:
|
||||
return await self._recv_waiter
|
||||
except asyncio.CancelledError:
|
||||
raise exceptions.ConnectionClosedError
|
||||
finally:
|
||||
self._recv_waiter = None
|
||||
Reference in New Issue
Block a user