diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..76d8fca --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +**__pycache__ diff --git a/client.py b/client.py index ba9746f..9f27bde 100755 --- a/client.py +++ b/client.py @@ -1,25 +1,39 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 import argparse +import socket +from socket import SOL_SOCKET, SO_REUSEADDR +import pickle import pika from params import rmq_params, socket_params def checkpoint(message): - """Prints [CHeckpoint] + """Prints [Checkpoint] """ print("[Checkpoint] {}".format(message)) +# Get pi's IP +# From https://stackoverflow.com/questions/166506/ +# finding-local-ip-addresses-using-pythons-stdlib?page=1&tab=votes#tab-top +def my_ip(): + return str((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] + if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), + s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, + socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0]) + def process_response(response): """Process response from serve & update GUI """ # Update GUI with color # TODO + color = 'red' checkpoint("Update GUI with color \'{}\'".format(color)) # Wait a few seconds for a possible reject # TODO + reject = False checkpoint("Drink rejected? \'{}\'".format(reject)) return @@ -28,23 +42,26 @@ def main(): """Processes rfid scans """ parser = argparse.ArgumentParser(description='Processses arguments') - parse.add_argument('-s', help='Set server IP or hostname', required=True) - parse.add_argument('-p', help='Set server Port to receive messages on', + parser.add_argument('-s', help='Set RMQ server', required=True) + parser.add_argument('-p', help='Set port for socket to listen on', default=socket_params['port']) - parse.add_argument('-z', help='Set size for socket to recive messages', + parser.add_argument('-z', help='Set size for socket to recive messages', default=socket_params['size']) + parser.add_argument('-b', help='Set socket backlog size', + default=socket_params['backlog']) # Process args args = parser.parse_args() - server_host = args.s - server_port = args.p - server_size = args.z + rmq_host = args.s + socket_port = int(args.p) + socket_size = int(args.z) + socket_backlog = int(args.b) # Setup RabbitMQ credentials = pika.PlainCredentials(rmq_params['username'], rmq_params['password']) parameters = pika.ConnectionParameters(host=rmq_host, - virtual_host=rmq_paras['vhost'], + virtual_host=rmq_params['vhost'], credentials=credentials) # Connect to RabbitMQ @@ -60,26 +77,45 @@ def main(): # TODO print('Continuously listen for RFID ids') # TODO + rfid_id = '123456' + socket_host = my_ip() + order_data = {'id': rfid_id, 'ip': socket_host, 'port': socket_port, + 'size': socket_size} checkpoint("Received id \'{}\'".format(rfid_id)) # Submit new drink order to queue channel.basic_publish(exchange=rmq_params['exchange'], routing_key=rmq_params['order_queue'], - body=rfid_id) + body=str(order_data)) checkpoint("Getting status for id \'{}\'".format(rfid_id)) - # Wait for response from server for drink order - try: - s = socket.socket(AF_INET, socket.SOCK_STREAM) - s.connect((server_host, server_port)) - recv_data = s.recv(server_size) - s.close() - process_response(recv_data) - # Error receiving message - except Exception as ex: - print(ex) + ## Wait for response from server for drink order + # Setup socket + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) + s.bind((socket_host, socket_port)) + s.listen(socket_backlog) + checkpoint("Created socket at {} on port {}" + .format(socket_host, socket_port)) + + # Listen for reply from server + server, address = s.accept() + svr_addr = server.getpeername()[0] + svr_port = server.getpeername()[1] + checkpoint("Accepted server connection from {} on {}" + .format(svr_addr, svr_port)) + + # Receive data from server + recv_data = server.recv(socket_size) + recv_data = pickle.loads(recv_data) + checkpoint("Received data: {}".format(recv_data)) + s.close() + + # Update GUI based on response + process_response(recv_data) + break main() diff --git a/params.py b/params.py index 8960da9..6b37586 100644 --- a/params.py +++ b/params.py @@ -6,6 +6,8 @@ rmq_params = {'vhost': 'my_vhost', 'username': 'user', 'password': 'pass', 'exchange': 'my_exchange', - 'order_queue': 'my_order_queue'} + 'order_queue': 'my_order_queue', + 'reject_queue': 'my_reject_queue'} socket_params = {'size': 1024, - 'port': 8080} + 'port': 8080, + 'backlog': 4} diff --git a/server.py b/server.py index 5b4b0a1..ec290a8 100755 --- a/server.py +++ b/server.py @@ -1,11 +1,16 @@ #!/usr/bin/env python3 +import ast from datetime import datetime import math +import socket from time import sleep +import pickle import pymongo +import pika +from params import rmq_params, socket_params def checkpoint(message): """Prints [CHeckpoint] @@ -99,30 +104,85 @@ def allowed_drinks(id): return drinks_left +def order_callback(ch, method, properties, body): + """Process 1 drink being ordered + """ + + # Decode body + body = body.decode('utf-8') + client_params = ast.literal_eval(body) + + # Extract things from client data + id = int(client_params.get('id')) + host = client_params.get('ip') + port = int(client_params.get('port')) + + add_drink(id) + drinks = allowed_drinks(id) + + # Add drink for user + + # Try to send response to client + try: + # Connect to client socket + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect((host, port)) + checkpoint("Created socket at {} on port {}".format(host, port)) + + # Send num drinks to client + s.send(pickle.dumps(drinks)) + s.close() + except Exception as ex: + print(ex) + +def reject_callback(ch, method, properties, body): + """Remove most recent drink for user + """ + + # Decode body + id = int(body.decode('utf-8')) + + # Remove most recent drink + remove_drink = get_info(id)['drinks'][-1] + remove_drinks(id, [remove_drink]) + checkpoint("Removed most recent drink for \'{}\'".format(id)) + def main(): """Create & start 'drinks' queue for submitting drinks """ + # Temp TODO + remove_drinks(123456) + + # Connect to RMQ + credentials = pika.PlainCredentials(rmq_params['username'], rmq_params['password']) + parameters = pika.ConnectionParameters(host='localhost', + virtual_host=rmq_params['vhost'], + credentials=credentials) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + + checkpoint('Connect to RMQ server') # Create order queue - # TODO + checkpoint('Setting up exchanges and queue...') + channel.exchange_declare(exchange=rmq_params['exchange'], + exchange_type='direct', + auto_delete=True) + channel.queue_declare(queue=rmq_params['order_queue'], auto_delete=True) + channel.queue_bind(exchange=rmq_params['exchange'], queue=rmq_params['order_queue']) + channel.queue_declare(queue=rmq_params['reject_queue'], auto_delete=True) + channel.queue_bind(exchange=rmq_params['exchange'], queue=rmq_params['reject_queue']) # Start consuming drink queue - # TODO + channel.basic_consume(lambda ch, method, properties, + body: order_callback(ch, method, properties, body), + queue=rmq_params['order_queue'], no_ack=True) + channel.basic_consume(lambda ch, method, properties, + body: reject_callback(ch, method, properties, body), + queue=rmq_params['reject_queue'], no_ack=True) - # Arguments - id = 123456 - remove_drinks(id) - print() - allowed_drinks(id) - print() - add_drink(id) - allowed_drinks(id) - print() - sleep(2) - add_drink(id) - allowed_drinks(id) - print() - add_drink(id) - allowed_drinks(id) + checkpoint("Consuming RMQ queues: \'{}\' and \'{}\'" + .format(rmq_params['order_queue'], rmq_params['reject_queue'])) + channel.start_consuming() main()