Commit 61b085ef authored by Milosz Malczak's avatar Milosz Malczak

Moving publisher to TCP

parent 48ab52b7
Internal trigger on channel 3
Sampled signal: 100kHz sine wave Channel 3(0-3)
number of presamples = 0
number of acquisitions = 5
Number of channels: 1
Postsamples: 2 ,Best: 0.000572896096855, medium: 0.000710890814662, variance: 0.000000053278288,
Postsamples: 10 ,Best: 0.000580189982429, medium: 0.000643312186003, variance: 0.000000003923883,
Postsamples: 100 ,Best: 0.000551857985556, medium: 0.000615892698988, variance: 0.000000003396801,
Postsamples: 1000 ,Best: 0.000655396142974, medium: 0.000729066692293, variance: 0.000000002557101,
Postsamples: 10000 ,Best: 0.001086103962734, medium: 0.001358926994726, variance: 0.000000067446906,
Postsamples: 100000 ,Best: 0.008953895885497, medium: 0.009634020552039, variance: 0.000000255370224,
Number of channels: 2
Postsamples: 2 ,Best: 0.000617414945737, medium: 0.000738564785570, variance: 0.000000014298909,
Postsamples: 10 ,Best: 0.000639991136268, medium: 0.000797065068036, variance: 0.000000011309527,
Postsamples: 100 ,Best: 0.000710595864803, medium: 0.000769018614665, variance: 0.000000001959616,
Postsamples: 1000 ,Best: 0.000670948065817, medium: 0.000770822586492, variance: 0.000000006407673,
Postsamples: 10000 ,Best: 0.001578854862601, medium: 0.001658579753712, variance: 0.000000004683342,
Postsamples: 100000 ,Best: 0.012607470154762, medium: 0.013600414572284, variance: 0.000000457720085,
Number of channels: 3
Postsamples: 2 ,Best: 0.000832587946206, medium: 0.000984404236078, variance: 0.000000044952812,
Postsamples: 10 ,Best: 0.000571095151827, medium: 0.000809086393565, variance: 0.000000023020299,
Postsamples: 100 ,Best: 0.000716756097972, medium: 0.000777146639302, variance: 0.000000002526106,
Postsamples: 1000 ,Best: 0.000840920954943, medium: 0.000953066814691, variance: 0.000000018053205,
Postsamples: 10000 ,Best: 0.002001366810873, medium: 0.002100262511522, variance: 0.000000013551104,
Postsamples: 100000 ,Best: 0.018622281029820, medium: 0.019830726739019, variance: 0.000000562585940,
Number of channels: 4
Postsamples: 2 ,Best: 0.000956265022978, medium: 0.001271071936935, variance: 0.000000313571437,
Postsamples: 10 ,Best: 0.000882796011865, medium: 0.001002867007628, variance: 0.000000010051286,
Postsamples: 100 ,Best: 0.000782405957580, medium: 0.000852786377072, variance: 0.000000001988760,
Postsamples: 1000 ,Best: 0.000914072850719, medium: 0.001046138396487, variance: 0.000000010762032,
Postsamples: 10000 ,Best: 0.002384664025158, medium: 0.002693657251075, variance: 0.000000076608980,
Postsamples: 100000 ,Best: 0.025531234918162, medium: 0.026989804208279, variance: 0.000001874679344,
import zmq
import socket
from general import serialization
import logging
logger = logging.getLogger(__name__)
class Publisher():
class Publisher:
message_length_size = 10
def __init__(self, ip, port):
"""TODO should I add some logging here or some error handling?"""
context = zmq.Context()
self.socket = context.socket(zmq.DEALER)
addr = str(ip) + ':' + str(port)
self.socket.connect("tcp://" + addr)
self.ip = ip
self.port = port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print("socket created")
self.socket.connect((self.ip, self.port))
print("socket connected")
def send_message(self, data):
msg = serialization.serialize(data)
length = str(len(msg)).encode('utf-8')
length = length.rjust(self.message_length_size, b'0')
self.socket.send(length)
self.socket.send(msg)
def __del__(self):
self.socket.close()
class PublisherIPC():
......@@ -29,3 +39,5 @@ class PublisherIPC():
def send_message(self, data):
msg = pickle.dumps(data)
self.socket.send(msg)
import socket
import zmq
class TCPServer:
mess_length_size = 10
READ_ONLY = zmq.POLLIN | zmq.POLLERR
def __init__(self, ip, port, poller):
self.ip = ip
self.port = port
self.poller = poller
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.bind((self.ip, self.port))
self.socket.listen(1)
self.fd_conn = {}
def receive_packet(self, fd):
conn = self.fd_conn[fd]
mess_size = self.receive_message(conn, fd, self.mess_length_size)
mess_size = int(mess_size)
data = self.receive_message(conn, fd, mess_size)
return data
def receive_message(self, conn, fd, size):
buff = b''
total_bytes = 0
while(total_bytes < size):
data = self.receive_trunc(conn, fd, size - total_bytes)
total_bytes += len(data)
buff += data
return buff
def receive_trunc(self, conn, fd, size):
trunc = conn.recv(size)
if not trunc:
self.unregister_connection(fd)
return None
else:
return trunc
def register_connection(self):
conn, addr = self.socket.accept()
self.fd_conn[conn.fileno()] = conn
self.poller.register(conn, self.READ_ONLY)
print('Connection address: ' + str(addr))
def unregister_connection(self, fd):
conn = self.fd_conn[fd]
self.poller.unregister(conn)
conn.close()
del self.fd_conn[fd]
print("No data, finish")
......@@ -4,6 +4,8 @@ import zmq
import sys
sys.path.append('../')
from general.ipaddr import get_ip
from general.tcp_server import TCPServer
class ServerExposeZMQ(QtCore.QObject):
......@@ -17,18 +19,23 @@ class ServerExposeZMQ(QtCore.QObject):
def monitorSlot(self):
context = zmq.Context()
socket = context.socket(zmq.ROUTER)
ip = get_ip()
socket.bind("tcp://" + ip + ":" + str(self.port_GUI))
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN | zmq.POLLERR)
tcp_server = TCPServer(ip, self.port_GUI, poller)
poller.register(tcp_server.socket)
while True:
socks = dict(poller.poll())
if socket in socks:
[identity, message] = socket.recv_multipart()
self.signal.emit(message)
if tcp_server.socket.fileno() in socks:
tcp_server.register_connection()
for sock, event in socks.items():
if sock in tcp_server.fd_conn:
message = tcp_server.receive_packet(sock)
if message:
self.signal.emit(message)
class ThreadServerExposeZMQ(QtGui.QWidget):
......
......@@ -10,6 +10,7 @@ from general.ipaddr import get_ip
from general.addresses import server_expose_to_user_port
from general.addresses import server_expose_to_device_port
from general import serialization
from general.tcp_server import TCPServer
class Expose():
......@@ -102,22 +103,26 @@ class Expose():
socket = context.socket(zmq.ROUTER)
monitor = socket.get_monitor_socket()
socket_ADC_listener = context.socket(zmq.ROUTER)
#socket_ADC_listener = context.socket(zmq.ROUTER)
socket_zeroconf_listener = context.socket(zmq.ROUTER)
server_ip = get_ip()
socket.bind("tcp://" + server_ip + ":" +
str(server_expose_to_user_port))
socket_ADC_listener.bind("tcp://" + server_ip + ":" +
str(server_expose_to_device_port))
#socket_ADC_listener.bind("tcp://" + server_ip + ":" +
# str(server_expose_to_device_port))
socket_zeroconf_listener.bind("ipc:///tmp/zeroconf")
poller = zmq.Poller()
poller.register(monitor, zmq.POLLIN | zmq.POLLERR)
poller.register(socket, zmq.POLLIN | zmq.POLLERR)
poller.register(socket_ADC_listener, zmq.POLLIN | zmq.POLLERR)
#poller.register(socket_ADC_listener, zmq.POLLIN | zmq.POLLERR)
poller.register(socket_zeroconf_listener, zmq.POLLIN | zmq.POLLERR)
tcp_server = TCPServer(server_ip, server_expose_to_device_port, poller)
poller.register(tcp_server.socket)
while True:
socks = dict(poller.poll(100))
if socket in socks:
......@@ -134,13 +139,17 @@ class Expose():
evt = recv_monitor_message(monitor)
evt.update({'description': EVENT_MAP[evt['event']]})
logger.info("Event: {}".format(evt))
if socket_ADC_listener in socks:
[identity, message] = socket_ADC_listener.recv_multipart()
data = serialization.deserialize(message)
try:
getattr(self, data['function_name'])(*data['args'])
except AttributeError as e:
logger.error("Attribute error: {}".format(e))
if tcp_server.socket.fileno() in socks:
tcp_server.register_connection()
for sock, event in socks.items():
if sock in tcp_server.fd_conn:
message = tcp_server.receive_packet(sock)
if message:
data = serialization.deserialize(message)
try:
getattr(self, data['function_name'])(*data['args'])
except AttributeError as e:
logger.error("Attribute error: {}".format(e))
if socket_zeroconf_listener in socks:
[identity, message] = socket_zeroconf_listener.recv_multipart()
data = pickle.loads(message)
......
......@@ -10,6 +10,9 @@ from general.ipaddr import get_ip
import zmq
import pickle
from general import serialization
from general.tcp_server import TCPServer
class ServerExposeTest():
......@@ -41,20 +44,27 @@ class ServerExposeTest():
def monitorSlot(self, return_queue):
self.return_queue = return_queue
context = zmq.Context()
socket = context.socket(zmq.ROUTER)
ip = get_ip()
socket.bind("tcp://" + ip + ":" + str(self.port_GUI))
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN | zmq.POLLERR)
while True:
socks = dict(poller.poll())
if socket in socks:
[identity, message] = socket.recv_multipart()
data = serialization.deserialize(message)
getattr(self, data['function_name'])(*data['args'])
tcp_server = TCPServer(ip, self.port_GUI, poller)
poller.register(tcp_server.socket)
while True:
socks = dict(poller.poll())
if tcp_server.socket.fileno() in socks:
tcp_server.register_connection()
for sock, event in socks.items():
if sock in tcp_server.fd_conn:
message = tcp_server.receive_packet(sock)
if message:
data = serialization.deserialize(message)
try:
getattr(self, data['function_name'])(*data['args'])
except AttributeError as e:
logger.error("Attribute error: {}".format(e))
class ThreadServerExposeTest():
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment