Parsing nginx access- and error-logs

.py
ETL

This is a collection of several implementations of ~parsing nginx access- and error-logs, with a deal for someday, to compare their performance.

and this one, just for comparison

OOP vs. non-OOP code

the gist itself

Non-OOP and dataclasses-based solutions

import dataclasses
import re
from datetime import datetime
from typing import ClassVar, Optional


@dataclasses.dataclass(frozen=True)
class NgxAccessLine:
    remote_addr: str
    remote_user: str
    ts: str | datetime
    request: str
    status: str | int
    body_bytes_sent: str | int
    http_referer: str
    http_user_agent: str

    DO_CAST: ClassVar[bool] = False

    # RegEx for default `combined` nginx access log format
    #   $remote_addr - $remote_user [$time_local]
    #   "$request" $status $body_bytes_sent
    #   "$http_referer" "$http_user_agent"
    access_line_pattern: ClassVar[re.Pattern] = re.compile(
        r'^'
        r'([0-9.]+) - (-|\S+) \[([^]]+)\] '
        r'"(.*)" (\d+) (\d+) '
        r'"(.*)" "(.*)"'
        r'$'
    )

    @classmethod
    def from_string(cls, s):
        if m := cls.access_line_pattern.match(s):
            kwargs = dict(zip(
                ('remote_addr', 'remote_user', 'ts',
                 'request', 'status', 'body_bytes_sent',
                 'http_referer', 'http_user_agent'),
                m.groups()
            ))

            if cls.DO_CAST:
                kwargs.update({
                    'ts': datetime.strptime(kwargs['ts'], '%d/%b/%Y:%H:%M:%S %z'),
                    'status': int(kwargs['status']),
                    'body_bytes_sent': int(kwargs['body_bytes_sent']),
                })

            return cls(**kwargs)

    @classmethod
    def fields(cls):
        return tuple(f.name for f in dataclasses.fields(cls))

    def as_tuple(self):
        return dataclasses.astuple(self)

    def as_dict(self):
        return dataclasses.asdict(self)


@dataclasses.dataclass(frozen=True)
class NgxErrLine:
    ts: datetime
    level: str
    pid: int
    tid: int
    cid: int | None
    msg: str
    # Additional attributes revealed from `msg`
    remote_addr: Optional[str] = None

    DO_CAST: ClassVar[bool] = False

    # Nginx error log format
    #   YYYY/MM/DD HH:MM:SS [level] pid#tid: *conn_number message
    # ref: nginx/src/core/ngx_log.c, nginx/src/core/ngx_connection.h
    error_line_pattern: ClassVar[re.Pattern] = re.compile(
        r'^'
        r'([\d/]+ [\d:]+) \[(\w+)\] (\d+)#(\d+):(?: \*(\d+))? (.*)'
        r'$'
    )

    msg_client_pattern = re.compile(r'\bclient: (\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b')

    @classmethod
    def from_string(cls, s):
        if m := cls.error_line_pattern.match(s):
            kwargs = dict(zip(
                ('ts', 'level', 'pid', 'tid', 'cid', 'msg'),
                m.groups()
            ))

            # Additional attributes revealed from `msg`
            if remote_addr := cls.msg_client_pattern.search(kwargs['msg']):
                kwargs.update({'remote_addr': remote_addr.group(1)})

            if cls.DO_CAST:
                kwargs.update({
                    'ts': datetime.strptime(kwargs['ts'], '%Y/%m/%d %H:%M:%S'),
                    'pid': int(kwargs['pid']),
                    'tid': int(kwargs['tid']),
                    'cid': int(kwargs['cid']) if kwargs['cid'] else None,
                })

            return cls(**kwargs)

    @classmethod
    def fields(cls):
        return tuple(f.name for f in dataclasses.fields(cls))

    def as_tuple(self):
        return dataclasses.astuple(self)

    def as_dict(self):
        return dataclasses.asdict(self)

Now we can compare Simple non-obj-oriented solution and dataclasses-based one (note: with or without DO_CAST option).

import bz2
import csv
import gzip
from datetime import datetime
from pathlib import Path

# from ngxlogz_namdtup import NgxAccessLine, NgxErrLine # obsoleted; for comparison
from ngxlogz import NgxAccessLine, NgxErrLine


IN_LOGDIR = Path('~/tmp/ngx').expanduser()
OUT_FILE_A = Path('~/tmp/ngx/az.csv.bz2').expanduser()
OUT_FILE_E = Path('~/tmp/ngx/ez.csv.bz2').expanduser()
DO_CAST = False


def ngx_access_linebyline(in_logdir=Path('/var/log/nginx'), glob_pattern='access.log-*'):
    """Generate log lines from all the nginx access.log files."""
    for fpath in sorted(Path(in_logdir).glob(glob_pattern),
                        key=lambda fp: fp.stat().st_mtime):
        with gzip.open(fpath, 'rt') if fpath.suffix == '.gz' else open(fpath) as fo:
            yield from map(str.rstrip, fo.readlines())

def ngx_err_linebyline(in_logdir=Path('/var/log/nginx'), glob_pattern='error.log-*'):
    """Generate log lines from all the nginx error.log files."""
    for fpath in sorted(Path(in_logdir).glob(glob_pattern),
                        key=lambda fp: fp.stat().st_mtime):
        with gzip.open(fpath, 'rt') if fpath.suffix == '.gz' else open(fpath) as fo:
            yield from map(str.rstrip, fo.readlines())

def dump_csv_bz2(fpath, header, rows):
    """A .csv.bz2 writer.
    Pass a generator as `rows`, to benchmark objects creation and transformations."""
    t0 = datetime.now()
    with bz2.open(fpath, 'wt', newline='') as fo:
        cw = csv.writer(fo)
        cw.writerow(header)
        cw.writerows(rows)
    print(datetime.now() - t0)


## ---------------------------------------------------------------------
## -- Simple non-obj-oriented solution, for performance comparison

print('Processing access-logs fast')

p = NgxAccessLine.access_line_pattern

def parse_access_logline(s, do_cast=DO_CAST):
    g = list(p.match(s).groups())
    if do_cast:
        g[2] = datetime.strptime(g[2], '%d/%b/%Y:%H:%M:%S %z')
        g[4] = int(g[4])
        g[5] = int(g[5])
    return g

dump_csv_bz2(
    OUT_FILE_A.with_suffix('.tmp.csv.bz2'),
    NgxAccessLine.fields(),
    map(parse_access_logline, ngx_access_linebyline(IN_LOGDIR))
)

## ---------------------------------------------------------------------

NgxAccessLine.DO_CAST = DO_CAST
NgxErrLine.DO_CAST = DO_CAST

print('Processing access-logs with our module utilizing dataclasses')

dump_csv_bz2(
    OUT_FILE_A,
    NgxAccessLine.fields(),
    (NgxAccessLine.from_string(s).as_tuple() for s in ngx_access_linebyline(IN_LOGDIR))
)

print('Processing error-logs with our module utilizing dataclasses')

dump_csv_bz2(
    OUT_FILE_E,
    NgxErrLine.fields(),
    (NgxErrLine.from_string(s).as_tuple() for s in ngx_err_linebyline(IN_LOGDIR))
)

An addition

NamedTuple-based solution

import re
from datetime import datetime
from typing import ClassVar, NamedTuple, Optional
class NgxAccessLine(NamedTuple):
    remote_addr: str
    remote_user: str
    time_local: datetime
    request: str
    status: int
    body_bytes_sent: int
    http_referer: str
    http_user_agent: str

    # Default `combined` nginx access log format
    #   '$remote_addr - $remote_user [$time_local] '
    #   '"$request" $status $body_bytes_sent '
    #   '"$http_referer" "$http_user_agent"'
    access_line_pattern: ClassVar[re.Pattern] = re.compile(
        r'^'
        r'([0-9.]{7,15}) - (.*) \[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} \+\d{4})\] '
        r'"(.*)" (\d+) (\d+) '
        r'"(.*)" "(.*)"'
        r'$'
    )

    @classmethod
    def from_string(cls, s):
        m = access_line_pattern.match(s)
        if m:
            (
                remote_addr, remote_user, time_local,
                request, status, body_bytes_sent,
                http_referer, http_user_agent
            ) = m.groups()
            return cls(remote_addr, remote_user, datetime.strptime(time_local, '%d/%b/%Y:%H:%M:%S %z'),
                       request, int(status), int(body_bytes_sent),
                       http_referer, http_user_agent)
class NgxErrLine(NamedTuple):
    ts: datetime
    level: str
    pid: int
    tid: int
    conn_number: int  # Optional uint, we use -1 if it is absent
    msg: str
    # Additional err-related attributes revealed from `msg`
    rel_ip: Optional[int] = None
    # rel_wtf: Optional[str] = None

    # Maintained nginx error log format
    #   YYYY/MM/DD HH:MM:SS [{level}] {pid}#{tid}: *{conn_number} {msg}
    # ref: nginx/src/core/ngx_log.c, nginx/src/core/ngx_connection.h
    # sample: 2021/04/19 17:48:31 [crit] 5797#5797: *1902 SSL_do_handshake() failed (SSL: error:14201044:SSL routines:tls_choose_sigalg:internal error) while SSL handshaking, client: 0.0.0.0, server: 0.0.0.0:443
    error_line_pattern: ClassVar[re.Pattern] = re.compile(
        r'^'
        r'(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[(\w+)\] (\d+)#(\d+):(?: \*(\d+))? (.*)'
        r'$'
    )

    @classmethod
    def from_string(cls, s):
        m = error_line_pattern.match(s)
        if m:
            (ts, level, pid, tid, conn_number, msg) = m.groups()

            kwargs = dict()
            rel_ip = re.findall(r'\bclient: (\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b', msg)
            if rel_ip and len(rel_ip) == 1:
                kwargs.update({'rel_ip': rel_ip.pop()})

            return cls(
                datetime.strptime(ts, '%Y/%m/%d %H:%M:%S'), level, int(pid), int(tid),
                int(conn_number) if conn_number else -1, msg, **kwargs
            )

Performance measurement

Not complete

As mentioned above, this section is a someday-deal.

Define functions that read logs and return them line by line:

def genaccesslines():
    for fn in Path(Path.cwd(), 'tmp/nginx-logz-0718').glob('access.log-*.gz'):
        with gzip.open(fn, 'rt') as fo:
            yield from map(str.rstrip, fo.readlines())

def generrorlines():
    for fn in Path(Path.cwd(), 'tmp/nginx-logz-0718').glob('error.log-*.gz'):
        with gzip.open(fn, 'rt') as fo:
            yield from map(str.rstrip, fo.readlines())

then, depending on decoding method

t0 = datetime.now()  # [TIMING]

with open('tmp/az.csv', 'w', newline='') as fo:
    w = csv.writer(fo)
    w.writerow(_.name for _ in dataclasses.fields(NgxAccessLine))
    w.writerows(map(dataclasses.astuple, map(NgxAccessLine.from_string, genaccesslines())))

with open('tmp/ez.csv', 'w', newline='') as fo:
    w = csv.writer(fo)
    w.writerow(_.name for _ in dataclasses.fields(NgxErrLine))
    w.writerows(map(dataclasses.astuple, map(NgxErrLine.from_string, generrorlines())))

print(datetime.now() - t0)  # [TIMING]
t0 = datetime.now()  # [TIMING]

with open('tmp/az.csv', 'w', newline='') as fo:
    w = csv.writer(fo)
    w.writerow(NgxAccessLine._fields)
    w.writerows(map(NgxAccessLine.from_string, genaccesslines()))

with open('tmp/ez.csv', 'w', newline='') as fo:
    w = csv.writer(fo)
    w.writerow(NgxErrLine._fields)
    w.writerows(map(NgxErrLine.from_string, generrorlines()))

print(datetime.now() - t0)  # [TIMING]