Click here to Skip to main content
15,889,438 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
See more:
```
#include<map>
#include<string>
#include<vector>
#include<fstream>
#include<iostream>
#include <omp.h>
#include <tbb/concurrent_hash_map.h>
#include <tbb/tbb.h>

template <typename KeyType, typename ValueType>
class ConcurrentHashMap {
  private:
    typedef tbb::concurrent_hash_map<KeyType, ValueType> HashMap;
    typedef typename HashMap::const_accessor HashMapConstAccessor;
    typedef typename HashMap::accessor HashMapAccessor;
    typedef typename HashMap::iterator HashMapIterator;
    typedef typename HashMap::value_type HashMapValuePair;

  public:
    ConcurrentHashMap (ValueType default_value) : DEFAULT_VALUE(default_value) {
    }

    size_t size() {
      return hashMap.size();
    }

    bool insert(const KeyType& key, const ValueType& value) {
      HashMapAccessor accessor;
      bool inserted = hashMap.insert(accessor, key);
      if (inserted) {
        accessor->second = value;
      }
      return inserted;
    }

    bool erase(const KeyType& key) {
      return hashMap.erase(key);
    }

    bool find(const KeyType& key, ValueType& value) {
      HashMapConstAccessor accessor;
      if (hashMap.find(accessor, key)) {
        value = accessor->second;
        return true;
      }
      value = DEFAULT_VALUE;
      return false;
    }

    void clear() {
      hashMap.clear();
    }

    const ValueType& operator[](const KeyType& key) const {
      HashMapConstAccessor accessor;
      hashMap.find(accessor, key);
      if (hashMap.find(accessor, key)) {
        return accessor->second;
      }
      // accessor->second = DEFAULT_VALUE;
      return DEFAULT_VALUE;
    }

    HashMapIterator begin() {
      return hashMap.begin();
    }

    HashMapIterator end() {
      return hashMap.end();
    }

  private:
    HashMap hashMap;
    ValueType DEFAULT_VALUE;
};

class A;

using HashMap = ConcurrentHashMap<int, A*>;
// using HashMap = ConcurrentUnorderedMap<int, A*>;

class A {
public:
  A(int _a, int _b): a(_a), b(_b) {

  }
  void sub () {

  }
  int a = 1;
  int b = 0;;
};

void test(int N, HashMap& hashMap) {
  int thread_num = 16;

  std::thread writer(
    [&] () {
      auto writeStartTime = std::chrono::high_resolution_clock::now();
      #pragma omp parallel for num_threads(thread_num)
      for (int i = 0; i < N; i++) {
        hashMap.insert(i, new A(1, i));
      }
      auto writeEndTime = std::chrono::high_resolution_clock::now();
      double writeTime = std::chrono::duration<double>(writeEndTime 
                                                      - writeStartTime).count();
      std::cout << "writeTime=" << writeTime << std::endl;
    }
  );

  writer.join();
}

int main () {
  // cmd: g++ test_con_hashmap.cpp -fopenmp -ltbb  && ./a.out
  int N = 3174014;
  std::nullptr_t NULLPOINTER = nullptr;
  HashMap hashMap(NULLPOINTER);

  test(N, hashMap);

  std::cout << "------------finish write---------------" << std::endl;

  size_t hashmap_size = hashMap.size();
  std::cout << "\n------ hashMap size = " << hashmap_size << std::endl;

{ 
  int thread_num = 32;
  std::thread reader(
    [&] () {
      std::this_thread::sleep_for(std::chrono::milliseconds(1));
      auto readStartTime = std::chrono::high_resolution_clock::now();

      std::random_device rd;
      std::mt19937 gen(rd());
      std::uniform_int_distribution<int> dis(0, N - 1);


      for (int i = 0; i < 10; i++) {
        #pragma omp parallel for num_threads(thread_num)
        for (int i = 0; i < N; i++) {
          A* value;
          int randomKey = dis(gen);
          if(hashMap.find(randomKey, value)){
            
          }
        }
      }
      auto readEndTime = std::chrono::high_resolution_clock::now();
      double readTime = std::chrono::duration<double>(readEndTime 
                                               - readStartTime).count();
      std::cout << "-----readTime=" << readTime << std::endl;
    }
  );

  size_t cnt = 0;
  for(auto iterator1 = hashMap.begin(); 
      iterator1 != hashMap.end(); ++iterator1 ){
    cnt++;
  }
  std::cout << " cnt=" << cnt << std::endl;


  if (cnt != hashmap_size) {
    std::cout << " error" << std::endl;
  }


  reader.join();
}

  hashMap.clear();

  return 0;
}
```

What I have tried:

I added a test example, including two pieces of code. One is where I encapsulated tbb::concurrent_hash_map, and the other is a test example. The results are as follows:

writeTime=0.370875
------------finish write---------------
------ hashMap size = 3174014
 cnt size=3210251
 error
-----readTime=2.22228
Posted

1 solution

The problem is caused by where you have the counter code. I tried this myself and got a matching counts when I moved the counter to outside the reader thread's scope. Something like this :
C++
int main ()
{
    // cmd: g++ test_con_hashmap.cpp -fopenmp -ltbb  && ./a.out
    int N = 3174014;
    std::nullptr_t NULLPOINTER = nullptr;
    HashMap hashMap(NULLPOINTER);

    test( N, hashMap );

    std::cout << "------------finish write---------------" << std::endl;

    size_t hashmap_size = hashMap.size();
    std::cout << "\n------ hashMap size = " << hashmap_size << std::endl;

    { 
      int thread_num = 32;
      std::thread reader(
        [&] () {
          std::this_thread::sleep_for(std::chrono::milliseconds(1));
          auto readStartTime = std::chrono::high_resolution_clock::now();
    
          std::random_device rd;
          std::mt19937 gen(rd());
          std::uniform_int_distribution<int> dis(0, N - 1);
     
          for (int i = 0; i < 10; i++) {
            #pragma omp parallel for num_threads(thread_num)
            for (int i = 0; i < N; i++) {
              A* value;
              int randomKey = dis(gen);
              if(hashMap.find(randomKey, value)){
                
              }
            }
          }
          auto readEndTime = std::chrono::high_resolution_clock::now();
          double readTime = std::chrono::duration<double>(readEndTime 
                                                   - readStartTime).count();
          std::cout << "-----readTime=" << readTime << std::endl;
        }
      );
    
      reader.join();
    } // end of reader thread scope

    size_t cnt = 0;
    for( auto it1 = hashMap.begin(); it1 != hashMap.end(); ++it1 )
    {
       cnt++;
    }
    std::cout << " cnt=" << cnt << std::endl;

    if( cnt == hashmap_size )
        std::cout << "counts match" << std::endl;
    else
        std::cout << "error" << std::endl;

    hashMap.clear();
    return 0;
}
 
Share this answer
 
Comments
ystraw y 17-Dec-23 0:34am    
I am simulating a scenario where find and traversal occur at the same time, so the query needs to be performed before the find thread is completed, so the count position should not be changed.
Rick York 17-Dec-23 12:19pm    
In my opinion, this is not an effective simulation of that. I think the errant counts occur because more than one thread is iterating. I think a better simulation would be to have some threads searching and another thread that periodically removes one entry and then adds another (at a quick interval) and run them concurrently. That will give the access control of the map a good test.
ystraw y 17-Dec-23 12:29pm    
Thank you very much for your reply. In fact, I am simulating a real requirement of mine. I need to support concurrent find() and a single thread for traversal. Traversal and find() may occur at the same time. I understand that the example I wrote is multi-threaded find() and only the main thread is traversing, and these are read operations, I am looking forward to figuring out why the problem occurs?

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900