Compleated Code

This commit is contained in:
elvis
2023-08-24 19:46:27 +02:00
parent 1c487e1a23
commit 1f2827a100
22 changed files with 4506 additions and 1 deletions

29
.gitignore vendored
View File

@ -1,4 +1,31 @@
# ---> C++
# General
.DS_Store
.AppleDouble
.LSOverride
# Icon must end with two \r
Icon
# Thumbnails
._*
# Files that might appear in the root of a volume
.DocumentRevisions-V100
.fseventsd
.Spotlight-V100
.TemporaryItems
.Trashes
.VolumeIcon.icns
.com.apple.timemachine.donotpresent
# Directories potentially created on remote AFP share
.AppleDB
.AppleDesktop
Network Trash Folder
Temporary Items
.apdisk
# ---> c++
# Prerequisites
*.d

37
CMakeLists.txt Normal file
View File

@ -0,0 +1,37 @@
cmake_minimum_required(VERSION 3.10)
project(main)
# exports compile_commands.json for the IDE
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
include_directories(BEFORE SYSTEM ${PROJECT_SOURCE_DIR}/../fastflow/)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED True)
if(UNIX AND NOT APPLE)
set(CMAKE_CXX_COMPILER "/usr/bin/g++")
set(CMAKE_CXX_FLAGS "-Wall")
else()
set(CMAKE_CXX_COMPILER "/opt/homebrew/bin/g++-13")
set(CMAKE_CXX_FLAGS "-Wall -DNO_DEFAULT_MAPPING")
endif()
# set(CMAKE_CXX_FLAGS "-Wall -DNO_DEFAULT_MAPPING -DBLOCKING_MODE")
set(CMAKE_CXX_FLAGS_DEBUG "-O0 -g")
set(CMAKE_CXX_FLAGS_RELEASE "-O3 -finline-functions")
set(CMAKE_THREAD_PREFER_PTHREAD TRUE)
set(THREADS_PREFER_PTHREAD_FLAG TRUE)
find_package(Threads REQUIRED)
add_library(timer timer.cpp timer.hpp)
add_library(threadpool threadPool.cpp threadPool.hpp)
target_link_libraries(threadpool Threads::Threads)
add_executable(main main.cpp reader.hpp writer.hpp task.hpp stencil.hpp)
include_directories(main PUBLIC ${PROJECT_SOURCE_DIR}/)
target_link_libraries(main Threads::Threads timer threadpool)

1
compile_commands.json Symbolic link
View File

@ -0,0 +1 @@
build/compile_commands.json

297
main.cpp Normal file
View File

@ -0,0 +1,297 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#include <cmath>
#include <cstdint>
#include <cstdio>
#include <filesystem>
#include <fstream>
#include <future>
#include <iostream>
#include <memory>
#include <numeric>
#include <thread>
#include <utility>
#include <vector>
#include <ff/ff.hpp>
// important ff.hpp before other ff includes
#include <ff/pipeline.hpp>
#include "reader.hpp"
#include "stencil.hpp"
#include "timer.hpp"
#include "writer.hpp"
using namespace std;
using namespace ff;
char *getOption(char **begin, char **end, const std::string &option) {
char **itr = std::find(begin, end, option);
if (itr != end && ++itr != end)
return *itr;
return nullptr;
}
long long int fastflow(vector<string> images,
vector<pair<int, int>> neighborhood, int iterations,
std::function<char(std::vector<char>)> f,
int maxworkers) {
Reader<char> reader(images);
Stencil<char> stencil(f, neighborhood, iterations, maxworkers);
Writer<char> writer;
ff::ff_Pipe<> pipe(reader, stencil, writer);
UTimer timer;
timer.start();
if (pipe.run_and_wait_end() < 0) {
error("running pipeline\n");
return 1;
}
timer.stop();
long long int ret = timer.print("\tElapsed time: ");
return ret;
}
long long int stdthreads(vector<string> images,
vector<pair<int, int>> neighborhood, int iterations,
std::function<char(std::vector<char>)> f,
int maxworkers) {
long num_images = images.size();
Reader<char> reader(images);
Stencil<char> *stencil = // pointer so that we can delete only once and not
// when out of scope of the thread
new Stencil<char>(f, neighborhood, iterations, maxworkers);
Writer<char> writer;
std::vector<std::promise<Task<char> *> *> *tv =
new std::vector<std::promise<Task<char> *> *>();
std::vector<std::shared_future<Task<char> *>> *in =
new std::vector<std::shared_future<Task<char> *>>();
std::vector<std::promise<Task<char> *> *> *out =
new std::vector<std::promise<Task<char> *> *>();
std::vector<std::shared_future<Task<char> *>> *tvw =
new std::vector<std::shared_future<Task<char> *>>();
for (int i = 0; i < num_images; ++i) {
std::promise<Task<char> *> *p = new std::promise<Task<char> *>();
std::future<Task<char> *> f = p->get_future();
std::promise<Task<char> *> *o = new std::promise<Task<char> *>();
std::future<Task<char> *> l = o->get_future();
tv->push_back(p);
in->push_back(f.share());
out->push_back(o);
tvw->push_back(l.share());
}
UTimer timer;
timer.start();
std::thread readerT(reader, images, tv);
std::thread stencilT(&Stencil<char>::stdthread, stencil, in, out);
std::thread writerT(writer, tvw);
readerT.join();
stencilT.join();
writerT.join();
timer.stop();
long long int ret = timer.print("\tElapsed time: ");
for (int i = 0; i < num_images; ++i) {
delete ((*tv)[i]);
delete ((*out)[i]);
}
delete tv;
delete in;
delete out;
delete tvw;
delete stencil;
return ret;
}
long long int sequential(vector<string> images,
vector<pair<int, int>> neighborhood, int iterations,
std::function<char(std::vector<char>)> f) {
long num_images = images.size();
Reader<char> reader(images);
Stencil<char> *stencil = // pointer so that we can delete only once and not
// when out of scope of the thread
new Stencil<char>(f, neighborhood, iterations);
Writer<char> writer;
std::vector<std::promise<Task<char> *> *> *tv =
new std::vector<std::promise<Task<char> *> *>();
std::vector<std::shared_future<Task<char> *>> *in =
new std::vector<std::shared_future<Task<char> *>>();
std::vector<std::promise<Task<char> *> *> *out =
new std::vector<std::promise<Task<char> *> *>();
std::vector<std::shared_future<Task<char> *>> *tvw =
new std::vector<std::shared_future<Task<char> *>>();
for (int i = 0; i < num_images; ++i) {
std::promise<Task<char> *> *p = new std::promise<Task<char> *>();
std::future<Task<char> *> f = p->get_future();
std::promise<Task<char> *> *o = new std::promise<Task<char> *>();
std::future<Task<char> *> l = o->get_future();
tv->push_back(p);
in->push_back(f.share());
out->push_back(o);
tvw->push_back(l.share());
}
UTimer timer;
timer.start();
std::thread readerT(reader, images, tv);
std::thread stencilT(&Stencil<char>::sequential, stencil, in, out);
std::thread writerT(writer, tvw);
readerT.join();
stencilT.join();
writerT.join();
timer.stop();
long long int ret = timer.print("\tElapsed time: ");
for (int i = 0; i < num_images; ++i) {
delete ((*tv)[i]);
delete ((*out)[i]);
}
delete tv;
delete in;
delete out;
delete tvw;
delete stencil;
return ret;
}
char gameOfLife(vector<char> in) {
auto v = std::accumulate(in.begin() + 1, in.end(), 0);
if (in[0] && v < 2) {
return 0;
} else if (in[0] && v > 3) {
return 0;
} else if (!in[0] && v != 3) {
return 0;
}
return 1;
}
// -----------------------------------------------------------------------------
int main(int argc, char *argv[]) {
int average_max = 5;
int iter_for_num_workers = 128;
vector<string> images = {
"../tests/empty2x2",
"../tests/increasing4x6",
"../tests/increasing300x200",
"../tests/random400x2500",
// "../tests/equation",
// "../tests/equation2"
};
vector<pair<int, int>> neig = {make_pair(-1, 1), make_pair(-1, 0),
make_pair(-1, -1), make_pair(0, 1),
make_pair(0, -1), make_pair(1, 1),
make_pair(1, 0), make_pair(1, -1)};
ofstream csvfile;
csvfile.open("performance.csv");
for (std::string image : images) {
cout << endl
<< "\033[1;31mProcessing: \t" << image << "\033[0m" << endl;
csvfile << ",Name,Size(B)\n";
csvfile << "Image," << image << ",";
csvfile << std::filesystem::file_size(image) << "\n";
csvfile << "\n";
csvfile << "Number of iterations,1,2,4,8,16\n";
csvfile << "fastflow:,";
cout << "\033[1;31mFastflow\033[0m" << endl;
for (int iterations : {1, 2, 4, 8, 16}) {
vector<long long int> results;
for (int i = 0; i < average_max; ++i) {
results.push_back(
fastflow({image}, neig, iterations, &gameOfLife, 0));
}
csvfile << std::accumulate(results.begin(), results.end(), 0) /
average_max
<< ",";
}
csvfile << "\n";
csvfile << "stdthread:,";
cout << "\033[1;31mStdthread\033[0m" << endl;
for (int iterations : {1, 2, 4, 8, 16}) {
vector<long long int> results;
for (int i = 0; i < average_max; ++i) {
results.push_back(
stdthreads({image}, neig, iterations, &gameOfLife, 0));
}
csvfile << std::accumulate(results.begin(), results.end(), 0) /
average_max
<< ",";
}
csvfile << "\n";
// ------------------------------------------------------------------ //
csvfile << "Number of Workers,sequential,fastflow,stdthreads,";
csvfile << "Iterations:," << iter_for_num_workers << "\n";
cout << "\033[1;31mDifferent number of workers\033[0m" << endl;
int hardware_concurrency = std::thread::hardware_concurrency();
vector<int> list_max_workers = vector<int>();
for (int i = 1; i < hardware_concurrency; i*=2) {
list_max_workers.push_back(i);
}
list_max_workers.push_back(hardware_concurrency);
for (int max_workers : list_max_workers) {
csvfile << max_workers << ",";
if(max_workers == 1) {
cout << "\033[1;31mSequential\033[0m" << endl;
vector<long long int> results;
for (int i = 0; i < average_max; ++i) {
results.push_back(sequential(
{image}, neig, iter_for_num_workers, &gameOfLife));
}
csvfile << std::accumulate(results.begin(), results.end(), 0) /
average_max;
}
csvfile << ",";
cout << "\033[1;31mFastflow with " << max_workers
<< " workers\033[0m" << endl;
{
vector<long long int> results;
for (int i = 0; i < average_max; ++i) {
results.push_back(fastflow({image}, neig,
iter_for_num_workers,
&gameOfLife, max_workers));
}
csvfile << std::accumulate(results.begin(), results.end(), 0) /
average_max;
}
csvfile << ",";
cout << "\033[1;31mStdthread with " << max_workers
<< " workers\033[0m" << endl;
{
vector<long long int> results;
for (int i = 0; i < average_max; ++i) {
results.push_back(stdthreads({image}, neig,
iter_for_num_workers,
&gameOfLife, max_workers));
}
csvfile << std::accumulate(results.begin(), results.end(), 0) /
average_max;
}
csvfile << "\n";
}
}
csvfile.close();
return 0;
}

96
reader.hpp Normal file
View File

@ -0,0 +1,96 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#pragma once
#ifndef READER_HPP
#define READER_HPP
#include <cassert>
#include <cstdint>
#include <cstdlib>
#include <fstream>
#include <future>
#include <iostream>
#include <vector>
#include <ff/ff.hpp>
#include "task.hpp"
using namespace ff;
template <typename T> class Reader : public ff_node_t<Task<T>> {
std::vector<std::string> Images;
public:
Reader(std::vector<std::string> images);
Task<T> *svc(Task<T> *);
void operator()(std::vector<std::string> images,
std::vector<std::promise<Task<T> *> *> *OutputVector);
};
template <typename T> Task<T> *read_one_image(std::string imagename);
template <typename T>
Reader<T>::Reader(std::vector<std::string> images) : Images(images) {}
// svc function for fastflow library, function as an emitter: generates all
// tasks from the list of paths
template <typename T> Task<T> *Reader<T>::svc(Task<T> *) {
for (std::string s : Images) {
Task<T> *t = read_one_image<T>(s);
ff_node::ff_send_out(t);
}
return this->EOS;
}
// operator for std thread
template <typename T>
void Reader<T>::operator()(
std::vector<std::string> images,
std::vector<std::promise<Task<T> *> *> *OutputVector) {
assert(images.size() >= (*OutputVector).size() &&
"Error: wrong length for promise vector [Reader::operator()]");
int count = 0;
for (std::promise<Task<T> *> *output : *OutputVector) {
output->set_value(read_one_image<T>(images[count]));
++count;
}
}
template <typename T>
Task<T> *read_one_image(std::string image_name) {
T *image;
int32_t rows;
int32_t cols;
const std::string &filepath(image_name);
std::fstream file{filepath, file.binary | file.in};
if (!file.is_open()) {
std::cerr << "Error: Failed to open " << filepath << std::endl;
return NULL;
} else {
file.read(reinterpret_cast<char *>(&rows), sizeof(rows));
file.read(reinterpret_cast<char *>(&cols), sizeof(cols));
image = new T[rows * cols];
file.read(reinterpret_cast<char *>(image), (rows * cols) * sizeof(T));
file.close();
}
Task<T> *task;
std::vector<std::vector<T>> *matrix =
new std::vector<std::vector<T>>(rows, std::vector<T>(cols));
for (int x = 0; x < rows; ++x) {
for (int y = 0; y < cols; ++y) {
(*matrix)[x][y] = image[x * cols + y];
}
}
task = new Task<T>(matrix, rows, cols, filepath);
delete[] image;
return task;
}
#endif /* READER_HPP */

0
report/document.bib Normal file
View File

BIN
report/document.pdf Normal file

Binary file not shown.

209
report/document.tex Normal file
View File

@ -0,0 +1,209 @@
\documentclass[12pt, oneside]{article}
%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - %%
%% Load Packages %%
%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - %%
\usepackage[
top=2cm,
bottom=2cm,
left=2cm,
right=2cm,
headheight=20pt,
centering
]{geometry}
\geometry{a4paper}
\usepackage[utf8]{inputenc} %% use UTF-8, maybe not needed since 2018
\usepackage[italian,main=english]{babel} %% language
\pagestyle{headings}
\usepackage{scrlayer-scrpage}
\usepackage{csquotes} %% correct language also for citations
\ifoot[]{}
\cfoot[]{}
\ofoot[\pagemark]{\pagemark}
\pagestyle{scrplain}
\usepackage[
backend=biber,
style=numeric,
sorting=ynt
]{biblatex} %% for citations
\addbibresource{document.bib}
\usepackage{import} %% specify path for import
%% math packages
\usepackage{graphicx} %% for pictures
\usepackage{float}
\usepackage{amssymb} %% math symbols
\usepackage{amsmath} %% math matrix etc
\usepackage{minted} %% code block
\usepackage{tabularray} %% better tables
\usepackage{booktabs} %% rules for tables
\usepackage{mathrsfs}
\usepackage{mathtools}
\usepackage{algorithm} %% for algorithms
\usepackage{algpseudocode} %% loads algorithmicx
\usepackage{amsthm}
\usepackage{thmtools} %% theorems
%% plot packages
\usepackage{pgfplots} %% plots used with \begin{tikzpicture}
\usepackage{tikz} %% for pictures
\usetikzlibrary{trees}
\pgfplotsset{width=10cm,compat=newest}
%% design packages
\usepackage{enumitem} %% for lists and enumerating
\usepackage{color}
\usepackage{xcolor,colortbl} % xcolor for defining colors, colortbl for table colors
\usepackage{makecell} %% for multiple lines in cell of table
\usepackage{cancel}
\usepackage{pgfornament} %% ornaments
%% load last
\usepackage[hidelinks]{hyperref} %% links for table of contents, load last
\usepackage{bookmark} %% for better table of contents
%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - %%
%% Configuration of the packages %%
%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - %%
\linespread{1}
\raggedbottom %% spaces if page is empty % chktex 1
%% set max table of contents recursion to subsection (3->subsubsecition)
\setcounter{tocdepth}{3}
\setcounter{secnumdepth}{3}
%% use bar instead of arrow for vectors
\renewcommand{\vec}[1]{\bar{#1}}
%% easy norm
\newcommand{\norm}[1]{\left\lvert#1\right\rvert}
% argmin and argmax
\DeclareMathOperator*{\argmax}{argmax}
\DeclareMathOperator*{\argmin}{argmin}
%% itemize use less vertical space (use olditemize for default behaviour)
\let\olditemize=\itemize%% old itemize
\let\endolditemize=\enditemize%% old end itemize
\renewenvironment{itemize}{\olditemize\itemsep-0.2em}{\endolditemize}
%% items in itemize emph+box
%% usage: \ieb{Class:} for simple item
%% \ieb[4cm]{Class:} for specific size of box
\newcommand{\ieb}[2][2cm]{
\makebox[#1][l]{\emph{#2}}
} %% TODO: replace with description environment (? maybe)
% less vertical space around align & align*
\newcommand{\zerodisplayskips}{
\setlength{\abovedisplayskip}{0pt}
\setlength{\belowdisplayskip}{0pt}
\setlength{\abovedisplayshortskip}{0pt}
\setlength{\belowdisplayshortskip}{0pt}
}
% make dotfill use all the space available
\renewcommand{\dotfill}{
\leavevmode\cleaders\hbox to 1.00em{\hss .\hss }\hfill\kern0pt } % chktex 1 chktex 26
\setlength{\fboxsep}{-\fboxrule} % for debugging
%% PACKAGE algorithm
\floatname{algorithm}{Algorithm}
%% PACKAGE tabularray
\UseTblrLibrary{amsmath}
%% PACKAGE color
\definecolor{red}{rgb}{1, 0.1, 0.1}
\definecolor{lightgreen}{rgb}{0.55, 0.87, 0.47}
\definecolor{gray}{rgb}{0.3, 0.3, 0.3}
\newcommand{\lgt}{\cellcolor{lightgreen}} %% light green in tables
\newcommand{\gry}{\textcolor{gray}} %% gray text
\newcommand{\rd}{\textcolor{red}} %% red text
%% PACKAGE minipage
\newcommand{\thend}[1]{\begin{center}
\begin{minipage}[c][1em][c]{#1}
\dotfill{}
\end{minipage}
\end{center}}
%% PACKAGE thmtools
\declaretheoremstyle[
headfont=\normalfont\bfseries,
notefont=\mdseries,
bodyfont=\normalfont,
qed=\qedsymbol % chktex 1
]{steo}
\declaretheorem[numbered=no, style=steo]{theorem}
\declaretheoremstyle[
headfont=\normalfont\bfseries,
notefont=\mdseries,
bodyfont=\normalfont,
]{sdef}
\declaretheorem[numbered=no, style=sdef]{definition}
\declaretheoremstyle[
spaceabove=-6pt,
spacebelow=6pt,
headfont=\normalfont\bfseries,
bodyfont=\normalfont,
postheadspace=1em,
qed=$\blacksquare$,
headpunct={:}
]{sprf}
\declaretheorem[name={Proof}, style=sprf, numbered=no]{prof}
%% ......................................................................... %%
%% local changes
% \setcounter{secnumdepth}{0}
\graphicspath{ {./import/} }
%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - %%
\title{Document}
\author{
Elvis Rossi
}
\date{\today}
%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - %%
\begin{document}
\section{Implementation Design}
\subsection{Design Choices}
The class \texttt{Stencil} holds both the parallel implementation using the FastFlow library and using the native C++ threads. The one using C++ threads can be called with the method \texttt{stdthread}. The operator \texttt{()} instead will use the FastFlow library. The class can also be used as a node; an example is given in the file ``main.cpp'', where using the function \texttt{fastflow} creates a pipe between the reader, the stencil and the writer.
\begin{figure}[H]
\centering
\includegraphics[width=0.4\textwidth]{pipeline.eps}
\caption{}
\end{figure}
The class \texttt{Reader} reads a binary file composed of 4 bytes representing the number of rows, 4 bytes representing the number of columns and then the raw matrix data. The result is stored in the class \texttt{Task} which will be passed to the next node in the FastFlow implementation. If instead the operator \texttt{()} is called, the resulting task will be returned via the promise given as input.
\end{document}
%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - %%
%%% Local Variables:
%%% TeX-command-extra-options: "-shell-escape"
%%% End:

3259
report/import/pipeline.eps Normal file

File diff suppressed because it is too large Load Diff

278
stencil.hpp Normal file
View File

@ -0,0 +1,278 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#pragma once
#ifndef STENCIL_HPP
#define STENCIL_HPP
#include <algorithm>
#include <cstdio>
#include <functional>
#include <future>
#include <iostream>
#include <utility>
#include <vector>
#include <ff/ff.hpp>
// importand order in includes
#include <ff/map.hpp>
#include <ff/parallel_for.hpp>
#include "task.hpp"
#include "threadPool.hpp"
template <typename T> class Stencil : public ff::ff_Map<Task<T>> {
public:
Stencil(std::function<T(std::vector<T>)> convolution,
std::vector<std::pair<int, int>> neighborhood, int iterations);
Stencil(std::function<T(std::vector<T>)> convolution,
std::vector<std::pair<int, int>> neighborhood, int iterations,
int maxworkers);
Task<T> *svc(Task<T> *);
std::vector<std::vector<T>> *operator()(std::vector<std::vector<T>> *matrix,
int iterations);
void stdthread(std::vector<std::shared_future<Task<T> *>> *ResultsVector,
std::vector<std::promise<Task<T> *> *> *OutputVector);
void sequential(std::vector<std::shared_future<Task<T> *>> *ResultsVector,
std::vector<std::promise<Task<T> *> *> *OutputVector);
private:
Task<T> *svc_helper(Task<T> *t);
void constructor_helper(std::vector<std::pair<int, int>> neighborhood);
std::function<T(std::vector<T>)> Convolution;
std::vector<std::pair<int, int>> Neighborhood;
int Iterations = 0;
std::vector<int> Borders{0, 0, 0, 0}; // left, top, right, bottom
int MaxWorkers = 0;
};
template <typename T>
Stencil<T>::Stencil(std::function<T(std::vector<T>)> convolution,
std::vector<std::pair<int, int>> neighborhood,
int iterations)
: ff::ff_Map<Task<T>>(), Convolution(convolution), Neighborhood({{0, 0}}),
Iterations(iterations) {
constructor_helper(neighborhood);
}
template <typename T>
Stencil<T>::Stencil(std::function<T(std::vector<T>)> convolution,
std::vector<std::pair<int, int>> neighborhood,
int iterations, int maxworkers)
: ff::ff_Map<Task<T>>(maxworkers), Convolution(convolution),
Neighborhood({{0, 0}}), Iterations(iterations), MaxWorkers(maxworkers) {
constructor_helper(neighborhood);
}
template <typename T>
void Stencil<T>::constructor_helper(
std::vector<std::pair<int, int>> neighborhood) {
// copies neighborhood and adds the default element (0,0)
Neighborhood.insert(Neighborhood.end(), neighborhood.begin(),
neighborhood.end());
// finds the boundaries of the neighborhood
auto frst = [](std::pair<int, int> a, std::pair<int, int> b) {
return a.first < b.first;
};
auto scnd = [](std::pair<int, int> a, std::pair<int, int> b) {
return a.second < b.second;
};
if (neighborhood.size() > 0) {
Borders[0] =
std::abs(std::min(0, (*std::min_element(Neighborhood.begin(),
Neighborhood.end(), frst))
.first));
Borders[1] =
std::abs(std::min(0, (*std::min_element(Neighborhood.begin(),
Neighborhood.end(), scnd))
.second));
Borders[2] =
std::abs(std::max(0, (*std::max_element(Neighborhood.begin(),
Neighborhood.end(), frst))
.first));
Borders[3] =
std::abs(std::max(0, (*std::max_element(Neighborhood.begin(),
Neighborhood.end(), scnd))
.second));
}
}
// svc function for fastflow library
template <typename T> Task<T> *Stencil<T>::svc(Task<T> *task) {
task = svc_helper(task);
ff::ff_node::ff_send_out(task);
return this->GO_ON;
}
// operator to apply to vector<vector<T>>
template <typename T>
std::vector<std::vector<T>> *
Stencil<T>::operator()(std::vector<std::vector<T>> *matrix, int iterations) {
if ((*matrix).size() == 0 || (*matrix)[0].size() == 0) {
return matrix;
}
Task<T> *task = new Task<T>(matrix, (*matrix).size(), (*matrix)[0].size());
task = svc_helper(task);
return task->VectorData;
}
// function for std thread
// ResultsVector: vector of futures where another thread will put the tasks
// OutputVector: where to put the processed tasks
template <typename T>
void Stencil<T>::stdthread(
std::vector<std::shared_future<Task<T> *>> *ResultsVector,
std::vector<std::promise<Task<T> *> *> *OutputVector) {
if (!ResultsVector || !OutputVector) {
std::cerr << "Error: input is NULL [Stencil<T>::stdthread]"
<< std::endl;
return;
}
ThreadPool thread_pool(MaxWorkers);
int MaxThreads = thread_pool.numberOfThreads();
int count = 0;
// for each task, create a new arena where to store the new computed values
// then send jobs to the threadpool
for (auto result : *ResultsVector) {
Task<T> *task = result.get();
int niter = Iterations;
int delta = (task->Rows * task->Cols) / MaxThreads;
std::vector<std::vector<T>> *arena = new std::vector<std::vector<T>>(0);
*arena = *(task->VectorData); // copy all from VectorData
while (niter > 0) {
for (int l = 0; l < MaxThreads - 1; ++l) {
thread_pool.addJob([&, l, delta] {
for (int k = l * delta; k < (l + 1) * delta; ++k) {
int x = k / task->Cols;
int y = k % task->Cols;
if (x < Borders[1] || x >= task->Rows - Borders[3] ||
y < Borders[0] || y >= task->Cols - Borders[2]) {
continue;
}
std::vector<T> n;
n.resize(Neighborhood.size());
std::transform(
Neighborhood.begin(), Neighborhood.end(), n.begin(),
[&task, x, y](std::pair<int, int> e) {
return (*task->VectorData)[x + e.second]
[y + e.first];
});
(*arena)[x][y] = Convolution(n);
}
});
}
thread_pool.addJob([&, delta, MaxThreads] {
for (int k = (MaxThreads - 1) * delta;
k < task->Cols * task->Rows; ++k) {
int x = k / task->Cols;
int y = k % task->Cols;
if (x < Borders[1] || x >= task->Rows - Borders[3] ||
y < Borders[0] || y >= task->Cols - Borders[2]) {
continue;
}
std::vector<T> n;
n.resize(Neighborhood.size());
std::transform(
Neighborhood.begin(), Neighborhood.end(), n.begin(),
[&task, x, y](std::pair<int, int> e) {
return (
*task->VectorData)[x + e.second][y + e.first];
});
(*arena)[x][y] = Convolution(n);
}
});
thread_pool.waitEnd();
std::swap(task->VectorData, arena);
--niter;
}
delete (arena);
// set the value of the promise
(*OutputVector)[count]->set_value(task);
++count;
}
}
template <typename T>
void Stencil<T>::sequential(
std::vector<std::shared_future<Task<T> *>> *ResultsVector,
std::vector<std::promise<Task<T> *> *> *OutputVector) {
if (!ResultsVector || !OutputVector) {
std::cerr << "Error: input is NULL [Stencil<T>::stdthread]"
<< std::endl;
return;
}
int count = 0;
for (auto result : *ResultsVector) {
Task<T> *task = result.get();
int niter = Iterations;
std::vector<std::vector<T>> *arena = new std::vector<std::vector<T>>(0);
*arena = *(task->VectorData); // copy all from VectorData
while (niter > 0) {
for (int x = 0; x < task->Rows; ++x) {
for (int y = 0; y < task->Cols; ++y) {
if (x < Borders[1] || x >= task->Rows - Borders[3] ||
y < Borders[0] || y >= task->Cols - Borders[2]) {
continue;
}
std::vector<T> n;
n.resize(Neighborhood.size());
std::transform(
Neighborhood.begin(), Neighborhood.end(), n.begin(),
[&task, x, y](std::pair<int, int> e) {
return (
*task->VectorData)[x + e.second][y + e.first];
});
(*arena)[x][y] = Convolution(n);
}
}
std::swap(task->VectorData, arena);
--niter;
}
delete (arena);
(*OutputVector)[count]->set_value(task);
++count;
}
}
template <typename T> Task<T> *Stencil<T>::svc_helper(Task<T> *task) {
int niter = Iterations;
std::vector<std::vector<T>> *arena = new std::vector<std::vector<T>>(0);
*arena = *(task->VectorData);
while (niter > 0) {
parallel_for(
0, task->Rows * task->Cols,
[&](const int k) {
int x = k / task->Cols;
int y = k % task->Cols;
if (x < Borders[1] || x >= task->Rows - Borders[3] ||
y < Borders[0] || y >= task->Cols - Borders[2]) {
return;
}
std::vector<T> n;
n.resize(Neighborhood.size());
std::transform(
Neighborhood.begin(), Neighborhood.end(), n.begin(),
[&task, x, y](std::pair<int, int> e) {
return (*task->VectorData)[x + e.second][y + e.first];
});
(*arena)[x][y] = Convolution(n);
},
MaxWorkers);
std::swap(task->VectorData, arena);
--niter;
}
delete (arena);
return task;
}
#endif /* STENCIL_HPP */

39
task.hpp Normal file
View File

@ -0,0 +1,39 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#pragma once
#ifndef TASK_HPP
#define TASK_HPP
#include <cstdint>
#include <filesystem>
#include <string>
#include <vector>
template <typename T> class Task {
public:
Task(std::vector<std::vector<T>> *data, int32_t rows, int32_t cols,
std::string name);
Task(std::vector<std::vector<T>> *data, int32_t rows, int32_t cols);
~Task();
public:
std::vector<std::vector<T>> *VectorData;
int32_t Rows, Cols;
std::filesystem::path PathName;
};
template <typename T>
Task<T>::Task(std::vector<std::vector<T>> *data, int32_t rows, int32_t cols,
std::string name)
: VectorData(data), Rows(rows), Cols(cols), PathName(name) {
}
template <typename T>
Task<T>::Task(std::vector<std::vector<T>> *data, int32_t rows, int32_t cols)
: VectorData(data), Rows(rows), Cols(cols) {
PathName = "";
}
template <typename T> Task<T>::~Task() {
delete (VectorData);
}
#endif /* READER_HPP */

BIN
tests/empty2x2 Normal file

Binary file not shown.

BIN
tests/equation Normal file

Binary file not shown.

BIN
tests/equation2 Normal file

Binary file not shown.

BIN
tests/increasing300x200 Normal file

Binary file not shown.

BIN
tests/increasing4x6 Normal file

Binary file not shown.

BIN
tests/random400x2500 Normal file

Binary file not shown.

102
threadPool.cpp Normal file
View File

@ -0,0 +1,102 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#include <threadPool.hpp>
#include <cstdint>
#include <functional>
#include <mutex>
#include <thread>
ThreadPool::ThreadPool() {
const uint32_t ThreadNum = std::thread::hardware_concurrency();
// create all threads waiting
for (uint32_t i = 0; i < ThreadNum; ++i) {
Threads.emplace_back(std::thread(&ThreadPool::spin, this));
}
}
// creates at maximum maxThreads threads, if 0 then use the maximum concurrency
// possible
ThreadPool::ThreadPool(uint32_t MaxThreads) {
const uint32_t ThreadNum =
(MaxThreads > 0)
? std::min(std::thread::hardware_concurrency(), MaxThreads)
: std::thread::hardware_concurrency();
// create all threads waiting
for (uint32_t i = 0; i < ThreadNum; ++i) {
Threads.emplace_back(std::thread(&ThreadPool::spin, this));
}
}
int ThreadPool::numberOfThreads() { return Threads.size(); }
void ThreadPool::spin() {
// wait for a new job on the mutex, then run the job
// comunicate that a job was finished after job is executed
while (true) {
std::function<void()> job;
{
std::unique_lock<std::mutex> lock(QueueMutex);
MutexCondition.wait(lock, [&] { return !Jobs.empty() || Terminate; });
if (Terminate) {
break;
}
++WorkingThreads;
job = Jobs.front();
Jobs.pop();
}
job();
{
std::unique_lock<std::mutex> lock(QueueMutex);
--WorkingThreads;
if (WorkingThreads == 0) {
JobCondition.notify_all();
}
}
}
}
void ThreadPool::addJob(const std::function<void()> &job) {
if (Terminated) {
return;
}
{ // add one job, sync with mutex
std::unique_lock<std::mutex> lock(QueueMutex);
Jobs.push(job);
}
// notify that there is one job
MutexCondition.notify_one();
}
bool ThreadPool::waitEnd() {
// waits on the condition until there are no more jobs and all threads are
// idle, should return false
if (Terminated) {
return false;
}
{
std::unique_lock<std::mutex> lock(QueueMutex);
JobCondition.wait(lock,
[&] { return Jobs.empty() && WorkingThreads == 0; });
}
return !Jobs.empty();
}
void ThreadPool::Stop() {
// joins all threads (lets them finish the current job)
{
std::unique_lock<std::mutex> lock(QueueMutex);
Terminate = true;
}
MutexCondition.notify_all();
for (std::thread &t : Threads) {
t.join();
}
Threads.clear();
Terminated = true;
}
ThreadPool::~ThreadPool() {
if (!Terminated) {
this->Stop();
}
}

35
threadPool.hpp Normal file
View File

@ -0,0 +1,35 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#pragma once
#ifndef THREADPOOL_HPP
#define THREADPOOL_HPP
#include <atomic>
#include <condition_variable>
#include <functional>
#include <queue>
#include <thread>
class ThreadPool {
public:
ThreadPool();
explicit ThreadPool(uint32_t MaxThreads);
void addJob(const std::function<void()> &Job);
int numberOfThreads();
void Stop();
bool waitEnd();
~ThreadPool();
private:
void spin();
bool Terminate = false;
bool Terminated = false;
std::atomic_int WorkingThreads = 0;
std::mutex QueueMutex;
std::condition_variable MutexCondition;
std::condition_variable JobCondition;
std::vector<std::thread> Threads;
std::queue<std::function<void()>> Jobs;
};
#endif /* THREADPOOL_HPP */

27
timer.cpp Normal file
View File

@ -0,0 +1,27 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#include <chrono>
#include <iostream>
#include <string>
#include "timer.hpp"
UTimer::UTimer() : TimeStart(std::chrono::high_resolution_clock::now()) {
TimeElapsed = std::chrono::high_resolution_clock::now() - TimeStart;
Timeusec =
std::chrono::duration_cast<std::chrono::microseconds>(TimeElapsed)
.count();
}
void UTimer::start() { TimeStart = std::chrono::high_resolution_clock::now(); }
void UTimer::stop() {
TimeElapsed = std::chrono::high_resolution_clock::now() - TimeStart;
Timeusec =
std::chrono::duration_cast<std::chrono::microseconds>(TimeElapsed)
.count();
}
long long int UTimer::print(const std::string &s) {
std::cout << s << "\t" << Timeusec << " usecs" << std::endl;
return Timeusec;
}

22
timer.hpp Normal file
View File

@ -0,0 +1,22 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#pragma once
#ifndef TIMER_HPP
#define TIMER_HPP
#include <chrono>
#include <string>
class UTimer {
public:
UTimer();
void start();
void stop();
long long int print(const std::string &s);
private:
std::chrono::time_point<std::chrono::high_resolution_clock> TimeStart;
std::chrono::duration<double> TimeElapsed;
long long int Timeusec;
};
#endif /* TIMER_HPP */

76
writer.hpp Normal file
View File

@ -0,0 +1,76 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#pragma once
#ifndef WRITER_HPP
#define WRITER_HPP
#include <cstdint>
#include <filesystem>
#include <fstream>
#include <future>
#include <iostream>
#include <string>
#include <ff/ff.hpp>
#include "task.hpp"
using namespace ff;
template <typename T> class Writer : public ff::ff_node_t<Task<T>, void> {
public:
void *svc(Task<T> *);
void operator()(std::vector<std::shared_future<Task<T> *>> *);
};
// svc function for fastflow library
template <typename T> void *Writer<T>::svc(Task<T> *in) {
write_one_image(in);
return this->GO_ON;
}
// operator for std thread
template <typename T>
void Writer<T>::operator()(
std::vector<std::shared_future<Task<T> *>> *ResultsVector) {
if (!ResultsVector) {
std::cerr << "Error: invalid pointer to vector of futures "
"[Writer::operator()]"
<< std::endl;
return;
}
for (auto Result : *ResultsVector) {
Task<T> *t = Result.get();
write_one_image(t);
delete (t);
}
}
// writes out just one image, prepend the output image with "out_"
template <typename T> void write_one_image(Task<T> *t) {
std::string Pre = "out_";
std::filesystem::path Path = t->PathName;
std::string NameNew = Pre + Path.filename().string();
Path.replace_filename(NameNew);
std::fstream File{Path, File.out | File.binary};
if (!File.is_open()) {
std::cerr << "Error: Failed to open " << t->PathName.string() << std::endl;
return;
} else {
File.write(reinterpret_cast<char *>(&t->Rows), sizeof(t->Rows));
File.write(reinterpret_cast<char *>(&t->Cols), sizeof(t->Cols));
// use temporary buffer to not call the write function too frequently
T *buffer = new T[t->Cols];
for (int x = 0; x < t->Rows; ++x) {
for (int y = 0; y < t->Cols; ++y) {
buffer[y] = (*t->VectorData)[x][y];
}
File.write(reinterpret_cast<char *>(buffer), t->Cols * sizeof(T));
}
delete[] buffer;
File.close();
}
}
#endif /* WRITER_HPP */