Today, I want to share how to solve an interesting problem I recently encountered. I was a noob to this and had to come across some concepts I had never encountered before. Here is the problem statement:
Create an API where you take your friend's location, destination, and travel date. Compare the temperature of those two locations at 2 PM on that day and return a response deciding whether they should travel there. Hint: You might need to periodically fetch data and store it somewhere.
For fetching data periodically, we are going to use Celery, and for storing the data, we are going to use Redis.
Here is the diagram of the whole task flow:
The breakdown of the task is:
Build a background system for periodical fetching of the weather data (Steps 2 - 7)
Create an API for the recommendation task (Step 8)
Extract the places from the API and match them with the stored data while parallel updating the data (Step 8,9)
Celery helps you with asynchronous task management. Celery has 'workers' that execute your asynchronous tasks in the background and 'beats' that schedule the particular task based on the time interval you set, like a cron job. You can learn more from the documentation - https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html
Let's get started.
1. Install Dependencies
Celery works best with Python 3.7+ versions; therefore, set up a virtual environment based on these versions. On the other hand, Redis needs to be installed in your OS. First, in case you are in linux or WSL, run:
sudo apt update && apt upgrade
sudo apt install redis-server
In your terminal, run redis-cli and you will get the redis shell. To check if redis is connected, type ping in the redis shell. If it gives PONG , the installation and connection is successful.
Now in your python virtual environment, run
pip install django django-celery-results celery redis
2. Edit settings.py File
First, create a Django project and an app within it.
django-admin startproject travelmanagement
cd travelmanagement
python manage.py startapp apilist
```{Python}
In order for your app to work with both Celery and Redis, you need to make some changes in your Django's settings.py file. Add the following snippets in your settings.py file:
CELERY_BROKER_URL = 'redis://127.0.0.1:6379' CELERY_ACCEPT_CONTENT = ['application/json'] CELERY_RESULT_SERIALIZER = 'json' CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_BACKEND = 'django-db' CELERY_IMPORTS = ('apilist.tasks',) # we will implement this function in the next step, generally it is appname.file_name CELERY_TIMEZONE = 'Asia/Dhaka'
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
For best practice, keep all these in a .env file and load using python-dotenv library. Let's do the setup for the redis cache:
```{Python}
CACHES = {
'default': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': os.getenv('REDIS_LOCATION'),
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
'CONNECTION_POOL_KWARGS': {'max_connections': 100},
},
'KEY_PREFIX': 'my_cache_prefix', # Add a prefix to your cache keys
'TIMEOUT': 300, # Cache timeout in seconds (adjust as needed)
'VERSION': 1, # Cache version (change this if you change your cache keys structure)
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
'IGNORE_EXCEPTIONS': True, # Ignore cache-related exceptions
},
}
}
3. Write the tasks.py File
Create a file named tasks.py inside your app folder. Here, you will write the function for the asynchronous task. For this project, we are required to fetch district-wise 2 p.m. temperature data periodically and store it. I had two URLs, one is a JSON for all districts, and another is an Open Meteo API link. First, we need to find out the structure of the JSON object, traverse it, and store it in a list. Later, you need to save the list in the redis cache with a key. Just like cookies, redis caches have two methods - set and get.
cache.set("your key", your_data)
The whole code will look like the following; print statements are for debugging purposes:
from django.core.cache import cache
from celery import shared_task
import requests
@shared_task
def fetch_and_store_temperature():
try:
response = requests.get('https://raw.githubusercontent.com/strativ-dev/technical-screening-test/main/bd-districts.json')
response.raise_for_status()
districts_data = response.json().get('districts', [])
all_temperatures = []
for district in districts_data:
latitude = district.get('lat')
longitude = district.get('long')
if latitude is not None and longitude is not None:
api_url = f'https://api.open-meteo.com/v1/forecast?latitude={latitude}&longitude={longitude}&hourly=temperature_2m&timezone=GMT&forecast_days=7'
weather_response = requests.get(api_url)
weather_response.raise_for_status()
weather_data = weather_response.json()
hourly_data = weather_data.get('hourly', {})
temperature_at_2pm = hourly_data.get('temperature_2m', [])
# Cache key for each district without specifying a travel date
cache_key = f'temperature_at_2pm_{district["name"]}'
cache.set(cache_key, temperature_at_2pm)
# print("checking", cache.get(cache_key))
all_temperatures.extend(temperature_at_2pm)
# Store all temperatures in a single cache key
cache.set('temperature_data', all_temperatures)
except requests.exceptions.RequestException as e:
# Handle API request exceptions
print(f"Error fetching weather data: {e}")
except Exception as e:
# Handle other exceptions
print(f"Error: {e}")
You must use the @shared_task decorator for any function that you want to run asynchronously.
4. Write the celery.py File
Go to your project directory and open a file named celery.py. This configuration is needed for scheduling your tasks. Before that, go to the project's init.py file and add the following lines:
from celery import app as celery_app
__all__ = ("celery_app",)
We will now set the necessary configurations in the celery.py file. Let's say, we want to execute our function every 15 minutes. Here is the code for that:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from celery.schedules import crontab
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'travelmanagement.settings') # project name.settings
app = Celery('travelmanagement') # provide your project name
app.conf.enable_utc = False
app.config_from_object(settings, namespace='CELERY')
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f"Request: {self.request!r}")
app.conf.CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
app.conf.beat_schedule = {
# create an object for your scheduling your task
'fetch-and-store-temp-data-contrab': {
'task': 'apilist.tasks.fetch_and_store_temperature', #app_name.tasks.function_name
'schedule': crontab(minute='*/15'), #crontab() means run every minute
# 'args' : (..., ...) In case function takes parameters, add them here
}
}
crontab schedules the task for you. Additionally, if your asynchronous function takes arguments, comment out the last line and do the necessary changes. A list of crontab commands can be found in the official document - https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html
5. Clear pycache
Sometimes, if the code of the workers are changed too frequently, celery may not pick up the changes. For that, delete your sqlit3 database and run the following command. This command deletes all the pycache files. Do not forget to add the pycache files in the .gitignore to avoid pushing those.
` py3clean .
`
6. Run Migrations
If your task is still running based on old code, clear the migration files using the following commands (for Linux):
find . -path "*/migrations/*.py" -not -name "__init__.py" -delete
find . -path "*/migrations/*.pyc" -delete
Now run the migrations. Because you are using celery, you need to use 3 commands, instead of the usual 2:
python manage.py makemigrations
python manage.py migrate
python manage.py migrate django_celery_beat
7. Run Celery workers and beats
Everything is configured. Now open three terminals, two for Celery and one for Django. Run the following in the first terminal:
celery -A your_project_name worker -l info
Run the following command in the other terminal to start the celery beats
celery -A your_project_name beat -l info
And finally, run the server in the last terminal.
python manage.py runserver
8. Create a view for the API
This part is about creating your API for travel suggestions. What you would do -
- Extract the districts - where you are and where you want to go
- Extract the temperature data from the cache using the get method
- As it is a lot of data, I used the average values of temperatures of the districts and compared these. If my location has higher temperature, I suggested to pick the destination district, otherwise not.
Here is how you can do it:
class DecisionMakingAPIView(APIView):
def post(self, request):
try:
data = request.data
source_district_name = data.get('source_district_name')
destination_district_name = data.get('destination_district_name')
# Retrieve temperatures for source district
source_cache_key = f'temperature_at_2pm_{source_district_name}'
source_temperature_at_2pm = cache.get(source_cache_key)
# Retrieve temperatures for destination district
destination_cache_key = f'temperature_at_2pm_{destination_district_name}'
destination_temperature_at_2pm = cache.get(destination_cache_key)
if source_temperature_at_2pm is not None and destination_temperature_at_2pm is not None:
# Calculate average temperature for the source district
source_average_temperature = sum(source_temperature_at_2pm) / len(source_temperature_at_2pm)
# Calculate average temperature for the destination district
destination_average_temperature = sum(destination_temperature_at_2pm) / len(destination_temperature_at_2pm)
# Compare average temperatures and make a travel recommendation
if source_average_temperature < destination_average_temperature:
recommendation = f"Traveling from {source_district_name} to {destination_district_name} is Recommended as destination is cooler."
else:
recommendation = f"Traveling from {source_district_name} to {destination_district_name} is NOT Recommended as destination is hotter."
return Response({'recommendation': recommendation}, status=status.HTTP_200_OK)
else:
return Response({'error': 'Temperature data not available for the specified source or destination district.'}, status=status.HTTP_404_NOT_FOUND)
except Exception as e:
# Handle exceptions if necessary
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
9. Add the URL for the API
Lastly, create a urls.py file inside your app folder and add the following:
from django.urls import path
from . import views
urlpatterns = [
path('recommendation/', views.DecisionMakingAPIView.as_view(), name='travel-recommendation')
]
10. Check the API response
In the postman, add the form data like the following and see the response. Done!
I have written this blog so that I can use this as a future reference. Find the project in the GitHub in case you want the code - https://github.com/Afroza2/Strativ-AB-Travel-Management.git