一、Interrupts in ide.c


这个作业问为什么 xv6 在 acquire() 一开始取消当前处理器的中断,而在 release() 最后设置中断,并以 ide.c 中两个函数为例。

先按照作业要求在 iderw() 中添加 sti()cli():

void
iderw(struct buf *b)
{
  struct buf **pp;
  if(!holdingsleep(&b->lock))
    panic("iderw: buf not locked");
  if((b->flags & (B_VALID|B_DIRTY)) == B_VALID)
    panic("iderw: nothing to do");
  if(b->dev != 0 && !havedisk1)
    panic("iderw: ide disk 1 not present");
  acquire(&idelock);
  sti();
  b->qnext = 0;
  for(pp=&idequeue; *pp; pp=&(*pp)->qnext)
    ;
  *pp = b;
  if(idequeue == b)
    idestart(b);
  while((b->flags & (B_VALID|B_DIRTY)) != B_VALID){
    sleep(b, &idelock);
  }
  cli();
  release(&idelock);
}

假设在执行 b->qnext = 0 的时候,处理器接受到了一个磁盘中断。于是处理器暂停 iderw(),通过层层调用,最终到达 Interrupt Handler:

void
ideintr(void)
{
  struct buf *b;
  acquire(&idelock);
  if((b = idequeue) == 0){
    release(&idelock);
    return;
  }
  idequeue = b->qnext;
  if(!(b->flags & B_DIRTY) && idewait(1) >= 0)
    insl(0x1f0, b->data, BSIZE/4);
  b->flags |= B_VALID;
  b->flags &= ~B_DIRTY;
  wakeup(b);
  if(idequeue != 0)
    idestart(idequeue);
  release(&idelock);
}

这时问题出现了,当处理器进入 acquire(&idelock):

void
acquire(struct spinlock *lk)
{
  pushcli();
  if(holding(lk))
    panic("acquire");
  while(xchg(&lk->locked, 1) != 0)
    ;
  __sync_synchronize();
  lk->cpu = mycpu();
  getcallerpcs(&lk, lk->pcs);
}

执行 holding(lk),结果为真,因为这个处理器从 iderw() 过来,一直持有 idelock 这把锁,这时内核就 panic 了。

 

二、Interrupts in file.c


与上小节一样,在 filealloc() 相应为止添加 sti()cli():

struct file*
filealloc(void)
{
  struct file *f;
  acquire(&ftable.lock);
  sti();
  for(f = ftable.file; f < ftable.file + NFILE; f++){
    if(f->ref == 0){
      f->ref = 1;
      cli();
      release(&ftable.lock);
      return f;
    }
  }
  cli();
  release(&ftable.lock);
  return 0;
}

添加头文件 #include "x86.h"。这种情况我们启动 QEMU 不会 panic。这是因为在 xv6 中没有中断处理函数会争 ftable.lock 保护的资源。自然也就不会 acquire(&ftable.lock)

 

三、xv6 lock implementation


Why does release() clear lk->pcs[0] and lk->cpu before clearing lk->locked? Why not wait until after?

void
release(struct spinlock *lk)
{
  if(!holding(lk))
    panic("release");
  lk->pcs[0] = 0;
  lk->cpu = 0;
  __sync_synchronize();
  asm volatile("movl $0, %0" : "+m" (lk->locked) : );
  popcli();
}

先看一下这两个成员代表了什么:

// Mutual exclusion lock.
struct spinlock {
  uint locked;       // Is the lock held?

  // For debugging:
  char *name;        // Name of lock.
  struct cpu *cpu;   // The cpu holding the lock.
  uint pcs[10];      // The call stack (an array of program counters)
                     // that locked the lock.
};

lk->cpu 显而易见。关于 lk->pcs:

void
getcallerpcs(void *v, uint pcs[])
{
  uint *ebp;
  int i;
  ebp = (uint*)v - 2;
  for(i = 0; i < 10; i++){
    if(ebp == 0 || ebp < (uint*)KERNBASE || ebp == (uint*)0xffffffff)
      break;
    pcs[i] = ebp[1];
    ebp = (uint*)ebp[0];
  }
  for(; i < 10; i++)
    pcs[i] = 0;
}
void
panic(char *s)
{
  int i;
  uint pcs[10];
  cli();
  cons.locking = 0;
  cprintf("lapicid %d: panic: ", lapicid());
  cprintf(s);
  cprintf("\n");
  getcallerpcs(&s, pcs);
  for(i=0; i<10; i++)
    cprintf(" %p", pcs[i]);
  panicked = 1;
  for(;;)
    ;
}

根据 X86 calling conventions 剖析一下上面两个函数的调用栈:

                              +-------------+
                              |             |
                       +----+ |     ...     |
                       |      |             |
                       |      +-------------+
                       |      |    pcs      | <---+ arg2 for getcallerpsc
       stackframe for panic   +-------------+
                       |      |     &s      | <---+ arg1 for getcallerpsc
                       |      +-------------+ <---+ (uint*)v
                       |      | ret address |
                       +----+ +-------------+ <---+ ebp[1]
                       |      |  old %ebp   |
                       |      +-------------+ <---+ (uint*)v - 2
                       |      |     v       |
                       |      +-------------+ 
stackframe for getcallerpcs   |     ...     |
                       |      |             |
                       +----+ +-------------+ <---+ esp

所以这个函数能纪录最多 10 个调用栈的信息。调用栈中这 10 个函数的暂停点 eip 纪录在数组中。

接着回答开始的问题。如果我们将 realse() 函数改为如下:

void
release(struct spinlock *lk)
{
  if(!holding(lk))
    panic("release");
  __sync_synchronize();
  asm volatile("movl $0, %0" : "+m" (lk->locked) : );
  lk->pcs[0] = 0;
  lk->cpu = 0;
  popcli();
}

结合 acquire() 函数:

void
acquire(struct spinlock *lk)
{
  pushcli();
  if(holding(lk))
    panic("acquire");
  while(xchg(&lk->locked, 1) != 0)
    ;
  __sync_synchronize();
  lk->cpu = mycpu();
  getcallerpcs(&lk, lk->pcs);
}

一旦 realse() 执行 asm 语句,可能立即有进程获取锁,掉出 while 循环,接着执行 getcallerpcs(&lk, lk->pcs)。这时 lk->pcs 还未清空,导致 lk->pcs 错误的存入了另一个进程的调用栈。如果此时内核 panic,就会打印出错误的调用栈信息。