How to build a REST API with Django and JWT Auth

django==1.11
mysqlclient
djangorestframework
djangorestframework-jwt
requests
#settings.py

INSTALLED_APPS = [
# ...
'apiapp',
'rest_framework',
# ...
]
#models/user.py
from django.db import models
from django.contrib.auth.models import AbstractUser


class User(AbstractUser):
pass
#settings.py

# Custom User model
AUTH_USER_MODEL = 'apiapp.User'
#serializers/user.py
from rest_framework import serializers
from appapi.models import User

class UserSerializer(serializers.Serializer):
id = serializers.IntegerField(read_only=True)
username = serializers.CharField(required=True, allow_blank=False, max_length=100)
password = serializers.CharField(required=True, write_only=True)
email = serializers.CharField(required=True, allow_blank=True, max_length=100)
is_staff = serializers.BooleanField(required=False, default=False)
is_superuser = serializers.BooleanField(required=False, default=False)

def create(self, data):
"""
Create and return a new `Snippet` instance, given the validated data.
"""
instance = User.objects.create(
username=data.get('username'),
email=data.get('email'),
is_staff=data.get('is_staff'),
is_superuser=data.get('is_superuser'),
)
instance.set_password(data.get('password'))
instance.save()
return instance

def update(self, instance, data):
"""
Update and return an existing `Snippet` instance, given the validated data.
"""
instance.username = data.get('username', instance.username)
instance.email = data.get('email', instance.email)
instance.is_staff = data.get('is_staff', instance.is_staff)
instance.is_superuser = data.get('is_superuser', instance.is_staff)
instance.set_password(data.get('password'))
instance.save()
return instance
#urls.py
urlpatterns = [
url(r'^user/$', user.api_admin_user_index, name='api_admin_user_index'),
]
#views/user.py
from rest_framework.decorators import api_view
from rest_framework.response import Response
from apiapp.models import User
from apiapp.serializers.user import UserSerializer

@api_view(['GET'])
def api_admin_user_index(request):
"""
get:
List all users.
"""
serializer = UserSerializer(User.objects.all(), many=True)
return Response(serializer.data)
#views/user.py
from rest_framework import status
from rest_framework.decorators import api_view
from rest_framework.response import Response
from apiapp.models import User
from apiapp.serializers.user import UserSerializer
from apiapp.utils.jsonreader import JsonReader


@api_view(['GET', 'POST'])
def api_admin_user_index(request):
"""
get:
List all users.
post:
Create new user.
"""
if request.method == 'GET':
serializer = UserSerializer(User.objects.all(), many=True)
return Response(serializer.data)

elif request.method == 'POST':
data = JsonReader.read_body(request)
serializer = UserSerializer(data=data)
if serializer.is_valid():
serializer.save()
return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
#utils/jsonreader.py
import json


class JsonReader:

@staticmethod
def read_body(request):
'''returns a dict from the body of request'''

data = dict()

try:
body = request.body
data = JsonReader.bytes_to_dict(body)
except Exception as exc:
if isinstance(request.data, dict):
data = request.data
elif isinstance(request.data, str):
data = JsonReader.str_to_dict(request.data)

return data

@staticmethod
def str_to_dict(s: str):
dict = json.loads(s)
return dict

@staticmethod
def dict_to_str(d: dict):
str = json.dumps(d)
return str

@staticmethod
def bytes_to_dict(b: bytes):
str = b.decode("utf-8")
dict = JsonReader.str_to_dict(str)
return dict
#urls.py
urlpatterns = [
url(r'^user/$', user.api_admin_user_index, name='api_admin_user_index'),
url(r'^user/(?P[0-9]+)/$', user.api_admin_user_detail, name='api_admin_user_detail'),
]
#views/user.py
@api_view(['GET', 'PUT', 'DELETE'])
def api_admin_user_detail(request, pk):
"""
get:
Detail one user.
put:
Update one user.
delete:
Delete one user.
"""

try:
user = User.objects.get(pk=pk)
except User.DoesNotExist:
return Response({'error': "User " + pk + " does not exist"}, status=status.HTTP_404_NOT_FOUND)

if request.method == 'GET':
serializer = UserSerializer(user)
return Response(serializer.data)

elif request.method == 'PUT':
data = JsonReader.read_body(request)
serializer = UserSerializer(user, data=data)
if serializer.is_valid():
serializer.save()
return Response(serializer.data)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

elif request.method == 'DELETE':
user.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
#settings.py

# Configure the authentication in Django Rest Framework to be JWT
# http://www.django-rest-framework.org/
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': (
'rest_framework_jwt.authentication.JSONWebTokenAuthentication',
),
}

# Configure the JWTs to expire after 1 hour, and allow users to refresh near-expiration tokens
# https://getblimp.github.io/django-rest-framework-jwt/
JWT_AUTH = {
# If the secret is wrong, it will raise a jwt.DecodeError telling you as such. You can still get at the payload by setting the JWT_VERIFY to False.
'JWT_VERIFY': True,

# You can turn off expiration time verification by setting JWT_VERIFY_EXPIRATION to False.
# If set to False, JWTs will last forever meaning a leaked token could be used by an attacker indefinitely.
'JWT_VERIFY_EXPIRATION': True,

# This is an instance of Python's datetime.timedelta. This will be added to datetime.utcnow() to set the expiration time.
# Default is datetime.timedelta(seconds=300)(5 minutes).
'JWT_EXPIRATION_DELTA': datetime.timedelta(hours=1),

'JWT_ALLOW_REFRESH': True,
'JWT_AUTH_HEADER_PREFIX': 'JWT',
}
#urls.py
urlpatterns = [
# ...
url(r'^jwt/refresh-token/', refresh_jwt_token, name='refresh_jwt_token'),
url(r'^jwt/api-token-verify/', verify_jwt_token, name='verify_jwt_token'),
url(r'^jwt/api-token-auth/', obtain_jwt_token, name='obtain_jwt_token'),
]
#urls.py
urlpatterns = [
# ...
url(r'^login/', registration.api_login, name='api_login'),
]
#views/registration.py
@api_view(['POST'])
def api_login(request):
"""
post:
This view is called through API POST with a json body like so:

{
"username": "admin",
"password": "admin"
}

:param request:
:return:
"""
data = JsonReader.read_body(request)

response_login = requests.post(
request.build_absolute_uri(reverse('obtain_jwt_token')),
data=data
)
response_login_dict = json.loads(response_login.content)
return Response(response_login_dict, response_login.status_code)
[
{
"model": "apiapp.user",
"pk": 1,
"fields": {
"password": "pbkdf2_sha256$36000$SoF7ueJzOE1I$244qaRI1dReT4DxZKXJi2sRmuKdqPcjeOiUaPdH2UV0=",
"last_login": null,
"is_superuser": true,
"username": "admin",
"first_name": "",
"last_name": "",
"email": "admin@slowcode.io",
"is_staff": true,
"is_active": true,
"date_joined": "2018-08-29T10:09:27.191Z",
"groups": [],
"user_permissions": []
}
},
{
"model": "apiapp.user",
"pk": 2,
"fields": {
"password": "pbkdf2_sha256$36000$bkA3NXYqXZ4S$zuH97poSj3trZoNRFeROw3PgutbGOHlZunliI8/1jbg=",
"last_login": null,
"is_superuser": false,
"username": "user1",
"first_name": "",
"last_name": "",
"email": "user1@slowcode.io",
"is_staff": true,
"is_active": true,
"date_joined": "2018-08-29T10:10:56.873Z",
"groups": [],
"user_permissions": []
}
},
{
"model": "apiapp.user",
"pk": 3,
"fields": {
"password": "pbkdf2_sha256$36000$KKujn28LvdGX$OxtfRmIUWNPwbBPsz2iwKwff8klJ5PiPXj3P9N70Hto=",
"last_login": null,
"is_superuser": false,
"username": "user2",
"first_name": "",
"last_name": "",
"email": "user2@slowcode.io",
"is_staff": false,
"is_active": true,
"date_joined": "2018-08-29T10:11:19.576Z",
"groups": [],
"user_permissions": []
}
}
]
python manage.py loaddata users
#security/voters.py
from apiapp.models import User


class AbstractVoter:

request = None

def __init__(self, request):
self.request = request

def is_logged_in(self):
if isinstance(self.request.user, User):
return True

return False

def is_superuser(self):
if self.is_logged_in():
return self.request.user.is_superuser

return False


class UserVoter(AbstractVoter):

def user_can_manage_me(self, user_inst: User):
if self.is_logged_in():
if self.is_superuser():
return True
if self.request.user == user_inst:
return True

return False
#views/user.py
@api_view(['GET', 'POST'])
def api_admin_user_index(request):
"""
get:
List all users.
post:
Create new user.
"""
voter = UserVoter(request)
if not voter.is_superuser():
return Response({'error': "User API is not allowed by non admin user"}, status=status.HTTP_403_FORBIDDEN)

#...


@api_view(['GET', 'PUT', 'DELETE'])
def api_admin_user_detail(request, pk):
"""
get:
Detail one user.
put:
Update one user.
delete:
Delete one user.
"""
voter = UserVoter(request)
if not voter.is_superuser():
return Response({'error': "User API is not allowed by non admin user"}, status=status.HTTP_403_FORBIDDEN)

#...
#urls.py
urlpatterns = [
#...
url(r'^(?P[0-9]+)/$', user.api_user_detail, name='api_user_detail'),
]
#views/user.py
@api_view(['GET', 'PUT'])
def api_user_detail(request, pk):
"""
get:
Detail one user.
put:
Update one user.
"""
try:
user_inst = User.objects.get(pk=pk)
except User.DoesNotExist:
return Response(status=status.HTTP_404_NOT_FOUND)

voter = UserVoter(request)
if not voter.user_can_manage_me(user_inst):
return Response({'error': "User API is not allowed"}, status=status.HTTP_403_FORBIDDEN)

if request.method == 'GET':
serializer = UserSerializer(user_inst)
return Response(serializer.data)

elif request.method == 'PUT':
data = JsonReader.read_body(request)
if 'is_staff' in data:
if not voter.is_superuser():
return Response({'error': "Non admin cannot update admin attributes"}, status=status.HTTP_403_FORBIDDEN)
serializer = UserSerializer(user_inst, data=data)
if serializer.is_valid():
serializer.save()
return Response(serializer.data)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
git clone https://github.com/joeymasip/django-apiskeleton.git

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store