mostly finish backend, still need to do reject callback & take out some TODOs

master
Paul Walko 2018-04-25 00:57:56 +00:00
parent 2f2464f342
commit bc6930c8db
4 changed files with 138 additions and 39 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
**__pycache__

View File

@ -1,25 +1,39 @@
#!/usr/bin/env python #!/usr/bin/env python3
import argparse import argparse
import socket
from socket import SOL_SOCKET, SO_REUSEADDR
import pickle
import pika import pika
from params import rmq_params, socket_params from params import rmq_params, socket_params
def checkpoint(message): def checkpoint(message):
"""Prints [CHeckpoint] <message> """Prints [Checkpoint] <message>
""" """
print("[Checkpoint] {}".format(message)) print("[Checkpoint] {}".format(message))
# Get pi's IP
# From https://stackoverflow.com/questions/166506/
# finding-local-ip-addresses-using-pythons-stdlib?page=1&tab=votes#tab-top
def my_ip():
return str((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2]
if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)),
s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET,
socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0])
def process_response(response): def process_response(response):
"""Process response from serve & update GUI """Process response from serve & update GUI
""" """
# Update GUI with color # Update GUI with color
# TODO # TODO
color = 'red'
checkpoint("Update GUI with color \'{}\'".format(color)) checkpoint("Update GUI with color \'{}\'".format(color))
# Wait a few seconds for a possible reject # Wait a few seconds for a possible reject
# TODO # TODO
reject = False
checkpoint("Drink rejected? \'{}\'".format(reject)) checkpoint("Drink rejected? \'{}\'".format(reject))
return return
@ -28,23 +42,26 @@ def main():
"""Processes rfid scans """Processes rfid scans
""" """
parser = argparse.ArgumentParser(description='Processses arguments') parser = argparse.ArgumentParser(description='Processses arguments')
parse.add_argument('-s', help='Set server IP or hostname', required=True) parser.add_argument('-s', help='Set RMQ server', required=True)
parse.add_argument('-p', help='Set server Port to receive messages on', parser.add_argument('-p', help='Set port for socket to listen on',
default=socket_params['port']) default=socket_params['port'])
parse.add_argument('-z', help='Set size for socket to recive messages', parser.add_argument('-z', help='Set size for socket to recive messages',
default=socket_params['size']) default=socket_params['size'])
parser.add_argument('-b', help='Set socket backlog size',
default=socket_params['backlog'])
# Process args # Process args
args = parser.parse_args() args = parser.parse_args()
server_host = args.s rmq_host = args.s
server_port = args.p socket_port = int(args.p)
server_size = args.z socket_size = int(args.z)
socket_backlog = int(args.b)
# Setup RabbitMQ # Setup RabbitMQ
credentials = pika.PlainCredentials(rmq_params['username'], credentials = pika.PlainCredentials(rmq_params['username'],
rmq_params['password']) rmq_params['password'])
parameters = pika.ConnectionParameters(host=rmq_host, parameters = pika.ConnectionParameters(host=rmq_host,
virtual_host=rmq_paras['vhost'], virtual_host=rmq_params['vhost'],
credentials=credentials) credentials=credentials)
# Connect to RabbitMQ # Connect to RabbitMQ
@ -60,26 +77,45 @@ def main():
# TODO # TODO
print('Continuously listen for RFID ids') print('Continuously listen for RFID ids')
# TODO # TODO
rfid_id = '123456' rfid_id = '123456'
socket_host = my_ip()
order_data = {'id': rfid_id, 'ip': socket_host, 'port': socket_port,
'size': socket_size}
checkpoint("Received id \'{}\'".format(rfid_id)) checkpoint("Received id \'{}\'".format(rfid_id))
# Submit new drink order to queue # Submit new drink order to queue
channel.basic_publish(exchange=rmq_params['exchange'], channel.basic_publish(exchange=rmq_params['exchange'],
routing_key=rmq_params['order_queue'], routing_key=rmq_params['order_queue'],
body=rfid_id) body=str(order_data))
checkpoint("Getting status for id \'{}\'".format(rfid_id)) checkpoint("Getting status for id \'{}\'".format(rfid_id))
# Wait for response from server for drink order
try:
s = socket.socket(AF_INET, socket.SOCK_STREAM)
s.connect((server_host, server_port))
recv_data = s.recv(server_size)
s.close()
process_response(recv_data)
# Error receiving message ## Wait for response from server for drink order
except Exception as ex: # Setup socket
print(ex) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
s.bind((socket_host, socket_port))
s.listen(socket_backlog)
checkpoint("Created socket at {} on port {}"
.format(socket_host, socket_port))
# Listen for reply from server
server, address = s.accept()
svr_addr = server.getpeername()[0]
svr_port = server.getpeername()[1]
checkpoint("Accepted server connection from {} on {}"
.format(svr_addr, svr_port))
# Receive data from server
recv_data = server.recv(socket_size)
recv_data = pickle.loads(recv_data)
checkpoint("Received data: {}".format(recv_data))
s.close()
# Update GUI based on response
process_response(recv_data)
break
main() main()

View File

@ -6,6 +6,8 @@ rmq_params = {'vhost': 'my_vhost',
'username': 'user', 'username': 'user',
'password': 'pass', 'password': 'pass',
'exchange': 'my_exchange', 'exchange': 'my_exchange',
'order_queue': 'my_order_queue'} 'order_queue': 'my_order_queue',
'reject_queue': 'my_reject_queue'}
socket_params = {'size': 1024, socket_params = {'size': 1024,
'port': 8080} 'port': 8080,
'backlog': 4}

View File

@ -1,11 +1,16 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import ast
from datetime import datetime from datetime import datetime
import math import math
import socket
from time import sleep from time import sleep
import pickle
import pymongo import pymongo
import pika
from params import rmq_params, socket_params
def checkpoint(message): def checkpoint(message):
"""Prints [CHeckpoint] <message> """Prints [CHeckpoint] <message>
@ -99,30 +104,85 @@ def allowed_drinks(id):
return drinks_left return drinks_left
def order_callback(ch, method, properties, body):
"""Process 1 drink being ordered
"""
# Decode body
body = body.decode('utf-8')
client_params = ast.literal_eval(body)
# Extract things from client data
id = int(client_params.get('id'))
host = client_params.get('ip')
port = int(client_params.get('port'))
add_drink(id)
drinks = allowed_drinks(id)
# Add drink for user
# Try to send response to client
try:
# Connect to client socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((host, port))
checkpoint("Created socket at {} on port {}".format(host, port))
# Send num drinks to client
s.send(pickle.dumps(drinks))
s.close()
except Exception as ex:
print(ex)
def reject_callback(ch, method, properties, body):
"""Remove most recent drink for user
"""
# Decode body
id = int(body.decode('utf-8'))
# Remove most recent drink
remove_drink = get_info(id)['drinks'][-1]
remove_drinks(id, [remove_drink])
checkpoint("Removed most recent drink for \'{}\'".format(id))
def main(): def main():
"""Create & start 'drinks' queue for submitting drinks """Create & start 'drinks' queue for submitting drinks
""" """
# Temp TODO
remove_drinks(123456)
# Connect to RMQ
credentials = pika.PlainCredentials(rmq_params['username'], rmq_params['password'])
parameters = pika.ConnectionParameters(host='localhost',
virtual_host=rmq_params['vhost'],
credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
checkpoint('Connect to RMQ server')
# Create order queue # Create order queue
# TODO checkpoint('Setting up exchanges and queue...')
channel.exchange_declare(exchange=rmq_params['exchange'],
exchange_type='direct',
auto_delete=True)
channel.queue_declare(queue=rmq_params['order_queue'], auto_delete=True)
channel.queue_bind(exchange=rmq_params['exchange'], queue=rmq_params['order_queue'])
channel.queue_declare(queue=rmq_params['reject_queue'], auto_delete=True)
channel.queue_bind(exchange=rmq_params['exchange'], queue=rmq_params['reject_queue'])
# Start consuming drink queue # Start consuming drink queue
# TODO channel.basic_consume(lambda ch, method, properties,
body: order_callback(ch, method, properties, body),
queue=rmq_params['order_queue'], no_ack=True)
channel.basic_consume(lambda ch, method, properties,
body: reject_callback(ch, method, properties, body),
queue=rmq_params['reject_queue'], no_ack=True)
# Arguments checkpoint("Consuming RMQ queues: \'{}\' and \'{}\'"
id = 123456 .format(rmq_params['order_queue'], rmq_params['reject_queue']))
remove_drinks(id) channel.start_consuming()
print()
allowed_drinks(id)
print()
add_drink(id)
allowed_drinks(id)
print()
sleep(2)
add_drink(id)
allowed_drinks(id)
print()
add_drink(id)
allowed_drinks(id)
main() main()