日志记录手册 — Python 文档
记录食谱
- 作者
- 维奈沙吉普
此页面包含许多与日志记录相关的方法,这些方法在过去很有用。
在多个模块中使用日志记录
多次调用 logging.getLogger('someLogger')
返回对同一个记录器对象的引用。 这不仅在同一个模块内如此,而且在跨模块时也是如此,只要它在同一个 Python 解释器进程中。 对同一对象的引用也是如此; 此外,应用程序代码可以在一个模块中定义和配置父记录器,并在单独的模块中创建(但不配置)子记录器,并且所有对子记录器的调用都将传递给父记录器。 这是一个主要模块:
import logging
import auxiliary_module
# create logger with 'spam_application'
logger = logging.getLogger('spam_application')
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# add the handlers to the logger
logger.addHandler(fh)
logger.addHandler(ch)
logger.info('creating an instance of auxiliary_module.Auxiliary')
a = auxiliary_module.Auxiliary()
logger.info('created an instance of auxiliary_module.Auxiliary')
logger.info('calling auxiliary_module.Auxiliary.do_something')
a.do_something()
logger.info('finished auxiliary_module.Auxiliary.do_something')
logger.info('calling auxiliary_module.some_function()')
auxiliary_module.some_function()
logger.info('done with auxiliary_module.some_function()')
这是辅助模块:
import logging
# create logger
module_logger = logging.getLogger('spam_application.auxiliary')
class Auxiliary:
def __init__(self):
self.logger = logging.getLogger('spam_application.auxiliary.Auxiliary')
self.logger.info('creating an instance of Auxiliary')
def do_something(self):
self.logger.info('doing something')
a = 1 + 1
self.logger.info('done doing something')
def some_function():
module_logger.info('received a call to "some_function"')
输出如下所示:
2005-03-23 23:47:11,663 - spam_application - INFO -
creating an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,665 - spam_application.auxiliary.Auxiliary - INFO -
creating an instance of Auxiliary
2005-03-23 23:47:11,665 - spam_application - INFO -
created an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,668 - spam_application - INFO -
calling auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,668 - spam_application.auxiliary.Auxiliary - INFO -
doing something
2005-03-23 23:47:11,669 - spam_application.auxiliary.Auxiliary - INFO -
done doing something
2005-03-23 23:47:11,670 - spam_application - INFO -
finished auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,671 - spam_application - INFO -
calling auxiliary_module.some_function()
2005-03-23 23:47:11,672 - spam_application.auxiliary - INFO -
received a call to 'some_function'
2005-03-23 23:47:11,673 - spam_application - INFO -
done with auxiliary_module.some_function()
从多个线程记录
从多个线程进行日志记录不需要特别的努力。 以下示例显示了来自主(初始)线程和另一个线程的日志记录:
import logging
import threading
import time
def worker(arg):
while not arg['stop']:
logging.debug('Hi from myfunc')
time.sleep(0.5)
def main():
logging.basicConfig(level=logging.DEBUG, format='%(relativeCreated)6d %(threadName)s %(message)s')
info = {'stop': False}
thread = threading.Thread(target=worker, args=(info,))
thread.start()
while True:
try:
logging.debug('Hello from main')
time.sleep(0.75)
except KeyboardInterrupt:
info['stop'] = True
break
thread.join()
if __name__ == '__main__':
main()
运行时,脚本应打印如下内容:
0 Thread-1 Hi from myfunc
3 MainThread Hello from main
505 Thread-1 Hi from myfunc
755 MainThread Hello from main
1007 Thread-1 Hi from myfunc
1507 MainThread Hello from main
1508 Thread-1 Hi from myfunc
2010 Thread-1 Hi from myfunc
2258 MainThread Hello from main
2512 Thread-1 Hi from myfunc
3009 MainThread Hello from main
3013 Thread-1 Hi from myfunc
3515 Thread-1 Hi from myfunc
3761 MainThread Hello from main
4017 Thread-1 Hi from myfunc
4513 MainThread Hello from main
4518 Thread-1 Hi from myfunc
这显示了日志输出,正如人们所期望的那样。 当然,这种方法适用于比此处显示的更多的线程。
多个处理程序和格式化程序
记录器是普通的 Python 对象。 addHandler() 方法对于您可以添加的处理程序数量没有最小或最大配额。 有时,应用程序将所有严重性的所有消息记录到文本文件中,同时将错误或以上记录到控制台是有益的。 要设置它,只需配置适当的处理程序。 应用程序代码中的日志调用将保持不变。 下面是对之前简单的基于模块的配置示例的轻微修改:
import logging
logger = logging.getLogger('simple_example')
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
fh.setFormatter(formatter)
# add the handlers to logger
logger.addHandler(ch)
logger.addHandler(fh)
# 'application' code
logger.debug('debug message')
logger.info('info message')
logger.warning('warn message')
logger.error('error message')
logger.critical('critical message')
请注意,“应用程序”代码不关心多个处理程序。 改变的只是添加和配置了一个名为 fh 的新处理程序。
在编写和测试应用程序时,创建具有更高或更低严重性过滤器的新处理程序的能力非常有用。 与其使用许多 print
语句进行调试,不如使用 logger.debug
:与 print 语句不同,稍后您必须将其删除或注释掉,logger.debug 语句可以在源代码中保持完整并保持休眠状态,直到您再次需要它们。 那时,唯一需要发生的变化是修改记录器和/或处理程序的严重性级别以进行调试。
记录到多个目的地
假设您想在不同的情况下使用不同的消息格式登录到控制台和文件。 假设您要将 DEBUG 及更高级别的消息记录到文件中,并将 INFO 及更高级别的消息记录到控制台。 我们还假设文件应该包含时间戳,但控制台消息不应该。 以下是您如何实现这一目标:
import logging
# set up logging to file - see previous section for more details
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
datefmt='%m-%d %H:%M',
filename='/temp/myapp.log',
filemode='w')
# define a Handler which writes INFO messages or higher to the sys.stderr
console = logging.StreamHandler()
console.setLevel(logging.INFO)
# set a format which is simpler for console use
formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
# tell the handler to use this format
console.setFormatter(formatter)
# add the handler to the root logger
logging.getLogger('').addHandler(console)
# Now, we can log to the root logger, or any other logger. First the root...
logging.info('Jackdaws love my big sphinx of quartz.')
# Now, define a couple of other loggers which might represent areas in your
# application:
logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')
logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')
当你运行它时,在控制台上你会看到
root : INFO Jackdaws love my big sphinx of quartz.
myapp.area1 : INFO How quickly daft jumping zebras vex.
myapp.area2 : WARNING Jail zesty vixen who grabbed pay from quack.
myapp.area2 : ERROR The five boxing wizards jump quickly.
在文件中你会看到类似的东西
10-22 22:19 root INFO Jackdaws love my big sphinx of quartz.
10-22 22:19 myapp.area1 DEBUG Quick zephyrs blow, vexing daft Jim.
10-22 22:19 myapp.area1 INFO How quickly daft jumping zebras vex.
10-22 22:19 myapp.area2 WARNING Jail zesty vixen who grabbed pay from quack.
10-22 22:19 myapp.area2 ERROR The five boxing wizards jump quickly.
如您所见,DEBUG 消息仅显示在文件中。 其他消息被发送到两个目的地。
此示例使用控制台和文件处理程序,但您可以使用您选择的任意数量和组合的处理程序。
配置服务器示例
下面是一个使用日志配置服务器的模块示例:
import logging
import logging.config
import time
import os
# read initial config file
logging.config.fileConfig('logging.conf')
# create and start listener on port 9999
t = logging.config.listen(9999)
t.start()
logger = logging.getLogger('simpleExample')
try:
# loop through logging calls to see the difference
# new configurations make, until Ctrl+C is pressed
while True:
logger.debug('debug message')
logger.info('info message')
logger.warning('warn message')
logger.error('error message')
logger.critical('critical message')
time.sleep(5)
except KeyboardInterrupt:
# cleanup
logging.config.stopListening()
t.join()
这是一个脚本,它接受一个文件名并将该文件发送到服务器,正确地以二进制编码的长度开头,作为新的日志配置:
#!/usr/bin/env python
import socket, sys, struct
with open(sys.argv[1], 'rb') as f:
data_to_send = f.read()
HOST = 'localhost'
PORT = 9999
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print('connecting...')
s.connect((HOST, PORT))
print('sending config...')
s.send(struct.pack('>L', len(data_to_send)))
s.send(data_to_send)
s.close()
print('complete')
处理阻塞的处理程序
有时您必须让您的日志处理程序在不阻塞您正在记录的线程的情况下完成它们的工作。 这在 Web 应用程序中很常见,当然它也会出现在其他场景中。
表现出缓慢行为的一个常见罪魁祸首是 SMTPHandler:由于开发人员无法控制的多种原因(例如,性能不佳的邮件或网络基础设施),发送电子邮件可能需要很长时间。 但是几乎所有基于网络的处理程序都可以阻止:即使是 SocketHandler 操作也可能在引擎盖下进行 DNS 查询,这太慢了(并且此查询可能位于 Python 层以下的套接字库代码深处,并且超出您的控制范围)。
一种解决方案是使用两部分方法。 对于第一部分,仅将 QueueHandler 附加到那些从性能关键线程访问的记录器。 他们只是简单地写入他们的队列,队列的大小可以设置为足够大的容量,也可以在没有大小上限的情况下进行初始化。 对队列的写入通常会很快被接受,但您可能需要在代码中捕获 queue.Full 异常作为预防措施。 如果您是在其代码中具有性能关键线程的库开发人员,请务必将其记录下来(连同建议仅将 QueueHandlers
附加到您的记录器),以便其他将使用您代码的开发人员受益.
解决方案的第二部分是 QueueListener,它被设计为与 QueueHandler 对应。 QueueListener 非常简单:它传递一个队列和一些处理程序,并启动一个内部线程,该线程侦听其队列以获取从 QueueHandlers
(或 LogRecords
,就此而言)。 LogRecords
从队列中移除并传递给处理程序进行处理。
拥有一个单独的 QueueListener 类的好处是你可以使用同一个实例来服务多个 QueueHandlers
。 这比现有处理程序类的线程版本更加资源友好,后者会占用每个处理程序一个线程而没有特别的好处。
下面是使用这两个类的示例(省略了导入):
que = queue.Queue(-1) # no limit on size
queue_handler = QueueHandler(que)
handler = logging.StreamHandler()
listener = QueueListener(que, handler)
root = logging.getLogger()
root.addHandler(queue_handler)
formatter = logging.Formatter('%(threadName)s: %(message)s')
handler.setFormatter(formatter)
listener.start()
# The log output will display the thread which generated
# the event (the main thread) rather than the internal
# thread which monitors the internal queue. This is what
# you want to happen.
root.warning('Look out!')
listener.stop()
运行时,将产生:
MainThread: Look out!
在 3.5 版中更改: 在 Python 3.5 之前,QueueListener 总是将从队列接收的每条消息传递给每个初始化它的处理程序。 (这是因为假设级别过滤全部在队列已填充的另一侧完成。)从 3.5 开始,可以通过将关键字参数 respect_handler_level=True
传递给侦听器的构造函数来更改此行为。 完成后,侦听器将每个消息的级别与处理程序的级别进行比较,并且仅在适当的情况下将消息传递给处理程序。
通过网络发送和接收日志事件
假设您想通过网络发送日志事件,并在接收端处理它们。 一个简单的方法是将 SocketHandler 实例附加到发送端的根记录器:
import logging, logging.handlers
rootLogger = logging.getLogger('')
rootLogger.setLevel(logging.DEBUG)
socketHandler = logging.handlers.SocketHandler('localhost',
logging.handlers.DEFAULT_TCP_LOGGING_PORT)
# don't bother with a formatter, since a socket handler sends the event as
# an unformatted pickle
rootLogger.addHandler(socketHandler)
# Now, we can log to the root logger, or any other logger. First the root...
logging.info('Jackdaws love my big sphinx of quartz.')
# Now, define a couple of other loggers which might represent areas in your
# application:
logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')
logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')
在接收端,您可以使用 socketserver 模块设置接收器。 这是一个基本的工作示例:
import pickle
import logging
import logging.handlers
import socketserver
import struct
class LogRecordStreamHandler(socketserver.StreamRequestHandler):
"""Handler for a streaming logging request.
This basically logs the record using whatever logging policy is
configured locally.
"""
def handle(self):
"""
Handle multiple requests - each expected to be a 4-byte length,
followed by the LogRecord in pickle format. Logs the record
according to whatever policy is configured locally.
"""
while True:
chunk = self.connection.recv(4)
if len(chunk) < 4:
break
slen = struct.unpack('>L', chunk)[0]
chunk = self.connection.recv(slen)
while len(chunk) < slen:
chunk = chunk + self.connection.recv(slen - len(chunk))
obj = self.unPickle(chunk)
record = logging.makeLogRecord(obj)
self.handleLogRecord(record)
def unPickle(self, data):
return pickle.loads(data)
def handleLogRecord(self, record):
# if a name is specified, we use the named logger rather than the one
# implied by the record.
if self.server.logname is not None:
name = self.server.logname
else:
name = record.name
logger = logging.getLogger(name)
# N.B. EVERY record gets logged. This is because Logger.handle
# is normally called AFTER logger-level filtering. If you want
# to do filtering, do it at the client end to save wasting
# cycles and network bandwidth!
logger.handle(record)
class LogRecordSocketReceiver(socketserver.ThreadingTCPServer):
"""
Simple TCP socket-based logging receiver suitable for testing.
"""
allow_reuse_address = True
def __init__(self, host='localhost',
port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
handler=LogRecordStreamHandler):
socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
self.abort = 0
self.timeout = 1
self.logname = None
def serve_until_stopped(self):
import select
abort = 0
while not abort:
rd, wr, ex = select.select([self.socket.fileno()],
[], [],
self.timeout)
if rd:
self.handle_request()
abort = self.abort
def main():
logging.basicConfig(
format='%(relativeCreated)5d %(name)-15s %(levelname)-8s %(message)s')
tcpserver = LogRecordSocketReceiver()
print('About to start TCP server...')
tcpserver.serve_until_stopped()
if __name__ == '__main__':
main()
首先运行服务器,然后是客户端。 在客户端,控制台上没有打印任何内容; 在服务器端,您应该看到如下内容:
About to start TCP server...
59 root INFO Jackdaws love my big sphinx of quartz.
59 myapp.area1 DEBUG Quick zephyrs blow, vexing daft Jim.
69 myapp.area1 INFO How quickly daft jumping zebras vex.
69 myapp.area2 WARNING Jail zesty vixen who grabbed pay from quack.
69 myapp.area2 ERROR The five boxing wizards jump quickly.
请注意,在某些情况下,pickle 存在一些安全问题。 如果这些影响您,您可以通过覆盖 makePickle()
方法并在那里实现您的替代方案,以及调整上述脚本以使用您的替代序列化方案来使用替代序列化方案。
将上下文信息添加到日志输出
有时,除了传递给日志记录调用的参数之外,您还希望日志记录输出包含上下文信息。 例如,在网络应用程序中,可能需要在日志中记录客户端特定的信息(例如 远程客户端的用户名或 IP 地址)。 尽管您可以使用 extra 参数来实现这一点,但以这种方式传递信息并不总是很方便。 虽然在每个连接的基础上创建 Logger
实例可能很诱人,但这不是一个好主意,因为这些实例不会被垃圾收集。 虽然这在实践中不是问题,但当 Logger
实例的数量取决于您要在记录应用程序时使用的粒度级别时,如果 [ X212X] 实例实际上是无界的。
使用 LoggerAdapters 传递上下文信息
传递要输出的上下文信息以及日志事件信息的一种简单方法是使用 LoggerAdapter
类。 这个类被设计成一个Logger
,这样你就可以调用debug()
、info()
、warning()
、error()
、exception()
、critical()
和 log()
。 这些方法与 Logger
中的对应方法具有相同的签名,因此您可以互换使用这两种类型的实例。
当你创建一个 LoggerAdapter
的实例时,你传递给它一个 Logger
实例和一个包含上下文信息的类似 dict 的对象。 当您在 LoggerAdapter
的实例上调用其中一种日志记录方法时,它会将调用委托给传递给其构造函数的 Logger
的底层实例,并安排在委托调用中传递上下文信息. 这是 LoggerAdapter
的代码片段:
def debug(self, msg, /, *args, **kwargs):
"""
Delegate a debug call to the underlying logger, after adding
contextual information from this adapter instance.
"""
msg, kwargs = self.process(msg, kwargs)
self.logger.debug(msg, *args, **kwargs)
LoggerAdapter
的 process()
方法是将上下文信息添加到日志输出的地方。 它传递了日志记录调用的消息和关键字参数,并将它们的(可能)修改版本传回以在对底层记录器的调用中使用。 此方法的默认实现不处理消息,而是在关键字参数中插入一个“额外”键,其值是传递给构造函数的类似 dict 的对象。 当然,如果您在对适配器的调用中传递了“额外”关键字参数,它将被静默覆盖。
使用 'extra' 的优点是类似 dict 的对象中的值被合并到 LogRecord
实例的 __dict__ 中,允许您将自定义字符串与知道键的 Formatter
实例一起使用类似 dict 的对象。 如果您需要不同的方法,例如 如果您想在消息字符串中添加或附加上下文信息,您只需要子类化 LoggerAdapter
并覆盖 process()
来执行您需要的操作。 这是一个简单的例子:
class CustomAdapter(logging.LoggerAdapter):
"""
This example adapter expects the passed in dict-like object to have a
'connid' key, whose value in brackets is prepended to the log message.
"""
def process(self, msg, kwargs):
return '[%s] %s' % (self.extra['connid'], msg), kwargs
你可以这样使用:
logger = logging.getLogger(__name__)
adapter = CustomAdapter(logger, {'connid': some_conn_id})
然后,您记录到适配器的任何事件都将在日志消息前面加上 some_conn_id
的值。
使用 dicts 以外的对象传递上下文信息
您不需要将实际的 dict 传递给 LoggerAdapter
- 您可以传递一个实现 __getitem__
和 __iter__
的类的实例,使其看起来像一个 dict日志记录。 如果您想动态生成值(而 dict 中的值将是常量),这将很有用。
使用过滤器传递上下文信息
您还可以使用用户定义的 Filter
将上下文信息添加到日志输出。 Filter
实例可以修改传递给它们的 LogRecords
,包括添加额外的属性,然后可以使用合适的格式字符串输出,或者如果需要自定义 Formatter
。
例如,在 Web 应用程序中,正在处理的请求(或至少是其中有趣的部分)可以存储在线程本地 (threading.local) 变量中,然后从 [ X202X] 将来自请求的信息(例如远程 IP 地址和远程用户的用户名)添加到 LogRecord
,使用 [ X382X] 上面的例子。 在这种情况下,可以使用相同的格式字符串来获得与上述类似的输出。 这是一个示例脚本:
import logging
from random import choice
class ContextFilter(logging.Filter):
"""
This is a filter which injects contextual information into the log.
Rather than use actual contextual information, we just use random
data in this demo.
"""
USERS = ['jim', 'fred', 'sheila']
IPS = ['123.231.231.123', '127.0.0.1', '192.168.0.1']
def filter(self, record):
record.ip = choice(ContextFilter.IPS)
record.user = choice(ContextFilter.USERS)
return True
if __name__ == '__main__':
levels = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL)
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)-15s %(name)-5s %(levelname)-8s IP: %(ip)-15s User: %(user)-8s %(message)s')
a1 = logging.getLogger('a.b.c')
a2 = logging.getLogger('d.e.f')
f = ContextFilter()
a1.addFilter(f)
a2.addFilter(f)
a1.debug('A debug message')
a1.info('An info message with %s', 'some parameters')
for x in range(10):
lvl = choice(levels)
lvlname = logging.getLevelName(lvl)
a2.log(lvl, 'A message at %s level with %d %s', lvlname, 2, 'parameters')
它在运行时会产生如下结果:
2010-09-06 22:38:15,292 a.b.c DEBUG IP: 123.231.231.123 User: fred A debug message
2010-09-06 22:38:15,300 a.b.c INFO IP: 192.168.0.1 User: sheila An info message with some parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1 User: sheila A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR IP: 127.0.0.1 User: jim A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG IP: 127.0.0.1 User: sheila A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR IP: 123.231.231.123 User: fred A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 192.168.0.1 User: jim A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1 User: sheila A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG IP: 192.168.0.1 User: jim A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f ERROR IP: 127.0.0.1 User: sheila A message at ERROR level with 2 parameters
2010-09-06 22:38:15,301 d.e.f DEBUG IP: 123.231.231.123 User: fred A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f INFO IP: 123.231.231.123 User: fred A message at INFO level with 2 parameters
从多个进程记录到单个文件
尽管日志记录是线程安全的,并且支持从单个进程中的多个线程记录到单个文件 是 ,但从 多个进程 记录到单个文件是 不是 ] 支持,因为没有标准方法可以在 Python 中跨多个进程序列化对单个文件的访问。 如果您需要从多个进程登录到单个文件,一种方法是让所有进程登录到 SocketHandler
,并有一个单独的进程来实现从套接字读取和记录到文件。 (如果您愿意,可以在现有进程之一中指定一个线程来执行此功能。) 本节 更详细地记录了这种方法,并包括一个可用作起点的工作套接字接收器以便您适应自己的应用程序。
您还可以编写自己的处理程序,该处理程序使用 multiprocessing 模块中的 Lock 类来序列化进程对文件的访问。 现有的 FileHandler
和子类目前没有使用 multiprocessing,尽管它们将来可能会这样做。 请注意,目前,multiprocessing 模块并未在所有平台上提供工作锁功能(参见 https://bugs.python.org/issue3770)。
或者,您可以使用 Queue
和 QueueHandler 将所有日志事件发送到多进程应用程序中的进程之一。 以下示例脚本演示了如何执行此操作; 在示例中,一个单独的侦听器进程侦听其他进程发送的事件,并根据其自己的日志记录配置记录这些事件。 尽管该示例仅演示了一种实现方式(例如,您可能希望使用侦听器线程而不是单独的侦听器进程 - 实现将类似),但它确实允许侦听器和其他进程使用完全不同的日志记录配置在您的应用程序中,并且可以用作满足您自己的特定要求的代码的基础:
# You'll need these imports in your own code
import logging
import logging.handlers
import multiprocessing
# Next two import lines for this demo only
from random import choice, random
import time
#
# Because you'll want to define the logging configurations for listener and workers, the
# listener and worker process functions take a configurer parameter which is a callable
# for configuring logging for that process. These functions are also passed the queue,
# which they use for communication.
#
# In practice, you can configure the listener however you want, but note that in this
# simple example, the listener does not apply level or filter logic to received records.
# In practice, you would probably want to do this logic in the worker processes, to avoid
# sending events which would be filtered out between processes.
#
# The size of the rotated files is made small so you can see the results easily.
def listener_configurer():
root = logging.getLogger()
h = logging.handlers.RotatingFileHandler('mptest.log', 'a', 300, 10)
f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
h.setFormatter(f)
root.addHandler(h)
# This is the listener process top-level loop: wait for logging events
# (LogRecords)on the queue and handle them, quit when you get a None for a
# LogRecord.
def listener_process(queue, configurer):
configurer()
while True:
try:
record = queue.get()
if record is None: # We send this as a sentinel to tell the listener to quit.
break
logger = logging.getLogger(record.name)
logger.handle(record) # No level or filter logic applied - just do it!
except Exception:
import sys, traceback
print('Whoops! Problem:', file=sys.stderr)
traceback.print_exc(file=sys.stderr)
# Arrays used for random selections in this demo
LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL]
LOGGERS = ['a.b.c', 'd.e.f']
MESSAGES = [
'Random message #1',
'Random message #2',
'Random message #3',
]
# The worker configuration is done at the start of the worker process run.
# Note that on Windows you can't rely on fork semantics, so each process
# will run the logging configuration code when it starts.
def worker_configurer(queue):
h = logging.handlers.QueueHandler(queue) # Just the one handler needed
root = logging.getLogger()
root.addHandler(h)
# send all messages, for demo; no other level or filter logic applied.
root.setLevel(logging.DEBUG)
# This is the worker process top-level loop, which just logs ten events with
# random intervening delays before terminating.
# The print messages are just so you know it's doing something!
def worker_process(queue, configurer):
configurer(queue)
name = multiprocessing.current_process().name
print('Worker started: %s' % name)
for i in range(10):
time.sleep(random())
logger = logging.getLogger(choice(LOGGERS))
level = choice(LEVELS)
message = choice(MESSAGES)
logger.log(level, message)
print('Worker finished: %s' % name)
# Here's where the demo gets orchestrated. Create the queue, create and start
# the listener, create ten workers and start them, wait for them to finish,
# then send a None to the queue to tell the listener to finish.
def main():
queue = multiprocessing.Queue(-1)
listener = multiprocessing.Process(target=listener_process,
args=(queue, listener_configurer))
listener.start()
workers = []
for i in range(10):
worker = multiprocessing.Process(target=worker_process,
args=(queue, worker_configurer))
workers.append(worker)
worker.start()
for w in workers:
w.join()
queue.put_nowait(None)
listener.join()
if __name__ == '__main__':
main()
上述脚本的一个变体将日志记录在主进程中,在一个单独的线程中:
import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue
import random
import threading
import time
def logger_thread(q):
while True:
record = q.get()
if record is None:
break
logger = logging.getLogger(record.name)
logger.handle(record)
def worker_process(q):
qh = logging.handlers.QueueHandler(q)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(qh)
levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL]
loggers = ['foo', 'foo.bar', 'foo.bar.baz',
'spam', 'spam.ham', 'spam.ham.eggs']
for i in range(100):
lvl = random.choice(levels)
logger = logging.getLogger(random.choice(loggers))
logger.log(lvl, 'Message no. %d', i)
if __name__ == '__main__':
q = Queue()
d = {
'version': 1,
'formatters': {
'detailed': {
'class': 'logging.Formatter',
'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'INFO',
},
'file': {
'class': 'logging.FileHandler',
'filename': 'mplog.log',
'mode': 'w',
'formatter': 'detailed',
},
'foofile': {
'class': 'logging.FileHandler',
'filename': 'mplog-foo.log',
'mode': 'w',
'formatter': 'detailed',
},
'errors': {
'class': 'logging.FileHandler',
'filename': 'mplog-errors.log',
'mode': 'w',
'level': 'ERROR',
'formatter': 'detailed',
},
},
'loggers': {
'foo': {
'handlers': ['foofile']
}
},
'root': {
'level': 'DEBUG',
'handlers': ['console', 'file', 'errors']
},
}
workers = []
for i in range(5):
wp = Process(target=worker_process, name='worker %d' % (i + 1), args=(q,))
workers.append(wp)
wp.start()
logging.config.dictConfig(d)
lp = threading.Thread(target=logger_thread, args=(q,))
lp.start()
# At this point, the main process could do some useful work of its own
# Once it's done that, it can wait for the workers to terminate...
for wp in workers:
wp.join()
# And now tell the logging thread to finish up, too
q.put(None)
lp.join()
这个变体展示了你如何可以例如 为特定记录器应用配置 - 例如 foo
记录器有一个特殊的处理程序,它将 foo
子系统中的所有事件存储在一个文件 mplog-foo.log
中。 这将被主进程中的日志机器使用(即使日志事件是在工作进程中生成的)来将消息定向到适当的目的地。
使用 concurrent.futures.ProcessPoolExecutor
如果你想使用 concurrent.futures.ProcessPoolExecutor 来启动你的工作进程,你需要稍微不同地创建队列。 代替
queue = multiprocessing.Queue(-1)
你应该使用
queue = multiprocessing.Manager().Queue(-1) # also works with the examples above
然后您可以从此替换工作线程创建:
workers = []
for i in range(10):
worker = multiprocessing.Process(target=worker_process,
args=(queue, worker_configurer))
workers.append(worker)
worker.start()
for w in workers:
w.join()
对此(记得先导入 concurrent.futures):
with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
for i in range(10):
executor.submit(worker_process, queue, worker_configurer)
使用文件轮换
有时您想让日志文件增长到特定大小,然后打开一个新文件并记录到该文件。 您可能希望保留一定数量的这些文件,并在创建了这么多文件后,旋转这些文件,以便文件数量和文件大小都保持有界。 对于这种使用模式,日志包提供了一个 RotatingFileHandler
:
import glob
import logging
import logging.handlers
LOG_FILENAME = 'logging_rotatingfile_example.out'
# Set up a specific logger with our desired output level
my_logger = logging.getLogger('MyLogger')
my_logger.setLevel(logging.DEBUG)
# Add the log message handler to the logger
handler = logging.handlers.RotatingFileHandler(
LOG_FILENAME, maxBytes=20, backupCount=5)
my_logger.addHandler(handler)
# Log some messages
for i in range(20):
my_logger.debug('i = %d' % i)
# See what files are created
logfiles = glob.glob('%s*' % LOG_FILENAME)
for filename in logfiles:
print(filename)
结果应该是 6 个单独的文件,每个文件都包含应用程序的日志历史记录的一部分:
logging_rotatingfile_example.out
logging_rotatingfile_example.out.1
logging_rotatingfile_example.out.2
logging_rotatingfile_example.out.3
logging_rotatingfile_example.out.4
logging_rotatingfile_example.out.5
最新的文件始终是 logging_rotatingfile_example.out
,每次达到大小限制时,都会使用后缀 .1
重命名。 每个现有备份文件都被重命名以增加后缀(.1
变为 .2
等),并删除 .6
文件。
显然,这个例子将日志长度设置得太小作为一个极端的例子。 您可能希望将 maxBytes 设置为适当的值。
使用替代格式样式
当日志被添加到 Python 标准库时,格式化具有可变内容的消息的唯一方法是使用 %-f 格式化方法。 从那时起,Python 获得了两种新的格式化方法:string.Template(在 Python 2.4 中添加)和 str.format()(在 Python 2.6 中添加)。
日志记录(从 3.2 开始)提供了对这两种附加格式样式的改进支持。 Formatter
类已得到增强,以采用名为 style
的附加可选关键字参数。 默认为 '%'
,但其他可能的值为 '{'
和 '$'
,它们对应于其他两种格式样式。 默认情况下保持向后兼容性(如您所料),但通过显式指定样式参数,您可以指定与 str.format() 或 string.Template 一起使用的格式字符串。 这是一个示例控制台会话来显示可能性:
>>> import logging
>>> root = logging.getLogger()
>>> root.setLevel(logging.DEBUG)
>>> handler = logging.StreamHandler()
>>> bf = logging.Formatter('{asctime} {name} {levelname:8s} {message}',
... style='{')
>>> handler.setFormatter(bf)
>>> root.addHandler(handler)
>>> logger = logging.getLogger('foo.bar')
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:11:55,341 foo.bar DEBUG This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:12:11,526 foo.bar CRITICAL This is a CRITICAL message
>>> df = logging.Formatter('$asctime $name ${levelname} $message',
... style='$')
>>> handler.setFormatter(df)
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:13:06,924 foo.bar DEBUG This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:13:11,494 foo.bar CRITICAL This is a CRITICAL message
>>>
请注意,最终输出到日志的日志消息的格式完全独立于单个日志消息的构造方式。 那仍然可以使用 %-f 格式,如下所示:
>>> logger.error('This is an%s %s %s', 'other,', 'ERROR,', 'message')
2010-10-28 15:19:29,833 foo.bar ERROR This is another, ERROR, message
>>>
日志调用(logger.debug()
、logger.info()
等)仅采用实际日志消息本身的位置参数,关键字参数仅用于确定如何处理实际日志调用的选项(例如 exc_info
关键字参数指示应记录回溯信息,或 extra
关键字参数指示要添加到日志的其他上下文信息)。 所以你不能直接使用 str.format() 或 string.Template 语法进行日志调用,因为日志包在内部使用 %-f 格式来合并格式字符串和变量参数. 在保持向后兼容性的同时不会改变这一点,因为现有代码中的所有日志记录调用都将使用 %-f 格式字符串。
但是,有一种方法可以使用 {}- 和 $- 格式来构建您的个人日志消息。 回想一下,对于消息,您可以使用任意对象作为消息格式字符串,并且日志记录包将对该对象调用 str()
以获取实际格式字符串。 考虑以下两个类:
class BraceMessage:
def __init__(self, fmt, /, *args, **kwargs):
self.fmt = fmt
self.args = args
self.kwargs = kwargs
def __str__(self):
return self.fmt.format(*self.args, **self.kwargs)
class DollarMessage:
def __init__(self, fmt, /, **kwargs):
self.fmt = fmt
self.kwargs = kwargs
def __str__(self):
from string import Template
return Template(self.fmt).substitute(**self.kwargs)
其中任何一个都可以用来代替格式字符串,以允许使用 {}- 或 $-formatting 来构建实际的“消息”部分,该部分出现在格式化的日志输出中,代替“%(message)s”或“ {message}”或“$message”。 每当您想记录某些内容时都使用类名有点笨拙,但是如果您使用诸如 __(双下划线 - 不要与 _ 混淆,单下划线用作 [ X236X]gettext.gettext() 或其兄弟)。
上述类不包含在 Python 中,尽管它们很容易复制并粘贴到您自己的代码中。 它们可以按如下方式使用(假设它们是在名为 wherever
的模块中声明的):
>>> from wherever import BraceMessage as __
>>> print(__('Message with {0} {name}', 2, name='placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})',
... point=p))
Message with coordinates: (0.50, 0.50)
>>> from wherever import DollarMessage as __
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>
虽然上面的例子使用 print()
来显示格式是如何工作的,你当然会使用 logger.debug()
或类似的方法来实际使用这种方法记录日志。
需要注意的一件事是,使用这种方法不会造成显着的性能损失:实际格式化不是在您进行日志记录调用时发生,而是在(以及如果)记录的消息实际上即将由处理程序输出到日志时发生。 因此,唯一可能让您感到困惑的稍微不寻常的事情是括号围绕格式字符串和参数,而不仅仅是格式字符串。 那是因为 __ 符号只是对 XXXMessage 类之一的构造函数调用的语法糖。
如果您愿意,可以使用 LoggerAdapter
来实现与上述类似的效果,如下例所示:
import logging
class Message:
def __init__(self, fmt, args):
self.fmt = fmt
self.args = args
def __str__(self):
return self.fmt.format(*self.args)
class StyleAdapter(logging.LoggerAdapter):
def __init__(self, logger, extra=None):
super().__init__(logger, extra or {})
def log(self, level, msg, /, *args, **kwargs):
if self.isEnabledFor(level):
msg, kwargs = self.process(msg, kwargs)
self.logger._log(level, Message(msg, args), (), **kwargs)
logger = StyleAdapter(logging.getLogger(__name__))
def main():
logger.debug('Hello, {}', 'world!')
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
main()
当使用 Python 3.2 或更高版本运行时,上述脚本应记录消息 Hello, world!
。
定制 LogRecord
每个日志事件都由一个 LogRecord 实例表示。 当一个事件被记录并且没有被记录器级别过滤掉时,一个 LogRecord 被创建,填充有关于事件的信息,然后传递给该记录器的处理程序(及其祖先,直到并包括记录器,其中禁止进一步向上传播层次结构)。 在 Python 3.2 之前,只有两个地方完成了这个创建:
- Logger.makeRecord(),在记录事件的正常过程中调用。 这直接调用了 LogRecord 来创建一个实例。
- makeLogRecord(),使用包含要添加到 LogRecord 的属性的字典调用。 这通常在通过网络接收到合适的字典时调用(例如 通过 SocketHandler 以 pickle 形式,或通过 HTTPHandler 以 JSON 形式)。
这通常意味着如果您需要对 LogRecord 执行任何特殊操作,则必须执行以下操作之一。
- 创建您自己的 Logger 子类,该子类覆盖 Logger.makeRecord(),并在实例化您关心的任何记录器之前使用 setLoggerClass() 对其进行设置。
- 将 Filter 添加到记录器或处理程序,它会在调用 filter() 方法时执行您需要的必要特殊操作。
在(比如说)几个不同的库想要做不同的事情的情况下,第一种方法会有点笨拙。 每个人都会尝试设置自己的 Logger 子类,最后一个这样做的人将获胜。
第二种方法在许多情况下工作得相当好,但不允许您例如 使用 LogRecord 的专门子类。 库开发人员可以在他们的记录器上设置一个合适的过滤器,但他们必须记住每次引入新的记录器时都这样做(他们只需添加新的包或模块并执行
logger = logging.getLogger(__name__)
在模块级别)。 这可能是要考虑的太多事情。 开发人员还可以将过滤器添加到附加到其顶级记录器的 NullHandler,但如果应用程序开发人员将处理程序附加到较低级别的库记录器,则不会调用该过滤器 - 因此该处理程序的输出将不反映库开发人员的意图。
在 Python 3.2 及更高版本中,LogRecord 创建是通过工厂完成的,您可以指定该工厂。 工厂只是一个可调用的,您可以使用 setLogRecordFactory() 进行设置,并使用 getLogRecordFactory() 进行查询。 使用与 LogRecord 构造函数相同的签名调用工厂,因为 LogRecord 是工厂的默认设置。
这种方法允许自定义工厂控制 LogRecord 创建的所有方面。 例如,您可以返回一个子类,或者在创建后仅向记录添加一些附加属性,使用类似于以下的模式:
old_factory = logging.getLogRecordFactory()
def record_factory(*args, **kwargs):
record = old_factory(*args, **kwargs)
record.custom_attribute = 0xdecafbad
return record
logging.setLogRecordFactory(record_factory)
这种模式允许不同的库将工厂链接在一起,只要它们不覆盖彼此的属性或无意覆盖作为标准提供的属性,就不会出现意外。 但是,应该记住,链中的每个链接都会为所有日志记录操作增加运行时开销,并且只有在使用 过滤器 不能提供所需结果时才应使用该技术。
子类化 QueueHandler - ZeroMQ 示例
您可以使用 QueueHandler
子类将消息发送到其他类型的队列,例如 ZeroMQ 'publish' 套接字。 在下面的示例中,套接字被单独创建并传递给处理程序(作为它的“队列”):
import zmq # using pyzmq, the Python binding for ZeroMQ
import json # for serializing records portably
ctx = zmq.Context()
sock = zmq.Socket(ctx, zmq.PUB) # or zmq.PUSH, or other suitable value
sock.bind('tcp://*:5556') # or wherever
class ZeroMQSocketHandler(QueueHandler):
def enqueue(self, record):
self.queue.send_json(record.__dict__)
handler = ZeroMQSocketHandler(sock)
当然,还有其他组织方式,例如传入处理程序创建套接字所需的数据:
class ZeroMQSocketHandler(QueueHandler):
def __init__(self, uri, socktype=zmq.PUB, ctx=None):
self.ctx = ctx or zmq.Context()
socket = zmq.Socket(self.ctx, socktype)
socket.bind(uri)
super().__init__(socket)
def enqueue(self, record):
self.queue.send_json(record.__dict__)
def close(self):
self.queue.close()
子类化 QueueListener - ZeroMQ 示例
您还可以将 QueueListener
子类化以从其他类型的队列获取消息,例如 ZeroMQ“订阅”套接字。 下面是一个例子:
class ZeroMQSocketListener(QueueListener):
def __init__(self, uri, /, *handlers, **kwargs):
self.ctx = kwargs.get('ctx') or zmq.Context()
socket = zmq.Socket(self.ctx, zmq.SUB)
socket.setsockopt_string(zmq.SUBSCRIBE, '') # subscribe to everything
socket.connect(uri)
super().__init__(socket, *handlers, **kwargs)
def dequeue(self):
msg = self.queue.recv_json()
return logging.makeLogRecord(msg)
基于字典的配置示例
下面是一个日志配置字典的例子——它取自 Django 项目 上的 文档。 这个字典被传递给 dictConfig() 以使配置生效:
LOGGING = {
'version': 1,
'disable_existing_loggers': True,
'formatters': {
'verbose': {
'format': '%(levelname)s %(asctime)s %(module)s %(process)d %(thread)d %(message)s'
},
'simple': {
'format': '%(levelname)s %(message)s'
},
},
'filters': {
'special': {
'()': 'project.logging.SpecialFilter',
'foo': 'bar',
}
},
'handlers': {
'null': {
'level':'DEBUG',
'class':'django.utils.log.NullHandler',
},
'console':{
'level':'DEBUG',
'class':'logging.StreamHandler',
'formatter': 'simple'
},
'mail_admins': {
'level': 'ERROR',
'class': 'django.utils.log.AdminEmailHandler',
'filters': ['special']
}
},
'loggers': {
'django': {
'handlers':['null'],
'propagate': True,
'level':'INFO',
},
'django.request': {
'handlers': ['mail_admins'],
'level': 'ERROR',
'propagate': False,
},
'myproject.custom': {
'handlers': ['console', 'mail_admins'],
'level': 'INFO',
'filters': ['special']
}
}
}
有关此配置的更多信息,您可以查看 Django 文档的 相关部分 。
使用旋转器和命名器自定义日志轮换处理
以下代码段给出了如何定义命名器和旋转器的示例,其中显示了基于 zlib 的日志文件压缩:
def namer(name):
return name + ".gz"
def rotator(source, dest):
with open(source, "rb") as sf:
data = sf.read()
compressed = zlib.compress(data, 9)
with open(dest, "wb") as df:
df.write(compressed)
os.remove(source)
rh = logging.handlers.RotatingFileHandler(...)
rh.rotator = rotator
rh.namer = namer
这些不是“真正的”.gz 文件,因为它们是裸压缩的数据,没有您在实际 gzip 文件中找到的“容器”。 此代码段仅用于说明目的。
一个更精细的多处理示例
以下工作示例显示了如何使用配置文件将日志记录与多处理一起使用。 这些配置相当简单,但用于说明如何在真实的多处理场景中实现更复杂的配置。
在示例中,主进程产生一个侦听器进程和一些工作进程。 每个主进程、侦听器和工作进程都有三个独立的配置(工作进程共享相同的配置)。 我们可以看到主进程中的日志记录、工作线程如何记录到 QueueHandler 以及侦听器如何实现 QueueListener 和更复杂的日志记录配置,并安排将通过队列接收到的事件分派到配置中指定的处理程序。 请注意,这些配置纯粹是说明性的,但您应该能够根据自己的场景调整此示例。
这是脚本 - 文档字符串和注释有望解释它是如何工作的:
import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue, Event, current_process
import os
import random
import time
class MyHandler:
"""
A simple handler for logging events. It runs in the listener process and
dispatches events to loggers based on the name in the received record,
which then get dispatched, by the logging system, to the handlers
configured for those loggers.
"""
def handle(self, record):
if record.name == "root":
logger = logging.getLogger()
else:
logger = logging.getLogger(record.name)
if logger.isEnabledFor(record.levelno):
# The process name is transformed just to show that it's the listener
# doing the logging to files and console
record.processName = '%s (for %s)' % (current_process().name, record.processName)
logger.handle(record)
def listener_process(q, stop_event, config):
"""
This could be done in the main process, but is just done in a separate
process for illustrative purposes.
This initialises logging according to the specified configuration,
starts the listener and waits for the main process to signal completion
via the event. The listener is then stopped, and the process exits.
"""
logging.config.dictConfig(config)
listener = logging.handlers.QueueListener(q, MyHandler())
listener.start()
if os.name == 'posix':
# On POSIX, the setup logger will have been configured in the
# parent process, but should have been disabled following the
# dictConfig call.
# On Windows, since fork isn't used, the setup logger won't
# exist in the child, so it would be created and the message
# would appear - hence the "if posix" clause.
logger = logging.getLogger('setup')
logger.critical('Should not appear, because of disabled logger ...')
stop_event.wait()
listener.stop()
def worker_process(config):
"""
A number of these are spawned for the purpose of illustration. In
practice, they could be a heterogeneous bunch of processes rather than
ones which are identical to each other.
This initialises logging according to the specified configuration,
and logs a hundred messages with random levels to randomly selected
loggers.
A small sleep is added to allow other processes a chance to run. This
is not strictly needed, but it mixes the output from the different
processes a bit more than if it's left out.
"""
logging.config.dictConfig(config)
levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL]
loggers = ['foo', 'foo.bar', 'foo.bar.baz',
'spam', 'spam.ham', 'spam.ham.eggs']
if os.name == 'posix':
# On POSIX, the setup logger will have been configured in the
# parent process, but should have been disabled following the
# dictConfig call.
# On Windows, since fork isn't used, the setup logger won't
# exist in the child, so it would be created and the message
# would appear - hence the "if posix" clause.
logger = logging.getLogger('setup')
logger.critical('Should not appear, because of disabled logger ...')
for i in range(100):
lvl = random.choice(levels)
logger = logging.getLogger(random.choice(loggers))
logger.log(lvl, 'Message no. %d', i)
time.sleep(0.01)
def main():
q = Queue()
# The main process gets a simple configuration which prints to the console.
config_initial = {
'version': 1,
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'INFO'
}
},
'root': {
'handlers': ['console'],
'level': 'DEBUG'
}
}
# The worker process configuration is just a QueueHandler attached to the
# root logger, which allows all messages to be sent to the queue.
# We disable existing loggers to disable the "setup" logger used in the
# parent process. This is needed on POSIX because the logger will
# be there in the child following a fork().
config_worker = {
'version': 1,
'disable_existing_loggers': True,
'handlers': {
'queue': {
'class': 'logging.handlers.QueueHandler',
'queue': q
}
},
'root': {
'handlers': ['queue'],
'level': 'DEBUG'
}
}
# The listener process configuration shows that the full flexibility of
# logging configuration is available to dispatch events to handlers however
# you want.
# We disable existing loggers to disable the "setup" logger used in the
# parent process. This is needed on POSIX because the logger will
# be there in the child following a fork().
config_listener = {
'version': 1,
'disable_existing_loggers': True,
'formatters': {
'detailed': {
'class': 'logging.Formatter',
'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
},
'simple': {
'class': 'logging.Formatter',
'format': '%(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'simple',
'level': 'INFO'
},
'file': {
'class': 'logging.FileHandler',
'filename': 'mplog.log',
'mode': 'w',
'formatter': 'detailed'
},
'foofile': {
'class': 'logging.FileHandler',
'filename': 'mplog-foo.log',
'mode': 'w',
'formatter': 'detailed'
},
'errors': {
'class': 'logging.FileHandler',
'filename': 'mplog-errors.log',
'mode': 'w',
'formatter': 'detailed',
'level': 'ERROR'
}
},
'loggers': {
'foo': {
'handlers': ['foofile']
}
},
'root': {
'handlers': ['console', 'file', 'errors'],
'level': 'DEBUG'
}
}
# Log some initial events, just to show that logging in the parent works
# normally.
logging.config.dictConfig(config_initial)
logger = logging.getLogger('setup')
logger.info('About to create workers ...')
workers = []
for i in range(5):
wp = Process(target=worker_process, name='worker %d' % (i + 1),
args=(config_worker,))
workers.append(wp)
wp.start()
logger.info('Started worker: %s', wp.name)
logger.info('About to create listener ...')
stop_event = Event()
lp = Process(target=listener_process, name='listener',
args=(q, stop_event, config_listener))
lp.start()
logger.info('Started listener')
# We now hang around for the workers to finish their work.
for wp in workers:
wp.join()
# Workers all done, listening can now stop.
# Logging in the parent still works normally.
logger.info('Telling listener to stop ...')
stop_event.set()
lp.join()
logger.info('All done.')
if __name__ == '__main__':
main()
将 BOM 插入发送到 SysLogHandler 的消息中
RFC 5424 要求将 Unicode 消息作为具有以下结构的一组字节发送到系统日志守护程序:一个可选的纯 ASCII 组件,后跟一个 UTF-8 字节订单标记 (BOM),后跟使用 UTF-8 编码的 Unicode。 (参见规范的相关部分。)
在 Python 3.1 中,代码被添加到 SysLogHandler 以将 BOM 插入消息中,但不幸的是,它的实现不正确,BOM 出现在消息的开头,因此不允许任何纯 ASCII 组件出现在它面前。
由于此行为被破坏,错误的 BOM 插入代码将从 Python 3.2.4 及更高版本中删除。 但是,它不会被替换,如果您想生成符合 RFC 5424 的消息,其中包括 BOM、之前的可选纯 ASCII 序列和之后的任意 Unicode,使用 UTF-8 编码,那么您需要执行以下操作:
将 Formatter 实例附加到您的 SysLogHandler 实例,使用格式字符串,例如:
'ASCII section\ufeffUnicode section'
Unicode 代码点 U+FEFF,当使用 UTF-8 编码时,将被编码为 UTF-8 BOM——字节串
b'\xef\xbb\xbf'
。用你喜欢的任何占位符替换 ASCII 部分,但要确保替换后出现在那里的数据始终是 ASCII(这样,它在 UTF-8 编码后将保持不变)。
用你喜欢的任何占位符替换 Unicode 部分; 如果替换后出现的数据包含 ASCII 范围之外的字符,那很好——它将使用 UTF-8 编码。
格式化消息将由SysLogHandler
使用UTF-8编码进行编码。 如果您遵循上述规则,您应该能够生成符合 RFC 5424 的消息。 如果不这样做,日志记录可能不会抱怨,但您的消息将不符合 RFC 5424,并且您的 syslog 守护程序可能会抱怨。
实现结构化日志
尽管大多数日志消息是供人类阅读的,因此不容易机器解析,但在某些情况下,您可能希望以结构化格式输出消息,该格式的 是 能够被程序解析(没有需要复杂的正则表达式来解析日志消息)。 使用 logging 包可以直接实现这一点。 有多种方法可以实现这一点,但以下是一种简单的方法,它使用 JSON 以机器可解析的方式序列化事件:
import json
import logging
class StructuredMessage:
def __init__(self, message, /, **kwargs):
self.message = message
self.kwargs = kwargs
def __str__(self):
return '%s >>> %s' % (self.message, json.dumps(self.kwargs))
_ = StructuredMessage # optional, to improve readability
logging.basicConfig(level=logging.INFO, format='%(message)s')
logging.info(_('message 1', foo='bar', bar='baz', num=123, fnum=123.456))
如果运行上面的脚本,它会打印:
message 1 >>> {"fnum": 123.456, "num": 123, "bar": "baz", "foo": "bar"}
请注意,项目的顺序可能会根据所使用的 Python 版本而有所不同。
如果您需要更专业的处理,您可以使用自定义 JSON 编码器,如以下完整示例所示:
from __future__ import unicode_literals
import json
import logging
# This next bit is to ensure the script runs unchanged on 2.x and 3.x
try:
unicode
except NameError:
unicode = str
class Encoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, set):
return tuple(o)
elif isinstance(o, unicode):
return o.encode('unicode_escape').decode('ascii')
return super().default(o)
class StructuredMessage:
def __init__(self, message, /, **kwargs):
self.message = message
self.kwargs = kwargs
def __str__(self):
s = Encoder().encode(self.kwargs)
return '%s >>> %s' % (self.message, s)
_ = StructuredMessage # optional, to improve readability
def main():
logging.basicConfig(level=logging.INFO, format='%(message)s')
logging.info(_('message 1', set_value={1, 2, 3}, snowman='\u2603'))
if __name__ == '__main__':
main()
当上面的脚本运行时,它会打印:
message 1 >>> {"snowman": "\u2603", "set_value": [1, 2, 3]}
请注意,项目的顺序可能会根据所使用的 Python 版本而有所不同。
使用 dictConfig() 自定义处理程序
有时您想以特定方式自定义日志处理程序,如果您使用 dictConfig() ,您可能无需子类化就可以做到这一点。 例如,考虑您可能想要设置日志文件的所有权。 在 POSIX 上,这可以使用 shutil.chown() 轻松完成,但 stdlib 中的文件处理程序不提供内置支持。 您可以使用普通函数自定义处理程序创建,例如:
def owned_file_handler(filename, mode='a', encoding=None, owner=None):
if owner:
if not os.path.exists(filename):
open(filename, 'a').close()
shutil.chown(filename, *owner)
return logging.FileHandler(filename, mode, encoding)
然后,您可以在传递给 dictConfig() 的日志记录配置中指定通过调用此函数来创建日志记录处理程序:
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'default': {
'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
},
},
'handlers': {
'file':{
# The values below are popped from this dictionary and
# used to create the handler, set the handler's level and
# its formatter.
'()': owned_file_handler,
'level':'DEBUG',
'formatter': 'default',
# The values below are passed to the handler creator callable
# as keyword arguments.
'owner': ['pulse', 'pulse'],
'filename': 'chowntest.log',
'mode': 'w',
'encoding': 'utf-8',
},
},
'root': {
'handlers': ['file'],
'level': 'DEBUG',
},
}
在此示例中,我使用 pulse
用户和组设置所有权,仅用于说明目的。 把它组合成一个工作脚本,chowntest.py
:
import logging, logging.config, os, shutil
def owned_file_handler(filename, mode='a', encoding=None, owner=None):
if owner:
if not os.path.exists(filename):
open(filename, 'a').close()
shutil.chown(filename, *owner)
return logging.FileHandler(filename, mode, encoding)
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'default': {
'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
},
},
'handlers': {
'file':{
# The values below are popped from this dictionary and
# used to create the handler, set the handler's level and
# its formatter.
'()': owned_file_handler,
'level':'DEBUG',
'formatter': 'default',
# The values below are passed to the handler creator callable
# as keyword arguments.
'owner': ['pulse', 'pulse'],
'filename': 'chowntest.log',
'mode': 'w',
'encoding': 'utf-8',
},
},
'root': {
'handlers': ['file'],
'level': 'DEBUG',
},
}
logging.config.dictConfig(LOGGING)
logger = logging.getLogger('mylogger')
logger.debug('A debug message')
要运行它,您可能需要以 root
的身份运行:
$ sudo python3.3 chowntest.py
$ cat chowntest.log
2013-11-05 09:34:51,128 DEBUG mylogger A debug message
$ ls -l chowntest.log
-rw-r--r-- 1 pulse pulse 55 2013-11-05 09:34 chowntest.log
请注意,此示例使用 Python 3.3,因为这是 shutil.chown() 出现的地方。 这种方法应该适用于任何支持 dictConfig() 的 Python 版本——即 Python 2.7、3.2 或更高版本。 对于 3.3 之前的版本,您需要使用例如实现实际的所有权更改 os.chown()。
实际上,处理程序创建函数可能位于项目中某处的实用程序模块中。 而不是配置中的行:
'()': owned_file_handler,
你可以使用例如:
'()': 'ext://project.util.owned_file_handler',
其中 project.util
可以替换为函数所在的包的实际名称。 在上面的工作脚本中,使用 'ext://__main__.owned_file_handler'
应该可以工作。 在这里,实际的可调用对象由 ext://
规范中的 dictConfig() 解析。
希望这个例子也指出了如何实现其他类型的文件更改的方法 - 例如 设置特定的 POSIX 权限位 - 以相同的方式,使用 os.chmod()。
当然,该方法还可以扩展到除 FileHandler 之外的处理程序类型——例如,旋转文件处理程序之一,或完全不同类型的处理程序。
在整个应用程序中使用特定的格式样式
在 Python 3.2 中,Formatter 获得了一个 style
关键字参数,该参数默认为 %
以实现向后兼容,但允许指定 {
或 $
支持 str.format() 和 string.Template 支持的格式化方法。 请注意,这控制最终输出到日志的日志消息的格式,并且与单个日志消息的构造方式完全正交。
日志调用(debug()、info() 等)仅采用实际日志消息本身的位置参数,关键字参数仅用于确定如何处理日志的选项打电话(例如 exc_info
关键字参数指示应记录回溯信息,或 extra
关键字参数指示要添加到日志的其他上下文信息)。 所以你不能直接使用 str.format() 或 string.Template 语法进行日志调用,因为日志包在内部使用 %-f 格式来合并格式字符串和变量参数. 在保持向后兼容性的同时不会改变这一点,因为现有代码中的所有日志记录调用都将使用 %-f 格式字符串。
There have been suggestions to associate format styles with specific loggers, but that approach also runs into backward compatibility problems because any existing code could be using a given logger name and using %-formatting.
为了使日志记录在任何第三方库和您的代码之间互操作,需要在单个日志记录调用级别做出有关格式的决定。 这开辟了几种适应替代格式样式的方法。
使用 LogRecord 工厂
在 Python 3.2 中,随着上述 Formatter 的变化,日志包获得了允许用户设置他们自己的 LogRecord 子类的能力,使用 setLogRecordFactory()[X211X ] 功能。 你可以使用它来设置你自己的 LogRecord 子类,它通过覆盖 getMessage() 方法来做正确的事情。 这个方法的基类实现是 msg % args
格式发生的地方,你可以在这里替换你的替代格式; 但是,您应该小心支持所有格式样式并允许 %-f 格式作为默认格式,以确保与其他代码的互操作性。 还应该注意调用 str(self.msg)
,就像基本实现一样。
有关详细信息,请参阅有关 setLogRecordFactory() 和 LogRecord 的参考文档。
使用自定义消息对象
还有另一种可能更简单的方法,您可以使用 {}- 和 $- 格式来构建您的个人日志消息。 您可能还记得(从 使用任意对象作为消息 ),在记录时您可以使用任意对象作为消息格式字符串,并且记录包将调用 str()对象以获取实际的格式字符串。 考虑以下两个类:
class BraceMessage:
def __init__(self, fmt, /, *args, **kwargs):
self.fmt = fmt
self.args = args
self.kwargs = kwargs
def __str__(self):
return self.fmt.format(*self.args, **self.kwargs)
class DollarMessage:
def __init__(self, fmt, /, **kwargs):
self.fmt = fmt
self.kwargs = kwargs
def __str__(self):
from string import Template
return Template(self.fmt).substitute(**self.kwargs)
其中任何一个都可以用来代替格式字符串,以允许使用 {}- 或 $-formatting 来构建实际的“消息”部分,该部分出现在格式化的日志输出中,代替“%(message)s”或“ {message}”或“$message”。 如果您发现每当您想记录某些东西时都使用类名有点笨拙,您可以使用别名(例如 M
或 _
)作为消息(或也许 __
,如果您使用 _
进行本地化)。
下面给出了这种方法的示例。 首先,使用 str.format() 进行格式化:
>>> __ = BraceMessage
>>> print(__('Message with {0} {1}', 2, 'placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})', point=p))
Message with coordinates: (0.50, 0.50)
其次,用 string.Template 格式化:
>>> __ = DollarMessage
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>
需要注意的一件事是,使用这种方法不会造成显着的性能损失:实际格式化不是在您进行日志记录调用时发生,而是在(以及如果)记录的消息实际上即将由处理程序输出到日志时发生。 因此,唯一可能让您感到困惑的稍微不寻常的事情是括号围绕格式字符串和参数,而不仅仅是格式字符串。 那是因为 __ 符号只是对上面显示的 XXXMessage
类之一的构造函数调用的语法糖。
使用 dictConfig() 配置过滤器
你 可以 使用 dictConfig() 配置过滤器,尽管乍一看如何去做可能并不明显(因此这个秘诀)。 由于 Filter 是标准库中唯一包含的过滤器类,并且不太可能满足许多要求(它仅作为基类存在),您通常需要定义自己的 Filter 子类具有覆盖的 filter() 方法。 为此,请在过滤器的配置字典中指定 ()
键,指定将用于创建过滤器的可调用对象(类是最明显的,但您可以提供任何返回 [ X227X]过滤器实例)。 这是一个完整的例子:
import logging
import logging.config
import sys
class MyFilter(logging.Filter):
def __init__(self, param=None):
self.param = param
def filter(self, record):
if self.param is None:
allow = True
else:
allow = self.param not in record.msg
if allow:
record.msg = 'changed: ' + record.msg
return allow
LOGGING = {
'version': 1,
'filters': {
'myfilter': {
'()': MyFilter,
'param': 'noshow',
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'filters': ['myfilter']
}
},
'root': {
'level': 'DEBUG',
'handlers': ['console']
},
}
if __name__ == '__main__':
logging.config.dictConfig(LOGGING)
logging.debug('hello')
logging.debug('hello - noshow')
此示例展示了如何以关键字参数的形式将配置数据传递给构造实例的可调用对象。 运行时,上面的脚本将打印:
changed: hello
这表明过滤器按配置工作。
需要注意的几点:
- 如果您不能在配置中直接引用可调用对象(例如 如果它位于不同的模块中,并且您无法直接将其导入到配置字典所在的位置),则可以使用
ext://...
形式,如 访问外部对象 中所述。 例如,您可以在上面的示例中使用文本'ext://__main__.MyFilter'
而不是MyFilter
。 - 与过滤器一样,此技术还可用于配置自定义处理程序和格式化程序。 有关日志记录如何支持在其配置中使用用户定义对象的更多信息,请参阅 用户定义对象 ,并参阅上面的其他食谱配方 使用 dictConfig() 自定义处理程序。
自定义异常格式
有时您可能想要进行自定义异常格式设置 - 出于参数的考虑,假设您希望每个记录的事件仅一行,即使存在异常信息也是如此。 您可以使用自定义格式化程序类来执行此操作,如以下示例所示:
import logging
class OneLineExceptionFormatter(logging.Formatter):
def formatException(self, exc_info):
"""
Format an exception so that it prints on a single line.
"""
result = super().formatException(exc_info)
return repr(result) # or format into one line however you want to
def format(self, record):
s = super().format(record)
if record.exc_text:
s = s.replace('\n', '') + '|'
return s
def configure_logging():
fh = logging.FileHandler('output.txt', 'w')
f = OneLineExceptionFormatter('%(asctime)s|%(levelname)s|%(message)s|',
'%d/%m/%Y %H:%M:%S')
fh.setFormatter(f)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(fh)
def main():
configure_logging()
logging.info('Sample message')
try:
x = 1 / 0
except ZeroDivisionError as e:
logging.exception('ZeroDivisionError: %s', e)
if __name__ == '__main__':
main()
运行时,这会生成一个正好有两行的文件:
28/01/2015 07:21:23|INFO|Sample message|
28/01/2015 07:21:23|ERROR|ZeroDivisionError: integer division or modulo by zero|'Traceback (most recent call last):\n File "logtest7.py", line 30, in main\n x = 1 / 0\nZeroDivisionError: integer division or modulo by zero'|
虽然上述处理很简单,但它指出了如何根据您的喜好格式化异常信息。 traceback 模块可能有助于满足更专业的需求。
说日志消息
在某些情况下,可能需要以可听而不是可见的格式呈现日志消息。 如果您的系统中有可用的文本到语音 (TTS) 功能,即使它没有 Python 绑定,这也很容易做到。 大多数 TTS 系统都有一个可以运行的命令行程序,可以使用 subprocess 从处理程序调用它。 这里假设 TTS 命令行程序不会期望与用户交互或需要很长时间才能完成,并且记录消息的频率不会高到用消息淹没用户,并且可以接受一次说出一条消息而不是并发,下面的示例实现在处理下一条消息之前等待一条消息被说出,这可能会导致其他处理程序保持等待。 这是一个展示该方法的简短示例,它假定 espeak
TTS 包可用:
import logging
import subprocess
import sys
class TTSHandler(logging.Handler):
def emit(self, record):
msg = self.format(record)
# Speak slowly in a female English voice
cmd = ['espeak', '-s150', '-ven+f3', msg]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
# wait for the program to finish
p.communicate()
def configure_logging():
h = TTSHandler()
root = logging.getLogger()
root.addHandler(h)
# the default formatter just returns the message
root.setLevel(logging.DEBUG)
def main():
logging.info('Hello')
logging.debug('Goodbye')
if __name__ == '__main__':
configure_logging()
sys.exit(main())
运行时,这个脚本应该用女声说“你好”,然后说“再见”。
当然,上述方法可以适用于其他 TTS 系统,甚至可以通过从命令行运行的外部程序处理消息的其他系统。
缓冲日志消息并有条件地输出它们
在某些情况下,您可能希望将消息记录在临时区域中,并且仅在发生某种情况时才输出它们。 例如,您可能希望开始记录某个函数中的调试事件,并且如果该函数在没有错误的情况下完成,您不希望收集到的调试信息使日志变得混乱,但是如果出现错误,您希望所有的调试信息要输出的信息以及错误。
这是一个示例,它展示了如何使用装饰器为您希望日志记录以这种方式运行的函数执行此操作。 它利用了 logging.handlers.MemoryHandler,它允许缓冲记录的事件直到某些条件发生,此时缓冲的事件是 flushed
- 传递给另一个处理程序([X212X ] 处理程序)进行处理。 默认情况下,MemoryHandler
在其缓冲区被填满或看到级别大于或等于指定阈值的事件时刷新。 如果您想要自定义刷新行为,您可以将此配方与更专业的 MemoryHandler
子类一起使用。
示例脚本有一个简单的函数 foo
,它只是循环遍历所有日志记录级别,写入 sys.stderr
以说明它将在哪个级别登录,然后在该级别实际记录一条消息等级。 您可以将参数传递给 foo
,如果为真,将在 ERROR 和 CRITICAL 级别记录 - 否则,它仅在 DEBUG、INFO 和 WARNING 级别记录。
该脚本只是安排使用装饰器装饰 foo
,该装饰器将执行所需的条件日志记录。 装饰器将记录器作为参数,并在调用被装饰函数的持续时间内附加内存处理程序。 装饰器还可以使用目标处理程序、应发生刷新的级别和缓冲区容量(缓冲的记录数)进行额外参数化。 这些默认为 StreamHandler,分别写入 sys.stderr
、logging.ERROR
和 100
。
这是脚本:
import logging
from logging.handlers import MemoryHandler
import sys
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
def log_if_errors(logger, target_handler=None, flush_level=None, capacity=None):
if target_handler is None:
target_handler = logging.StreamHandler()
if flush_level is None:
flush_level = logging.ERROR
if capacity is None:
capacity = 100
handler = MemoryHandler(capacity, flushLevel=flush_level, target=target_handler)
def decorator(fn):
def wrapper(*args, **kwargs):
logger.addHandler(handler)
try:
return fn(*args, **kwargs)
except Exception:
logger.exception('call failed')
raise
finally:
super(MemoryHandler, handler).flush()
logger.removeHandler(handler)
return wrapper
return decorator
def write_line(s):
sys.stderr.write('%s\n' % s)
def foo(fail=False):
write_line('about to log at DEBUG ...')
logger.debug('Actually logged at DEBUG')
write_line('about to log at INFO ...')
logger.info('Actually logged at INFO')
write_line('about to log at WARNING ...')
logger.warning('Actually logged at WARNING')
if fail:
write_line('about to log at ERROR ...')
logger.error('Actually logged at ERROR')
write_line('about to log at CRITICAL ...')
logger.critical('Actually logged at CRITICAL')
return fail
decorated_foo = log_if_errors(logger)(foo)
if __name__ == '__main__':
logger.setLevel(logging.DEBUG)
write_line('Calling undecorated foo with False')
assert not foo(False)
write_line('Calling undecorated foo with True')
assert foo(True)
write_line('Calling decorated foo with False')
assert not decorated_foo(False)
write_line('Calling decorated foo with True')
assert decorated_foo(True)
运行此脚本时,应观察到以下输出:
Calling undecorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling undecorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
about to log at CRITICAL ...
Calling decorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling decorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
Actually logged at DEBUG
Actually logged at INFO
Actually logged at WARNING
Actually logged at ERROR
about to log at CRITICAL ...
Actually logged at CRITICAL
如您所见,实际日志输出仅在记录严重性为 ERROR 或更高的事件时才会发生,但在这种情况下,还会记录任何先前具有较低严重性的事件。
你当然可以使用传统的装饰方式:
@log_if_errors(logger)
def foo(fail=False):
...
通过配置使用 UTC (GMT) 格式化时间
有时您想使用 UTC 格式化时间,这可以使用诸如 UTCFormatter 之类的类来完成,如下所示:
import logging
import time
class UTCFormatter(logging.Formatter):
converter = time.gmtime
然后您可以在代码中使用 UTCFormatter
而不是 Formatter。 如果你想通过配置来做到这一点,你可以使用 dictConfig() API 和以下完整示例所示的方法:
import logging
import logging.config
import time
class UTCFormatter(logging.Formatter):
converter = time.gmtime
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'utc': {
'()': UTCFormatter,
'format': '%(asctime)s %(message)s',
},
'local': {
'format': '%(asctime)s %(message)s',
}
},
'handlers': {
'console1': {
'class': 'logging.StreamHandler',
'formatter': 'utc',
},
'console2': {
'class': 'logging.StreamHandler',
'formatter': 'local',
},
},
'root': {
'handlers': ['console1', 'console2'],
}
}
if __name__ == '__main__':
logging.config.dictConfig(LOGGING)
logging.warning('The local time is %s', time.asctime())
当这个脚本运行时,它应该打印如下内容:
2015-10-17 12:53:29,501 The local time is Sat Oct 17 13:53:29 2015
2015-10-17 13:53:29,501 The local time is Sat Oct 17 13:53:29 2015
显示如何将时间格式化为本地时间和 UTC,每个处理程序一个。
使用上下文管理器进行选择性日志记录
有时临时更改日志配置并在执行某些操作后将其恢复会很有用。 为此,上下文管理器是保存和恢复日志上下文的最明显方式。 这是此类上下文管理器的一个简单示例,它允许您有选择地更改日志记录级别并纯粹在上下文管理器的范围内添加日志记录处理程序:
import logging
import sys
class LoggingContext:
def __init__(self, logger, level=None, handler=None, close=True):
self.logger = logger
self.level = level
self.handler = handler
self.close = close
def __enter__(self):
if self.level is not None:
self.old_level = self.logger.level
self.logger.setLevel(self.level)
if self.handler:
self.logger.addHandler(self.handler)
def __exit__(self, et, ev, tb):
if self.level is not None:
self.logger.setLevel(self.old_level)
if self.handler:
self.logger.removeHandler(self.handler)
if self.handler and self.close:
self.handler.close()
# implicit return of None => don't swallow exceptions
如果您指定级别值,则记录器的级别将设置为上下文管理器覆盖的 with 块范围内的该值。 如果您指定处理程序,它会在进入块时添加到记录器中,并在退出块时移除。 您还可以要求经理在块退出时为您关闭处理程序 - 如果您不再需要处理程序,您可以这样做。
为了说明它是如何工作的,我们可以在上面添加以下代码块:
if __name__ == '__main__':
logger = logging.getLogger('foo')
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.INFO)
logger.info('1. This should appear just once on stderr.')
logger.debug('2. This should not appear.')
with LoggingContext(logger, level=logging.DEBUG):
logger.debug('3. This should appear once on stderr.')
logger.debug('4. This should not appear.')
h = logging.StreamHandler(sys.stdout)
with LoggingContext(logger, level=logging.DEBUG, handler=h, close=True):
logger.debug('5. This should appear twice - once on stderr and once on stdout.')
logger.info('6. This should appear just once on stderr.')
logger.debug('7. This should not appear.')
我们最初将记录器的级别设置为 INFO
,因此消息 #1 出现而消息 #2 没有。 然后我们在下面的 with
块中临时将级别更改为 DEBUG
,因此出现消息 #3。 块退出后,记录器的级别恢复为 INFO
,因此不会出现消息 #4。 在下一个 with
块中,我们再次将级别设置为 DEBUG
,但也添加了一个写入 sys.stdout
的处理程序。 因此,消息 #5 在控制台上出现两次(一次通过 stderr
,一次通过 stdout
)。 with
语句完成后,状态与之前一样,因此消息 #6 出现(如消息 #1)而消息 #7 不出现(就像消息 #2)。
如果我们运行生成的脚本,结果如下:
$ python logctx.py
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.
如果我们再次运行它,但将 stderr
管道传输到 /dev/null
,我们会看到以下内容,这是写入 stdout
的唯一消息:
$ python logctx.py 2>/dev/null
5. This should appear twice - once on stderr and once on stdout.
再一次,将 stdout
传递到 /dev/null
,我们得到:
$ python logctx.py >/dev/null
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.
在这种情况下,打印到 stdout
的消息 #5 没有出现,正如预期的那样。
当然,这里描述的方法可以推广,例如临时附加日志过滤器。 请注意,上述代码适用于 Python 2 和 Python 3。
CLI 应用程序启动模板
这是一个示例,它展示了您如何:
- 使用基于命令行参数的日志记录级别
- 分派到单独文件中的多个子命令,所有子命令都以一致的方式记录在同一级别
- 使用简单、最少的配置
假设我们有一个命令行应用程序,其工作是停止、启动或重新启动某些服务。 出于说明目的,可以将其组织为文件 app.py
,它是应用程序的主要脚本,在 start.py
、stop.py
和 [ 中实现了单独的命令] X191X]。 进一步假设我们想通过命令行参数控制应用程序的详细程度,默认为 logging.INFO
。 这是 app.py
的一种写法:
import argparse
import importlib
import logging
import os
import sys
def main(args=None):
scriptname = os.path.basename(__file__)
parser = argparse.ArgumentParser(scriptname)
levels = ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')
parser.add_argument('--log-level', default='INFO', choices=levels)
subparsers = parser.add_subparsers(dest='command',
help='Available commands:')
start_cmd = subparsers.add_parser('start', help='Start a service')
start_cmd.add_argument('name', metavar='NAME',
help='Name of service to start')
stop_cmd = subparsers.add_parser('stop',
help='Stop one or more services')
stop_cmd.add_argument('names', metavar='NAME', nargs='+',
help='Name of service to stop')
restart_cmd = subparsers.add_parser('restart',
help='Restart one or more services')
restart_cmd.add_argument('names', metavar='NAME', nargs='+',
help='Name of service to restart')
options = parser.parse_args()
# the code to dispatch commands could all be in this file. For the purposes
# of illustration only, we implement each command in a separate module.
try:
mod = importlib.import_module(options.command)
cmd = getattr(mod, 'command')
except (ImportError, AttributeError):
print('Unable to find the code for command \'%s\'' % options.command)
return 1
# Could get fancy here and load configuration from file or dictionary
logging.basicConfig(level=options.log_level,
format='%(levelname)s %(name)s %(message)s')
cmd(options)
if __name__ == '__main__':
sys.exit(main())
而 start
、stop
和 restart
命令可以在单独的模块中实现,就像这样开始:
# start.py
import logging
logger = logging.getLogger(__name__)
def command(options):
logger.debug('About to start %s', options.name)
# actually do the command processing here ...
logger.info('Started the \'%s\' service.', options.name)
因此停止:
# stop.py
import logging
logger = logging.getLogger(__name__)
def command(options):
n = len(options.names)
if n == 1:
plural = ''
services = '\'%s\'' % options.names[0]
else:
plural = 's'
services = ', '.join('\'%s\'' % name for name in options.names)
i = services.rfind(', ')
services = services[:i] + ' and ' + services[i + 2:]
logger.debug('About to stop %s', services)
# actually do the command processing here ...
logger.info('Stopped the %s service%s.', services, plural)
和重启类似:
# restart.py
import logging
logger = logging.getLogger(__name__)
def command(options):
n = len(options.names)
if n == 1:
plural = ''
services = '\'%s\'' % options.names[0]
else:
plural = 's'
services = ', '.join('\'%s\'' % name for name in options.names)
i = services.rfind(', ')
services = services[:i] + ' and ' + services[i + 2:]
logger.debug('About to restart %s', services)
# actually do the command processing here ...
logger.info('Restarted the %s service%s.', services, plural)
如果我们使用默认日志级别运行此应用程序,我们会得到如下输出:
$ python app.py start foo
INFO start Started the 'foo' service.
$ python app.py stop foo bar
INFO stop Stopped the 'foo' and 'bar' services.
$ python app.py restart foo bar baz
INFO restart Restarted the 'foo', 'bar' and 'baz' services.
第一个词是日志记录级别,第二个词是记录事件的位置的模块或包名称。
如果我们更改日志级别,那么我们可以更改发送到日志的信息。 例如,如果我们想要更多信息:
$ python app.py --log-level DEBUG start foo
DEBUG start About to start foo
INFO start Started the 'foo' service.
$ python app.py --log-level DEBUG stop foo bar
DEBUG stop About to stop 'foo' and 'bar'
INFO stop Stopped the 'foo' and 'bar' services.
$ python app.py --log-level DEBUG restart foo bar baz
DEBUG restart About to restart 'foo', 'bar' and 'baz'
INFO restart Restarted the 'foo', 'bar' and 'baz' services.
如果我们想要更少:
$ python app.py --log-level WARNING start foo
$ python app.py --log-level WARNING stop foo bar
$ python app.py --log-level WARNING restart foo bar baz
在这种情况下,命令不会向控制台打印任何内容,因为它们不会记录 WARNING
级别或更高级别的任何内容。
用于日志记录的 Qt GUI
一个不时出现的问题是关于如何登录到 GUI 应用程序。 Qt 框架是一种流行的跨平台 UI 框架,使用 PySide2 或 PyQt5 库进行 Python 绑定。
以下示例显示了如何登录到 Qt GUI。 这引入了一个简单的 QtHandler
类,它接受一个可调用对象,它应该是执行 GUI 更新的主线程中的一个插槽。 还创建了一个工作线程来展示如何从 UI 本身(通过手动日志记录按钮)以及在后台执行工作的工作线程(在这里,只记录随机级别的随机消息)登录到 GUI之间的短暂延迟)。
工作线程是使用 Qt 的 QThread
类而不是 threading 模块实现的,因为在某些情况下必须使用 QThread
,它提供了与其他 Qt
组件。
该代码应适用于 PySide2
或 PyQt5
的最新版本。 您应该能够将该方法应用于早期版本的 Qt。 有关更多详细信息,请参阅代码片段中的注释。
import datetime
import logging
import random
import sys
import time
# Deal with minor differences between PySide2 and PyQt5
try:
from PySide2 import QtCore, QtGui, QtWidgets
Signal = QtCore.Signal
Slot = QtCore.Slot
except ImportError:
from PyQt5 import QtCore, QtGui, QtWidgets
Signal = QtCore.pyqtSignal
Slot = QtCore.pyqtSlot
logger = logging.getLogger(__name__)
#
# Signals need to be contained in a QObject or subclass in order to be correctly
# initialized.
#
class Signaller(QtCore.QObject):
signal = Signal(str, logging.LogRecord)
#
# Output to a Qt GUI is only supposed to happen on the main thread. So, this
# handler is designed to take a slot function which is set up to run in the main
# thread. In this example, the function takes a string argument which is a
# formatted log message, and the log record which generated it. The formatted
# string is just a convenience - you could format a string for output any way
# you like in the slot function itself.
#
# You specify the slot function to do whatever GUI updates you want. The handler
# doesn't know or care about specific UI elements.
#
class QtHandler(logging.Handler):
def __init__(self, slotfunc, *args, **kwargs):
super().__init__(*args, **kwargs)
self.signaller = Signaller()
self.signaller.signal.connect(slotfunc)
def emit(self, record):
s = self.format(record)
self.signaller.signal.emit(s, record)
#
# This example uses QThreads, which means that the threads at the Python level
# are named something like "Dummy-1". The function below gets the Qt name of the
# current thread.
#
def ctname():
return QtCore.QThread.currentThread().objectName()
#
# Used to generate random levels for logging.
#
LEVELS = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL)
#
# This worker class represents work that is done in a thread separate to the
# main thread. The way the thread is kicked off to do work is via a button press
# that connects to a slot in the worker.
#
# Because the default threadName value in the LogRecord isn't much use, we add
# a qThreadName which contains the QThread name as computed above, and pass that
# value in an "extra" dictionary which is used to update the LogRecord with the
# QThread name.
#
# This example worker just outputs messages sequentially, interspersed with
# random delays of the order of a few seconds.
#
class Worker(QtCore.QObject):
@Slot()
def start(self):
extra = {'qThreadName': ctname() }
logger.debug('Started work', extra=extra)
i = 1
# Let the thread run until interrupted. This allows reasonably clean
# thread termination.
while not QtCore.QThread.currentThread().isInterruptionRequested():
delay = 0.5 + random.random() * 2
time.sleep(delay)
level = random.choice(LEVELS)
logger.log(level, 'Message after delay of %3.1f: %d', delay, i, extra=extra)
i += 1
#
# Implement a simple UI for this cookbook example. This contains:
#
# * A read-only text edit window which holds formatted log messages
# * A button to start work and log stuff in a separate thread
# * A button to log something from the main thread
# * A button to clear the log window
#
class Window(QtWidgets.QWidget):
COLORS = {
logging.DEBUG: 'black',
logging.INFO: 'blue',
logging.WARNING: 'orange',
logging.ERROR: 'red',
logging.CRITICAL: 'purple',
}
def __init__(self, app):
super().__init__()
self.app = app
self.textedit = te = QtWidgets.QPlainTextEdit(self)
# Set whatever the default monospace font is for the platform
f = QtGui.QFont('nosuchfont')
f.setStyleHint(f.Monospace)
te.setFont(f)
te.setReadOnly(True)
PB = QtWidgets.QPushButton
self.work_button = PB('Start background work', self)
self.log_button = PB('Log a message at a random level', self)
self.clear_button = PB('Clear log window', self)
self.handler = h = QtHandler(self.update_status)
# Remember to use qThreadName rather than threadName in the format string.
fs = '%(asctime)s %(qThreadName)-12s %(levelname)-8s %(message)s'
formatter = logging.Formatter(fs)
h.setFormatter(formatter)
logger.addHandler(h)
# Set up to terminate the QThread when we exit
app.aboutToQuit.connect(self.force_quit)
# Lay out all the widgets
layout = QtWidgets.QVBoxLayout(self)
layout.addWidget(te)
layout.addWidget(self.work_button)
layout.addWidget(self.log_button)
layout.addWidget(self.clear_button)
self.setFixedSize(900, 400)
# Connect the non-worker slots and signals
self.log_button.clicked.connect(self.manual_update)
self.clear_button.clicked.connect(self.clear_display)
# Start a new worker thread and connect the slots for the worker
self.start_thread()
self.work_button.clicked.connect(self.worker.start)
# Once started, the button should be disabled
self.work_button.clicked.connect(lambda : self.work_button.setEnabled(False))
def start_thread(self):
self.worker = Worker()
self.worker_thread = QtCore.QThread()
self.worker.setObjectName('Worker')
self.worker_thread.setObjectName('WorkerThread') # for qThreadName
self.worker.moveToThread(self.worker_thread)
# This will start an event loop in the worker thread
self.worker_thread.start()
def kill_thread(self):
# Just tell the worker to stop, then tell it to quit and wait for that
# to happen
self.worker_thread.requestInterruption()
if self.worker_thread.isRunning():
self.worker_thread.quit()
self.worker_thread.wait()
else:
print('worker has already exited.')
def force_quit(self):
# For use when the window is closed
if self.worker_thread.isRunning():
self.kill_thread()
# The functions below update the UI and run in the main thread because
# that's where the slots are set up
@Slot(str, logging.LogRecord)
def update_status(self, status, record):
color = self.COLORS.get(record.levelno, 'black')
s = '<pre><font color="%s">%s</font></pre>' % (color, status)
self.textedit.appendHtml(s)
@Slot()
def manual_update(self):
# This function uses the formatted message passed in, but also uses
# information from the record to format the message in an appropriate
# color according to its severity (level).
level = random.choice(LEVELS)
extra = {'qThreadName': ctname() }
logger.log(level, 'Manually logged!', extra=extra)
@Slot()
def clear_display(self):
self.textedit.clear()
def main():
QtCore.QThread.currentThread().setObjectName('MainThread')
logging.getLogger().setLevel(logging.DEBUG)
app = QtWidgets.QApplication(sys.argv)
example = Window(app)
example.show()
sys.exit(app.exec_())
if __name__=='__main__':
main()