/* ---------------------------------------------------------------------------
 *
 * (c) The GHC Team, 2006
 *
 * Thread-related functionality
 *
 * --------------------------------------------------------------------------*/

#include "PosixSource.h"
#include "Rts.h"

#include "Capability.h"
#include "Updates.h"
#include "Threads.h"
#include "STM.h"
#include "Schedule.h"
#include "Trace.h"
#include "ThreadLabels.h"
#include "Updates.h"
#include "Messages.h"
#include "sm/Storage.h"

/* Next thread ID to allocate.
 * LOCK: sched_mutex
 */
static StgThreadID next_thread_id = 1;

/* The smallest stack size that makes any sense is:
 *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
 *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
 *  + 1                       (the closure to enter)
 *  + 1			      (stg_ap_v_ret)
 *  + 1			      (spare slot req'd by stg_ap_v_ret)
 *
 * A thread with this stack will bomb immediately with a stack
 * overflow, which will increase its stack size.  
 */
#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)

/* ---------------------------------------------------------------------------
   Create a new thread.

   The new thread starts with the given stack size.  Before the
   scheduler can run, however, this thread needs to have a closure
   (and possibly some arguments) pushed on its stack.  See
   pushClosure() in Schedule.h.

   createGenThread() and createIOThread() (in SchedAPI.h) are
   convenient packaged versions of this function.

   currently pri (priority) is only used in a GRAN setup -- HWL
   ------------------------------------------------------------------------ */
StgTSO *
createThread(Capability *cap, nat size)
{
    StgTSO *tso;
    nat stack_size;

    /* sched_mutex is *not* required */

    /* First check whether we should create a thread at all */

    // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW

    /* catch ridiculously small stack sizes */
    if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
	size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
    }

    size = round_to_mblocks(size);
    tso = (StgTSO *)allocate(cap, size);

    stack_size = size - TSO_STRUCT_SIZEW;
    TICK_ALLOC_TSO(stack_size, 0);

    SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);

    // Always start with the compiled code evaluator
    tso->what_next = ThreadRunGHC;

    tso->why_blocked  = NotBlocked;
    tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
    tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
    tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
    tso->flags = 0;
    tso->dirty = 1;
    
    tso->saved_errno = 0;
    tso->bound = NULL;
    tso->cap = cap;
    
    tso->stack_size     = stack_size;
    tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
	                  - TSO_STRUCT_SIZEW;
    tso->sp             = (P_)&(tso->stack) + stack_size;

    tso->trec = NO_TREC;
    
#ifdef PROFILING
    tso->prof.CCCS = CCS_MAIN;
#endif
    
  /* put a stop frame on the stack */
    tso->sp -= sizeofW(StgStopFrame);
    SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
    tso->_link = END_TSO_QUEUE;
    
    /* Link the new thread on the global thread list.
     */
    ACQUIRE_LOCK(&sched_mutex);
    tso->id = next_thread_id++;  // while we have the mutex
    tso->global_link = g0->threads;
    g0->threads = tso;
    RELEASE_LOCK(&sched_mutex);
    
    // ToDo: report the stack size in the event?
    traceEventCreateThread(cap, tso);

    return tso;
}

/* ---------------------------------------------------------------------------
 * Comparing Thread ids.
 *
 * This is used from STG land in the implementation of the
 * instances of Eq/Ord for ThreadIds.
 * ------------------------------------------------------------------------ */

int
cmp_thread(StgPtr tso1, StgPtr tso2) 
{ 
  StgThreadID id1 = ((StgTSO *)tso1)->id; 
  StgThreadID id2 = ((StgTSO *)tso2)->id;
 
  if (id1 < id2) return (-1);
  if (id1 > id2) return 1;
  return 0;
}

/* ---------------------------------------------------------------------------
 * Fetching the ThreadID from an StgTSO.
 *
 * This is used in the implementation of Show for ThreadIds.
 * ------------------------------------------------------------------------ */
int
rts_getThreadId(StgPtr tso) 
{
  return ((StgTSO *)tso)->id;
}

/* -----------------------------------------------------------------------------
   Remove a thread from a queue.
   Fails fatally if the TSO is not on the queue.
   -------------------------------------------------------------------------- */

rtsBool // returns True if we modified queue
removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
{
    StgTSO *t, *prev;

    prev = NULL;
    for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
	if (t == tso) {
	    if (prev) {
		setTSOLink(cap,prev,t->_link);
                return rtsFalse;
	    } else {
		*queue = t->_link;
                return rtsTrue;
	    }
	}
    }
    barf("removeThreadFromQueue: not found");
}

rtsBool // returns True if we modified head or tail
removeThreadFromDeQueue (Capability *cap, 
                         StgTSO **head, StgTSO **tail, StgTSO *tso)
{
    StgTSO *t, *prev;
    rtsBool flag = rtsFalse;

    prev = NULL;
    for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
	if (t == tso) {
	    if (prev) {
		setTSOLink(cap,prev,t->_link);
                flag = rtsFalse;
	    } else {
		*head = t->_link;
                flag = rtsTrue;
	    }
	    if (*tail == tso) {
		if (prev) {
		    *tail = prev;
		} else {
		    *tail = END_TSO_QUEUE;
		}
                return rtsTrue;
	    } else {
                return flag;
            }
	}
    }
    barf("removeThreadFromMVarQueue: not found");
}

/* ----------------------------------------------------------------------------
   tryWakeupThread()

   Attempt to wake up a thread.  tryWakeupThread is idempotent: it is
   always safe to call it too many times, but it is not safe in
   general to omit a call.

   ------------------------------------------------------------------------- */

void
tryWakeupThread (Capability *cap, StgTSO *tso)
{
    tryWakeupThread_(cap, deRefTSO(tso));
}

void
tryWakeupThread_ (Capability *cap, StgTSO *tso)
{
    traceEventThreadWakeup (cap, tso, tso->cap->no);

#ifdef THREADED_RTS
    if (tso->cap != cap)
    {
        MessageWakeup *msg;
        msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
        SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
        msg->tso = tso;
        sendMessage(cap, tso->cap, (Message*)msg);
        debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d",
                      (lnat)tso->id, tso->cap->no);
        return;
    }
#endif

    switch (tso->why_blocked)
    {
    case BlockedOnMVar:
    {
        if (tso->_link == END_TSO_QUEUE) {
            tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
            goto unblock;
        } else {
            return;
        }
    }

    case BlockedOnMsgThrowTo:
    {
        const StgInfoTable *i;
        
        i = lockClosure(tso->block_info.closure);
        unlockClosure(tso->block_info.closure, i);
        if (i != &stg_MSG_NULL_info) {
            debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
                          (lnat)tso->id, tso->block_info.throwto->header.info);
            return;
        }

        // remove the block frame from the stack
        ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info);
        tso->sp += 3;
        goto unblock;
    }

    case BlockedOnBlackHole:
    case BlockedOnSTM:
    case ThreadMigrating:
        goto unblock;

    default:
        // otherwise, do nothing
        return;
    }

unblock:
    // just run the thread now, if the BH is not really available,
    // we'll block again.
    tso->why_blocked = NotBlocked;
    appendToRunQueue(cap,tso);
}

/* ----------------------------------------------------------------------------
   migrateThread
   ------------------------------------------------------------------------- */

void
migrateThread (Capability *from, StgTSO *tso, Capability *to)
{
    traceEventMigrateThread (from, tso, to->no);
    // ThreadMigrating tells the target cap that it needs to be added to
    // the run queue when it receives the MSG_TRY_WAKEUP.
    tso->why_blocked = ThreadMigrating;
    tso->cap = to;
    tryWakeupThread(from, tso);
}

/* ----------------------------------------------------------------------------
   awakenBlockedQueue

   wakes up all the threads on the specified queue.
   ------------------------------------------------------------------------- */

void
wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
{
    MessageBlackHole *msg;
    const StgInfoTable *i;

    ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info  ||
           bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info  );

    for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE; 
         msg = msg->link) {
        i = msg->header.info;
        if (i != &stg_IND_info) {
            ASSERT(i == &stg_MSG_BLACKHOLE_info);
            tryWakeupThread(cap,msg->tso);
        }
    }

    // overwrite the BQ with an indirection so it will be
    // collected at the next GC.
#if defined(DEBUG) && !defined(THREADED_RTS)
    // XXX FILL_SLOP, but not if THREADED_RTS because in that case
    // another thread might be looking at this BLOCKING_QUEUE and
    // checking the owner field at the same time.
    bq->bh = 0; bq->queue = 0; bq->owner = 0;
#endif
    OVERWRITE_INFO(bq, &stg_IND_info);
}

// If we update a closure that we know we BLACKHOLE'd, and the closure
// no longer points to the current TSO as its owner, then there may be
// an orphaned BLOCKING_QUEUE closure with blocked threads attached to
// it.  We therefore traverse the BLOCKING_QUEUEs attached to the
// current TSO to see if any can now be woken up.
void
checkBlockingQueues (Capability *cap, StgTSO *tso)
{
    StgBlockingQueue *bq, *next;
    StgClosure *p;

    debugTraceCap(DEBUG_sched, cap,
                  "collision occurred; checking blocking queues for thread %ld",
                  (lnat)tso->id);
    
    for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) {
        next = bq->link;

        if (bq->header.info == &stg_IND_info) {
            // ToDo: could short it out right here, to avoid
            // traversing this IND multiple times.
            continue;
        }
        
        p = bq->bh;

        if (p->header.info != &stg_BLACKHOLE_info ||
            ((StgInd *)p)->indirectee != (StgClosure*)bq)
        {
            wakeBlockingQueue(cap,bq);
        }   
    }
}

/* ----------------------------------------------------------------------------
   updateThunk

   Update a thunk with a value.  In order to do this, we need to know
   which TSO owns (or is evaluating) the thunk, in case we need to
   awaken any threads that are blocked on it.
   ------------------------------------------------------------------------- */

void
updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
{
    StgClosure *v;
    StgTSO *owner;
    const StgInfoTable *i;

    i = thunk->header.info;
    if (i != &stg_BLACKHOLE_info &&
        i != &stg_CAF_BLACKHOLE_info &&
        i != &__stg_EAGER_BLACKHOLE_info &&
        i != &stg_WHITEHOLE_info) {
        updateWithIndirection(cap, thunk, val);
        return;
    }
    
    v = ((StgInd*)thunk)->indirectee;

    updateWithIndirection(cap, thunk, val);

    i = v->header.info;
    if (i == &stg_TSO_info) {
        owner = deRefTSO((StgTSO*)v);
        if (owner != tso) {
            checkBlockingQueues(cap, tso);
        }
        return;
    }

    if (i != &stg_BLOCKING_QUEUE_CLEAN_info &&
        i != &stg_BLOCKING_QUEUE_DIRTY_info) {
        checkBlockingQueues(cap, tso);
        return;
    }

    owner = deRefTSO(((StgBlockingQueue*)v)->owner);

    if (owner != tso) {
        checkBlockingQueues(cap, tso);
    } else {
        wakeBlockingQueue(cap, (StgBlockingQueue*)v);
    }
}

/* ---------------------------------------------------------------------------
 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
 * used by Control.Concurrent for error checking.
 * ------------------------------------------------------------------------- */
 
HsBool
rtsSupportsBoundThreads(void)
{
#if defined(THREADED_RTS)
  return HS_BOOL_TRUE;
#else
  return HS_BOOL_FALSE;
#endif
}

/* ---------------------------------------------------------------------------
 * isThreadBound(tso): check whether tso is bound to an OS thread.
 * ------------------------------------------------------------------------- */
 
StgBool
isThreadBound(StgTSO* tso USED_IF_THREADS)
{
#if defined(THREADED_RTS)
  return (tso->bound != NULL);
#endif
  return rtsFalse;
}

/* ----------------------------------------------------------------------------
 * Debugging: why is a thread blocked
 * ------------------------------------------------------------------------- */

#if DEBUG
void
printThreadBlockage(StgTSO *tso)
{
  switch (tso->why_blocked) {
  case BlockedOnRead:
    debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
    break;
  case BlockedOnWrite:
    debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
    break;
#if defined(mingw32_HOST_OS)
    case BlockedOnDoProc:
    debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
    break;
#endif
  case BlockedOnDelay:
    debugBelch("is blocked until %ld", (long)(tso->block_info.target));
    break;
  case BlockedOnMVar:
    debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
    break;
  case BlockedOnBlackHole:
      debugBelch("is blocked on a black hole %p", 
                 ((StgBlockingQueue*)tso->block_info.bh->bh));
    break;
  case BlockedOnMsgThrowTo:
    debugBelch("is blocked on a throwto message");
    break;
  case NotBlocked:
    debugBelch("is not blocked");
    break;
  case ThreadMigrating:
    debugBelch("is runnable, but not on the run queue");
    break;
  case BlockedOnCCall:
    debugBelch("is blocked on an external call");
    break;
  case BlockedOnCCall_NoUnblockExc:
    debugBelch("is blocked on an external call (exceptions were already blocked)");
    break;
  case BlockedOnSTM:
    debugBelch("is blocked on an STM operation");
    break;
  default:
    barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
	 tso->why_blocked, tso->id, tso);
  }
}


void
printThreadStatus(StgTSO *t)
{
  debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
    {
      void *label = lookupThreadLabel(t->id);
      if (label) debugBelch("[\"%s\"] ",(char *)label);
    }
    if (t->what_next == ThreadRelocated) {
	debugBelch("has been relocated...\n");
    } else {
	switch (t->what_next) {
	case ThreadKilled:
	    debugBelch("has been killed");
	    break;
	case ThreadComplete:
	    debugBelch("has completed");
	    break;
	default:
	    printThreadBlockage(t);
	}
        if (t->dirty) {
            debugBelch(" (TSO_DIRTY)");
        } else if (t->flags & TSO_LINK_DIRTY) {
            debugBelch(" (TSO_LINK_DIRTY)");
        }
	debugBelch("\n");
    }
}

void
printAllThreads(void)
{
  StgTSO *t, *next;
  nat i, g;
  Capability *cap;

  debugBelch("all threads:\n");

  for (i = 0; i < n_capabilities; i++) {
      cap = &capabilities[i];
      debugBelch("threads on capability %d:\n", cap->no);
      for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
	  printThreadStatus(t);
      }
  }

  debugBelch("other threads:\n");
  for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
    for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
      if (t->why_blocked != NotBlocked) {
	  printThreadStatus(t);
      }
      if (t->what_next == ThreadRelocated) {
	  next = t->_link;
      } else {
	  next = t->global_link;
      }
    }
  }
}

// useful from gdb
void 
printThreadQueue(StgTSO *t)
{
    nat i = 0;
    for (; t != END_TSO_QUEUE; t = t->_link) {
	printThreadStatus(t);
	i++;
    }
    debugBelch("%d threads on queue\n", i);
}

#endif /* DEBUG */