use a new thread to implement the follow python code. make it as fast and effective as possible.
class ThreadBatchTelegramHandler(logging.Handler): def init(self, token, chat_id, flush_interval=5): super().init() self.token = token self.chat_id = chat_id self.flush_interval = flush_interval self.lock = threading.Lock()
To implement a ThreadBatchTelegramHandler
that sends log messages to a Telegram chat in batches using a new thread, we can utilize Python's threading
module along with a queue to manage the log messages efficiently. The handler will collect log messages and send them in batches at specified intervals.
Here's a complete implementation:
import logging
import threading
import time
import requests
from queue import Queue
class ThreadBatchTelegramHandler(logging.Handler):
def __init__(self, token, chat_id, flush_interval=5, batch_size=10):
super().__init__()
self.token = token
self.chat_id = chat_id
self.flush_interval = flush_interval
self.batch_size = batch_size
self.queue = Queue()
self.lock = threading.Lock()
self.stop_event = threading.Event()
self.thread = threading.Thread(target=self._process_queue)
self.thread.start()
def emit(self, record):
# Format the log record and put it in the queue
log_entry = self.format(record)
self.queue.put(log_entry)
def _process_queue(self):
while not self.stop_event.is_set():
messages = []
while len(messages) < self.batch_size and not self.queue.empty():
messages.append(self.queue.get())
if messages:
self._send_messages(messages)
# Wait for the flush interval or until there are enough messages
for _ in range(self.flush_interval):
if self.stop_event.is_set():
break
time.sleep(1)
def _send_messages(self, messages):
# Join messages into a single string
message_text = "\n".join(messages)
url = f"https://api.telegram.org/bot{self.token}/sendMessage"
payload = {
'chat_id': self.chat_id,
'text': message_text,
'parse_mode': 'Markdown' # Optional: use Markdown formatting
}
try:
response = requests.post(url, json=payload)
response.raise_for_status() # Raise an error for bad responses
except requests.RequestException as e:
logging.error(f"Failed to send messages to Telegram: {e}")
def close(self):
self.stop_event.set()
self.thread.join()
super().close()
# Example usage
if __name__ == "__main__":
logger = logging.getLogger("TelegramLogger")
logger.setLevel(logging.DEBUG)
# Replace with your actual Telegram bot token and chat ID
token = "YOUR_TELEGRAM_BOT_TOKEN"
chat_id = "YOUR_CHAT_ID"
telegram_handler = ThreadBatchTelegramHandler(token, chat_id)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
telegram_handler.setFormatter(formatter)
logger.addHandler(telegram_handler)
# Example logging
for i in range(25):
logger.info(f"This is log message number {i + 1}")
time.sleep(1) # Simulate some delay between log messages
# Clean up
telegram_handler.close()
Queue
to store log messages. This allows for thread-safe operations when adding messages from the main thread._process_queue
) continuously checks the queue for messages to send. It collects messages until either the batch size is reached or the flush interval has passed._send_messages
method sends the collected messages to the Telegram API. It handles any exceptions that may occur during the request.close
method ensures that the thread stops processing when the handler is no longer needed.YOUR_TELEGRAM_BOT_TOKEN
and YOUR_CHAT_ID
with your actual Telegram bot token and chat ID.