問合せなどの共通メールを数人に転送処理しているのですが、時々しか転送されてこなくなりました。調べてみるとどうやら携帯キャリア側のなんらかのフィルタに引っかかって届かないようです。
携帯の迷惑メールフィルタの設定をいろいろ試すより何か良い方法は無いか考えていたのですが、メールを受信してLINEグループに通知できたら便利かもと思い検索。
LINE通知が簡単に出来る方法が見つかりました。
どうせならPythonでメールを受信してLINEに通知してしまえばPythonの勉強にもなったりしてスマートよね。と思いいろいろ調査してみました。
LINE通知の準備
1.パソコンから LINE にログインして、アクセストークンを取得する
※アカウント情報はLINEアプリの「設定>アカウント」から確認できます。それからログイン許可もチェックします。
「トークンを発行する」をクリックし、トークン名とトークルームを選択して「発行する」をクリックします。
次に表示される画面で発行されたトークンをメモ帳などで大事に保存しておきます。
2.LINEグループに、LINE Notify を招待する
LINEアプリでトークン発行時に選択したグループに、LINE Notify を招待します。
これでLINE通知の準備はOKです。
3.試しにメッセージを送ってみる
コマンドプロンプトを開き下記のコマンドを入力します。
1 |
curl -X POST -H "Authorization: Bearer [access_token]" -F "message=foobar" https://notify-api.line.me/api/notify |
※[access_token] 部分に発行したトークンを入力してください。
※foobar がメッセージとしてLINEに表示されます。
<参考>「コマンドラインから LINE にメッセージを送れる LINE Notify」
Python で新着メールを受信してデータベース保存とLINE通知する
1.動作環境
- Windows10(64ビット)
- VScode 1.25.1 ※デバッグに便利です
- Python 3.6.6
- chardet ※
python -m pip install chardet
でコマンドプロンプトからインストール
2.ソースコード
検索していたらほぼそのまま利用できる記事を見つけましたので動作確認しながら改良しました。
<参考>Python の標準ライブラリで新着メールだけを受信してDBに保存する
改良点
- LINE 通知処理の追加
- メール本文が取得できない(multipartの階層や文字コードの問題)対応
- 文字コードの取得処理
- データの削除処理の追加
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# -*- coding: utf_8 -*- import sqlite3 import poplib import email import chardet import urllib.request, urllib.parse from email.header import decode_header from email.utils import parsedate_to_datetime # msg から文字コードを取得 def get_jp_encoding_name(msg, char_code = 'iso-2022-jp'): try: enc = chardet.detect(msg) return enc['encoding'] except: return char_code # msg から name ヘッダを取得 def get_header(msg, name): header = '' if msg[name]: for tup in decode_header(str(msg[name])): if type(tup[0]) is bytes: charset = tup[1] if charset: header += tup[0].decode(tup[1]) else: header += tup[0].decode() elif type(tup[0]) is str: header += tup[0] return header # msg から本文を取得 def get_content(msg): try: if msg.is_multipart(): for pl in msg.get_payload(): if pl.get_content_type() == "multipart/alternative": for pla in pl.get_payload(): if pla.get_content_type() == "text/plain": break else: pla = pl if pla.get_content_type() == "text/plain": pl2 = pla.get_payload(decode=True) charset = pla.get_content_charset() if not charset: charset = get_jp_encoding_name(pl2) else: charset = get_jp_encoding_name(pl2, charset) try: pl2 = pl2.decode(charset, 'ignore') except: pl2 = pl2.decode() return pl2 else: if msg.get_content_type() == "text/plain": pl2 = msg.get_payload(decode=True) charset = msg.get_content_charset() if not charset: charset = get_jp_encoding_name(pl2) else: charset = get_jp_encoding_name(pl2, charset) try: pl2 = pl2.decode(charset, 'ignore') except: pl2 = pl2.decode() return pl2 except: import traceback traceback.print_exc() return "" # 指定した番号のメッセージを受信する def fetchmail(cli, msg_no): content = cli.retr(msg_no)[1] uidl = cli.uidl(msg_no).decode().split(' ')[-1] msg = email.message_from_bytes(b'\r\n'.join(content)) from_ = get_header(msg, 'From') date_hdr = get_header(msg, 'Date') if date_hdr: date = parsedate_to_datetime(date_hdr) else: date = None subject = get_header(msg, 'Subject') content = get_content(msg) return (uidl, subject, content, from_, date) # 新着メールのUIDL番号のリストを返す def find_newmail(cli, db): #uidl = cli.uidl()[1] remote_uidl = list(map(lambda elm: elm.decode().split(' '), cli.uidl()[1])) c = db.cursor() res = c.execute('SELECT uidl FROM mail').fetchall() local_uidl = map(lambda tup: tup[0], res) # サーバ上のUIDL番号のセットと、受信済みメールのUIDL番号のセットとの差集合をとる new_uidl = set(map(lambda elm: elm[-1], remote_uidl)) - set(local_uidl) return list(filter(lambda elm: elm[1] in new_uidl, remote_uidl)) # LINEにメッセージを送る def send_LINE(msg): edtMsg = ('日時: %s\n送信者: %s\n表題: %s\n本文: %s' % (msg[4], msg[3], msg[1], msg[2])) url = "https://notify-api.line.me/api/notify" method = "POST" obj = { "message" : edtMsg } data = urllib.parse.urlencode(obj).encode("utf-8") headers = { "Authorization" : "Bearer ここにトークン入れる", } req = urllib.request.Request(url, data=data, headers=headers, method=method) with urllib.request.urlopen(req) as res: res_body = res.read().decode("utf-8") return res_body # メールを受信してDBに保存する def receive_all(cli, db): newmail = find_newmail(cli, db) #count = len(newmail) c = db.cursor() for mail in newmail: msg = fetchmail(cli, mail[0]) c.execute("""INSERT INTO mail (uidl, subject, content, sender, sent_at) VALUES (?, ?, ?, ?, ?)""", msg) print('Date: %s, From: %s, Subject: %s' % (msg[4], msg[3], msg[1])) #print('\n%s\n' % (msg[2])) db.commit() # LINE Notify line = send_LINE(msg) print(line) c.close() # データベース作成 def setup(db): c = db.cursor() try: c.execute('CREATE TABLE mail (uidl text, subject text, content text, sender text, sent_at timestamp, created_at default current_timestamp, PRIMARY KEY(uidl))') db.commit() except: print('') c.close() # データの削除(30日前のもの) def db_delete(db): c = db.cursor() try: c.execute("DELETE FROM mail WHERE sent_at < date('now', '-30 days')") if c.rowcount > 0: print('%d 件削除されました。' % c.rowcount) db.commit() except sqlite3.Error as e: print('sqlite3.Error occurred:', e.args[0]) c.close() db = sqlite3.connect('MailToLINE.db') setup(db) cli = poplib.POP3_SSL('メールサーバ') cli.user('メール@アドレス') cli.pass_('メールパスワード') receive_all(cli, db) cli.quit() db_delete(db) db.close() |
LINE 通知処理の追加
「ここにトークンを入れる」に発行してコマンドラインで確認できたトークンを置き換えます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
# LINEにメッセージを送る def send_LINE(msg): edtMsg = ('日時: %s\n送信者: %s\n表題: %s\n本文: %s' % (msg[4], msg[3], msg[1], msg[2])) url = "https://notify-api.line.me/api/notify" method = "POST" obj = { "message" : edtMsg } data = urllib.parse.urlencode(obj).encode("utf-8") headers = { "Authorization" : "Bearer ここにトークン入れる", } req = urllib.request.Request(url, data=data, headers=headers, method=method) with urllib.request.urlopen(req) as res: res_body = res.read().decode("utf-8") return res_body |
メール本文が取得できない(multipartの階層や文字コードの問題)対応
mulutipartの階層になっている場合に文字コードや本文が取得できない事があったため参考にさせていただいたコードを改良しました。
さらに階層が増えることもあるようですが、私の環境では10数通上手く処理できておりますのでしばらくは様子見です。
文字コードが取得出来ない状況を考えて get_jp_encoding_name
を呼び出す様にしてます。
また、本文に機種依存文字(株)とか○付き数字とかがあるとdecodeに失敗するので pl2.decode(charset, 'ignore')
「'ignore'」を加えました。これで機種依存が無視されてdecodeされます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# msg から本文を取得 def get_content(msg): try: if msg.is_multipart(): for pl in msg.get_payload(): if pl.get_content_type() == "multipart/alternative": for pla in pl.get_payload(): if pla.get_content_type() == "text/plain": break else: pla = pl if pla.get_content_type() == "text/plain": pl2 = pla.get_payload(decode=True) charset = pla.get_content_charset() if not charset: charset = get_jp_encoding_name(pl2) else: charset = get_jp_encoding_name(pl2, charset) try: pl2 = pl2.decode(charset, 'ignore') except: pl2 = pl2.decode() return pl2 else: if msg.get_content_type() == "text/plain": pl2 = msg.get_payload(decode=True) charset = msg.get_content_charset() if not charset: charset = get_jp_encoding_name(pl2) else: charset = get_jp_encoding_name(pl2, charset) try: pl2 = pl2.decode(charset, 'ignore') except: pl2 = pl2.decode() return pl2 except: import traceback traceback.print_exc() return "" |
文字コードの取得処理
メール本文から文字コードを返してくれるので chardet はテキストファイルの読み込み時に利用されることが多い様です。
1 2 3 4 5 6 7 |
# msg から文字コードを取得 def get_jp_encoding_name(msg, char_code = 'iso-2022-jp'): try: enc = chardet.detect(msg) return enc['encoding'] except: return char_code |
データの削除処理の追加
30日過ぎたらデータを削除するものですが、別ファイルにして1日1回の実行でスケジュールを組んだ方が良いですね
1 2 3 4 5 6 7 8 9 10 11 12 |
# データの削除(30日前のもの) def db_delete(db): c = db.cursor() try: c.execute("DELETE FROM mail WHERE sent_at < date('now', '-30 days')") if c.rowcount > 0: print('%d 件削除されました。' % c.rowcount) db.commit() except sqlite3.Error as e: print('sqlite3.Error occurred:', e.args[0]) c.close() |
スケジュール実行の設定
Windowsのタスクスケジューラに登録するのですが、コード中の print コマンドをログファイルに保存したいので、バッチファイルを作成します。
MailToLINE.py の保存してあるフォルダにカレントディレクトリを移動してから実行します。
ログは>>を2つ付ける事で追加書き込みしてくれます。>1つだと常に上書きされてしまうので注意です。※存在しない場合は自動で新規作成してくれます。
1 2 3 4 |
@echo off d: cd \PyDev\MailToLINE python MailToLINE.py >> MailToLINE.log |
1.タスクスケジューラの起動
Windowsキー>Windows管理ツール>タスクスケジューラの順に起動します。
右側の操作から基本タスクの作成をクリックします。
名前を入力して「次へ」ボタンをクリックします。
1回限りにチェックし「次へ」ボタンをクリックします。
開始日と時間を設定し「次へ」ボタンをクリックします。
プログラムの開始にチェックして「次へ」ボタンをクリックします。
参照ボタンをクリックして、MailToLINE.bat を選択して「次へ」ボタンをクリックします。
完了をクリックしたときに・・・にチェックを付けて「完了」ボタンをクリックします。
ユーザがログオンしているかどうかにかかわらず実行するをチェックします。
トリガーで編集ボタンをクリックします。
繰り返し間隔にチェックを付けて1時間にします。また、継続時間を無制限にして「OK」ボタンをクリックします。
MailToLINEのプロパティ画面で「OK」ボタンをクリックします。
タスクスケジューラの左、タスクスケジューラライブラリをクリックして、中央に登録したタスクが登録されているか確認します。
これで1時間毎にメールを受信して新規メールがあった場合にLINEグループにメッセージが送られます。
15分間隔でも良いかもしれません。
最後に
文字コードの問題、削除処理の分割やLINEへのメッセージは1000文字までの制限がありますので超えた場合にどう処理するかなど、まだまだ改良の余地があります。
メールのタイトルに合わせて通知グループを振り分けるとか、別の処理を起動するとかいろいろ応用もできそうです。
まだ10数通のメールでしか確認がとれていないため何か問題が発生するかもしれません。
参考にして頂く場合は自己責任でよろしくお願いします。
基本的にメールソフトを利用して受信する様にしており、メールの転送も行った上でのあくまでも補助的な利用が目的で利用しておりますのであしからず。
先日、深夜2時1分に送られてきたメールが深夜3時にLINE通知されました。1時間間隔だとこういうことあるよねぇ。。。