mirror of
https://github.com/lightningcell/flask-2fa-auth.git
synced 2026-05-26 15:13:35 +00:00
- Added `track_login_location` function to monitor user login locations. - Introduced `LoginLocation` model to store login details including IP and geolocation. - Created `LocationApprovalToken` model for managing location approval tokens. - Enhanced OTP verification to include location tracking and alerts for suspicious logins. - Implemented email notifications for suspicious login attempts and location approvals. - Added `login_history` route to display user's login activity. - Updated templates for login history and email notifications. - Configured mail settings and added dependencies for email functionality. - Introduced utility classes for mail and location services.
339 lines
11 KiB
Python
339 lines
11 KiB
Python
"""
|
|
Location tracking and geolocation service for Flask 2FA application.
|
|
|
|
This module provides location tracking functionality including:
|
|
- IP geolocation lookup
|
|
- Distance calculation between locations
|
|
- Suspicious login detection
|
|
- Location approval management
|
|
|
|
Security features:
|
|
- Rate limiting for API calls
|
|
- Error handling and fallbacks
|
|
- Privacy-aware location tracking
|
|
"""
|
|
|
|
import requests
|
|
import logging
|
|
import math
|
|
from typing import Optional, Dict, Any, Tuple
|
|
from flask import request, current_app
|
|
from datetime import datetime, timedelta
|
|
|
|
|
|
class LocationService:
|
|
"""
|
|
Service for handling location tracking and geolocation functionality.
|
|
|
|
Provides methods for IP geolocation, distance calculation,
|
|
and suspicious login detection.
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Initialize location service."""
|
|
self.logger = logging.getLogger(__name__)
|
|
self.api_timeout = 5 # seconds
|
|
self.cache = {} # Simple in-memory cache
|
|
self.cache_duration = timedelta(hours=1)
|
|
|
|
def get_client_ip(self) -> str:
|
|
"""
|
|
Get client IP address from request.
|
|
|
|
Handles various proxy headers for accurate IP detection.
|
|
|
|
Returns:
|
|
str: Client IP address
|
|
"""
|
|
# Check for forwarded headers (proxy/load balancer)
|
|
forwarded_for = request.headers.get('X-Forwarded-For')
|
|
if forwarded_for:
|
|
# Take first IP if multiple are present
|
|
return forwarded_for.split(',')[0].strip()
|
|
|
|
real_ip = request.headers.get('X-Real-IP')
|
|
if real_ip:
|
|
return real_ip
|
|
|
|
# Fallback to direct connection
|
|
return request.remote_addr or '127.0.0.1'
|
|
|
|
def get_user_agent(self) -> str:
|
|
"""
|
|
Get user agent string from request.
|
|
|
|
Returns:
|
|
str: User agent string
|
|
"""
|
|
return request.headers.get('User-Agent', 'Unknown')
|
|
|
|
def get_location_from_ip(self, ip_address: str) -> Dict[str, Any]:
|
|
"""
|
|
Get location data from IP address using geolocation API.
|
|
|
|
Uses multiple fallback services for reliability.
|
|
|
|
Args:
|
|
ip_address: IP address to lookup
|
|
|
|
Returns:
|
|
Dict containing location data
|
|
"""
|
|
# Skip private/local IPs
|
|
if self._is_private_ip(ip_address):
|
|
return self._get_default_location("Private Network")
|
|
|
|
# Check cache first
|
|
cache_key = f"location_{ip_address}"
|
|
if cache_key in self.cache:
|
|
cached_data, timestamp = self.cache[cache_key]
|
|
if datetime.utcnow() - timestamp < self.cache_duration:
|
|
return cached_data
|
|
|
|
location_data = self._fetch_location_data(ip_address)
|
|
|
|
# Cache the result
|
|
self.cache[cache_key] = (location_data, datetime.utcnow())
|
|
|
|
return location_data
|
|
|
|
def _fetch_location_data(self, ip_address: str) -> Dict[str, Any]:
|
|
"""
|
|
Fetch location data from external APIs with fallbacks.
|
|
|
|
Args:
|
|
ip_address: IP address to lookup
|
|
|
|
Returns:
|
|
Dict containing location data
|
|
"""
|
|
# Try multiple geolocation services
|
|
services = [
|
|
self._fetch_from_ipapi,
|
|
self._fetch_from_ipinfo,
|
|
self._fetch_from_freeipapi
|
|
]
|
|
|
|
for service in services:
|
|
try:
|
|
data = service(ip_address)
|
|
if data and data.get('country'):
|
|
self.logger.info(f"Successfully fetched location for {ip_address}")
|
|
return data
|
|
except Exception as e:
|
|
self.logger.warning(f"Failed to fetch from service: {str(e)}")
|
|
continue
|
|
|
|
# All services failed
|
|
self.logger.error(f"All geolocation services failed for {ip_address}")
|
|
return self._get_default_location("Unknown")
|
|
|
|
def _fetch_from_ipapi(self, ip_address: str) -> Dict[str, Any]:
|
|
"""Fetch location from ip-api.com (free service)."""
|
|
url = f"http://ip-api.com/json/{ip_address}"
|
|
response = requests.get(url, timeout=self.api_timeout)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
if data.get('status') == 'success':
|
|
return {
|
|
'country': data.get('country'),
|
|
'region': data.get('regionName'),
|
|
'city': data.get('city'),
|
|
'latitude': data.get('lat'),
|
|
'longitude': data.get('lon'),
|
|
'timezone': data.get('timezone'),
|
|
'isp': data.get('isp')
|
|
}
|
|
raise Exception(f"API returned error: {data.get('message')}")
|
|
|
|
def _fetch_from_ipinfo(self, ip_address: str) -> Dict[str, Any]:
|
|
"""Fetch location from ipinfo.io (free tier)."""
|
|
url = f"https://ipinfo.io/{ip_address}/json"
|
|
response = requests.get(url, timeout=self.api_timeout)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
if 'country' in data:
|
|
loc = data.get('loc', '').split(',')
|
|
return {
|
|
'country': data.get('country'),
|
|
'region': data.get('region'),
|
|
'city': data.get('city'),
|
|
'latitude': float(loc[0]) if len(loc) > 0 and loc[0] else None,
|
|
'longitude': float(loc[1]) if len(loc) > 1 and loc[1] else None,
|
|
'timezone': data.get('timezone'),
|
|
'isp': data.get('org')
|
|
}
|
|
raise Exception("No location data in response")
|
|
|
|
def _fetch_from_freeipapi(self, ip_address: str) -> Dict[str, Any]:
|
|
"""Fetch location from freeipapi.com."""
|
|
url = f"https://freeipapi.com/api/json/{ip_address}"
|
|
response = requests.get(url, timeout=self.api_timeout)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
if data.get('countryName'):
|
|
return {
|
|
'country': data.get('countryName'),
|
|
'region': data.get('regionName'),
|
|
'city': data.get('cityName'),
|
|
'latitude': data.get('latitude'),
|
|
'longitude': data.get('longitude'),
|
|
'timezone': data.get('timeZone'),
|
|
'isp': None
|
|
}
|
|
raise Exception("No location data in response")
|
|
|
|
def _is_private_ip(self, ip_address: str) -> bool:
|
|
"""
|
|
Check if IP address is private/local.
|
|
|
|
Args:
|
|
ip_address: IP address to check
|
|
|
|
Returns:
|
|
bool: True if IP is private
|
|
"""
|
|
private_ranges = [
|
|
'127.', # Localhost
|
|
'10.', # Private network
|
|
'172.16.', # Private network
|
|
'172.17.', # Private network
|
|
'172.18.', # Private network
|
|
'172.19.', # Private network
|
|
'172.20.', # Private network
|
|
'172.21.', # Private network
|
|
'172.22.', # Private network
|
|
'172.23.', # Private network
|
|
'172.24.', # Private network
|
|
'172.25.', # Private network
|
|
'172.26.', # Private network
|
|
'172.27.', # Private network
|
|
'172.28.', # Private network
|
|
'172.29.', # Private network
|
|
'172.30.', # Private network
|
|
'172.31.', # Private network
|
|
'192.168.', # Private network
|
|
'::1', # IPv6 localhost
|
|
]
|
|
|
|
return any(ip_address.startswith(prefix) for prefix in private_ranges)
|
|
|
|
def _get_default_location(self, reason: str) -> Dict[str, Any]:
|
|
"""
|
|
Get default location data when lookup fails.
|
|
|
|
Args:
|
|
reason: Reason for using default location
|
|
|
|
Returns:
|
|
Dict containing default location data
|
|
"""
|
|
return {
|
|
'country': reason,
|
|
'region': None,
|
|
'city': None,
|
|
'latitude': None,
|
|
'longitude': None,
|
|
'timezone': None,
|
|
'isp': None
|
|
}
|
|
|
|
def calculate_distance(self,
|
|
lat1: float, lon1: float,
|
|
lat2: float, lon2: float) -> float:
|
|
"""
|
|
Calculate distance between two geographic points using Haversine formula.
|
|
|
|
Args:
|
|
lat1, lon1: First point coordinates
|
|
lat2, lon2: Second point coordinates
|
|
|
|
Returns:
|
|
float: Distance in kilometers
|
|
"""
|
|
if None in [lat1, lon1, lat2, lon2]:
|
|
return 0.0
|
|
|
|
# Convert to radians
|
|
lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
|
|
|
|
# Haversine formula
|
|
dlat = lat2 - lat1
|
|
dlon = lon2 - lon1
|
|
a = (math.sin(dlat/2)**2 +
|
|
math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2)
|
|
c = 2 * math.asin(math.sqrt(a))
|
|
|
|
# Radius of Earth in kilometers
|
|
r = 6371
|
|
|
|
return c * r
|
|
|
|
def is_suspicious_location(self,
|
|
user_id: int,
|
|
new_location: Dict[str, Any]) -> Tuple[bool, Optional[Dict[str, Any]]]:
|
|
"""
|
|
Check if login location is suspicious based on user's history.
|
|
|
|
Args:
|
|
user_id: User ID
|
|
new_location: New location data
|
|
|
|
Returns:
|
|
Tuple of (is_suspicious: bool, last_location: Dict or None)
|
|
"""
|
|
from app.models import LoginLocation
|
|
|
|
# Get user's last approved login location
|
|
last_login = LoginLocation.query.filter_by(
|
|
user_id=user_id,
|
|
is_approved=True
|
|
).order_by(LoginLocation.login_time.desc()).first()
|
|
|
|
if not last_login:
|
|
# First login is always approved
|
|
return False, None
|
|
|
|
# Skip distance check if coordinates are missing
|
|
if (not all([new_location.get('latitude'), new_location.get('longitude')]) or
|
|
not all([last_login.latitude, last_login.longitude])):
|
|
return False, {
|
|
'country': last_login.country,
|
|
'region': last_login.region,
|
|
'city': last_login.city,
|
|
'login_time': last_login.login_time
|
|
}
|
|
|
|
# Calculate distance
|
|
distance = self.calculate_distance(
|
|
last_login.latitude, last_login.longitude,
|
|
new_location['latitude'], new_location['longitude']
|
|
)
|
|
|
|
# Check if distance exceeds threshold
|
|
threshold = current_app.config.get('SUSPICIOUS_LOGIN_THRESHOLD_KM', 100)
|
|
is_suspicious = distance > threshold
|
|
|
|
last_location_data = {
|
|
'country': last_login.country,
|
|
'region': last_login.region,
|
|
'city': last_login.city,
|
|
'login_time': last_login.login_time,
|
|
'distance_km': round(distance, 2)
|
|
}
|
|
|
|
if is_suspicious:
|
|
self.logger.warning(
|
|
f"Suspicious login detected for user {user_id}: "
|
|
f"{distance:.2f}km from last location"
|
|
)
|
|
|
|
return is_suspicious, last_location_data
|
|
|
|
|
|
# Global location service instance
|
|
location_service = LocationService()
|