Parsing nginx access- and error-logs
.py
ETL
This is a collection of several implementations of ~parsing nginx access- and error-logs, with a deal for someday, to compare their performance.
- straightforward, regex-only – expected as most robust and fast,
- OOP-styled, with
dataclasses– expected as nice,
and this one, just for comparison
- OOP-styled, with
NamedTuple– just for comparison.
OOP vs. non-OOP code
the gist itself
Non-OOP and dataclasses-based solutions
import dataclasses
import re
from datetime import datetime
from typing import ClassVar, Optional
@dataclasses.dataclass(frozen=True)
class NgxAccessLine:
remote_addr: str
remote_user: str
ts: str | datetime
request: str
status: str | int
body_bytes_sent: str | int
http_referer: str
http_user_agent: str
DO_CAST: ClassVar[bool] = False
# RegEx for default `combined` nginx access log format
# $remote_addr - $remote_user [$time_local]
# "$request" $status $body_bytes_sent
# "$http_referer" "$http_user_agent"
access_line_pattern: ClassVar[re.Pattern] = re.compile(
r'^'
r'([0-9.]+) - (-|\S+) \[([^]]+)\] '
r'"(.*)" (\d+) (\d+) '
r'"(.*)" "(.*)"'
r'$'
)
@classmethod
def from_string(cls, s):
if m := cls.access_line_pattern.match(s):
kwargs = dict(zip(
('remote_addr', 'remote_user', 'ts',
'request', 'status', 'body_bytes_sent',
'http_referer', 'http_user_agent'),
m.groups()
))
if cls.DO_CAST:
kwargs.update({
'ts': datetime.strptime(kwargs['ts'], '%d/%b/%Y:%H:%M:%S %z'),
'status': int(kwargs['status']),
'body_bytes_sent': int(kwargs['body_bytes_sent']),
})
return cls(**kwargs)
@classmethod
def fields(cls):
return tuple(f.name for f in dataclasses.fields(cls))
def as_tuple(self):
return dataclasses.astuple(self)
def as_dict(self):
return dataclasses.asdict(self)
@dataclasses.dataclass(frozen=True)
class NgxErrLine:
ts: datetime
level: str
pid: int
tid: int
cid: int | None
msg: str
# Additional attributes revealed from `msg`
remote_addr: Optional[str] = None
DO_CAST: ClassVar[bool] = False
# Nginx error log format
# YYYY/MM/DD HH:MM:SS [level] pid#tid: *conn_number message
# ref: nginx/src/core/ngx_log.c, nginx/src/core/ngx_connection.h
error_line_pattern: ClassVar[re.Pattern] = re.compile(
r'^'
r'([\d/]+ [\d:]+) \[(\w+)\] (\d+)#(\d+):(?: \*(\d+))? (.*)'
r'$'
)
msg_client_pattern = re.compile(r'\bclient: (\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b')
@classmethod
def from_string(cls, s):
if m := cls.error_line_pattern.match(s):
kwargs = dict(zip(
('ts', 'level', 'pid', 'tid', 'cid', 'msg'),
m.groups()
))
# Additional attributes revealed from `msg`
if remote_addr := cls.msg_client_pattern.search(kwargs['msg']):
kwargs.update({'remote_addr': remote_addr.group(1)})
if cls.DO_CAST:
kwargs.update({
'ts': datetime.strptime(kwargs['ts'], '%Y/%m/%d %H:%M:%S'),
'pid': int(kwargs['pid']),
'tid': int(kwargs['tid']),
'cid': int(kwargs['cid']) if kwargs['cid'] else None,
})
return cls(**kwargs)
@classmethod
def fields(cls):
return tuple(f.name for f in dataclasses.fields(cls))
def as_tuple(self):
return dataclasses.astuple(self)
def as_dict(self):
return dataclasses.asdict(self)
Now we can compare Simple non-obj-oriented solution and dataclasses-based one (note: with or without DO_CAST option).
import bz2
import csv
import gzip
from datetime import datetime
from pathlib import Path
# from ngxlogz_namdtup import NgxAccessLine, NgxErrLine # obsoleted; for comparison
from ngxlogz import NgxAccessLine, NgxErrLine
IN_LOGDIR = Path('~/tmp/ngx').expanduser()
OUT_FILE_A = Path('~/tmp/ngx/az.csv.bz2').expanduser()
OUT_FILE_E = Path('~/tmp/ngx/ez.csv.bz2').expanduser()
DO_CAST = False
def ngx_access_linebyline(in_logdir=Path('/var/log/nginx'), glob_pattern='access.log-*'):
"""Generate log lines from all the nginx access.log files."""
for fpath in sorted(Path(in_logdir).glob(glob_pattern),
key=lambda fp: fp.stat().st_mtime):
with gzip.open(fpath, 'rt') if fpath.suffix == '.gz' else open(fpath) as fo:
yield from map(str.rstrip, fo.readlines())
def ngx_err_linebyline(in_logdir=Path('/var/log/nginx'), glob_pattern='error.log-*'):
"""Generate log lines from all the nginx error.log files."""
for fpath in sorted(Path(in_logdir).glob(glob_pattern),
key=lambda fp: fp.stat().st_mtime):
with gzip.open(fpath, 'rt') if fpath.suffix == '.gz' else open(fpath) as fo:
yield from map(str.rstrip, fo.readlines())
def dump_csv_bz2(fpath, header, rows):
"""A .csv.bz2 writer.
Pass a generator as `rows`, to benchmark objects creation and transformations."""
t0 = datetime.now()
with bz2.open(fpath, 'wt', newline='') as fo:
cw = csv.writer(fo)
cw.writerow(header)
cw.writerows(rows)
print(datetime.now() - t0)
## ---------------------------------------------------------------------
## -- Simple non-obj-oriented solution, for performance comparison
print('Processing access-logs fast')
p = NgxAccessLine.access_line_pattern
def parse_access_logline(s, do_cast=DO_CAST):
g = list(p.match(s).groups())
if do_cast:
g[2] = datetime.strptime(g[2], '%d/%b/%Y:%H:%M:%S %z')
g[4] = int(g[4])
g[5] = int(g[5])
return g
dump_csv_bz2(
OUT_FILE_A.with_suffix('.tmp.csv.bz2'),
NgxAccessLine.fields(),
map(parse_access_logline, ngx_access_linebyline(IN_LOGDIR))
)
## ---------------------------------------------------------------------
NgxAccessLine.DO_CAST = DO_CAST
NgxErrLine.DO_CAST = DO_CAST
print('Processing access-logs with our module utilizing dataclasses')
dump_csv_bz2(
OUT_FILE_A,
NgxAccessLine.fields(),
(NgxAccessLine.from_string(s).as_tuple() for s in ngx_access_linebyline(IN_LOGDIR))
)
print('Processing error-logs with our module utilizing dataclasses')
dump_csv_bz2(
OUT_FILE_E,
NgxErrLine.fields(),
(NgxErrLine.from_string(s).as_tuple() for s in ngx_err_linebyline(IN_LOGDIR))
)An addition
NamedTuple-based solution
import re
from datetime import datetime
from typing import ClassVar, NamedTuple, Optionalclass NgxAccessLine(NamedTuple):
remote_addr: str
remote_user: str
time_local: datetime
request: str
status: int
body_bytes_sent: int
http_referer: str
http_user_agent: str
# Default `combined` nginx access log format
# '$remote_addr - $remote_user [$time_local] '
# '"$request" $status $body_bytes_sent '
# '"$http_referer" "$http_user_agent"'
access_line_pattern: ClassVar[re.Pattern] = re.compile(
r'^'
r'([0-9.]{7,15}) - (.*) \[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} \+\d{4})\] '
r'"(.*)" (\d+) (\d+) '
r'"(.*)" "(.*)"'
r'$'
)
@classmethod
def from_string(cls, s):
m = access_line_pattern.match(s)
if m:
(
remote_addr, remote_user, time_local,
request, status, body_bytes_sent,
http_referer, http_user_agent
) = m.groups()
return cls(remote_addr, remote_user, datetime.strptime(time_local, '%d/%b/%Y:%H:%M:%S %z'),
request, int(status), int(body_bytes_sent),
http_referer, http_user_agent)class NgxErrLine(NamedTuple):
ts: datetime
level: str
pid: int
tid: int
conn_number: int # Optional uint, we use -1 if it is absent
msg: str
# Additional err-related attributes revealed from `msg`
rel_ip: Optional[int] = None
# rel_wtf: Optional[str] = None
# Maintained nginx error log format
# YYYY/MM/DD HH:MM:SS [{level}] {pid}#{tid}: *{conn_number} {msg}
# ref: nginx/src/core/ngx_log.c, nginx/src/core/ngx_connection.h
# sample: 2021/04/19 17:48:31 [crit] 5797#5797: *1902 SSL_do_handshake() failed (SSL: error:14201044:SSL routines:tls_choose_sigalg:internal error) while SSL handshaking, client: 0.0.0.0, server: 0.0.0.0:443
error_line_pattern: ClassVar[re.Pattern] = re.compile(
r'^'
r'(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[(\w+)\] (\d+)#(\d+):(?: \*(\d+))? (.*)'
r'$'
)
@classmethod
def from_string(cls, s):
m = error_line_pattern.match(s)
if m:
(ts, level, pid, tid, conn_number, msg) = m.groups()
kwargs = dict()
rel_ip = re.findall(r'\bclient: (\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b', msg)
if rel_ip and len(rel_ip) == 1:
kwargs.update({'rel_ip': rel_ip.pop()})
return cls(
datetime.strptime(ts, '%Y/%m/%d %H:%M:%S'), level, int(pid), int(tid),
int(conn_number) if conn_number else -1, msg, **kwargs
)Performance measurement
Not complete
As mentioned above, this section is a someday-deal.
Define functions that read logs and return them line by line:
def genaccesslines():
for fn in Path(Path.cwd(), 'tmp/nginx-logz-0718').glob('access.log-*.gz'):
with gzip.open(fn, 'rt') as fo:
yield from map(str.rstrip, fo.readlines())
def generrorlines():
for fn in Path(Path.cwd(), 'tmp/nginx-logz-0718').glob('error.log-*.gz'):
with gzip.open(fn, 'rt') as fo:
yield from map(str.rstrip, fo.readlines())then, depending on decoding method
t0 = datetime.now() # [TIMING]
with open('tmp/az.csv', 'w', newline='') as fo:
w = csv.writer(fo)
w.writerow(_.name for _ in dataclasses.fields(NgxAccessLine))
w.writerows(map(dataclasses.astuple, map(NgxAccessLine.from_string, genaccesslines())))
with open('tmp/ez.csv', 'w', newline='') as fo:
w = csv.writer(fo)
w.writerow(_.name for _ in dataclasses.fields(NgxErrLine))
w.writerows(map(dataclasses.astuple, map(NgxErrLine.from_string, generrorlines())))
print(datetime.now() - t0) # [TIMING]t0 = datetime.now() # [TIMING]
with open('tmp/az.csv', 'w', newline='') as fo:
w = csv.writer(fo)
w.writerow(NgxAccessLine._fields)
w.writerows(map(NgxAccessLine.from_string, genaccesslines()))
with open('tmp/ez.csv', 'w', newline='') as fo:
w = csv.writer(fo)
w.writerow(NgxErrLine._fields)
w.writerows(map(NgxErrLine.from_string, generrorlines()))
print(datetime.now() - t0) # [TIMING]