重複した写真を一覧にして整理するソフトを書いた
目次
はじめに
旅行に行ったりすると写真がどんどん増えて、同じ写真なのか似た写真なのか分からなくなります。 HDDの容量も逼迫してきたため、整理するソフトを書きました。
画像のハッシュ化
pillowとimagehasというライブラリを使い、写真をハッシュ化します。
pip install Pillow imagehash
os.walkでフォルダ内を走査して、どんどんハッシュ化していきます。 最後、重複するリストを求めています。
import os
from PIL import Image
import imagehash
from tqdm import tqdm
def find_duplicates(directory):
# ファイルパスとそのハッシュの辞書
hashes = {}
# ディレクトリ内のすべてのファイルを走査
all_files = [os.path.join(foldername, filename) for foldername, _, filenames in os.walk(directory) for filename in filenames]
for filepath in tqdm(all_files, desc="Processing images", unit="file"):
try:
# 画像を開き、ハッシュを計算
with Image.open(filepath) as img:
h = imagehash.average_hash(img)
if h in hashes:
hashes[h].append(filepath)
else:
hashes[h] = [filepath]
except Exception as e:
print(f"Error processing {filepath}: {e}")
# 重複するファイルのリストを返す
return [paths for paths in hashes.values() if len(paths) > 1]
if __name__ == "__main__":
directory = input("Please enter the directory path: ")
duplicates = find_duplicates(directory)
if duplicates:
print("Found duplicates:")
for group in duplicates:
print("\n".join(group))
print("------")
else:
print("No duplicates found.")
これで十分なのですが、ローカルのファイル走査ですしもっと早くしたいです。 手軽に思いつくのが並列化ですね。やっていきましょう。
並列化
できあがったのがこちら。 ThreadPoolExecutorを使い、並列化を行いました。
使っているPCの性能と実環境でのパフォーマンスを確認しながら、最適なmax_workersの値を選択するとよいです。
import os
from PIL import Image
import imagehash
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
def compute_hash(filepath):
try:
with Image.open(filepath) as img:
h = imagehash.average_hash(img)
return (h, filepath)
except Exception as e:
print(f"Error processing {filepath}: {e}")
return None
def find_duplicates(directory, max_workers=15):
# ファイルパスとそのハッシュの辞書
hashes = {}
# ディレクトリ内のすべてのファイルを走査
all_files = [os.path.join(foldername, filename) for foldername, _, filenames in os.walk(directory) for filename in filenames]
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for result in tqdm(executor.map(compute_hash, all_files), total=len(all_files), desc="Processing images", unit="file"):
if result:
h, filepath = result
if h in hashes:
hashes[h].append(filepath)
else:
hashes[h] = [filepath]
# 重複するファイルのリストを返す
return [paths for paths in hashes.values() if len(paths) > 1]
if __name__ == "__main__":
directory = input("Please enter the directory path: ")
duplicates = find_duplicates(directory)
if duplicates:
print("Found duplicates:")
for group in duplicates:
print("\n".join(group))
print("------")
else:
print("No duplicates found.")
まとめ
とりあえずで作りました。 良ければどうぞ。