作业地址

ph.c 进行编译

1
gcc -g -O2 ph.c -pthread

在单线程和两个线程运行的情况下,两个线程发生 keys missing(这里直接复制作业地址上的例子了)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
./a.out 2
1: put time = 0.003338
0: put time = 0.003389
0: get time = 7.684335
0: 17480 keys missing
1: get time = 7.684335
1: 17480 keys missing
completion time = 7.687856

./a.out 1
0: put time = 0.004073
0: get time = 6.929189
0: 0 keys missing
completion time = 6.933433

通过代码知道,首先创建 NBUCKETentry ,随后通过 put() 方法把随机生成的 keys 按照平均分配的方式插入至对应的 entry 内,而最后通过 get() 方法比较有无缺失 keys

因为在 put() 的时候,可能会导致两个进程同时将新的 keys 插入到同一个 entry 内,而 entry 在插入的时候只能记录其中一个 keys,从而导致另外一个 keys 丢失

解决这个问题只要在 put() 的时候加入互斥锁就行

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
pthread_mutex_t locks[NBUCKET];
// ...
static
void put(int key, int value)
{
  int i = key % NBUCKET;
  pthread_mutex_lock(&put_locks[i]);
  insert(key, value, &table[i], table[i]);
  pthread_mutex_unlock(&put_locks[i]);
}

运行结果如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
./a.out 1
0: put time = 0.007325
0: get time = 8.372105
0: 0 keys missing
completion time = 8.379631

./a.out 2
0: put time = 0.010737
1: put time = 0.012351
1: get time = 8.138578
1: 0 keys missing
0: get time = 8.196163
0: 0 keys missing
completion time = 8.209938

现在没有丢失 keys 了,但是单线程跟两个线程的时间差不多,通过上面的结果可以看到主要是 get() 耗时长,而这需要改变 get() 方法让其也支持多线程运行

所以最后的代码如下

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <assert.h>
#include <pthread.h>
#include <sys/time.h>

#define SOL
#define NBUCKET 5
#define NKEYS 100000

struct entry {
  int key;
  int value;
  struct entry *next;
};
struct entry *table[NBUCKET];
int keys[NKEYS];
int nthread = 1;
volatile int done;

// 互斥锁
pthread_mutex_t put_locks[NBUCKET];

double
now()
{
 struct timeval tv;
 gettimeofday(&tv, 0);
 return tv.tv_sec + tv.tv_usec / 1000000.0;
}

static void
print(void)
{
  int i;
  struct entry *e;
  for (i = 0; i < NBUCKET; i++) {
    printf("%d: ", i);
    for (e = table[i]; e != 0; e = e->next) {
      printf("%d ", e->key);
    }
    printf("\n");
  }
}

static void
insert(int key, int value, struct entry **p, struct entry *n)
{
  struct entry *e = malloc(sizeof(struct entry));
  e->key = key;
  e->value = value;
  e->next = n;
  *p = e;
}

static
void put(int key, int value)
{
  int i = key % NBUCKET;
  pthread_mutex_lock(&put_locks[i]);
  insert(key, value, &table[i], table[i]);
  pthread_mutex_unlock(&put_locks[i]);
}

static struct entry*
get(int key)
{
  struct entry *e = 0;
  for (e = table[key % NBUCKET]; e != 0; e = e->next) {
    if (e->key == key) break;
  }
  return e;
}

/* 原方法,已拆分为 put_thread() 和 get_thread() */
/* static void * */
/* thread(void *xa) { */
/*   long n = (long) xa; */
/*   int i; */
/*   int b = NKEYS/nthread; */
/*   int k = 0; */
/*   double t1, t0; */

/*   //  printf("b = %d\n", b); */
/*   t0 = now(); */
/*   for (i = 0; i < b; i++) { */
/*     // printf("%d: put %d\n", n, b*n+i); */
/*     put(keys[b*n + i], n); */
/*   } */
/*   t1 = now(); */
/*   printf("%ld: put time = %f\n", n, t1-t0); */

/*   // Should use pthread_barrier, but MacOS doesn't support it ... */
/*   __sync_fetch_and_add(&done, 1); */
/*   while (done < nthread) ; */

/*   t0 = now(); */
/*   for (i = 0; i < NKEYS; i++) { */
/*     struct entry *e = get(keys[i]); */
/*     if (e == 0) k++; */
/*   } */
/*   t1 = now(); */
/*   printf("%ld: get time = %f\n", n, t1-t0); */
/*   printf("%ld: %d keys missing\n", n, k); */

/*   return NULL; */
/* } */

static void *
put_thread(void *xa)
{
  long n = (long) xa;
  int i;
  int b = NKEYS/nthread;
  int k = 0;
  double t1, t0;

  //  printf("b = %d\n", b);
  t0 = now();
  for (i = 0; i < b; i++) {
    // printf("%d: put %d\n", n, b*n+i);
    put(keys[b*n + i], n);
  }
  t1 = now();
  printf("%ld: put time = %f\n", n, t1-t0);

  // Should use pthread_barrier, but MacOS doesn't support it ...
  __sync_fetch_and_add(&done, 1);
  while (done < nthread) ;

  return NULL;
}

static void *
get_thread(void *xa) {
  long n = (long) xa;
  int i;
  int b = NKEYS/nthread;
  int k = 0;
  double t1, t0;

  t0 = now();
  for (i = 0; i < b; i++) {
    struct entry *e = get(keys[b*n + i]);
    if (e == 0) k++;
  }
  t1 = now();
  printf("%ld: get time = %f\n", n, t1-t0);
  printf("%ld: %d keys missing\n", n, k);

  // Should use pthread_barrier, but MacOS doesn't support it ...
  __sync_fetch_and_add(&done, 1);
  while (done < nthread) ;

  return NULL;
}

int
main(int argc, char *argv[])
{
  pthread_t *tha;
  void *value;
  long i;
  double t1, t0;

  if (argc < 2) {
    fprintf(stderr, "%s: %s nthread\n", argv[0], argv[0]);
    exit(-1);
  }
  nthread = atoi(argv[1]);

  // 初始化锁
  for (i = 0; i < NBUCKET; ++i) {
    assert(pthread_mutex_init(&put_locks[i], NULL) == 0);
  }

  // pthread_join() 空间分配
  tha = malloc(sizeof(pthread_t) * nthread);

  // keys 生成
  srandom(0);
  assert(NKEYS % nthread == 0);
  for (i = 0; i < NKEYS; i++) {
    keys[i] = random();
  }

  t0 = now();

  // put_thread() 创建
  for(i = 0; i < nthread; i++) {
    assert(pthread_create(&tha[i], NULL, put_thread, (void *) i) == 0);
  }
  // 等待所有 thread 完成
  for(i = 0; i < nthread; i++) {
    assert(pthread_join(tha[i], &value) == 0);
  }

  // get_thread() 创建
  for (i = 0; i < nthread; ++i) {
    assert(pthread_create(&tha[i], NULL, get_thread, (void *) i) == 0);
  }
  // 等待所有 thread 完成
  for(i = 0; i < nthread; i++) {
    assert(pthread_join(tha[i], &value) == 0);
  }

  t1 = now();
  printf("completion time = %f\n", t1-t0);
}

结果如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
./a.out 1
0: put time = 0.009105
0: get time = 8.598449
0: 0 keys missing
completion time = 8.607930

./a.out 2
0: put time = 0.015992
1: put time = 0.017194
1: get time = 3.659808
1: 0 keys missing
0: get time = 4.596863
0: 0 keys missing
completion time = 4.614256

修改后有约 50%的提高