本作业要求在多处理器的机器上完成。

下载ph.c,编译执行:

wu@wu-insparition:~/Desktop$ gcc -g -O2 ph.c -pthread
wu@wu-insparition:~/Desktop$ ./a.out 2
1: put time = 0.008965
0: put time = 0.009019
1: get time = 13.099731
1: 16330 keys missing
0: get time = 13.108517
0: 16330 keys missing
completion time = 13.117796
wu@wu-insparition:~/Desktop$ ./a.out 1
0: put time = 0.013479
0: get time = 10.370999
0: 0 keys missing
completion time = 10.384810

可以看到,在多核处理器上,增加线程数后完成时间虽有略微增加,但是整个工作量是以前的两倍,原因见文末。

Why are there missing keys with 2 or more threads, but not with 1 thread? Identify a sequence of events that can lead to keys missing for 2 threads.

这个程序,在 main 函数里通过 keys[i] = random() 随机的为数组生成 100000 key。

然后通过模运算,将 entry 插入到 5 个链表中的一个:

static void
insert(int key, int value, struct entry **p, struct entry *n)
{
  struct entry *e = malloc(sizeof(struct entry));
  e->key = key;
  e->value = value;
  e->next = n;
  *p = e;
}

static void put(int key, int value)
{
  int i = key % NBUCKET; /*随机放进一个桶*/
  insert(key, value, &table[i], table[i]);
}

注意这一句:

long n = (long)xa;
int b = NKEYS / nthread;
for (i = 0; i < b; i++)
  {
    put(keys[b * n + i], n);
  }

将 100000 个插入任务平均分配到每个线程。

如果在 main() 函数中只创建一个线程(argv[1]=1),那么只有一个线程去执行 thread() 函数,这时 100000 个 entry 全部分配到 5 个桶中。如果启动两个(argv[1]=2),这时就会出现 Race Condition。xv6 book 中的举例与本作业类似。

图中 B 和 D 形成 Race Condition:

     +               +
     |               |
     |  e->next = n; |
     |               |
     |               |
     |               |  e->next = n;
     |               |
     |  *p = e;      |
     |               |
time |               |  *p = e;
     |               |
     |               +
     |
     |                   win!
     v

由于 table 是一个全局的资源,多线程情况下,不同线程可能会同时读写它。所以在多线程情况下,我们选择加锁来保证插入的时候不会丢失 entry,达到 0 key missing。

由于造成 Race Condition 的是每个桶,桶之间没有 Race Condition,为了提高代码的并行度,选择对每个桶进行加锁(降低锁粒度,虽然也能对整个table加一把大锁)。这样一来 put() 函数也能并行操作了。

下面对比一下粒度不同的锁的效率:

代码如下:

#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <assert.h>
#include <pthread.h>
#include <sys/time.h>

#define SOL
#define NBUCKET 5
#define NKEYS 100000

struct entry
{
  int key;
  int value;
  struct entry *next;
};
struct entry *table[NBUCKET];
int keys[NKEYS];
int nthread = 1;
volatile int done;
pthread_mutex_t locks[NBUCKET];

double
now()
{
  struct timeval tv;
  gettimeofday(&tv, 0);
  return tv.tv_sec + tv.tv_usec / 1000000.0;
}

static void
print(void)
{
  int i;
  struct entry *e;
  for (i = 0; i < NBUCKET; i++)
  {
    printf("%d: ", i);
    for (e = table[i]; e != 0; e = e->next)
    {
      printf("%d ", e->key);
    }
    printf("\n");
  }
}

static void
insert(int key, int value, struct entry **p, struct entry *n)
{
  struct entry *e = malloc(sizeof(struct entry));
  e->key = key;
  e->value = value;
  e->next = n;
  *p = e;
}

static void put(int key, int value)
{
  int i = key % NBUCKET; /*随机放进一个桶*/
  pthread_mutex_lock(&locks[i]);
  insert(key, value, &table[i], table[i]);
  pthread_mutex_unlock(&locks[i]);
}

static struct entry *
get(int key)
{
  struct entry *e = 0;
  /*根据key定位到桶*/
  for (e = table[key % NBUCKET]; e != 0; e = e->next)
  {
    if (e->key == key)
      break;
  }
  return e;
}

static void *
thread(void *xa)
{
  long n = (long)xa; /*线程编号*/
  int i;
  int b = NKEYS / nthread;
  int k = 0;
  double t1, t0;
  //  printf("b = %d\n", b);
  t0 = now();
  for (i = 0; i < b; i++)
  {
    // printf("%d: put %d\n", n, b*n+i);
    /*key是已经随机好的值,value n是线程编号*/
    put(keys[b * n + i], n);
  }
  t1 = now();
  printf("%ld: put time = %f\n", n, t1 - t0);
  // Should use pthread_barrier, but MacOS doesn't support it ...
  __sync_fetch_and_add(&done, 1);
  while (done < nthread)
    ;
  t0 = now();
  for (i = 0; i < NKEYS; i++)
  {
    struct entry *e = get(keys[i]);
    if (e == 0)
      k++;
  }
  t1 = now();
  printf("%ld: get time = %f\n", n, t1 - t0);
  printf("%ld: %d keys missing\n", n, k);
  return NULL;
}

int main(int argc, char *argv[])
{
  pthread_t *tha;
  void *value;
  long i;
  double t1, t0;
  for (int i = 0; i < NBUCKET; i++)
  {
    pthread_mutex_init(&locks[i], NULL);
  }
  if (argc < 2)
  {
    //fprintf(stderr, "%s: %s nthread\n", argv[0], argv[0]);
    //exit(-1);
  }
  //nthread = atoi(argv[1]);
  nthread = 2;
  tha = malloc(sizeof(pthread_t) * nthread);
  srandom(0);
  assert(NKEYS % nthread == 0); /*只允许创建偶数个线程*/
  for (i = 0; i < NKEYS; i++)
  {
    keys[i] = random(); /*生成10000个随机的key*/
  }
  t0 = now();
  for (i = 0; i < nthread; i++)
  {
    assert(pthread_create(&tha[i], NULL, thread, (void *)i) == 0);
  }
  for (i = 0; i < nthread; i++)
  {
    assert(pthread_join(tha[i], &value) == 0);
  }
  t1 = now();
  printf("completion time = %f\n", t1 - t0);
}
0: put time = 0.008046
0: get time = 5.302227
0: 0 keys missing
completion time = 5.310402

1: put time = 0.009085
0: put time = 0.011063
1: get time = 5.437663
1: 0 keys missing
0: get time = 5.446754
0: 0 keys missing
completion time = 5.457905

虽然表面看来,增加一个线程,完成时间并没有减少反而增加,但是第二种情况,由于多了一个线程,get() 函数多执行了 NKEYS 次。所以并行度还是有所提高。