Eagleget For Linux -
def refresh(self): self.layoutChanged.emit() class MainWindow(QMainWindow): def (self): super(). init () self.manager = DownloadManager() self.init_ui() self.timer = QTimer() self.timer.timeout.connect(self.update_progress) self.timer.start(1000)
def log_message(self, format, *args): pass # Suppress logging class BrowserIntegrationServer: def (self, port=8787): self.port = port self.server = None self.callback = None
def complete_download(self): filepath = os.path.join(self.task.save_path, self.task.filename) temp_filepath = filepath + '.eagleget' # Rename temp file to final file if os.path.exists(temp_filepath): os.rename(temp_filepath, filepath) self.task.status = DownloadStatus.COMPLETED self.task.completed_at = time.time() # Verify MD5 if available if self.task.md5: self.verify_md5(filepath)
def delete_task(self, task_id: str): conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute('DELETE FROM downloads WHERE id = ?', (task_id,)) conn.commit() conn.close() eagleget for linux
def resume_from_temp(self, temp_filepath: str): with open(temp_filepath, 'rb') as f: f.seek(0, 2) file_size = f.tell() # Update chunk downloaded sizes for chunk in self.chunks: if file_size > chunk.start: chunk.downloaded = min(chunk.end - chunk.start + 1, file_size - chunk.start)
def columnCount(self, parent=QModelIndex()): return len(self.headers)
def format_size(self, size): for unit in ['B', 'KB', 'MB', 'GB']: if size < 1024.0: return f"size:.1f unit" size /= 1024.0 return f"size:.1f TB" class DownloadDialog(QDialog): def (self, parent=None): super(). init (parent) self.init_ui() def refresh(self): self
def update_progress(self): self.model.refresh() stats = self.manager.get_statistics() self.status_label.setText( f"Total: stats['total'] | " f"Downloading: stats['downloading'] | " f"Completed: stats['completed'] | " f"Size: self.format_size(stats['downloaded_size']) / self.format_size(stats['total_size'])" )
def download_chunk(self, chunk: DownloadChunk): filepath = os.path.join(self.task.save_path, self.task.filename) temp_filepath = filepath + '.eagleget' # Check existing data if os.path.exists(temp_filepath): with open(temp_filepath, 'rb+') as f: f.seek(chunk.start) chunk.downloaded = f.tell() - chunk.start headers = {} if chunk.downloaded > 0: headers['Range'] = f'bytes=chunk.start + chunk.downloaded-chunk.end' else: headers['Range'] = f'bytes=chunk.start-chunk.end' try: response = requests.get(self.task.url, headers=headers, stream=True) with open(temp_filepath, 'r+b') as f: f.seek(chunk.start + chunk.downloaded) for data in response.iter_content(chunk_size=8192): if self.paused or self.stopped: break if data: f.write(data) with self.lock: chunk.downloaded += len(data) except Exception as e: print(f"Chunk chunk.thread_id failed: e")
def remove_download(self, task_id: str, delete_file: bool = False): if task_id in self.active_downloads: self.active_downloads[task_id].stop() del self.active_downloads[task_id] if delete_file: task = self.tasks[task_id] filepath = os.path.join(task.save_path, task.filename) if os.path.exists(filepath): os.remove(filepath) del self.tasks[task_id] self.delete_task(task_id) )) conn.commit() conn.close() def resume_from_temp(self
def resume_download(self, task_id: str): if task_id in self.tasks: self.start_download(task_id)
def resume(self): self.paused = False
def get_data(self): return (self.url_input.text(), self.path_input.text(), self.threads_spin.value()) src/browser_integration.py
sys.exit(app.exec_()) if == ' main ': main() Desktop Integration eagleget.desktop
def add_download(self, url: str, save_path: str, threads: int = 4) -> str: import uuid task_id = str(uuid.uuid4()) task = DownloadTask( id=task_id, url=url, filename=url.split('/')[-1], save_path=save_path, total_size=0, downloaded_size=0, status=DownloadStatus.PENDING, threads=threads, speed=0.0, created_at=datetime.now(), completed_at=None, md5=None ) self.tasks[task_id] = task self.save_task(task) return task_id