# -*- mode: python; tab-width: 4 -*- # $Id: smb.py,v 1.15 2003/02/22 08:03:19 miketeo Exp $ # # Copyright (C) 2001 Michael Teo # smb.py - SMB/CIFS library # # This software is provided 'as-is', without any express or implied warranty. # In no event will the author be held liable for any damages arising from the # use of this software. # # Permission is granted to anyone to use this software for any purpose, # including commercial applications, and to alter it and redistribute it # freely, subject to the following restrictions: # # 1. The origin of this software must not be misrepresented; you must not # claim that you wrote the original software. If you use this software # in a product, an acknowledgment in the product documentation would be # appreciated but is not required. # # 2. Altered source versions must be plainly marked as such, and must not be # misrepresented as being the original software. # # 3. This notice cannot be removed or altered from any source distribution. # import os, sys, socket, string, re, select, errno import nmb from random import randint from struct import * # Try to load amkCrypto's DES module to perform password encryption if required. try: from Crypto.Cipher import DES amk_crypto = 1 except ImportError: # Try to load mxCrypto's DES module to perform password encryption if required. try: from Crypto.Ciphers import DES amk_crypto = 0 except ImportError: DES = None try: from cStringIO import StringIO except ImportError: from StringIO import StringIO CVS_REVISION = '$Revision: 1.15 $' # Shared Device Type SHARED_DISK = 0x00 SHARED_PRINT_QUEUE = 0x01 SHARED_DEVICE = 0x02 SHARED_IPC = 0x03 # Extended attributes mask ATTR_ARCHIVE = 0x020 ATTR_COMPRESSED = 0x800 ATTR_NORMAL = 0x080 ATTR_HIDDEN = 0x002 ATTR_READONLY = 0x001 ATTR_TEMPORARY = 0x100 ATTR_DIRECTORY = 0x010 ATTR_SYSTEM = 0x004 # Service Type SERVICE_DISK = 'A:' SERVICE_PRINTER = 'LPT1:' SERVICE_IPC = 'IPC' SERVICE_COMM = 'COMM' SERVICE_ANY = '?????' # Server Type (Can be used to mask with SMBMachine.get_type() or SMBDomain.get_type()) SV_TYPE_WORKSTATION = 0x00000001 SV_TYPE_SERVER = 0x00000002 SV_TYPE_SQLSERVER = 0x00000004 SV_TYPE_DOMAIN_CTRL = 0x00000008 SV_TYPE_DOMAIN_BAKCTRL = 0x00000010 SV_TYPE_TIME_SOURCE = 0x00000020 SV_TYPE_AFP = 0x00000040 SV_TYPE_NOVELL = 0x00000080 SV_TYPE_DOMAIN_MEMBER = 0x00000100 SV_TYPE_PRINTQ_SERVER = 0x00000200 SV_TYPE_DIALIN_SERVER = 0x00000400 SV_TYPE_XENIX_SERVER = 0x00000800 SV_TYPE_NT = 0x00001000 SV_TYPE_WFW = 0x00002000 SV_TYPE_SERVER_NT = 0x00004000 SV_TYPE_POTENTIAL_BROWSER = 0x00010000 SV_TYPE_BACKUP_BROWSER = 0x00020000 SV_TYPE_MASTER_BROWSER = 0x00040000 SV_TYPE_DOMAIN_MASTER = 0x00080000 SV_TYPE_LOCAL_LIST_ONLY = 0x40000000 SV_TYPE_DOMAIN_ENUM = 0x80000000 # Options values for SMB.stor_file and SMB.retr_file SMB_O_CREAT = 0x10 # Create the file if file does not exists. Otherwise, operation fails. SMB_O_EXCL = 0x00 # When used with SMB_O_CREAT, operation fails if file exists. Cannot be used with SMB_O_OPEN. SMB_O_OPEN = 0x01 # Open the file if the file exists SMB_O_TRUNC = 0x02 # Truncate the file if the file exists # Share Access Mode SMB_SHARE_COMPAT = 0x00 SMB_SHARE_DENY_EXCL = 0x10 SMB_SHARE_DENY_WRITE = 0x20 SMB_SHARE_DENY_READEXEC = 0x30 SMB_SHARE_DENY_NONE = 0x40 SMB_ACCESS_READ = 0x00 SMB_ACCESS_WRITE = 0x01 SMB_ACCESS_READWRITE = 0x02 SMB_ACCESS_EXEC = 0x03 def strerror(errclass, errcode): if errclass == 0x01: return 'OS error', ERRDOS.get(errcode, 'Unknown error') elif errclass == 0x02: return 'Server error', ERRSRV.get(errcode, 'Unknown error') elif errclass == 0x03: return 'Hardware error', ERRHRD.get(errcode, 'Unknown error') # This is not a standard error class for SMB elif err_class == 0x80: return 'Browse error', ERRBROWSE.get(errcode, 'Unknown error') elif errclass == 0xff: return 'Bad command', 'Bad command. Please file bug report' else: return 'Unknown error', 'Unknown error' # Raised when an error has occured during a session class SessionError(Exception): pass # Raised when an supported feature is present/required in the protocol but is not # currently supported by pysmb class UnsupportedFeature(Exception): pass # Contains information about a SMB shared device/service class SharedDevice: def __init__(self, name, type, comment): self.__name = name self.__type = type self.__comment = comment def get_name(self): return self.__name def get_type(self): return self.__type def get_comment(self): return self.__comment def __repr__(self): return '' # Contains information about the shared file/directory class SharedFile: def __init__(self, ctime, atime, mtime, filesize, allocsize, attribs, shortname, longname): self.__ctime = ctime self.__atime = atime self.__mtime = mtime self.__filesize = filesize self.__allocsize = allocsize self.__attribs = attribs try: self.__shortname = shortname[:string.index(shortname, '\0')] except ValueError: self.__shortname = shortname try: self.__longname = longname[:string.index(longname, '\0')] except ValueError: self.__longname = longname def get_ctime(self): return self.__ctime def get_ctime_epoch(self): return self.__convert_smbtime(self.__ctime) def get_mtime(self): return self.__mtime def get_mtime_epoch(self): return self.__convert_smbtime(self.__mtime) def get_atime(self): return self.__atime def get_atime_epoch(self): return self.__convert_smbtime(self.__atime) def get_filesize(self): return self.__filesize def get_allocsize(self): return self.__allocsize def get_attributes(self): return self.__attribs def is_archive(self): return self.__attribs & ATTR_ARCHIVE def is_compressed(self): return self.__attribs & ATTR_COMPRESSED def is_normal(self): return self.__attribs & ATTR_NORMAL def is_hidden(self): return self.__attribs & ATTR_HIDDEN def is_readonly(self): return self.__attribs & ATTR_READONLY def is_temporary(self): return self.__attribs & ATTR_TEMPORARY def is_directory(self): return self.__attribs & ATTR_DIRECTORY def is_system(self): return self.__attribs & ATTR_SYSTEM def get_shortname(self): return self.__shortname def get_longname(self): return self.__longname def __repr__(self): return '' def __convert_smbtime(self, t): x = t >> 32 y = t & 0xffffffffL geo_cal_offset = 11644473600.0 # = 369.0 * 365.25 * 24 * 60 * 60 - (3.0 * 24 * 60 * 60 + 6.0 * 60 * 60) return ((x * 4.0 * (1 << 30) + (y & 0xfff00000L)) * 1.0e-7 - geo_cal_offset) # Contain information about a SMB machine class SMBMachine: def __init__(self, nbname, type, comment): self.__nbname = nbname self.__type = type self.__comment = comment def __repr__(self): return '' class SMBDomain: def __init__(self, nbgroup, type, master_browser): self.__nbgroup = nbgroup self.__type = type self.__master_browser = master_browser def __repr__(self): return '' # Represents a SMB session class SMB: # SMB Command Codes SMB_COM_CREATE_DIR = 0x00 SMB_COM_DELETE_DIR = 0x01 SMB_COM_CLOSE = 0x04 SMB_COM_DELETE = 0x06 SMB_COM_RENAME = 0x07 SMB_COM_CHECK_DIR = 0x10 SMB_COM_READ_RAW = 0x1a SMB_COM_WRITE_RAW = 0x1d SMB_COM_TRANSACTION = 0x25 SMB_COM_TRANSACTION2 = 0x32 SMB_COM_OPEN_ANDX = 0x2d SMB_COM_READ_ANDX = 0x2e SMB_COM_WRITE_ANDX = 0x2f SMB_COM_TREE_DISCONNECT = 0x71 SMB_COM_NEGOTIATE = 0x72 SMB_COM_SESSION_SETUP_ANDX = 0x73 SMB_COM_LOGOFF = 0x74 SMB_COM_TREE_CONNECT_ANDX = 0x75 # Security Share Mode (Used internally by SMB class) SECURITY_SHARE_MASK = 0x01 SECURITY_SHARE_SHARE = 0x00 SECURITY_SHARE_USER = 0x01 # Security Auth Mode (Used internally by SMB class) SECURITY_AUTH_MASK = 0x02 SECURITY_AUTH_ENCRYPTED = 0x02 SECURITY_AUTH_PLAINTEXT = 0x00 # Raw Mode Mask (Used internally by SMB class. Good for dialect up to and including LANMAN2.1) RAW_READ_MASK = 0x01 RAW_WRITE_MASK = 0x02 # Capabilities Mask (Used internally by SMB class. Good for dialect NT LM 0.12) CAP_RAW_MODE = 0x0001 CAP_MPX_MODE = 0x0002 CAP_UNICODE = 0x0004 CAP_LARGE_FILES = 0x0008 CAP_EXTENDED_SECURITY = 0x80000000 # Flags1 Mask FLAGS1_PATHCASELESS = 0x08 # Flags2 Mask FLAGS2_LONG_FILENAME = 0x0001 FLAGS2_UNICODE = 0x8000 def __init__(self, remote_name, remote_host, my_name = None, host_type = nmb.TYPE_SERVER, sess_port = nmb.NETBIOS_SESSION_PORT): # The uid attribute will be set when the client calls the login() method self.__uid = 0 self.__server_os = '' self.__server_lanman = '' self.__server_domain = '' self.__remote_name = string.upper(remote_name) if not my_name: my_name = socket.gethostname() i = string.find(my_name, '.') if i > -1: my_name = my_name[:i] self.__sess = nmb.NetBIOSSession(my_name, remote_name, remote_host, host_type, sess_port) # __neg_session will initialize the following attributes -- __can_read_raw, __can_write_raw, # __share_mode, __max_transmit_size, __max_raw_size, __enc_key, __session_key, __auth_mode, # __is_pathcaseless self.__neg_session() # If the following assertion fails, then mean that the encryption key is not sent when # encrypted authentication is required by the server. assert (self.__auth_mode == SMB.SECURITY_AUTH_PLAINTEXT) or (self.__auth_mode == SMB.SECURITY_AUTH_ENCRYPTED and self.__enc_key and len(self.__enc_key) >= 8) # Call login() without any authentication information to setup a session if the remote server # is in share mode. if self.__share_mode == SMB.SECURITY_SHARE_SHARE: self.login('', '') def __del__(self): if self.__uid > 0: try: self.__logoff() except: pass self.__uid = 0 try: self.__sess.close() except: pass def __decode_smb(self, data): _, cmd, err_class, _, err_code, flags1, flags2, _, tid, pid, uid, mid, wcount = unpack('<4sBBBHBH12sHHHHB', data[:33]) param_end = 33 + wcount * 2 return cmd, err_class, err_code, flags1, flags2, tid, uid, mid, data[33:param_end], data[param_end + 2:] def __decode_trans(self, params, data): totparamcnt, totdatacnt, _, paramcnt, paramoffset, paramds, datacnt, dataoffset, datads, setupcnt = unpack(' 0 and len(d) >= keylength: self.__enc_key = d[:keylength] else: self.__enc_key = '' return 1 else: raise SessionError, ( "Cannot neg dialect. (ErrClass: %d and ErrCode: %d)" % ( err_class, err_code ), err_class, err_code ) def __logoff(self): self.__send_smb_packet(SMB.SMB_COM_LOGOFF, 0, 0, 0, 0, 0, '\xff\x00\x00\x00', '') def __connect_tree(self, path, service, password, timeout = None): if password: # Password is only encrypted if the server passed us an "encryption" during protocol dialect # negotiation and mxCrypto's DES module is loaded. if self.__enc_key and DES: password = self.__deshash(password) self.__send_smb_packet(SMB.SMB_COM_TREE_CONNECT_ANDX, 0, 0x08, 0, 0, 0, pack('> 1) & 0x7f) << 1) s = s + chr(((ord(key[0]) & 0x01) << 6 | ((ord(key[1]) >> 2) & 0x3f)) << 1) s = s + chr(((ord(key[1]) & 0x03) << 5 | ((ord(key[2]) >> 3) & 0x1f)) << 1) s = s + chr(((ord(key[2]) & 0x07) << 4 | ((ord(key[3]) >> 4) & 0x0f)) << 1) s = s + chr(((ord(key[3]) & 0x0f) << 3 | ((ord(key[4]) >> 5) & 0x07)) << 1) s = s + chr(((ord(key[4]) & 0x1f) << 2 | ((ord(key[5]) >> 6) & 0x03)) << 1) s = s + chr(((ord(key[5]) & 0x3f) << 1 | ((ord(key[6]) >> 7) & 0x01)) << 1) s = s + chr((ord(key[6]) & 0x7f) << 1) return s def __deshash(self, password): # This is done according to Samba's encryption specification (docs/html/ENCRYPTION.html) if len(password) > 14: p14 = string.upper(password[:14]) else: p14 = string.upper(password) + '\0' * (14 - len(password)) if amk_crypto: p21 = DES.new(self.__expand_des_key(p14[:7])).encrypt('\x4b\x47\x53\x21\x40\x23\x24\x25') + DES.new(self.__expand_des_key(p14[7:])).encrypt('\x4b\x47\x53\x21\x40\x23\x24\x25') + '\0' * 5 return DES.new(self.__expand_des_key(p21[:7])).encrypt(self.__enc_key) + DES.new(self.__expand_des_key(p21[7:14])).encrypt(self.__enc_key) + DES.new(self.__expand_des_key(p21[14:])).encrypt(self.__enc_key) else: p21 = DES(self.__expand_des_key(p14[:7])).encrypt('\x4b\x47\x53\x21\x40\x23\x24\x25') + DES(self.__expand_des_key(p14[7:])).encrypt('\x4b\x47\x53\x21\x40\x23\x24\x25') + '\0' * 5 return DES(self.__expand_des_key(p21[:7])).encrypt(self.__enc_key) + DES(self.__expand_des_key(p21[7:14])).encrypt(self.__enc_key) + DES(self.__expand_des_key(p21[14:])).encrypt(self.__enc_key) def get_server_domain(self): return self.__server_domain def get_server_os(self): return self.__server_os def get_server_lanman(self): return self.__server_lanman def is_login_required(self): # Login is required if share mode is user. Otherwise only public services or services in share mode # are allowed. return self.__share_mode == SMB.SECURITY_SHARE_USER def login(self, name, password, domain = '', timeout = None): # Password is only encrypted if the server passed us an "encryption" during protocol dialect # negotiation and mxCrypto's DES module is loaded. if self.__enc_key and DES: password = self.__deshash(password) self.__send_smb_packet(SMB.SMB_COM_SESSION_SETUP_ANDX, 0, 0, 0, 0, 0, pack('= 0: self.__close_file(tid, fid) self.__disconnect_tree(tid) def stor_file(self, service, filename, callback, mode = SMB_O_CREAT | SMB_O_TRUNC, offset = 0, password = None, timeout = None): filename = string.replace(filename, '/', '\\') fid = -1 tid = self.__connect_tree('\\\\' + self.__remote_name + '\\' + service, SERVICE_ANY, password, timeout) try: fid, attrib, lastwritetime, datasize, grantedaccess, filetype, devicestate, action, serverfid = self.__open_file(tid, filename, mode, SMB_ACCESS_WRITE | SMB_SHARE_DENY_WRITE) # If the max_transmit buffer size is more than 16KB, upload process using non-raw mode is actually # faster than using raw-mode. if self.__max_transmit_size < 16384 and self.__can_write_raw: # Once the __raw_stor_file returns, fid is already closed self.__raw_stor_file(tid, fid, offset, datasize, callback, timeout) fid = -1 else: self.__nonraw_stor_file(tid, fid, offset, datasize, callback, timeout) finally: if fid >= 0: self.__close_file(tid, fid) self.__disconnect_tree(tid) def copy(self, src_service, src_path, dest_service, dest_path, callback = None, write_mode = SMB_O_CREAT | SMB_O_TRUNC, src_password = None, dest_password = None, timeout = None): dest_path = string.replace(dest_path, '/', '\\') src_path = string.replace(src_path, '/', '\\') src_tid = self.__connect_tree('\\\\' + self.__remote_name + '\\' + src_service, SERVICE_ANY, src_password, timeout) dest_tid = -1 try: if src_service == dest_service: dest_tid = src_tid else: dest_tid = self.__connect_tree('\\\\' + self.__remote_name + '\\' + dest_service, SERVICE_ANY, dest_password, timeout) dest_fid = self.__open_file(dest_tid, dest_path, write_mode, SMB_ACCESS_WRITE | SMB_SHARE_DENY_WRITE)[0] src_fid, _, _, src_datasize, _, _, _, _, _ = self.__open_file(src_tid, src_path, SMB_O_OPEN, SMB_ACCESS_READ | SMB_SHARE_DENY_WRITE) if callback: callback(0, src_datasize) max_buf_size = (self.__max_transmit_size >> 10) << 10 read_offset = 0 write_offset = 0 while read_offset < src_datasize: self.__send_smb_packet(SMB.SMB_COM_READ_ANDX, 0, 0, 0, src_tid, 0, pack(' -1 and src_service != dest_service: self.__disconnect_tree(dest_tid) def check_dir(self, service, path, password = None, timeout = None): tid = self.__connect_tree('\\\\' + self.__remote_name + '\\' + service, SERVICE_ANY, password, timeout) try: self.__send_smb_packet(SMB.SMB_COM_CHECK_DIR, 0, 0x08, 0, tid, 0, '', '\x04' + path + '\x00') while 1: data = self.__sess.recv_packet(timeout) if data: cmd, err_class, err_code, flags1, flags2, _, _, mid, params, d = self.__decode_smb(data) if cmd == SMB.SMB_COM_CHECK_DIR: if err_class == 0x00 and err_code == 0x00: return else: raise SessionError, ( 'Check directory failed. (ErrClass: %d and ErrCode: %d)' % ( err_class, err_code ), err_class, err_code ) finally: self.__disconnect_tree(tid) def remove(self, service, path, password = None, timeout = None): # Perform a list to ensure the path exists self.list_path(service, path, password, timeout) tid = self.__connect_tree('\\\\' + self.__remote_name + '\\' + service, SERVICE_ANY, password, timeout) try: self.__send_smb_packet(SMB.SMB_COM_DELETE, 0, 0x08, 0, tid, 0, pack('