前回の記事でパイプ接続したプロセスをmapのまま返すとプロセスが終らないまま関数から帰ってきてしまうので、気持ち悪いからlist関数を使ったが、
bipset = list ( map ( lambda x: ipv4adrset(x.decode( 'UTF-8' ).rstrip( '\n' )), proc.stdout)) ↓ bipset = [ipv4adrset(x.decode( 'UTF-8' ).rstrip( '\n' )) for x in proc.stdout] |
の様に、わざわざmap使うより、下のようにリスト内包表記で直接リストを返すような仕組みを用いた方がスマートな気がした。とくに今回のようにmap直後にlistするような使い方の場合特に。
mapやfilterはイテレータなので直後にforなどのループ処理を行なう場合に適しており、リストそのものが欲しいときにはmapを使わずに[リスト内包表記]をするのが良いきがする。リスト内包表記やlist式のリスト生成では各項目の変換式(関数)が一度に使用され結果のリストが返されるが、イテレータでは生成段階では変換式(関数)は実行されず、for文など繰返し処理が行なわれる時に一つずつ実行されることになる。
たとえばファイルを読み込み関数があったとして、オープンしたファイルハンドルに対して読み出して検索や加工をする処理の結果をイテレータとして返してしまうと、関数からリターンした段階でファイルを閉じると、そのイテレータからは読み出せないという結果になる。
ということでその場合は、リストとして返す関数が正解となる。
あと、イテレータからリストに変換するときに一度繰返し処理が行なわれるという考え方からすると、for文に使用する前にリストに変換するのは効率が悪く。動作速度的にも不利になるようなので、そういう観点で使い分けていく目安になるのではないかと思う。
次に、ログファイルの解析部分。これも前回記事で、djangoの認証バックエンドをチョットいじって、ログイン失敗のログを残せるようにしたので、ここではそれを検出してリストとして返すこととする。ログファイルはApacheユーザーに読取り権限があるので、そのまま読見込めばいい。ただし、/var/logに保存している場合など、logrotateの処理が行なわれる場合それも追跡して行ないたい。また古いログはrotate時にgzip圧縮されるのでこれにも対応したい。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | from blocklib import ipv4adrset import os, glob, gzip logger = getLogger(__name__) import re, json # djangoのアクセスログ(json形式)を読み込に認証履歴のレポートを返す # 認証履歴のログはdjango認証バックエンドを継承して認証時のログ出力機能を追加したもの # [引数]djlog : 解析するアクセスログ(json形式)をフルパスで与える(gzipもOK) # Noneの場合は、デフォルト django_log_json を使用(logorotateされたものも含み全てを読み込む) # st : レポート対象の開始日時(logrotateされたファイルはこの日時より新しいものを読み込み対象とする) # ed : レポート対象の終了日時 # address : レポート対象のIPアドレスオブジェクト(Noneの場合は全て)サブネット検索対応 def get_django_loginfails(djlog, st, ed, address = None ): if djlog is not None : djlogs = [djlog] else : djlogs = [x for x in glob.glob(django_log_json + '*' ) if re.match( '.*(log|gz|[0-9])' ,x) and os.path.getmtime(x) > st.timestamp()] djlogs.sort(key = lambda x: os.path.getmtime(x)) retlist = [] for jlog in djlogs: logger.debug(jlog) with open (jlog, 'r' ) if not re.match( '.*\.gz$' , jlog) else gzip. open (jlog, 'rt' ) as jf: #jl = json.load(jf) decoder = json.JSONDecoder() jlines = map ( lambda x: decoder.raw_decode(x), jf) #aa = decoder.raw_decode(line) for jl in map ( lambda x: x[ 0 ], jlines): # fromisoformatで読み込むために末尾のZが邪魔なので削除 # UTCのタイムゾーンを設定してJST時刻に変換する ts = datetime.fromisoformat(jl[ 'timestamp' ].rstrip( 'Z' )).replace(tzinfo = timezone.utc).astimezone(tz_jst) ts = ts.replace(microsecond = 0 ) if ts > st and ts < ed: (dat, event) = (jl[ 'ip' ], jl[ 'event' ]) res0 = re.match( 'user logged in : (.+)$' , event) res1 = re.match( 'login faild.*: (.+)$' , event) event = False if res1 else True if res0 else None if event is None : continue name = res1[ 1 ] if res1 else res0[ 1 ] if res0 else '' if address is not None and address ! = ipv4adrset(dat): continue logger.debug( '%s : %s - %s(%s)' % ( str (ts), dat, name, event)) retlist.append( { 'name' :name, 'datetime' :ts, 'address' :ipv4adrset(dat), 'status' :event, 'source' : 'django' , } ) #print('%s : %s : %s' % (str(ts), dat, event)) return retlist |
ログはdjaongo.structlogでjson形式で出力したものを対象としている
- 17行目globクラスを使用して指定したファイル名で始まるlogrotateファイルのリストを作成する(検索開始日付stよりも新しいタイムスタンプのファイルという条件も含める)
- それぞれに対して、ファイルオープンし(拡張子gzだったらgzipオープンに変更)1行ずつ読み出し、対象パターンを見つけたら辞書形式のデータの配列として返す
ソースプログラムを貼付けてるけど、実際はそのままでは動かない一部の公開なので誰の役にも立たないただの覚え書きなのであった。
json形式だからさっくり読めるだろうとして始めたら何かエラーになってサッパリ読めない。。何でだろうと思って先人様たちの記事をたどって調べたら、こんなもんだと上記のようにデコーダを作って1行ずつ読み込んでいくようだ。
ぇぇぇーーー。同じくpython使ってjson形式のログを掃き出してるのになんで直接読めないん??ってなった。こんど時間があったら調べてみよう