BLACK CAT PROGRAMMER

Port scanning

雖然坊間有很多port scanning 的software 可以用,但基於課習,自己寫了段 script 去查看自己部機有咩port 係開左,
為了可以快一點,我把 timeout set 了一秒,連不到就試下一個port,
但結都是很慢,如果想再快一點,可能要加thread 之類,下次有空再加

import socket

host = "192.168.1.1"

def pscan(ip, port):
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.settimeout(1) # to fasten the scanning speed
        s.connect((ip, port))
        s.close()
        return True
    except:
        return False
    

for i in range(1, 65535, 1):
    if pscan(host, i):
        print("{}:{} is opened".format(host, i))
    if i%100 == 0:
        print("scanned {} ports".format(i)) # print the progress

第二版本,一個用了 thread 去 scan 開了的port ,感覺良好。

import os
import sys
import threading
import time
import queue
import random
import socket
import ipaddress

DEBUG=False

class Task:
    def __init__(self, ip, port, sock_type=socket.SOCK_STREAM):
        self.ip = ip
        self.port = port
        self.sock_type = sock_type

class Consumer(threading.Thread):
    def __init__(self, queue, name="consumer"):
        threading.Thread.__init__(self)
        self.name = name
        self.running = True
        self.queue = queue
        self.doneTaskCount = 0
    

    def run(self):
        while self.queue.empty() == False:
            task = self.queue.get();
            try:
                s = socket.socket(socket.AF_INET, task.sock_type)
                s.settimeout(1)
                s.connect((task.ip, task.port))
                print("{}:{} is opened".format(task.ip, task.port))
                s.close()
            except:
                pass
        #print("{} done".format(self.name))

class Monitor(threading.Thread):
    def __init__(self, queue, report_int=5, name="monitor"):
        threading.Thread.__init__(self)
        self.queue = queue
        self.report_int = report_int

    def run(self):
        while self.queue.empty() == False:
            print("remaining port: {}".format(self.queue.qsize()))
            time.sleep(self.report_int)
        # print("{} done".format(self.name))
      
          
# main
def main(ip_list, start_port=1, end_port=65535, thread_count=1000):
    print("starting main")
    consumer_count = thread_count
    
    Q = queue.Queue()
    
    # queue all target ip and port
    for ip in ip_list:
        if DEBUG:
            print("putting {} {} {}".format(ip, start_port, end_port))
        for i in range(start_port, end_port+1):
            task = Task(str(ip), i)
            Q.put(task)


    print("starting Monitor")
    monitor = Monitor(Q)
    monitor.start()

    print("start scanning with {} threads".format(consumer_count))

    consumer_arr = []
    for i in range(consumer_count):
        consumer_arr.append(Consumer(Q, "Consumer {}".format(i)))
        consumer_arr[i].start()
    
    
    # wait for finish
    for i in range(consumer_count):
        consumer_arr[i].join()
    monitor.join()    
    print("done. cleaning up.")
  
def help():
    print("{} <ip or subnet of the target> [range of the port]".format(sys.argv[0]))
    print("eg:")
    print("scan all of the opened port of the subnet 192.168.1.1 - 192.168.1.254")
    print("  {} 192.168.1.0/24".format(sys.argv[0]))
    print("")
    print("scan the opened port from 1 to 1024 of 192.168.1.1")
    print("  {} 192.168.1.1 1:1024".format(sys.argv[0]))

if __name__ == "__main__":
    ip_list = None
    start_port = 1
    end_port = 65535
    
    if len(sys.argv) < 2:
        help()
        exit()
    
    # parse ip
    try:
        net4 = ipaddress.ip_network(sys.argv[1])
        ips = list(net4.hosts())
        if len(ips) > 0 :
            ip_list = ips
        else:
            ip = ipaddress.ip_address(sys.argv[1])
            ip_list = [str(ip)]
    except Exception as e:
        print("invalid ip format")
        help()
        exit()

    # parse port
    if len(sys.argv) == 3:
        input_ip = sys.argv[1]
        port = sys.argv[2].split(":")
        if len(port) == 1:
            start_port = int(port[0])
            end_port = int(port[0])
        else:
            start_port = int(port[0])
            end_port = int(port[1])

    

    main(ip_list, start_port, end_port)

Posted in notesTagged ,