FastAPI認証機構の追加

小規模djangoページの置き替えの為にFastAPIアプリケーションの勉強と実装実験を行っているが、次に用意したものは管理ページなどへアクセスするためのユーザー認証機構。前述の通りFastAPIはAPIの提供に特化したシンプルなフレームワークなので実用的な機能などは自前で準備する必要がある。かといって何も情報が無いかと言えばそうでも無く、公式のドキュメントやチュートリアルに必要そうなものが一通りまとめてあるので(例えばセキュリティの項目)、それをチマチマ実装していくという実に勉強になる仕組みが用意されている。

ドキュメント・チュートリアルで今回参考(コピペ)にしたのもは主に次の二点

  • セキュリティ
    • パスワード(およびハッシュ化)によるOAuth2、JWTトークンによるBearer
    • ユーザー認証の基本的な構造サンプル
  • SQL database(データベースアクセス)
    • SQLAlchemyを使用してFastAPIのパス構文に接続する方法

以上を組み合わせてユーザー情報をデータベースに保存し、ユーザー認証を行うような仕組みが実現出来る。

ただし、この段階ではAPIとしての実装なので実際に使用できるようにするには、フロントエンドの実装が必要となる。Djangoではバックエンド寄りの実装でHTMLテンプレートを用いてのUIだったが、今回のUIはFastAPI対応ではせっかくなのでシングルページの実装としてjavascript(Typescript)での実装としている。

こんな感じのログイン画面を準備してみました。ユーザー追加の画面や実際の管理画面は未実装だが、同じようなノリで追々追加していきます。

あと、このままではトークンの期限が切れる度に再認証になるので、セッションIDによるリフレッシュトークンの実装で同じ端末からはなるべくログインし続けられるような機構の導入を検討している(実装の手段はともかく見た目くらいはDjangoと同じくらいにしたいw)。

bootstrapをほぼ素で利用しているだけで何となく見た目だけでも良い感じなのが嬉しいですね。

FastAPI環境構築

前回の投稿にてDjangoの置き替えとしてFastAPIの勉強を始めてかなり時間が経ってしまいましたが、まずはDjangoの時と同じようにシングルページWEBアプリケーションのテンプレートのような形で勉強しようと思い、同様にDockerの環境構築からの覚え書きです。

  • PythonイメージベースのDockerfile
    • nodejsをインストールして、モジュールとしてTypescript、Bootstrap、SASSを追加
    • Pythonライブラリとしてfastapi関連ライブラリをインストール
    • 実行ユーザーappを作成してsudo権限を与える
  • nginxプロキシを追加したdocker-compose.yml
    • /codeをマウントしてここにアプリを配置する
    • startupスクリプトを実行させる(環境変数設定とtypescript、sassコンパイラの常駐設定、APIサーバー:uvicornのデーモン起動)

上記プロジェクトを用いてdocker-compose up -dすると、サーバーが起動してブラウザからhttp://localhost:8080/でアクセス出来るようになります。

WEBアプリケーションの実体はapp/main.pyです。サンプルではある程度サンプルページが表示されるまで作り込んでしまっていますが、覚え書きとして一番単純なアプリケーションから

from fastapi import FastAPI

app = FastAPI()

@app.get('/')
async def index():
    return {'detail': 'Hello FastAPI World.'}

として、保存。ブラウザからhttp://localhost:8080/で読み込むと、

一番単純なFastAPIアプリ

こんな感じで表示されます。

app/main.pyの1行目でライブラリを読み込んで、アプリケーションインタンスの生成(3行目)、5行目はHTTPリクエストメソッドを表す関数デコレータです。例ではルートパスに対するGETリクエスト。その下のdefが関数定義で、名前は分かりやすければ何でも良いです。この関数の戻り値がHTTPのレスポンスとしてブラウザに返されます。

見て分かるように、このアプリケーションで返されるレスポンスはHTMLではないため、結果の見た目が変です。

FastAPIはDjangoと異なりAPIが主体なのでデフォルトではJSONで返すのが基本のようです。Ajaxなどでデータのやりとりをするのに向いている感じ。

HTTP通信を使用したアプリケーションでデータのやりとりを行い、クライアント側で見た目を準備するような用途ではこのままで良さそうですが、普通のWEBアプリのようにブラウザから利用したい場合はひと工夫が必要です。

少し調べた感じでは、二通りの方法があるようです。

  1. Jinja2Templatesテンプレートライブラリを利用する
    • FastAPIのもととなるStarletteやFlaskでも使用されており、Djangoのテンプレートと同様に変数埋込で動的なHTML文書を返すことが出来る。
  2. スタティックマウントしたディレクトリでhtmlを有効にする
    • from astapi.staticfiles import StaticFiles
    • app.mount(“/html”, StaticFiles(directory=”/code/html”, html=True))
    • 上記のように追加することで http://localhost:8080/html/ で/code/htmlに保存されたindex.htmlが表示されるのでそのページに仕込んだjavascriptからFastAPIのAPIを利用することで、シングルページアプリケーションを実現することが出来る。

サンプルでは1の方法を採用して、ルートページを表示させるようにしている。2の方法をにするには@app.get(‘/’)の部分を他のパスに変更するか、レスポンスごと削除し、スタティックマウント先を’/’ルートにすると良い。

サンプルのようにHTMLテンプレートを利用したレスポンスがこちら。

Bootstrapでチョッピリレイアウトしてみた。

最初は2の方法が良いなと思ってみたけど、サンプルにあるようにエラーベージをHTMLでちゃんと表示させるにはテンプレートを使用した方が便利だし、どうせテンプレート使うならルートページもテンプレートで行こう!という結論となりました。

WEBアプリテンプレート(django+javascript)

code
code
nginx
nginx
Dockerfile
Dockerfile
docker-compose.yml
docker-compose.yml
conf.d
conf.d
uwsgi_params
uwsgi_params
project.conf
project.conf
project.conf
project.conf
/
/
[PROJECT]
[PROJECT]
docker
マウントポイント
docker…
サイト構築
ポイント
サイト構築 ポイント
プロジェクト構成
プロジェクト構成
Text is not SVG – cannot display
# 通常起動
$docker-compose up -d
# 再構築
CLEARSITE=1 docker-compose up -d
テンプレート表示状態

WEBアプリの勉強を始めた所、djangoを用いて同じ構成のサイトを何度も構築するのが面倒でテンプレートを作ることにした。しかしコレばっかりに夢中で肝心の勉強の方がおろそかになってきているのは、いつものお約束。

テンプレート構成

  • docker image 仕様(Dockerfile)
    • debianベースのpythonスリムイメージをベースとする
    • pythonライブラリインストールステージ:dbアクセス用ライブラリの為に一時的に開発ツール一式をインストール(後にスリム化処理)
    • nodejsは一旦aptパッケージインストールした後、npmで最新のバージョンをインストール
    • npmでtypescript、sass、bootstrapをインストール
    • その他チョットしたツール:gitやwget、アプリ実行用ユーザー(user)の作成とsudo権限設定
    • 実行コマンドは/code/startupシェルスクリプト:イメージにはダミーのシェルスクリプトを格納しているが、実際はサイト構築テンプレートとwusgi起動を含むシェルスクリプトを/code/startupとして作成しマウントして実行(後述)。
  • 起動:docker-compose構成
    • djangoは/codeをマウントしてサービスを実行
    • 環境変数CLEARSITE=1とすることでサイトを削除してテンプレートで再構築する(危険コマンド)
    • 構築用のstartupスクリプトが長いため、djangoサービスが始める前にnginxからのサービス要求がありロックすることがあったので、ヘルスチェックを行うようにしている(uwsgiのプロセスチェック)。
  • startupスクリプト(テンプレート構築&サービス起動)
    • 環境変数でサイト名アプリ名を定義し、サイト名のディレクトリが存在しない場合、またはCLEARSITEでアプリを削除している場合、テンプレートを用いてアプリケーション雛形を構築する。
    • Django-admin startupprojectでサイトを構築、settings.pyの必要な部分を順次編集。
    • python manage.py startappで最初のアプリケーションを作成(これを用いてシングルページアプリケーションの雛形とする)
    • アプリケーションの設定部分をsettings.pyに追記編集。bootstrapなどのミドルウェアの設定、LOGGING設定、環境変数読み込み設定。
    • ルート用urls.py、フロント側にデフォルト変数を送る機構context_processors.pyの雛形作成
    • アプリケーション定義用urls.py、views.pyの雛形作成、indexページとajax待ち受けの最低限構成、およびテンプレートHTMLのindex.html、base.html雛形作成。
    • python manage.py migrate :sqliteデータベースの初期化、管理者ユーザーの作成
    • スタティック領域の構成
      • bootstrapはnpmのグローバルエリアからローカルコピー(これは最新版に対応するため常に行う)
      • サイト用カスタムscssのテンプレート作成(再構築時)
      • sass常駐コンパイラの起動(上記scssファイルの更新と同時に自動コンパイル)
      • bootstrap関連その他js、およびtinymce設定
      • DOM構築用javascriptモジュールテンプレート(TSソース)設置、およびTypeScript初期化と設定ファイル初期編集(再構築時)
      • tsc常駐コンパイラ起動

テンプレートに凝りすぎてほぼほぼ自由度が下がってしまっている件。前回のDOM構文もTypescriptに移植して使える状態にして用意してます。細かい解説は折りを見て追記します。

HEIF→JPEG変換

ブログへの写真アップロードの件で少し考えたのが、以前のスマートフォンをiPhoneに変えたときに写真の画像ファイルフォーマットがデフォルトではjpegではなくheifになっていたので、互換性を重視するためにということでjpegに変更していた事。コマンドラインで一括変換する仕組みを導入するにあたって、改めてheifフォーマットの利点を調べてみた。ウィキペディアなどで調べてみると、画質はそのままで圧縮率が2倍以上と、保存用途としてもかなり良いため再びheifでの保存に切り替えることとする。

で、必要になってくるものがpythonでheifを扱うためのライブラリ’pyheif’

とりあえずbrewでインストール出来るということで、まずは必要なライブラリから

$ brew install libffi libheif (libde265) # 括弧はWSL2の場合

その後にpipでpythonライブラリのインストール

$ pip install pyheif

と、WSLではうまく行ったのだが、自分の環境のMacOSではpipでインストールエラーとなった。どうやらclangのインクルードパスが/opt/local/includeを要求しているのに対し、それが無いためらしい。内容を調べてみると、/opt/homebrewというディレクトリが/opt/localであれば色々解決しそうなので、試しにシンボリックリンクを張ることとした。

$ sudo ln -s /opt/homebrew /opt/local

これで旨く/opt/local/includeを見つけることが出来、pyheifライブラリがインストール出来た。

次にサイトを参考に先日のアップロード準備用リサイズプログラムwpresizeにheifデコード機能を組み込んでみた。

from PIL import Image
import pyheif

heif_file = pyheif.read("sample.heic")
image = Image.frombytes(
    heif_file.mode, 
    heif_file.size, 
    heif_file.data,
    "raw",
    heif_file.mode,
    heif_file.stride,
    )
image.save("output.jpeg", "JPEG")

ただし上記はあくまでも画像を変換するだけで、Exif情報はコピーされない。そこでpyheifからExif情報を取り出して出力JPEGファイルに付加する方法を調べてみた。

heif_file(pyheifオブジェクト)にはその他にmetadataというプロパティがあり、メタデータを格納する辞書形式(のリスト)であると説明される。リストと言っても内容は一つしかなく、そのゼロ番目の内容を調べると

type: 'Exif'
data: xxxxxx(バイナリ情報)

であったため、このdataの中身が撮影画像のExif情報だということが分かる。これを保存時に付加することでExif付きで保存することが出来る。

.....
exif = heif_file.metadata[0]['data']
.....
image.save("output.jpeg", "JPEG", exif=exif)

GPS情報だけ消したり、Exif情報を編集したいのであれば、前回同様に一度辞書形式に変換してやれば編集可能となる。

exif = heif_file.metadata[0]['data']
exif_dict = piexif.load(exif)
del exif_dict['GPS']
exif = piexif.dump(exif_dict)
image.save("output.jpeg", "JPEG", exif=exif)

結局メタデータ自体が2重に辞書保管されるような形になっていたのが分かりにくかったです。

あともう一点注意すべき点は、Exifの記載されている画像の回転情報。カメラ構え方で画像の縦横を変えて表示したいが、オリジナル画像から縦横変換をしようとすると画素の並び順から変えなければならず、再圧縮からの点からも非効率的(勝手な想像)だと思うので、Exifの情報として画像をどの様に表示するのかの向き(回転)情報を保管している。サムネイルやビューワはこの情報をもとに画像を表示している(GIMPなどの編集ソフトはこの情報をもとにするが、向きに合わせてオリジナルのピクセル情報を並び替えるのかを聞いてくる)。

そういった意味かと思うが、heif_fileで変換されたイメージはExif情報は保存されないが、Exifの回転情報をもとにピクセルを並び替えているようで、オリジナルを右90度回転した縦長画像を変換しても、縦長画像として出力される。これに対して右90度回転したExif情報を付加すると、さらに画像回転情報が加わり横倒しになった画像になる。

この辺りの仕組みを変換時に制御出来るかどうか調べられなかったので、とりあえずは変換時にオリジナルの回転処理がされるものとして、保存するExif情報のうち回転情報のみ1(正転)に戻しておくこととした。

exif = heif_file.metadata[0]['data']
exif_dict = piexif.load(exif)
del exif_dict['GPS']
exif_dict['0th'][274] = 1  # 回転情報を保持するExifアドレス(右回転の場合6になってるので正転1とする)
exif = piexif.dump(exif_dict)
image.save("output.jpeg", "JPEG", exif=exif)

対処療法的ではあるが、とりあえずこれでExifを保管し画像の向きを保持して変換出来るようになった。

前回のwpresizeに組み込んだところがこちら(masterにコミットしてしまったので前回のリンクもheif対応の物になってますw)。

pyheifオブジェクトに関しては詳しい資料が見つからなかったので、自分で調べる形になってしまった。とりあえず結果オーライで暫く運用ということで

2022年明けましての投稿

新年のネタは、3Dプリンタによる鏡餅。丁度白シルクのフィラメントがあったので8cmサイズで製作しました。ミカンは本物です。A4ペーパーを二つ折りにして、裏白はインクジェット用OHPシートに印刷してそのまま乗せてますw

昨日の投稿では旅行物だけあった写真が多く、貼り付ける写真について一枚一枚、GIMPで開いてサイズ変更して上書きセーブしてアップロードという手順を取っていたため非常に面倒で時間のかかる作業だった。前々から気になっていたが何となく惰性で過ごしていた感がある。

それに加えて、こちらの環境ではSafariとWorpressのライブラリの相性が悪く、非常に反応が遅く、下手するとハングアップしてしまうので、このブログ書きの時だけMicrosoftのEdgeを起動していたりします。

その部分は差し当たっては仕方の無いことで、追々と問題を解決しなければと思うところですが、とりあえず画像のサイズ変換アップロード環境だけは早急に何とかしないと、ということで、pythonスクリプトで画像サイズ変換プログラムを自作しました(リサイズだけでたいしたことはしていない為)。

Pillowという画像ライブラリとpiexifというExif編集ライブラリを使用しています。長手方向の幅を2000以下としてファイル名を変えて(-fオプションを付けるとオーバーライト)保存する単純なプログラム。

ついでにExif情報のうちGPS情報を削除します。

まぁ自分の一眼レフカメラはGPS持ってないし、iPhoneの写真をMacからアップするときは写真をフォルダコピーした段階でGPS情報は消されているっぽいので、普通には問題無いのだが、OneDriveのカメラロールに保存された写真とかたまにGPS情報がベッタリ貼られたまま流通しがちなので注意が必要です。

Pillowとpiexifを使用したExif編集の仕組みは以下の通り。

import piexif
from PIL import Image

img = Image.open('sample.jpeg')
exif = img.info['exif']
exif_dict = piexif.load(exif).  # Exif辞書情報の取り出し

# 取り出した辞書情報を編集します
del exif_dict['GPS']     # GPS情報を削除

# 編集後保存可能なバイナリデータに戻します
exif = piexif.dump(exif_dict)

# リサイズなどした新しい画像データの作成
img_new = img.resize([with, height], Image.BICUBIC)

# 編集したExifを適用して保存します
img_new.save('output.jpeg', quality=95, exif=exif)

久々のプログラミング。そういえばMacでの開発環境は作っただけで実践は初めてだった。。。いろいろライブラリとか入れ忘れてたりして意外とドタバタしました。

不正アクセスログ集計ツールその後

以前の記事で不正アクセスの疑いのあるIPアドレスを集計してアクセスブロックするツールを作成し運用していることと、docker導入に当たってブロックの仕組みを再検討して運用している記事を上げていましたが、もう一つポータルサイトのWEBアプリへの不正アクセス(django管理画面やwordpressへの不正ログイン)を集計する仕組みも紹介しました。実はこの仕組みに関してもdocker化で動作しなくなっていました。

データベースに保管された不正ログイン情報(Login_History)は別コンテナのデータベースサーバーにあるので、localhostからコンテナ名に変更するだけで良いのだが、ブロック済みのIPアドレスリストを取得して新規IPアドレスのみを表示する仕組みについては、ブロックリストがホストで管理されているためコンテナ内からはアクセス出来ない。ブロックリストを保管している/etc/ufw/before.rulesをコンテナに強引にマウントしてしまう方法もあるが、あまりスマートではない。

ということでこれまた興味本位の勉強を兼ねてホスト側からdockerブリッジネットワークへのサービスとしてソケット通信でブロックリストを公開する仕組みを作成した。

  • blocklist-server
#!/usr/bin/env python3
#--*-- coding: utf-8 --*--
# 対象ネットワーク: docker bridge network([shared] 192.168.29.0/24)
# ブロックリストホスト: 192.168.29.1
# 起動(su): /usr/local/sbin/blocklistsrv.py | /usr/bin/logger -i -t blocklistsrv 2>&1 &
# 停止: ps aux | grep blocklistsrv  -> kill -9
# ポート確認: lsof -i :8765  -> kill -9

import os, sys, socket
import psutil
import argparse
import re
import numpy as np

import daemon
from daemon import pidfile

pfile = '/var/run/blocklist-server.pid'

ipset_ufw = '/etc/ufw/before.rules'
ipset_start_line = 'blacklist-ipset-S'
ipset_end_line = 'blacklist-ipset-E'

svport = 8765
svhost = '192.168.29.1'

def getArgs():
    parser = argparse.ArgumentParser('blocklist service cmd')
    parser.add_argument('cmd', nargs='?', help='(start|stop|restart)')
    parser.add_argument('-l', '--syslog', action='store_true', help='enable syslog output')
    return parser.parse_args()

args = getArgs()

ーーー省略ーーー
def blserver():
    socket1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    socket1.bind((svhost, svport))
    socket1.listen(5)

    logger.info('Start Blocklist server. port : %d', (svport))
    while True:
        try:
            clientsocket, address = socket1.accept()
            blist = []
            logger.info(f"Connection from {address} has been established!")
            with open(ipset_ufw, 'r', encoding='utf-8') as fi:
                for line in fi:
                    if re.search(ipset_start_line, line):
                        break
                for line in fi:
                    if res:= re.match('^-A ufw-before-input -s ([0-9./]+) .*', line):
                        #logger.debug(res[1])
                        blist.append(res[1])
                    if re.search(ipset_end_line, line):
                        break
        #except KeyboardInterrupt:
        #    #clientsocket.close()
        #    exit('exit with keyboard interrupt!')
        except FileNotFoundError:
            logger.error('file not found! : %s' % (ipset_ufw))
        except PermissionError:
            logger.error('can\'t read %s, please run as superuser!' % (ipset_ufw))
        finally:
            clientsocket.send('\n'.join(blist).encode('utf-8'))
            clientsocket.close()

        ## blist 送信 

def main():
    #args = getArgs()
    #logger = getLoggerConf(args.syslog)
    if args.cmd == 'start':
        pass
    elif args.cmd == 'stop' or args.cmd == 'restart':
        # デーモンストップ
        if os.path.exists(pfile):
            try:
                logger.info(pfile + " is exits")
                pid =  open(pfile, 'r').read().rstrip()
                proc = psutil.Process(int(pid))
                proc.terminate()
                os.remove(pfile)
                logger.info('pid : %s removed.' % pid)
            except PermissionError:
                logger.error('PID File, permission error, please run as super user.')
                exit(0)
            #except:
            #    logger.error('some error occured.')
        if args.cmd == 'stop':
            exit(0)
    else:
        logger.error('operation command is one of (start|stop|restart)')
        exit(1)

    # デーモン起動
    with daemon.DaemonContext(pidfile=pidfile.TimeoutPIDLockFile(pfile)) as context:
        logger.info(context.is_open)
        blserver()


    #exit(0)



if __name__ == '__main__':
    main()


start, stop, restart機能を持つデーモンスクリプトを作成し、ubuntuホストへサービス登録

  • /usr/lib/systemd/system/blocklist.service
[Unit]
Description=blocklistsrv: this is service of ufw blocking IP address list.
After=docker.service

[Service]
ExecStart=/usr/local/sbin/blocklist-service -l start
ExecStop=/usr/local/sbin/blocklist-service -l stop
ExecReload=/usr/local/sbin/blocklist-service -l restart
Restart=no
Type=simple

[Install]
WantedBy=multi-user.target

ここでsudo systemctl start blocklistでサービスが起動される。ちなみに恒久的に起動するようにするにはsudo systemctl emable blocklistとする。

クライアントであるdocker内のdjangoのコードも下記のようにソケット対応に変更。

blocklist_socket_server = ('192.168.29.1', 8765) 

def get_blocklist():                                                                                                    
    """                                                                                                                 
    root権限でブロックリスト取得コマンドを走らせて標準入力からブロックアドレスのリストを返す                            
    """                                                                                                                 
    socket1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)                                                         
    socket1.connect(blocklist_socket_server)                                                                            
    bipset = [ ipv4adrset(ips) for ips in socket1.recv(4096).decode('utf-8').split('\n') ]                              
    return bipset                                                                                                       

とりあえず動作するようになったので一安心。

名前付きなUNIXソケットでも良かったけど、またソケットファイルをコンテナにマウント〜となってしまうので、dockerネットワークのゲートウェイから見せるソケットサーバーとしました。こんな感じで良いのだろうか。まぁ特に重要なサービスでもないので結果オーライで運用中です。

不正アクセスログ集計ツール⑥

最近たまに書くというとこればかりの話になっているが、その割には一向に進まない・・・

djangoへの実装だが、今回はデータベースを使用するわけでもないテストページなので新たにアプリケーションを追加することなく、ポータルのトップページにコッソリぶら下げることとする。(とりあえずログインユーザーのみのアクセス制限は掛ける予定として)

まずはurls.pyへのページ追加

app_name = 'top'

urlpatterns = [
    path('', views.IndexView.as_view(), name='index'),
    path('iplog/', views.BadIPView.as_view(), name='badip'),
    path('ipsum/', views.IpSumView.as_view(), name='iplog'),
]

indexは通常のポータルページで、今回 iplogとbadipという名前のページを追加する。iplogが不正アクセスの疑いのある(何回もログイン失敗を繰り返す輩)のIPアドレスとアクセス回数のリスト、badipがそのIPアドレスのアクセス履歴を表示するページ。

どちらのクラスもデータベースを使用せず、サーバー側のデータをリスト化して参照するので、まずはIpSumViewクラスから。

class IpSumView(generic.ListView):
    template_name = 'top/ipsumlist.html'
    context_object_name = 'ipaddress_records'
    queryset = None
    paginate_by_default = 20
    form1 = None
    form_initial = {}

template_nameは使用するテンプレートのパス名、context_object_nameはテンプレートで使用するリスト。querysetは通常であればデータベースの問い合わせとして、モデルオブジェクトとの接続を行なうが、ここではNoneとしてメソッドのget_queryset()をオーバーライドして先のアクセス解析関数からの戻り値のリストとして返す

    def get_queryset(self):
        """URL引数を取り出すサンプルとデータベースの代わりに動的にリストを作成する
        """
        days = int(self.request.GET.get('days', 1))
        max = int(self.request.GET.get('count', 1))
        srt = self.request.GET.get('srt', 'count')
        src = self.request.GET.get('src', 'bt')
        return get_login_report(get_datetime_hours_ago(days * 24), get_datetime_hours_ago(0), max, src, srt)

実際はフォームのためのパラメータセットなどがあるが省略。
リクエストGETでパラメータを設定するが、パラメータがない場合の初期値を取るため上記のような書き方となっている。get_login_reportがアクセス集計関数

def get_login_report(st, ed, max = 1, src = 'bt', srt = 'count'):
    iplist = {}
    #hists = get_login_history(st, ed)
    list1 = get_django_loginfails(None, st, ed) if src != 'wp' else []
    list2 = get_wp_loginfails(st, ed) if src != 'dj' else []
    hists = list1 + list2
    blist = get_blocklist()
    for adr in map(lambda x: x['address'], hists):
        iplist[adr] = iplist[adr] + 1 if adr in iplist.keys() else 1
    retv = [{'address':adr, 'count': cnt} for adr, cnt in iplist.items() if cnt >= max and adr not in blist]
    if srt == 'count':
        retv.sort(key = lambda x: x['count'], reverse=True)
    else:
        retv.sort(key = lambda x: hash(x['address']), reverse=True)
    return retv

wordpressのログイン履歴データベースからのリストと、djangoポータルページへの履歴を合せて、アクセス回数ごとのリストに変換。指定回数以上アクセスした対象を返す関数となっている。

左がとりあえずの完成形となる。BootstrapのNavibar以外は装飾無しの素のHTMLなので飾りっ気一切無し。この辺の見栄え関連は次回以降勉強していくことにする。

めっちゃくちゃ端折った感じで完成してますが、所詮覚え書きなので、ここまでの作り込みに関して気が付いたときに勘所とかを追記していこうと思います。

不正アクセスログ集計ツール⑤

いよいよ集計アプリをdjango WEBアプリに実装。その前に、WordpressのLoginHistoryプラグインのログも拝借するコードを追加

from datetime import datetime, timezone, timedelta, tzinfo
import MySQLdb

tz_jst = timezone(timedelta(hours=9), name='JST')

def get_wp_loginfails(st, ed, address=None):
    """
    wordpressのデータベースから認証履歴のレポートを返す(要:プラグイン User Login History)
    [引数] st : レポート対象の開始日時
        ed : レポート対象の終了日時
        address : レポート対象のIPアドレスオブジェクト(Noneの場合は全て)サブネット検索対応
    """
    user = '<USER>'
    passwd = '<passwork>'
    host = 'localhost'
    db='<dbname>'
    rows = None
    with MySQLdb.connect(user = user, passwd = passwd, host = host, db = db) as conn:
        with conn.cursor() as cur:
            (stat1, ed1) = (x.astimezone(timezone.utc).replace(tzinfo=None) for x in (st, ed))
            sql = 'select username, time_login, ip_address, login_status from wp_fa_user_logins'\
                ' where time_login > %s and time_login < %s' #and login_status = %s'
            cur.execute(sql,(stat1, ed1)) # <> 'login'
            rows = cur.fetchall()
    # rows 集計
    rows = rows if address is None else [row for row in rows if ipv4adrset(row[2]) == address]
    retlist = [
        {
            'name':row[0],
            'datetime':row[1].replace(tzinfo=timezone.utc).astimezone(tz_jst),
            'address':ipv4adrset(row[2]),
            'status':False if row[3]=='fail' else True,
            'source':'wordpress',
        } for row in rows]
    return retlist

なんやらいろいろゴチャゴチャやってるけど、とりあえずmysqlに貯め込んでいるログイン情報を指定期間分取り出してくる。

不正アクセスログ集計ツール④

前回の記事でパイプ接続したプロセスをmapのまま返すとプロセスが終らないまま関数から帰ってきてしまうので、気持ち悪いからlist関数を使ったが、

bipset =  list(map(lambda x: ipv4adrset(x.decode('UTF-8').rstrip('\n')), proc.stdout))
 ↓
bipset =  [ipv4adrset(x.decode('UTF-8').rstrip('\n')) for x in proc.stdout]

の様に、わざわざmap使うより、下のようにリスト内包表記で直接リストを返すような仕組みを用いた方がスマートな気がした。とくに今回のようにmap直後にlistするような使い方の場合特に。

mapやfilterはイテレータなので直後にforなどのループ処理を行なう場合に適しており、リストそのものが欲しいときにはmapを使わずに[リスト内包表記]をするのが良いきがする。リスト内包表記やlist式のリスト生成では各項目の変換式(関数)が一度に使用され結果のリストが返されるが、イテレータでは生成段階では変換式(関数)は実行されず、for文など繰返し処理が行なわれる時に一つずつ実行されることになる。

たとえばファイルを読み込み関数があったとして、オープンしたファイルハンドルに対して読み出して検索や加工をする処理の結果をイテレータとして返してしまうと、関数からリターンした段階でファイルを閉じると、そのイテレータからは読み出せないという結果になる。

ということでその場合は、リストとして返す関数が正解となる。

あと、イテレータからリストに変換するときに一度繰返し処理が行なわれるという考え方からすると、for文に使用する前にリストに変換するのは効率が悪く。動作速度的にも不利になるようなので、そういう観点で使い分けていく目安になるのではないかと思う。

次に、ログファイルの解析部分。これも前回記事で、djangoの認証バックエンドをチョットいじって、ログイン失敗のログを残せるようにしたので、ここではそれを検出してリストとして返すこととする。ログファイルはApacheユーザーに読取り権限があるので、そのまま読見込めばいい。ただし、/var/logに保存している場合など、logrotateの処理が行なわれる場合それも追跡して行ないたい。また古いログはrotate時にgzip圧縮されるのでこれにも対応したい。

from blocklib import ipv4adrset
import os, glob, gzip
logger = getLogger(__name__)
import re, json

# djangoのアクセスログ(json形式)を読み込に認証履歴のレポートを返す
# 認証履歴のログはdjango認証バックエンドを継承して認証時のログ出力機能を追加したもの
# [引数]djlog : 解析するアクセスログ(json形式)をフルパスで与える(gzipもOK)
#              Noneの場合は、デフォルト django_log_json を使用(logorotateされたものも含み全てを読み込む)
#       st : レポート対象の開始日時(logrotateされたファイルはこの日時より新しいものを読み込み対象とする)
#       ed : レポート対象の終了日時
#       address : レポート対象のIPアドレスオブジェクト(Noneの場合は全て)サブネット検索対応
def get_django_loginfails(djlog, st, ed, address=None):
    if djlog is not None:
        djlogs = [djlog]
    else:
        djlogs = [x for x in glob.glob(django_log_json + '*') if re.match('.*(log|gz|[0-9])',x) and os.path.getmtime(x) > st.timestamp()]
        djlogs.sort(key=lambda x: os.path.getmtime(x))
    retlist = []
    for jlog in djlogs:
        logger.debug(jlog)
        with open(jlog, 'r') if not re.match('.*\.gz$', jlog) else gzip.open(jlog, 'rt') as jf:
            #jl = json.load(jf)
            decoder = json.JSONDecoder()
            jlines = map(lambda x: decoder.raw_decode(x), jf)
                #aa = decoder.raw_decode(line)
            for jl in map(lambda x: x[0], jlines):
                # fromisoformatで読み込むために末尾のZが邪魔なので削除
                # UTCのタイムゾーンを設定してJST時刻に変換する
                ts = datetime.fromisoformat(jl['timestamp'].rstrip('Z')).replace(tzinfo=timezone.utc).astimezone(tz_jst)
                ts = ts.replace(microsecond=0)
                if ts > st and ts < ed:
                    (dat, event) = (jl['ip'], jl['event'])
                    res0 = re.match('user logged in : (.+)$', event)
                    res1 = re.match('login faild.*: (.+)$', event)
                    event = False if res1 else True if res0 else None
                    if event is None:
                        continue
                    name = res1[1] if res1 else res0[1] if res0 else ''
                    if address is not None and address != ipv4adrset(dat):
                        continue
                    logger.debug('%s : %s - %s(%s)' % (str(ts), dat, name, event))
                    retlist.append(
                        {
                            'name':name,
                            'datetime':ts,
                            'address':ipv4adrset(dat),
                            'status':event,
                            'source':'django',
                        }
                    )
                    #print('%s : %s : %s' % (str(ts), dat, event))
    return retlist

ログはdjaongo.structlogでjson形式で出力したものを対象としている

  • 17行目globクラスを使用して指定したファイル名で始まるlogrotateファイルのリストを作成する(検索開始日付stよりも新しいタイムスタンプのファイルという条件も含める)
  • それぞれに対して、ファイルオープンし(拡張子gzだったらgzipオープンに変更)1行ずつ読み出し、対象パターンを見つけたら辞書形式のデータの配列として返す

ソースプログラムを貼付けてるけど、実際はそのままでは動かない一部の公開なので誰の役にも立たないただの覚え書きなのであった。
json形式だからさっくり読めるだろうとして始めたら何かエラーになってサッパリ読めない。。何でだろうと思って先人様たちの記事をたどって調べたら、こんなもんだと上記のようにデコーダを作って1行ずつ読み込んでいくようだ。
ぇぇぇーーー。同じくpython使ってjson形式のログを掃き出してるのになんで直接読めないん??ってなった。こんど時間があったら調べてみよう

不正アクセスログ集計ツール③

パート3と思ったけど、ちょっとdjango開発環境にて問題発生。

Ubuntuサーバー(さくらVPS)上のコンテンツを編集するのに、いろいろゴタゴタした顛末を下記に

  • sshログイン出来るユーザーでremote-SSHのVScodeでdjangoプロジェクトのコンテンツ編集を行なう。
  • このままではプロジェクトのファイル所有者やアクセス権がユーザーのものになってしまうので、Apacheユーザーで動作する本番環境を直接編集するわけには行かない。
  • そこで一旦GitHubプライベートリポジトリにpushして、Apacheユーザーで本番環境にpullすることで動作させることとした。
  • しかし、ちょっとした修正さえ編集して動作を確認するためにpush/pullを繰り返すこととなり、Git履歴がメッチャ汚くなる(一旦pushしてしまうと例え一人で使用しているとはいえrebaseとか履歴編集が容易ではない)。
  • いっそApacheユーザーでSSHログイン出来るようにしようかと、血迷ったことを考えたが、すんでの所で思いとどまる。
  • 既にApache2環境にデプロイ済のポータルプロジェクトだが、ユーザー環境ではもう一度python manage.py runserverで動作させてある程度動作検証した後に本番環境へpushするという流れにしようとした。
  • 注意点は、データベースは同じものを使用するので壊さないように注意。
  • ログファイルなど本番用と混ざらないように、別ログになるようにする
# settings.py で動作中のユーザー名を得る(テスト環境であればローカルユーザーになる)
RUN_USER = pwd.getpwuid(os.stat(__file__).st_uid).pw_name

# ユーザー名でApache2で動作中かどうかを判別してログのベースディレクトリを変更
LogBase = '/var/log/django' if RUN_USER == APACHE_USER else os.path.join(BASE_DIR, '.log')

# あとはログの仕様で設定する
  • 一応デプロイ後のものと同じ構成でdjango runserverテスト環境でも動作することを確認できたが、画像であるとかstaticが読み込めなくなっていた。(Apacheではサーバーの設定でstaticのエイリアス設定をしていたので、djangoのURLconfでの動作はあまり見ていなかった)
  • 調べたところ、静的ファイルの設定はsettings.pyでSTATIC_URLとSTATIC_ROOTで設定してあったのだが、ウチの環境では動作せず(素の状態でdjango-admin startproject XXXX して試してみたがstaticディレクトリを読み取ることが出来なかった)。代わりにSTATICFILES_DIRSという配列を設定することで無事読み込めることが分かった。
STATIC_URL = '/static/'    # これはOK

STATIC_ROOT = os.path.join(BASE_DIR, 'static')
#   ↓
STATICFILES_DIRS = [
    os.path.join(BASE_DIR, 'static'),
]

  • STATIC_ROOTとSTATICFILES_DIRSを同時に設定するのはダメ。
  • djangoの素の状態でこれでstaticが読み込めることが確認できたが、本番環境からpullしてきたポータルプロジェクトでは上記の修正を行なってもstaticが読み込めなかった。数時間掛けて原因究明した結果、単に404 Not Foundの状態をブラウザが覚え続けていただけで、ブラウザキャッシュクリアすることで無事読み込めるようになった。お粗末。

何ともアホらいいオチでしたが、ブラウザキャッシュは意外に見落としがち。以前のサーバー引越の時も新サーバーに以前のドメインを割り当ててても一向に変化が無くて困ったのも、結局はブラウザキャッシュが原因でした。