use one thread lock and to implement the follow python code. make it as fast and effective as possible.
class TelegramHandler(logging.Handler): def init(self, token, chat_id, batch_size=10, flush_interval=5): super().init() self.token = token self.chat_id = chat_id self.batch_size = batch_size self.flush_interval = flush_interval
To implement a TelegramHandler
that uses a single thread lock for logging messages efficiently, we can utilize Python's threading
module. The handler will collect log messages in batches and send them to a Telegram chat at specified intervals. Here's a complete implementation:
import logging
import requests
import threading
import time
from collections import deque
class TelegramHandler(logging.Handler):
def __init__(self, token, chat_id, batch_size=10, flush_interval=5):
super().__init__()
self.token = token
self.chat_id = chat_id
self.batch_size = batch_size
self.flush_interval = flush_interval
self.messages = deque()
self.lock = threading.Lock()
self.running = True
self.flush_thread = threading.Thread(target=self.flush_messages)
self.flush_thread.start()
def emit(self, record):
msg = self.format(record)
with self.lock:
self.messages.append(msg)
if len(self.messages) >= self.batch_size:
self.flush()
def flush(self):
with self.lock:
if self.messages:
batch = list(self.messages)
self.messages.clear()
self.send_to_telegram(batch)
def send_to_telegram(self, messages):
payload = {
'chat_id': self.chat_id,
'text': '\n'.join(messages),
'parse_mode': 'Markdown' # or 'HTML' based on your preference
}
try:
requests.post(f'https://api.telegram.org/bot{self.token}/sendMessage', json=payload)
except Exception as e:
logging.error(f"Failed to send messages to Telegram: {e}")
def flush_messages(self):
while self.running:
time.sleep(self.flush_interval)
self.flush()
def close(self):
self.running = False
self.flush_thread.join()
super().close()
# Example usage:
if __name__ == "__main__":
logger = logging.getLogger("TelegramLogger")
logger.setLevel(logging.DEBUG)
# Replace with your actual Telegram bot token and chat ID
token = "YOUR_TELEGRAM_BOT_TOKEN"
chat_id = "YOUR_CHAT_ID"
telegram_handler = TelegramHandler(token, chat_id)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
telegram_handler.setFormatter(formatter)
logger.addHandler(telegram_handler)
# Example logging
for i in range(25):
logger.info(f"Log message {i + 1}")
time.sleep(1) # Simulate some delay between log messages
# Clean up
telegram_handler.close()
Thread Safety: A threading.Lock
is used to ensure that access to the messages
deque is thread-safe. This prevents race conditions when multiple threads try to log messages simultaneously.
Batching: The emit
method appends messages to a deque. When the number of messages reaches the batch_size
, it calls the flush
method to send the messages to Telegram.
Flushing: The flush
method sends the collected messages to Telegram and clears the deque. It is also called periodically by a separate thread (flush_messages
) that sleeps for flush_interval
seconds.
Graceful Shutdown: The close
method stops the flushing thread and ensures that any remaining messages are sent before the handler is closed.
Error Handling: If sending messages to Telegram fails, an error is logged.
This implementation is efficient and should work well for logging messages to Telegram in batches while ensuring thread safety.