雖然坊間有很多port scanning 的software 可以用,但基於課習,自己寫了段 script 去查看自己部機有咩port 係開左,
為了可以快一點,我把 timeout set 了一秒,連不到就試下一個port,
但結都是很慢,如果想再快一點,可能要加thread 之類,下次有空再加
import socket
host = "192.168.1.1"
def pscan(ip, port):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(1) # to fasten the scanning speed
s.connect((ip, port))
s.close()
return True
except:
return False
for i in range(1, 65535, 1):
if pscan(host, i):
print("{}:{} is opened".format(host, i))
if i%100 == 0:
print("scanned {} ports".format(i)) # print the progress
第二版本,一個用了 thread 去 scan 開了的port ,感覺良好。
import os
import sys
import threading
import time
import queue
import random
import socket
import ipaddress
DEBUG=False
class Task:
def __init__(self, ip, port, sock_type=socket.SOCK_STREAM):
self.ip = ip
self.port = port
self.sock_type = sock_type
class Consumer(threading.Thread):
def __init__(self, queue, name="consumer"):
threading.Thread.__init__(self)
self.name = name
self.running = True
self.queue = queue
self.doneTaskCount = 0
def run(self):
while self.queue.empty() == False:
task = self.queue.get();
try:
s = socket.socket(socket.AF_INET, task.sock_type)
s.settimeout(1)
s.connect((task.ip, task.port))
print("{}:{} is opened".format(task.ip, task.port))
s.close()
except:
pass
#print("{} done".format(self.name))
class Monitor(threading.Thread):
def __init__(self, queue, report_int=5, name="monitor"):
threading.Thread.__init__(self)
self.queue = queue
self.report_int = report_int
def run(self):
while self.queue.empty() == False:
print("remaining port: {}".format(self.queue.qsize()))
time.sleep(self.report_int)
# print("{} done".format(self.name))
# main
def main(ip_list, start_port=1, end_port=65535, thread_count=1000):
print("starting main")
consumer_count = thread_count
Q = queue.Queue()
# queue all target ip and port
for ip in ip_list:
if DEBUG:
print("putting {} {} {}".format(ip, start_port, end_port))
for i in range(start_port, end_port+1):
task = Task(str(ip), i)
Q.put(task)
print("starting Monitor")
monitor = Monitor(Q)
monitor.start()
print("start scanning with {} threads".format(consumer_count))
consumer_arr = []
for i in range(consumer_count):
consumer_arr.append(Consumer(Q, "Consumer {}".format(i)))
consumer_arr[i].start()
# wait for finish
for i in range(consumer_count):
consumer_arr[i].join()
monitor.join()
print("done. cleaning up.")
def help():
print("{} <ip or subnet of the target> [range of the port]".format(sys.argv[0]))
print("eg:")
print("scan all of the opened port of the subnet 192.168.1.1 - 192.168.1.254")
print(" {} 192.168.1.0/24".format(sys.argv[0]))
print("")
print("scan the opened port from 1 to 1024 of 192.168.1.1")
print(" {} 192.168.1.1 1:1024".format(sys.argv[0]))
if __name__ == "__main__":
ip_list = None
start_port = 1
end_port = 65535
if len(sys.argv) < 2:
help()
exit()
# parse ip
try:
net4 = ipaddress.ip_network(sys.argv[1])
ips = list(net4.hosts())
if len(ips) > 0 :
ip_list = ips
else:
ip = ipaddress.ip_address(sys.argv[1])
ip_list = [str(ip)]
except Exception as e:
print("invalid ip format")
help()
exit()
# parse port
if len(sys.argv) == 3:
input_ip = sys.argv[1]
port = sys.argv[2].split(":")
if len(port) == 1:
start_port = int(port[0])
end_port = int(port[0])
else:
start_port = int(port[0])
end_port = int(port[1])
main(ip_list, start_port, end_port)