BLACK CAT PROGRAMMER

python Comparison

python 提供很多的比較的方便,其中大家可能較常用 == 和 is,這個post 就講講他們的分別

is 是比較 variable 的pointer address

== 是比較 variable 的 value

簡單例子:

>>> p1 = { "name": "tom", "age": 11}
>>> p2 = { "name": "tom", "age": 11}
>>> p1 == p2
True
>>> p1 is p2
False
>>> hex(id(p1))
'0x104fdc800'
>>> hex(id(p2))
'0x104fdc840'

以上的例子都很簡單,因為 p1 同 p2 實際在memory 裡是兩個不同的 object ,不過他們有著一樣的 value,所以用 == 去比較時,會得到 True,而用 is 去比較時,會得到 False 。

我們再考慮以下的例子

>>> a = 256
>>> b = 256
>>> c = 257
>>> d = 257

>>> a == b
True
>>> c == d
True

>>> a is b
True
>>> c is d
False

為什麽 a 和 b 可以用 == 和 is 去比較,但 c 和 d 就不可以?

原來當 python 一開始是會把一常用的數值 load 入 memory ,這些數值包括 True, False, -256 to +256 ,所以他們有一樣的 memory address ,而大過256 的值,python 就會即時 allocate 新的 address

>>> hex(id(a))
'0x105b100d8'
>>> hex(id(b))
'0x105b100d8'

>>> hex(id(c))
'0x104fdffb0'
>>> hex(id(d))
'0x105047030'

所以 a b 是有一樣的 address ,而 c d 的 address 是不同

True False 在 python 世界裡都是一個 obj,他們都有各自的address,所以python 的 True 和 False 是大楷 T 和 F

>>> hex(id(True))
'0x105ad8b48'

>>> hex(id(False))
'0x105ad8b68'

Posted in notes, pythonTagged

python virtual environment

成日都忘記virtual environment 的用法,所以記下來,不用每一次都去睇doc

每一個project 所需的 library 的版本的要求都不同,virtual environment 就是替每一個project 開一個environment,而家每個environment 都可以有自己的所要的library。

實際操作亦很簡單,

去創造一個新的environment,我自己個人就喜歡將新的environment 叫venv 放在project 的root folder。記住要將.venv 放到git ignore 的file 裡。

python -m venv ./venv

使用剛剛新創建的virtual environment

# On Windows, run:

.venv\Scripts\activate.bat

# On Unix or MacOS, run:

source .venv/bin/activate

然後 python xxx.py 就可以使用新的environment,如果要安裝新的library,可以用pip 正常咁安裝

# list current install packages
pip list

# install packages with requirements.txt
pip install -r requirements.txt

離開virtural environment

# On Windows, run:

.venv\Scripts\deactivate.bat

# On Unix or MacOS, run:

source .venv/bin/deactivate

Posted in notesTagged ,

python thread – 2

Another python threading implementation

import threading
import random
import signal
import queue
import time

work_queue = queue.Queue()
is_running = True

def consumer():
  while is_running:
    task = work_queue.get()
    print("doing: {}".format(task))
    
def producer():
  while is_running:
    task = random.random()
    work_queue.put(task)
    time.sleep(10*random.random())

def shutdown():
  print("terminating...")
  is_running = False
  
def main():
  print("starting two threads")
  t1 = threading.Thread(target=consumer, name="consumer")
  t2 = threading.Thread(target=producer, name="producer")
  
  signal.signal(signal.SIGINT, shutdown)
  
  t1.start()
  t2.start()
  
  t1.join()
  t2.join()
  
  print("bye")

  
if __name__ == "__main__":
  main()

Previous implementation: http://hong.ddns.net/wp-admin/post.php?post=164&action=edit

Posted in notesTagged , ,

pdf tools

有時如果要將幾份pdf 合成一份,或者刪除某幾頁,要俾錢的 acrobat 先做到,但明明很簡單,所以不如自己寫一段仔python,用上免費的library…

有三個功能,
第一個是合併幾份pdf
第二個是拆散pdf
第三個是抽取其中某幾頁

from PyPDF2 import PdfFileWriter, PdfFileReader, PdfFileMerger
import sys
import os
           
def merge(pdf_list):
    input_pdf = pdf_list[:-1]
    output_pdf = pdf_list[-1]
    # check file exists
    for pdf in input_pdf:
        if not os.path.exists(pdf):
            print("error: {} not exists".format(pdf))
            return False
    if os.path.exists(output_pdf):
        print("error output file {} already exists".format(output_pdf))

    merger = PdfFileMerger()
    for pdf in input_pdf:
        merger.append(open(pdf, 'rb'))

    with open(output_pdf, 'wb') as fout:
        merger.write(fout)

def split(input_pdf):
    if not os.path.exists(input_pdf):
        print("error. file {} not exists".format(input_pdf))
        return False

    inputpdf = PdfFileReader(open(input_pdf, "rb"))

    for i in range(inputpdf.numPages):
        output = PdfFileWriter()
        output.addPage(inputpdf.getPage(i))
        with open("{}-page{}.pdf".format(input_pdf, i), "wb") as outputStream:
            output.write(outputStream)

def extract(input_pdf, start, end):
    inputpdf = PdfFileReader(open(input_pdf, "rb"))
    input_name = os.path.splitext(input_pdf)[0]
    output = PdfFileWriter()
    output_name = "{}_{}-{}.pdf".format(input_name, start, end)
    outputStream = open(output_name, "wb")

    end = min(inputpdf.numPages+1,end+1)

    print("extract {} from page {} to page {} to file {}".format(input_pdf, start, end, output_name))
    for i in range(start, end):
        output.addPage(inputpdf.getPage(i-1))
    output.write(outputStream)

def help():
    print("{} ACTION <ip or subnet of the target> [range of the port]".format(sys.argv[0]))
    print("ACTION")
    print("  merge <pdf separated by space}")
    print("  split input.pdf")
    print("  extract input.pdf <start page>[:end page]")
    
    
if __name__ == "__main__":
    if len(sys.argv) < 2:
        help()
        exit()
    if sys.argv[1] == "merge":
        if len(sys.argv) < 4:
            print("at least two pdf files to be merged")
            help()
            exit()
        else:
            merge(sys.argv[2:])

    elif sys.argv[1] == "split":
        if len(sys.argv) < 3:
            print("at least two pdf files to be merged")
            help()
            exit()
        else:
            split(sys.argv[2])

    elif sys.argv[1] == "extract":
        if len(sys.argv) < 4:
            help()
            exit()
        else:
            input_pdf  = sys.argv[2]
            page_range = sys.argv[3].split(":")
            start = int(page_range[0]) 
            end = int(page_range[1]) if len(page_range) > 1 else start

            extract(input_pdf, start, end)
Posted in notesTagged ,

python threading

經常忘記 python threading 的用做法,今次寫一個簡單的起手式,方便日後ref

import os
import threading
import time
import queue
import random


class Consumer(threading.Thread):
  def __init__(self, queue, name="consumer"):
    threading.Thread.__init__(self);
    self.name = name
    self.running = True
    self.queue = queue
    self.doneTaskCount = 0
    self.hp = 10

  def run(self):
    while self.running:
      task = self.queue.get();
      time.sleep(2)
      self.doneTaskCount += 1
      print("{} done task (#{}): {}".format(self.name, self.doneTaskCount, task))
      self.hp -= 1
      if self.hp <= 0:
        self.running = False

class Producer(threading.Thread):
  def __init__(self, queue, name="producer"):
    threading.Thread.__init__(self);
    self.name = name
    self.running = True
    self.queue = queue
    self.hp = 10
    self.doneTaskCount = 0

  def run(self):
    while self.running:
      time.sleep(1)
      task = random.random() * 100
      self.queue.put(task)
      self.doneTaskCount += 1
      print("{} created task (#{}): {}".format(self.name, self.doneTaskCount, task))
      
      self.hp -= 1
      if self.hp <= 0:
        self.running = False
          
# main
def main():
  print("start main")
  Q = queue.Queue()
  producer = Producer(Q, "Producer 1")
  consumer = Consumer(Q, "Consumer 1")
  
  producer.start()
  consumer.start()
  
  print(threading.current_thread())
  
  producer.join()
  consumer.join()
  print("performing cleaning, supposed after all thread finished")
  

if __name__ == "__main__":
  main()
Posted in notesTagged , ,

Port scanning

雖然坊間有很多port scanning 的software 可以用,但基於課習,自己寫了段 script 去查看自己部機有咩port 係開左,
為了可以快一點,我把 timeout set 了一秒,連不到就試下一個port,
但結都是很慢,如果想再快一點,可能要加thread 之類,下次有空再加

import socket

host = "192.168.1.1"

def pscan(ip, port):
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.settimeout(1) # to fasten the scanning speed
        s.connect((ip, port))
        s.close()
        return True
    except:
        return False
    

for i in range(1, 65535, 1):
    if pscan(host, i):
        print("{}:{} is opened".format(host, i))
    if i%100 == 0:
        print("scanned {} ports".format(i)) # print the progress

第二版本,一個用了 thread 去 scan 開了的port ,感覺良好。

import os
import sys
import threading
import time
import queue
import random
import socket
import ipaddress

DEBUG=False

class Task:
    def __init__(self, ip, port, sock_type=socket.SOCK_STREAM):
        self.ip = ip
        self.port = port
        self.sock_type = sock_type

class Consumer(threading.Thread):
    def __init__(self, queue, name="consumer"):
        threading.Thread.__init__(self)
        self.name = name
        self.running = True
        self.queue = queue
        self.doneTaskCount = 0
    

    def run(self):
        while self.queue.empty() == False:
            task = self.queue.get();
            try:
                s = socket.socket(socket.AF_INET, task.sock_type)
                s.settimeout(1)
                s.connect((task.ip, task.port))
                print("{}:{} is opened".format(task.ip, task.port))
                s.close()
            except:
                pass
        #print("{} done".format(self.name))

class Monitor(threading.Thread):
    def __init__(self, queue, report_int=5, name="monitor"):
        threading.Thread.__init__(self)
        self.queue = queue
        self.report_int = report_int

    def run(self):
        while self.queue.empty() == False:
            print("remaining port: {}".format(self.queue.qsize()))
            time.sleep(self.report_int)
        # print("{} done".format(self.name))
      
          
# main
def main(ip_list, start_port=1, end_port=65535, thread_count=1000):
    print("starting main")
    consumer_count = thread_count
    
    Q = queue.Queue()
    
    # queue all target ip and port
    for ip in ip_list:
        if DEBUG:
            print("putting {} {} {}".format(ip, start_port, end_port))
        for i in range(start_port, end_port+1):
            task = Task(str(ip), i)
            Q.put(task)


    print("starting Monitor")
    monitor = Monitor(Q)
    monitor.start()

    print("start scanning with {} threads".format(consumer_count))

    consumer_arr = []
    for i in range(consumer_count):
        consumer_arr.append(Consumer(Q, "Consumer {}".format(i)))
        consumer_arr[i].start()
    
    
    # wait for finish
    for i in range(consumer_count):
        consumer_arr[i].join()
    monitor.join()    
    print("done. cleaning up.")
  
def help():
    print("{} <ip or subnet of the target> [range of the port]".format(sys.argv[0]))
    print("eg:")
    print("scan all of the opened port of the subnet 192.168.1.1 - 192.168.1.254")
    print("  {} 192.168.1.0/24".format(sys.argv[0]))
    print("")
    print("scan the opened port from 1 to 1024 of 192.168.1.1")
    print("  {} 192.168.1.1 1:1024".format(sys.argv[0]))

if __name__ == "__main__":
    ip_list = None
    start_port = 1
    end_port = 65535
    
    if len(sys.argv) < 2:
        help()
        exit()
    
    # parse ip
    try:
        net4 = ipaddress.ip_network(sys.argv[1])
        ips = list(net4.hosts())
        if len(ips) > 0 :
            ip_list = ips
        else:
            ip = ipaddress.ip_address(sys.argv[1])
            ip_list = [str(ip)]
    except Exception as e:
        print("invalid ip format")
        help()
        exit()

    # parse port
    if len(sys.argv) == 3:
        input_ip = sys.argv[1]
        port = sys.argv[2].split(":")
        if len(port) == 1:
            start_port = int(port[0])
            end_port = int(port[0])
        else:
            start_port = int(port[0])
            end_port = int(port[1])

    

    main(ip_list, start_port, end_port)

Posted in notesTagged ,

python request

金田一出了新的故事,上網睇固然開心,不用擔心存放問題,但網頁的計設真是很爛,要scroll up and down 去看一整頁,又多廣告,索性把它下載下來好一點


import urllib.request

# http://cache.someou.com/Uploads/files/2020/66134/001.jpg

PAGES = [25, 23, 22, 22, 22, 24, 21, 23, 26, 23, 23, 21, 21, 22, 25, 22, 23, 21, 23, 24, 23, 23, 22, 23, 26, 23, 22, 24, 24, 22]
BASE_URL = 'http://cache.someou.com/Uploads/files/2020/{:d}/{:03d}.jpg'


for chp in range(30):
    for page in range(PAGES[chp]):
        link = BASE_URL.format(66134+chp, page+1)
        print("donwloading {} chapter {} page {}".format(link, chp, page))

        req = urllib.request.Request(link)
        req.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.157 Safari/537.36')
        req.add_header('Accept', 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3')
        req.add_header('Cookie', '__cfduid=d28d5f69ff39f960a0571508905d2e3b21556458631')
        response = urllib.request.urlopen(req)

        if response.status == 200:
            #result = f.read().decode('utf-8')
            f = open('{:03d}_{:03d}.jpg'.format(chp+1,page+1), 'wb')
            f.write(response.read())
            f.close()
        else:
            print("Failed in download chp {} page {}".format(chp, page))
            print("status {} reason {}".format(response.status, response.reason))

print("done")

小小玩具,自己做一個記錄

Posted in notesTagged