#include "fgc/ImagePipeline.h" #include "fgc/JpegXlEncoder.h" #include "fgc/Logger.h" #include #include namespace fs = std::filesystem; namespace fgc { ImagePipeline::ImagePipeline(IControlChannel& channel, std::function heading, Params params) : channel_(channel), heading_(std::move(heading)), params_(std::move(params)) {} ImagePipeline::~ImagePipeline() { stop(); } void ImagePipeline::start() { if (running_.exchange(true)) return; worker_ = std::thread(&ImagePipeline::run, this); } void ImagePipeline::stop() { if (!running_.exchange(false)) return; cv_.notify_all(); if (worker_.joinable()) worker_.join(); } void ImagePipeline::submit(const Frame& frame) { { std::lock_guard lock(mutex_); queue_.push(frame); } cv_.notify_one(); } void ImagePipeline::run() { while (running_) { Frame frame; { std::unique_lock lock(mutex_); cv_.wait(lock, [this] { return !queue_.empty() || !running_; }); if (!running_ && queue_.empty()) break; frame = std::move(queue_.front()); queue_.pop(); } process(frame); } } std::string ImagePipeline::labelFor(int cam_id) const { if (cam_id >= 0 && cam_id < static_cast(params_.labels.size())) return params_.labels[cam_id]; return "cam" + std::to_string(cam_id); } void ImagePipeline::process(const Frame& frame) { if (frame.channels != 1 && frame.channels != 3) { LOG_WARN << "Dropping frame with unsupported channel count " << frame.channels; return; } const int type = frame.channels == 1 ? CV_8UC1 : CV_8UC3; cv::Mat src(frame.height, frame.width, type, const_cast(frame.data.data())); cv::Mat img; cv::rotate(src, img, cv::ROTATE_90_COUNTERCLOCKWISE); if (params_.display) { cv::namedWindow("Display Image", cv::WINDOW_NORMAL); cv::resizeWindow("Display Image", img.cols / 4, img.rows / 4); cv::imshow("Display Image", img); cv::waitKey(10); } const std::string label = labelFor(frame.cam_id); const fs::path dir = fs::path(params_.output_dir) / label; std::error_code ec; fs::create_directories(dir, ec); const fs::path file = dir / (std::to_string(frame.timestamp_ms) + ".jxl"); if (params_.demo) { fs::copy_file(params_.demo_image, file, fs::copy_options::overwrite_existing, ec); if (ec) LOG_ERROR << "Demo copy failed: " << ec.message(); } else { bool ok = JpegXlEncoder::encodeToFile(file.string(), img.data, img.cols, img.rows, frame.channels, params_.jxl_distance, params_.jxl_effort); if (!ok) { LOG_ERROR << "Failed to encode " << file.string(); return; } } CamEvent ev; ev.tower = params_.tower; ev.camera = label; ev.heading_decideg = static_cast((heading_ ? heading_() : 0.f) * 10); ev.timestamp_ms = frame.timestamp_ms; channel_.publishCamEvent(ev); LOG_DEBUG << "Saved " << file.string() << " (" << img.cols << "x" << img.rows << ")"; } } // namespace fgc