-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathbackground_process.py
More file actions
75 lines (59 loc) · 2.13 KB
/
Copy pathbackground_process.py
File metadata and controls
75 lines (59 loc) · 2.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import threading
from anyone_protocol_sdk import *
from anon_starter import Anon
import signal
import sys
import stem
config = VPNConfig(
routings=[
VPNRouting("ip-api.com", ["de"]),
VPNRouting("ipinfo.io", ["nl"]),
]
)
class AnonRunner:
def __init__(self):
self.anon_instance = Anon()
self.shutdown_event = threading.Event()
# Register signal handler for graceful exit
signal.signal(signal.SIGINT, self.handle_exit)
signal.signal(signal.SIGTERM, self.handle_exit)
def run(self):
print("Starting Anon SDK process...")
self.anon_instance.start_vpn_with_config(config)
# Run shutdown wait in a separate thread
wait_thread = threading.Thread(
target=self.wait_for_shutdown, daemon=True)
wait_thread.start()
# Keep the main thread active (handling events)
try:
self.handle_events_forever()
except KeyboardInterrupt:
self.handle_exit()
def wait_for_shutdown(self):
"""Waits for shutdown signal in a separate thread (doesn't block main loop)."""
self.shutdown_event.wait()
self.stop()
def handle_events_forever(self):
"""Keeps the SDK running without blocking event handling."""
while not self.shutdown_event.is_set():
pass # Main thread keeps running and handling SDK events
def handle_exit(self, *args):
"""Handles graceful shutdown on Ctrl+C or SIGTERM."""
print("\nGracefully shutting down Anon SDK...")
self.stop()
sys.exit(0) # Exit cleanly
def stop(self):
if not self.shutdown_event.is_set():
self.shutdown_event.set()
try:
self.anon_instance.stop_vpn()
print("Anon SDK stopped successfully.")
except stem.SocketClosed:
print("Tor control connection already closed. Ignoring error.")
except Exception as e:
print(f"Unexpected error during shutdown: {e}")
if __name__ == "__main__":
runner = AnonRunner()
runner.run()
# http://ip-api.com/json
# https://ipinfo.io/json