use a new thread and to implement the follow python code. make it as fast and effective as possible.
class TelegramHandler(logging.Handler): def init(self, token, chat_id, flush_interval=5): super().init() self.token = token self.chat_id = chat_id self.flush_interval = flush_interval self.lock = threading.Lock()
To implement the TelegramHandler
class in a way that is fast and effective, we can utilize threading to send log messages to Telegram asynchronously. This will prevent blocking the main thread while waiting for network operations to complete. Below is an implementation that includes a separate thread for sending messages and a queue to manage log messages efficiently.
Here's the complete implementation:
import logging
import threading
import queue
import requests
import time
class TelegramHandler(logging.Handler):
def __init__(self, token, chat_id, flush_interval=5):
super().__init__()
self.token = token
self.chat_id = chat_id
self.flush_interval = flush_interval
self.lock = threading.Lock()
self.log_queue = queue.Queue()
self.stop_event = threading.Event()
self.worker_thread = threading.Thread(target=self._process_queue)
self.worker_thread.start()
def emit(self, record):
try:
msg = self.format(record)
self.log_queue.put(msg)
except Exception as e:
self.handleError(record)
def _process_queue(self):
while not self.stop_event.is_set():
try:
# Get message from the queue with a timeout
msg = self.log_queue.get(timeout=self.flush_interval)
self._send_message(msg)
self.log_queue.task_done()
except queue.Empty:
continue
def _send_message(self, message):
url = f"https://api.telegram.org/bot{self.token}/sendMessage"
payload = {
'chat_id': self.chat_id,
'text': message,
'parse_mode': 'HTML' # Optional: use HTML formatting
}
try:
with self.lock:
requests.post(url, json=payload)
except Exception as e:
# Handle exceptions (e.g., log to console or another handler)
print(f"Failed to send message: {e}")
def close(self):
self.stop_event.set()
self.worker_thread.join()
super().close()
# Example usage
if __name__ == "__main__":
logger = logging.getLogger("TelegramLogger")
logger.setLevel(logging.DEBUG)
# Replace with your actual Telegram bot token and chat ID
token = "YOUR_TELEGRAM_BOT_TOKEN"
chat_id = "YOUR_CHAT_ID"
telegram_handler = TelegramHandler(token, chat_id)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
telegram_handler.setFormatter(formatter)
logger.addHandler(telegram_handler)
# Log some messages
logger.info("This is an info message.")
logger.error("This is an error message.")
# Close the handler properly
telegram_handler.close()
Threading: A separate thread (worker_thread
) is created to process log messages from a queue (log_queue
). This allows the main thread to continue executing without waiting for network operations.
Queue: The queue.Queue
is used to store log messages. This ensures that messages are processed in the order they are received.
Locking: A lock (self.lock
) is used when sending messages to ensure that only one thread can send a message at a time, preventing race conditions.
Flush Interval: The flush_interval
parameter determines how long the worker thread will wait for new messages before checking the queue again. This can be adjusted based on your needs.
Error Handling: Basic error handling is included to catch exceptions when sending messages.
Graceful Shutdown: The close
method ensures that the worker thread is properly stopped when the handler is no longer needed.
This implementation is efficient and should work well for logging messages to Telegram without blocking the main