Creating a Chat application using Redis and Flask

Byte Blog
2 min readApr 27, 2023

--

Flask is a popular Python web framework that allows developers to build web applications quickly and easily. Redis is a high-performance in-memory key-value store that is often used for caching and real-time applications. In this tutorial, we’ll show you how to create a Flask API chat application with a Redis Streams backend.

Prerequisites:

  • Python 3.6 or later
  • Flask
  • redis-py

Step 1: Set up Redis

To create a Redis stream, you need to have access to a Redis server. You can use a local Redis server or a cloud-based Redis server like Redis Labs or Amazon ElastiCache. Follow the Redis documentation to set up a Redis server.

Step 2: Install required packages

Install Flask and redis-py packages using pip:

pip install Flask redis

Step 3: Create a Flask app

Create a Flask app and import the required packages:

from flask import Flask, request
from redis import Redis, RedisError

app = Flask(__name__)
redis = Redis(host='localhost', port=6379)

The host and port arguments specify the Redis server address.

Step 4: Create a chat endpoint

Create a Flask endpoint that accepts a message and stores it in a Redis stream:

@app.route('/chat', methods=['POST'])
def chat():
try:
message = request.json
redis.xadd('chat_stream', message)
return 'Message sent successfully'
except RedisError as e:
return f'Error: {str(e)}'

The endpoint receives a JSON payload in the request body and stores the message in a Redis stream called chat_stream.

Step 5: Create a stream reader endpoint

Create a Flask endpoint that reads messages from the Redis stream and returns them to the client:

@app.route('/stream')
def stream():
try:
last_id = request.args.get('last_id', 0)
messages = redis.xrange('chat_stream', min=last_id, max='+inf')
response = {'messages': []}
for message in messages:
response['messages'].append({'id': message[0], 'text': message[1]['text']})
return response
except RedisError as e:
return f'Error: {str(e)}'

The endpoint reads messages from the chat_stream Redis stream, starting from the last ID received by the client. It returns the messages to the client as a JSON response.

Step 6: Run the Flask app

Run the Flask app using the following command:

if __name__ == '__main__':
app.run(debug=True)

This command starts the Flask app on a local server.

Step 7: Test the API

To test the Flask API chat application, send a POST request to the /chat endpoint with a JSON payload:

import requests

message = {'text': 'Hello, World!'}
response = requests.post('http://localhost:5000/chat', json=message)
print(response.content)

This code sends a message to the Flask API, which then stores the message in the chat_stream Redis stream.

To read messages from the Redis stream, send a GET request to the /stream endpoint with the last received ID:

import requests

last_id = 0
while True:
response = requests.get(f'http://localhost:5000/stream?last_id={last_id}')
for message in response.json()['messages']:
print(f'{message["id"]}: {message["text"]}')

--

--

Byte Blog
Byte Blog

Written by Byte Blog

Technology enthusiast with a passion for transforming complex concepts into bite sized chunks

No responses yet