|
|
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- Fake FTP Server
- ~~~~~~~~~~~~~~~
- This is a simple fake FTP daemon. It stores the login data
- (username and password) it is given and then terminates the
- connection.
- It was made to easily recover forgotten passwords which are
- still stored (but scrambled) by an FTP client without the need
- for using a sniffer tool.
- Beware: Although the script reacts on `Ctrl-C`, it won't exit
- until another connect has happened because `socket.accept()`
- is blocking. Also, due to the use of threads, it may take some
- time until the port will be free again.
- Some useful resources for implementing this were the IETF's
- `RFC 959`_ and the `FTP reference`_ by D. J. Bernstein.
- .. _RFC 959: [url]http://tools.ietf.org/rfc/rfc959.txt[/url]
- .. _FTP reference: [url]http://cr.yp.to/ftp.html[/url]
- """
- from datetime import datetime
- from optparse import OptionParser
- from SocketServer import BaseRequestHandler, ThreadingTCPServer
- from sys import stdout
- class FTPLoginHandler(BaseRequestHandler):
- """Handler for FTP authentication."""
- def debug(self, message):
- """Show log message."""
- if self.server.debug:
- print '***', message
- def respond(self, code, explanation):
- """Send a response to the client."""
- self.request.send('%d %s\r\n' % (code, explanation))
- def process_request(self):
- """Parse input into a command and an argument."""
- data = self.request.recv(64)
- parts = data.strip().split(' ')
- return parts.pop(0), parts
- def log_auth(self, user, password):
- """Write username and password to logfile."""
- now = datetime.now().isoformat(' ')[:19]
- client = '%s:%d' % self.client_address
- line = ' '.join((now, client, user, password))
- self.server.logfile.write(line + '\n')
- self.server.logfile.flush()
- def handle(self):
- """Handle incoming data."""
- self.debug('Connection from %s:%d.'
- % self.client_address)
- self.respond(220, self.server.banner)
- user = None
- while True:
- cmd, args = self.process_request()
- if cmd == 'USER':
- if user is not None:
- self.respond(503, 'Incorrect sequence of'
- ' commands: PASS required after USER.')
- continue
- user = (args and args[0] or '*missing*')
- self.debug('User "%s" has identified.' % user)
- self.respond(331, 'Please specify the password.')
- continue
- elif cmd == 'PASS':
- if user is None:
- self.respond(503, 'Incorrect sequence of'
- ' commands: USER required before PASS.')
- continue
- password = (args and args[0] or '*missing*')
- self.debug('User "%s" supplied password "%s",'
- ' storing.' % (user, password))
- self.log_auth(user, password)
- self.respond(530, 'Login incorrect.')
- break
- else:
- self.debug('Rejecting request "%s".'
- % ' '.join(args))
- self.respond(530,
- 'Please login with USER and PASS.')
- break
- self.request.close()
- self.debug('Connection with %s:%d closed.'
- % self.client_address)
- class FTPLoginServer(ThreadingTCPServer):
- def __init__(self, host='', port=21, banner='', debug=False,
- logfile=None, append=False):
- ThreadingTCPServer.__init__(self, (host, port),
- FTPLoginHandler)
- self.banner = banner
- self.debug = debug
- mode = (append and 'a' or 'w')
- self.logfile = (logfile and open(logfile, mode) or stdout)
- def server_close(self):
- ThreadingTCPServer.server_close(self)
- self.logfile.close()
- if __name__ == '__main__':
- parser = OptionParser(usage='%prog [options] <port>')
- parser.add_option('-a', '--append', dest='append',
- action='store_true',
- help='append to LOGFILE ')
- parser.add_option('-b', '--banner', dest='banner',
- help='custom banner string')
- parser.add_option('-d', '--debug', dest='debug',
- action='store_true',
- help='show debugging messages')
- parser.add_option('-l', '--logfile', dest='logfile',
- help='write collected user/password data to LOGFILE')
- opts, args = parser.parse_args()
- # Parse arguments.
- if len(args) != 1:
- parser.print_help()
- parser.exit()
- try:
- port = int(args[0])
- except ValueError:
- parser.print_help()
- parser.exit()
- # Serve.
- server = FTPLoginServer(port=port, **opts.__dict__)
- try:
- server.serve_forever()
- except KeyboardInterrupt:
- print 'Ctrl-C pressed, exiting...'
- server.server_close()
复制代码 |
|