The example shows a small application which should run multi-threaded to speed up
the data processing.
There is an input-thread, creating work and putting it into a queue created with Thread::Queue
A bunch of worker threads is getting started, in this example you provide the count of workers as an argument to the script.
0 means no threads at all, fastest execution
1 starts just one worker
2 starts two workers, and so on.
Why does this construct happen to slow down instead of spreading the workload?
use strict;
use warnings;
use Time::HiRes qw(sleep time usleep);
use threads 1.39;
use threads::shared;
use Thread::Queue;
my $MAX_THREADS = 1;
if(@ARGV == 1) {
$MAX_THREADS = shift @ARGV;
}
my $WORK_COUNT=100;
my $IN_COUNT :shared = 0;
my $OUT_COUNT :shared = 0;
if($MAX_THREADS == 0) {
my $cnt = 0;
while($cnt < $WORK_COUNT) {
my %work=(
l => 10000,
a => 100,
b => 200,
);
my $result = compute(\%work);
print "OUTPUT $cnt '$result'\n";
$cnt++;
}
exit 0;
}
MAIN:
{
my $work_iq = Thread::Queue->new();
my $work_oq = Thread::Queue->new();
for (1..$MAX_THREADS) {
my $thr = threads->create('worker', $work_iq,$work_oq);
}
my $ithr = threads->create('input', $work_iq);
my $working_threads=$MAX_THREADS;
while($working_threads) {
my $queue_entry = $work_oq->dequeue();
if ($$queue_entry{'cnt'} >= 0) {
print "OUTPUT $$queue_entry{'cnt'} '$$queue_entry{'result'}'\n";
} else {
$working_threads--;
}
}
$ithr->join();
foreach my $thr (threads->list()) {
$thr->join();
}
}
print("Done\n");
exit(0);
sub compute {
my $work = shift;
my $s = 0;
while($$work{'l'} > 0) {
my $c = $$work{'a'} * $$work{'b'};
$s += $c;
$$work{'a'} += 1;
$$work{'b'} -= 1;
$$work{'l'}--;
}
return $s;
}
sub worker
{
my ($work_iq,$work_oq) = @_;
my $work=0;
my $tid = threads->tid();
my $again=1;
do {
my $queue_entry = $work_iq->dequeue();
if ($$queue_entry{'cnt'} >= 0) {
my $result = compute($$queue_entry{'work'});
my %result_entry=( 'cnt' => $$queue_entry{'cnt'}, 'result' => $result,);
$work_oq->enqueue(\%result_entry);
} else {
$again=0;
my %queue_entry=( 'cnt' => -1, 'result' => -1,);
$work_oq->enqueue(\%queue_entry);
}
} while ($again);
printf("Finished -> %2d\n", $tid);
}
sub input {
my ($work_iq) = @_;
while ($IN_COUNT < $WORK_COUNT) {
my %work=(
l => 10000,
a => 100,
b => 200,
);
my %queue_entry=( 'cnt' => $IN_COUNT, 'work' => \%work,);
$work_iq->enqueue(\%queue_entry);
{
$IN_COUNT++;
}
}
my %queue_entry=( 'cnt' => -1, 'work' => -1,);
for (1..$MAX_THREADS) {
$work_iq->enqueue(\%queue_entry)
}
print "input thread finished\n";
}
What I have tried:
at least it is reproducable in this small example code.
I already removed the use of shared variable, but still the non-threaded version is multiple times faster than the threaded version with one thread.
With every additional thread it gets slower but consumes more CPU at the same time.