The script is used to download large file by multiple threads and show progress bar in real time.

from __future__ import annotations
import sys
from tqdm import tqdm   #show progress bar
import requests
import multitasking
import signal
from retry import retry

signal.signal(signal.SIGINT, multitasking.killall)

headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36 QIHU 360SE'
}

MB = 1024**2

# split file to multiple parts
def split(start: int, end: int, step: int) -> list[tuple[int, int]]:
    parts = [(start, min(start+step, end))
             for start in range(0, end, step)]
    return parts

# get file size value, unit: B
def get_file_size(url: str, raise_error: bool = False) -> int:
    response = requests.head(url)
    file_size = response.headers.get('Content-Length')
    if file_size is None:
        if raise_error is True:
            raise ValueError('The file is not supported to downloaded by multiple threads.')
        return file_size
    return int(file_size)

def download(url: str, file_name: str, retry_times: int = 3, each_size=16*MB) -> None:
    f = open(file_name, 'wb')
    file_size = get_file_size(url)

    # multiple threads decorator, make it easy to use threads
    @retry(tries=retry_times)
    @multitasking.task
    def start_download(start: int, end: int) -> None:
        _headers = headers.copy()
        _headers['Range'] = f'bytes={start}-{end}'
        response = session.get(url, headers=_headers, stream=True)
        chunk_size = 128
        chunks = []
        for chunk in response.iter_content(chunk_size=chunk_size):
            chunks.append(chunk)
            bar.update(chunk_size)
        f.seek(start)
        for chunk in chunks:
            f.write(chunk)
        # free resource
        del chunks

    session = requests.Session()
    each_size = min(each_size, file_size)

    parts = split(0, file_size, each_size)
    print(f'the count of parts:{len(parts)}')
    # create progress bar
    bar = tqdm(total=file_size, desc=f'download file:{file_name}')
    for part in parts:
        start, end = part
        start_download(start, end)

    multitasking.wait_for_tasks()
    f.close()
    bar.close()


# useage: scriptPath fileURL
if "__main__" == __name__:
    url = sys.argv[1]
    file_name = url.split('/')[-1]
    download(url, file_name)

Output:

Categories: Python

0 0 votes
Article Rating
Subscribe
Notify of
guest

0 Comments
Inline Feedbacks
View all comments

Content Summary
: Input your strings, the tool can get a brief summary of the content for you.

X
0
Would love your thoughts, please comment.x
()
x