Skip to content
Snippets Groups Projects
Commit 1abb7e6c authored by Mathieu Beligon's avatar Mathieu Beligon
Browse files

[common] (twitch-dset) Switch to opencv to extract robots views on the fly from the video

parent c7959fea
No related branches found
No related tags found
No related merge requests found
watchdog==0.9.0
argh==0.26.2
cycler==0.10.0
decorator==4.4.1
ffmpeg-python==0.2.0
future==0.18.2
imageio==2.6.1
kiwisolver==1.1.0
matplotlib==3.1.2
networkx==2.4
numpy==1.17.4
pathtools==0.1.2
Pillow==6.2.1
pyparsing==2.4.5
PyPrind==2.11.2
python-dateutil==2.8.1
PyWavelets==1.1.1
PyYAML==5.2
scikit-image==0.16.2
scipy==1.3.1
PyPrind==2.11.2
ffmpeg-python==0.2.0
six==1.13.0
tqdm==4.40.1
import sys
from dataclasses import dataclass
import ffmpeg
from pyprind import ProgBar
from watchdog.events import FileSystemEvent, FileSystemEventHandler
from watchdog.observers import Observer
from research.constants import TWITCH_DSET
class ThumbnailsGenerator:
FPS = 2
def __init__(self, video_name: str):
self.output_folder = TWITCH_DSET / 'raw-frames' / video_name
self.output_folder.mkdir(parents=True, exist_ok=True)
self.video_path = TWITCH_DSET / 'videos' / f'{video_name}.mp4'
def run(self):
frames = self._get_frame_number() * self.FPS
self._create_progress_bar(frames)
self._launch_creation(frames)
def _launch_creation(self, frames):
ffmpeg. \
input(str(self.video_path)). \
filter('fps', fps=self.FPS).\
output(f"{self.output_folder}/frame_%d.jpg", frames=frames). \
run(quiet=True)
@dataclass
class ProgressHandler(FileSystemEventHandler):
bar: ProgBar
def on_created(self, event: FileSystemEvent):
self.bar.update()
def __hash__(self):
return hash(self.__class__.__name__)
def _create_progress_bar(self, frames):
bar = ProgBar(frames, title='Creating thumbnails', width=100, stream=sys.stdout, update_interval=True)
obs = Observer()
obs.schedule(self.ProgressHandler(bar), str(self.output_folder), recursive=True)
obs.start()
def _get_frame_number(self):
return int(ffmpeg.probe(str(self.video_path))['format']['duration'].split('.')[0])
from os import remove
from pathlib import Path
from shutil import move
import numpy as np
from scipy.spatial import distance
from skimage import io
from skimage.transform import resize
from research.constants import TWITCH_DSET
RES_DIR: Path = TWITCH_DSET / 'robots-views'
RES_DIR.mkdir(parents=True, exist_ok=True)
ref_image = io.imread(f'{__file__}/../mask.jpg')
......@@ -19,21 +10,7 @@ _REF_IMG_MASKED = ref_image*_MASK[:, :, np.newaxis]
_THRESHOLD = 23
def is_image_from_robot_view(path_to_image: Path) -> bool:
img = io.imread(path_to_image)
img_masked = img * _MASK[:, :, np.newaxis]
def is_image_from_robot_view(image: np.ndarray) -> bool:
img_masked = image * _MASK[:, :, np.newaxis]
return distance.euclidean(img_masked.flatten() / 255, _REF_IMG_MASKED.flatten() / 255) < _THRESHOLD
def process_image(path_to_image: Path):
if is_image_from_robot_view(path_to_image):
res_path = RES_DIR / f'{path_to_image.parent.name}-{path_to_image.name}'
move(str(path_to_image), str(res_path))
print(f'{path_to_image.stem} is a robot view, moving it')
else:
remove(str(path_to_image))
def process_all_images_in_dir(dir_path: Path):
for path_to_image in dir_path.glob('*/*.jpg'):
process_image(path_to_image)
import sys
from pathlib import Path
import cv2
import ffmpeg
from pyprind import ProgBar
import numpy as np
from research.constants import TWITCH_DSET
from research.dataset.twitch.robot_view import is_image_from_robot_view
from research.dataset.twitch.video_frame_generator import VideoFrameGenerator
RES_DIR: Path = TWITCH_DSET / 'robots-views'
RES_DIR.mkdir(parents=True, exist_ok=True)
class RobotsViewExtractor:
FPS = 2
def __init__(self, video_name: str):
self.video_name: str = video_name
self.video_path = TWITCH_DSET / 'videos' / f'{video_name}.mp4'
self.frame_generator: VideoFrameGenerator = VideoFrameGenerator(self.video_path, self.FPS)
def run(self):
self._create_prog_bar()
self._start_extraction()
def _start_extraction(self):
for i, frame in enumerate(self.frame_generator.generate()):
self._process_frame(frame, i)
def _create_prog_bar(self):
self._prog_bar = ProgBar(
self._get_number_of_frames(),
title='Creating thumbnails',
width=100,
stream=sys.stdout,
update_interval=True,
)
def _process_frame(self, frame: np.ndarray, frame_number: int):
if is_image_from_robot_view(frame):
self._save_frame(frame, frame_number)
self._prog_bar.update()
def _save_frame(self, frame: np.ndarray, frame_number: int):
cv2.imwrite(f"{RES_DIR}/{self.video_name}-frame-{frame_number + 1:06}.jpg", frame)
def _get_number_of_frames(self):
return int(ffmpeg.probe(str(self.video_path))['format']['duration'].split('.')[0]) * self.FPS
from pathlib import Path
import cv2
import ffmpeg
class VideoFrameGenerator:
def __init__(self, video_path: Path, desired_fps: int):
self.video_path: Path = video_path
self.desired_fps: int = desired_fps
self.video_fps: int = self._get_video_fps()
def _get_video_fps(self):
return max(
int(stream['avg_frame_rate'].split('/')[0])
for stream in ffmpeg.probe(str(self.video_path))['streams']
)
def generate(self):
video = cv2.VideoCapture(str(self.video_path))
frame_rate = self.video_fps // self.desired_fps
count = 0
while 1:
is_unfinished, frame = video.read()
if not is_unfinished:
video.release()
return
if not count % frame_rate:
yield frame
count += 1
import sys
from research.dataset.twitch.make_thumbnails import ThumbnailsGenerator
from research.dataset.twitch.robots_views_extractor import RobotsViewExtractor
if __name__ == '__main__':
_video_name = sys.argv[1]
print(f'Fragmenting video {_video_name}')
ThumbnailsGenerator(_video_name).run()
RobotsViewExtractor(_video_name).run()
from time import sleep
from research.constants import TWITCH_DSET
from research.dataset.twitch.robot_view import process_all_images_in_dir
if __name__ == '__main__':
while 1:
process_all_images_in_dir(TWITCH_DSET / 'raw-frames')
sleep(.1)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment