Source code for gatenet.discovery.detectors

import re
from typing import Optional
from abc import ABC, abstractmethod

[docs] class ServiceDetector(ABC): """ Abstract base class for service detection strategies. All service detectors must implement the `detect` method. """
[docs] @abstractmethod def detect(self, port: int, banner: str) -> Optional[str]: """ Detect service from port and banner. Parameters ---------- port : int The port number associated with the service. banner : str The banner string received from the service. Returns ------- Optional[str] The detected service name/version, or None if not detected. """ raise NotImplementedError
[docs] class SSHDetector(ServiceDetector): """Service detector for SSH servers."""
[docs] def detect(self, port: int, banner: str) -> Optional[str]: if port != 22 and 'ssh' not in banner: return None if 'openssh' in banner: version_match = re.search(r'openssh[_\s]+([\d\.]+p?\d*)', banner) version = version_match.group(1) if version_match else 'unknown' return f"OpenSSH {version}" if 'ssh' in banner: return "SSH Server" return None
[docs] class HTTPDetector(ServiceDetector): """Service detector for HTTP servers."""
[docs] def detect(self, port: int, banner: str) -> Optional[str]: if port not in [80, 8080, 8000, 443] and not banner.startswith('http'): return None if 'apache' in banner: return "Apache HTTP Server" if 'nginx' in banner: return "Nginx HTTP Server" if 'iis' in banner: return "Microsoft IIS" if banner.startswith('http'): return "HTTP Server" return None
[docs] class FTPDetector(ServiceDetector): """Service detector for FTP servers."""
[docs] def detect(self, port: int, banner: str) -> Optional[str]: if port != 21 and 'ftp' not in banner: return None if 'vsftpd' in banner: return "vsftpd FTP Server" if 'filezilla' in banner: return "FileZilla FTP Server" if 'ftp' in banner: return "FTP Server" return None
[docs] class SMTPDetector(ServiceDetector): """Detects SMTP services."""
[docs] def detect(self, port: int, banner: str) -> Optional[str]: if port != 25 and 'smtp' not in banner: return None if 'postfix' in banner: return "Postfix SMTP" if 'sendmail' in banner: return "Sendmail SMTP" if 'smtp' in banner: return "SMTP Server" return None
[docs] class PortMappingDetector(ServiceDetector): """Detects services by direct port mapping.""" PORT_MAPPING = { 443: "HTTPS Server", 53: "DNS Server", 23: "Telnet Server", 110: "POP3 Server", 143: "IMAP Server", 993: "IMAPS Server", 995: "POP3S Server", 3389: "Remote Desktop Protocol (RDP)" }
[docs] def detect(self, port: int, banner: str) -> Optional[str]: return self.PORT_MAPPING.get(port)
[docs] class BannerKeywordDetector(ServiceDetector): """Detects services by banner keywords.""" BANNER_KEYWORDS = [ (['telnet'], "Telnet Server"), (['pop3'], "POP3 Server"), (['imap'], "IMAP Server"), ]
[docs] def detect(self, port: int, banner: str) -> Optional[str]: for keywords, name in self.BANNER_KEYWORDS: if any(keyword in banner for keyword in keywords): return name return None
[docs] class GenericServiceDetector(ServiceDetector): """Detects services by generic indicators.""" SERVICE_INDICATORS = { 'mysql': 'MySQL Database', 'postgresql': 'PostgreSQL Database', 'redis': 'Redis Server', 'mongodb': 'MongoDB Database', 'elasticsearch': 'Elasticsearch', 'docker': 'Docker Registry', 'jenkins': 'Jenkins CI/CD', 'gitlab': 'GitLab', 'apache': 'Apache Server', 'nginx': 'Nginx Server' }
[docs] def detect(self, port: int, banner: str) -> Optional[str]: for indicator, service_name in self.SERVICE_INDICATORS.items(): if indicator in banner: return service_name return None
[docs] class FallbackDetector(ServiceDetector): """Fallback detector using default port services.""" DEFAULT_PORT_SERVICES = { 22: "SSH", 23: "Telnet", 25: "SMTP", 53: "DNS", 80: "HTTP", 110: "POP3", 143: "IMAP", 443: "HTTPS", 993: "IMAPS", 995: "POP3S", 3389: "RDP", 21: "FTP", 8080: "HTTP", 8000: "HTTP" }
[docs] def detect(self, port: int, banner: str) -> Optional[str]: if banner: return f"Unknown Service (Port {port})" return self.DEFAULT_PORT_SERVICES.get(port, f"Unknown Service (Port {port})")