Efficient Telegram Logging Handler with Thread Safety in Python

Answered using ChatGPT 4o by ChatGPT4o

Question:

use one thread lock and to implement the follow python code. make it as fast and effective as possible.

class TelegramHandler(logging.Handler): def init(self, token, chat_id, batch_size=10, flush_interval=5): super().init() self.token = token self.chat_id = chat_id self.batch_size = batch_size self.flush_interval = flush_interval

Answer:

To implement a TelegramHandler that uses a single thread lock for logging messages efficiently, we can utilize Python's threading module. The handler will collect log messages in batches and send them to a Telegram chat at specified intervals. Here's a complete implementation:

import logging
import requests
import threading
import time
from collections import deque

class TelegramHandler(logging.Handler):
    def __init__(self, token, chat_id, batch_size=10, flush_interval=5):
        super().__init__()
        self.token = token
        self.chat_id = chat_id
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self.messages = deque()
        self.lock = threading.Lock()
        self.running = True
        self.flush_thread = threading.Thread(target=self.flush_messages)
        self.flush_thread.start()

    def emit(self, record):
        msg = self.format(record)
        with self.lock:
            self.messages.append(msg)
            if len(self.messages) >= self.batch_size:
                self.flush()

    def flush(self):
        with self.lock:
            if self.messages:
                batch = list(self.messages)
                self.messages.clear()
                self.send_to_telegram(batch)

    def send_to_telegram(self, messages):
        payload = {
            'chat_id': self.chat_id,
            'text': '\n'.join(messages),
            'parse_mode': 'Markdown'  # or 'HTML' based on your preference
        }
        try:
            requests.post(f'https://api.telegram.org/bot{self.token}/sendMessage', json=payload)
        except Exception as e:
            logging.error(f"Failed to send messages to Telegram: {e}")

    def flush_messages(self):
        while self.running:
            time.sleep(self.flush_interval)
            self.flush()

    def close(self):
        self.running = False
        self.flush_thread.join()
        super().close()

# Example usage:
if __name__ == "__main__":
    logger = logging.getLogger("TelegramLogger")
    logger.setLevel(logging.DEBUG)
    
    # Replace with your actual Telegram bot token and chat ID
    token = "YOUR_TELEGRAM_BOT_TOKEN"
    chat_id = "YOUR_CHAT_ID"
    
    telegram_handler = TelegramHandler(token, chat_id)
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    telegram_handler.setFormatter(formatter)
    
    logger.addHandler(telegram_handler)

    # Example logging
    for i in range(25):
        logger.info(f"Log message {i + 1}")
        time.sleep(1)  # Simulate some delay between log messages

    # Clean up
    telegram_handler.close()

Explanation:

  1. Thread Safety: A threading.Lock is used to ensure that access to the messages deque is thread-safe. This prevents race conditions when multiple threads try to log messages simultaneously.

  2. Batching: The emit method appends messages to a deque. When the number of messages reaches the batch_size, it calls the flush method to send the messages to Telegram.

  3. Flushing: The flush method sends the collected messages to Telegram and clears the deque. It is also called periodically by a separate thread (flush_messages) that sleeps for flush_interval seconds.

  4. Graceful Shutdown: The close method stops the flushing thread and ensures that any remaining messages are sent before the handler is closed.

  5. Error Handling: If sending messages to Telegram fails, an error is logged.

This implementation is efficient and should work well for logging messages to Telegram in batches while ensuring thread safety.