Files
flask-2fa-auth/app/utils/location.py
Hamit Şimşek 22c747f14a Implement location tracking and suspicious login detection
- Added `track_login_location` function to monitor user login locations.
- Introduced `LoginLocation` model to store login details including IP and geolocation.
- Created `LocationApprovalToken` model for managing location approval tokens.
- Enhanced OTP verification to include location tracking and alerts for suspicious logins.
- Implemented email notifications for suspicious login attempts and location approvals.
- Added `login_history` route to display user's login activity.
- Updated templates for login history and email notifications.
- Configured mail settings and added dependencies for email functionality.
- Introduced utility classes for mail and location services.
2025-05-30 00:34:17 +03:00

339 lines
11 KiB
Python

"""
Location tracking and geolocation service for Flask 2FA application.
This module provides location tracking functionality including:
- IP geolocation lookup
- Distance calculation between locations
- Suspicious login detection
- Location approval management
Security features:
- Rate limiting for API calls
- Error handling and fallbacks
- Privacy-aware location tracking
"""
import requests
import logging
import math
from typing import Optional, Dict, Any, Tuple
from flask import request, current_app
from datetime import datetime, timedelta
class LocationService:
"""
Service for handling location tracking and geolocation functionality.
Provides methods for IP geolocation, distance calculation,
and suspicious login detection.
"""
def __init__(self):
"""Initialize location service."""
self.logger = logging.getLogger(__name__)
self.api_timeout = 5 # seconds
self.cache = {} # Simple in-memory cache
self.cache_duration = timedelta(hours=1)
def get_client_ip(self) -> str:
"""
Get client IP address from request.
Handles various proxy headers for accurate IP detection.
Returns:
str: Client IP address
"""
# Check for forwarded headers (proxy/load balancer)
forwarded_for = request.headers.get('X-Forwarded-For')
if forwarded_for:
# Take first IP if multiple are present
return forwarded_for.split(',')[0].strip()
real_ip = request.headers.get('X-Real-IP')
if real_ip:
return real_ip
# Fallback to direct connection
return request.remote_addr or '127.0.0.1'
def get_user_agent(self) -> str:
"""
Get user agent string from request.
Returns:
str: User agent string
"""
return request.headers.get('User-Agent', 'Unknown')
def get_location_from_ip(self, ip_address: str) -> Dict[str, Any]:
"""
Get location data from IP address using geolocation API.
Uses multiple fallback services for reliability.
Args:
ip_address: IP address to lookup
Returns:
Dict containing location data
"""
# Skip private/local IPs
if self._is_private_ip(ip_address):
return self._get_default_location("Private Network")
# Check cache first
cache_key = f"location_{ip_address}"
if cache_key in self.cache:
cached_data, timestamp = self.cache[cache_key]
if datetime.utcnow() - timestamp < self.cache_duration:
return cached_data
location_data = self._fetch_location_data(ip_address)
# Cache the result
self.cache[cache_key] = (location_data, datetime.utcnow())
return location_data
def _fetch_location_data(self, ip_address: str) -> Dict[str, Any]:
"""
Fetch location data from external APIs with fallbacks.
Args:
ip_address: IP address to lookup
Returns:
Dict containing location data
"""
# Try multiple geolocation services
services = [
self._fetch_from_ipapi,
self._fetch_from_ipinfo,
self._fetch_from_freeipapi
]
for service in services:
try:
data = service(ip_address)
if data and data.get('country'):
self.logger.info(f"Successfully fetched location for {ip_address}")
return data
except Exception as e:
self.logger.warning(f"Failed to fetch from service: {str(e)}")
continue
# All services failed
self.logger.error(f"All geolocation services failed for {ip_address}")
return self._get_default_location("Unknown")
def _fetch_from_ipapi(self, ip_address: str) -> Dict[str, Any]:
"""Fetch location from ip-api.com (free service)."""
url = f"http://ip-api.com/json/{ip_address}"
response = requests.get(url, timeout=self.api_timeout)
response.raise_for_status()
data = response.json()
if data.get('status') == 'success':
return {
'country': data.get('country'),
'region': data.get('regionName'),
'city': data.get('city'),
'latitude': data.get('lat'),
'longitude': data.get('lon'),
'timezone': data.get('timezone'),
'isp': data.get('isp')
}
raise Exception(f"API returned error: {data.get('message')}")
def _fetch_from_ipinfo(self, ip_address: str) -> Dict[str, Any]:
"""Fetch location from ipinfo.io (free tier)."""
url = f"https://ipinfo.io/{ip_address}/json"
response = requests.get(url, timeout=self.api_timeout)
response.raise_for_status()
data = response.json()
if 'country' in data:
loc = data.get('loc', '').split(',')
return {
'country': data.get('country'),
'region': data.get('region'),
'city': data.get('city'),
'latitude': float(loc[0]) if len(loc) > 0 and loc[0] else None,
'longitude': float(loc[1]) if len(loc) > 1 and loc[1] else None,
'timezone': data.get('timezone'),
'isp': data.get('org')
}
raise Exception("No location data in response")
def _fetch_from_freeipapi(self, ip_address: str) -> Dict[str, Any]:
"""Fetch location from freeipapi.com."""
url = f"https://freeipapi.com/api/json/{ip_address}"
response = requests.get(url, timeout=self.api_timeout)
response.raise_for_status()
data = response.json()
if data.get('countryName'):
return {
'country': data.get('countryName'),
'region': data.get('regionName'),
'city': data.get('cityName'),
'latitude': data.get('latitude'),
'longitude': data.get('longitude'),
'timezone': data.get('timeZone'),
'isp': None
}
raise Exception("No location data in response")
def _is_private_ip(self, ip_address: str) -> bool:
"""
Check if IP address is private/local.
Args:
ip_address: IP address to check
Returns:
bool: True if IP is private
"""
private_ranges = [
'127.', # Localhost
'10.', # Private network
'172.16.', # Private network
'172.17.', # Private network
'172.18.', # Private network
'172.19.', # Private network
'172.20.', # Private network
'172.21.', # Private network
'172.22.', # Private network
'172.23.', # Private network
'172.24.', # Private network
'172.25.', # Private network
'172.26.', # Private network
'172.27.', # Private network
'172.28.', # Private network
'172.29.', # Private network
'172.30.', # Private network
'172.31.', # Private network
'192.168.', # Private network
'::1', # IPv6 localhost
]
return any(ip_address.startswith(prefix) for prefix in private_ranges)
def _get_default_location(self, reason: str) -> Dict[str, Any]:
"""
Get default location data when lookup fails.
Args:
reason: Reason for using default location
Returns:
Dict containing default location data
"""
return {
'country': reason,
'region': None,
'city': None,
'latitude': None,
'longitude': None,
'timezone': None,
'isp': None
}
def calculate_distance(self,
lat1: float, lon1: float,
lat2: float, lon2: float) -> float:
"""
Calculate distance between two geographic points using Haversine formula.
Args:
lat1, lon1: First point coordinates
lat2, lon2: Second point coordinates
Returns:
float: Distance in kilometers
"""
if None in [lat1, lon1, lat2, lon2]:
return 0.0
# Convert to radians
lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
# Haversine formula
dlat = lat2 - lat1
dlon = lon2 - lon1
a = (math.sin(dlat/2)**2 +
math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2)
c = 2 * math.asin(math.sqrt(a))
# Radius of Earth in kilometers
r = 6371
return c * r
def is_suspicious_location(self,
user_id: int,
new_location: Dict[str, Any]) -> Tuple[bool, Optional[Dict[str, Any]]]:
"""
Check if login location is suspicious based on user's history.
Args:
user_id: User ID
new_location: New location data
Returns:
Tuple of (is_suspicious: bool, last_location: Dict or None)
"""
from app.models import LoginLocation
# Get user's last approved login location
last_login = LoginLocation.query.filter_by(
user_id=user_id,
is_approved=True
).order_by(LoginLocation.login_time.desc()).first()
if not last_login:
# First login is always approved
return False, None
# Skip distance check if coordinates are missing
if (not all([new_location.get('latitude'), new_location.get('longitude')]) or
not all([last_login.latitude, last_login.longitude])):
return False, {
'country': last_login.country,
'region': last_login.region,
'city': last_login.city,
'login_time': last_login.login_time
}
# Calculate distance
distance = self.calculate_distance(
last_login.latitude, last_login.longitude,
new_location['latitude'], new_location['longitude']
)
# Check if distance exceeds threshold
threshold = current_app.config.get('SUSPICIOUS_LOGIN_THRESHOLD_KM', 100)
is_suspicious = distance > threshold
last_location_data = {
'country': last_login.country,
'region': last_login.region,
'city': last_login.city,
'login_time': last_login.login_time,
'distance_km': round(distance, 2)
}
if is_suspicious:
self.logger.warning(
f"Suspicious login detected for user {user_id}: "
f"{distance:.2f}km from last location"
)
return is_suspicious, last_location_data
# Global location service instance
location_service = LocationService()