aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/journal.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/journal.c')
-rw-r--r--fs/ocfs2/journal.c211
1 files changed, 180 insertions, 31 deletions
diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
index f31c7e8c19c..9698338adc3 100644
--- a/fs/ocfs2/journal.c
+++ b/fs/ocfs2/journal.c
@@ -64,6 +64,137 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
int slot);
static int ocfs2_commit_thread(void *arg);
+
+/*
+ * The recovery_list is a simple linked list of node numbers to recover.
+ * It is protected by the recovery_lock.
+ */
+
+struct ocfs2_recovery_map {
+ unsigned int rm_used;
+ unsigned int *rm_entries;
+};
+
+int ocfs2_recovery_init(struct ocfs2_super *osb)
+{
+ struct ocfs2_recovery_map *rm;
+
+ mutex_init(&osb->recovery_lock);
+ osb->disable_recovery = 0;
+ osb->recovery_thread_task = NULL;
+ init_waitqueue_head(&osb->recovery_event);
+
+ rm = kzalloc(sizeof(struct ocfs2_recovery_map) +
+ osb->max_slots * sizeof(unsigned int),
+ GFP_KERNEL);
+ if (!rm) {
+ mlog_errno(-ENOMEM);
+ return -ENOMEM;
+ }
+
+ rm->rm_entries = (unsigned int *)((char *)rm +
+ sizeof(struct ocfs2_recovery_map));
+ osb->recovery_map = rm;
+
+ return 0;
+}
+
+/* we can't grab the goofy sem lock from inside wait_event, so we use
+ * memory barriers to make sure that we'll see the null task before
+ * being woken up */
+static int ocfs2_recovery_thread_running(struct ocfs2_super *osb)
+{
+ mb();
+ return osb->recovery_thread_task != NULL;
+}
+
+void ocfs2_recovery_exit(struct ocfs2_super *osb)
+{
+ struct ocfs2_recovery_map *rm;
+
+ /* disable any new recovery threads and wait for any currently
+ * running ones to exit. Do this before setting the vol_state. */
+ mutex_lock(&osb->recovery_lock);
+ osb->disable_recovery = 1;
+ mutex_unlock(&osb->recovery_lock);
+ wait_event(osb->recovery_event, !ocfs2_recovery_thread_running(osb));
+
+ /* At this point, we know that no more recovery threads can be
+ * launched, so wait for any recovery completion work to
+ * complete. */
+ flush_workqueue(ocfs2_wq);
+
+ /*
+ * Now that recovery is shut down, and the osb is about to be
+ * freed, the osb_lock is not taken here.
+ */
+ rm = osb->recovery_map;
+ /* XXX: Should we bug if there are dirty entries? */
+
+ kfree(rm);
+}
+
+static int __ocfs2_recovery_map_test(struct ocfs2_super *osb,
+ unsigned int node_num)
+{
+ int i;
+ struct ocfs2_recovery_map *rm = osb->recovery_map;
+
+ assert_spin_locked(&osb->osb_lock);
+
+ for (i = 0; i < rm->rm_used; i++) {
+ if (rm->rm_entries[i] == node_num)
+ return 1;
+ }
+
+ return 0;
+}
+
+/* Behaves like test-and-set. Returns the previous value */
+static int ocfs2_recovery_map_set(struct ocfs2_super *osb,
+ unsigned int node_num)
+{
+ struct ocfs2_recovery_map *rm = osb->recovery_map;
+
+ spin_lock(&osb->osb_lock);
+ if (__ocfs2_recovery_map_test(osb, node_num)) {
+ spin_unlock(&osb->osb_lock);
+ return 1;
+ }
+
+ /* XXX: Can this be exploited? Not from o2dlm... */
+ BUG_ON(rm->rm_used >= osb->max_slots);
+
+ rm->rm_entries[rm->rm_used] = node_num;
+ rm->rm_used++;
+ spin_unlock(&osb->osb_lock);
+
+ return 0;
+}
+
+static void ocfs2_recovery_map_clear(struct ocfs2_super *osb,
+ unsigned int node_num)
+{
+ int i;
+ struct ocfs2_recovery_map *rm = osb->recovery_map;
+
+ spin_lock(&osb->osb_lock);
+
+ for (i = 0; i < rm->rm_used; i++) {
+ if (rm->rm_entries[i] == node_num)
+ break;
+ }
+
+ if (i < rm->rm_used) {
+ /* XXX: be careful with the pointer math */
+ memmove(&(rm->rm_entries[i]), &(rm->rm_entries[i + 1]),
+ (rm->rm_used - i - 1) * sizeof(unsigned int));
+ rm->rm_used--;
+ }
+
+ spin_unlock(&osb->osb_lock);
+}
+
static int ocfs2_commit_cache(struct ocfs2_super *osb)
{
int status = 0;
@@ -586,8 +717,7 @@ int ocfs2_journal_load(struct ocfs2_journal *journal, int local)
mlog_entry_void();
- if (!journal)
- BUG();
+ BUG_ON(!journal);
osb = journal->j_osb;
@@ -650,6 +780,23 @@ bail:
return status;
}
+static int ocfs2_recovery_completed(struct ocfs2_super *osb)
+{
+ int empty;
+ struct ocfs2_recovery_map *rm = osb->recovery_map;
+
+ spin_lock(&osb->osb_lock);
+ empty = (rm->rm_used == 0);
+ spin_unlock(&osb->osb_lock);
+
+ return empty;
+}
+
+void ocfs2_wait_for_recovery(struct ocfs2_super *osb)
+{
+ wait_event(osb->recovery_event, ocfs2_recovery_completed(osb));
+}
+
/*
* JBD Might read a cached version of another nodes journal file. We
* don't want this as this file changes often and we get no
@@ -848,6 +995,7 @@ static int __ocfs2_recovery_thread(void *arg)
{
int status, node_num;
struct ocfs2_super *osb = arg;
+ struct ocfs2_recovery_map *rm = osb->recovery_map;
mlog_entry_void();
@@ -863,26 +1011,29 @@ restart:
goto bail;
}
- while(!ocfs2_node_map_is_empty(osb, &osb->recovery_map)) {
- node_num = ocfs2_node_map_first_set_bit(osb,
- &osb->recovery_map);
- if (node_num == O2NM_INVALID_NODE_NUM) {
- mlog(0, "Out of nodes to recover.\n");
- break;
- }
+ spin_lock(&osb->osb_lock);
+ while (rm->rm_used) {
+ /* It's always safe to remove entry zero, as we won't
+ * clear it until ocfs2_recover_node() has succeeded. */
+ node_num = rm->rm_entries[0];
+ spin_unlock(&osb->osb_lock);
status = ocfs2_recover_node(osb, node_num);
- if (status < 0) {
+ if (!status) {
+ ocfs2_recovery_map_clear(osb, node_num);
+ } else {
mlog(ML_ERROR,
"Error %d recovering node %d on device (%u,%u)!\n",
status, node_num,
MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
mlog(ML_ERROR, "Volume requires unmount.\n");
- continue;
}
- ocfs2_recovery_map_clear(osb, node_num);
+ spin_lock(&osb->osb_lock);
}
+ spin_unlock(&osb->osb_lock);
+ mlog(0, "All nodes recovered\n");
+
ocfs2_super_unlock(osb, 1);
/* We always run recovery on our own orphan dir - the dead
@@ -893,8 +1044,7 @@ restart:
bail:
mutex_lock(&osb->recovery_lock);
- if (!status &&
- !ocfs2_node_map_is_empty(osb, &osb->recovery_map)) {
+ if (!status && !ocfs2_recovery_completed(osb)) {
mutex_unlock(&osb->recovery_lock);
goto restart;
}
@@ -924,8 +1074,8 @@ void ocfs2_recovery_thread(struct ocfs2_super *osb, int node_num)
/* People waiting on recovery will wait on
* the recovery map to empty. */
- if (!ocfs2_recovery_map_set(osb, node_num))
- mlog(0, "node %d already be in recovery.\n", node_num);
+ if (ocfs2_recovery_map_set(osb, node_num))
+ mlog(0, "node %d already in recovery map.\n", node_num);
mlog(0, "starting recovery thread...\n");
@@ -1079,7 +1229,6 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
{
int status = 0;
int slot_num;
- struct ocfs2_slot_info *si = osb->slot_info;
struct ocfs2_dinode *la_copy = NULL;
struct ocfs2_dinode *tl_copy = NULL;
@@ -1092,8 +1241,8 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
* case we should've called ocfs2_journal_load instead. */
BUG_ON(osb->node_num == node_num);
- slot_num = ocfs2_node_num_to_slot(si, node_num);
- if (slot_num == OCFS2_INVALID_SLOT) {
+ slot_num = ocfs2_node_num_to_slot(osb, node_num);
+ if (slot_num == -ENOENT) {
status = 0;
mlog(0, "no slot for this node, so no recovery required.\n");
goto done;
@@ -1123,8 +1272,7 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
/* Likewise, this would be a strange but ultimately not so
* harmful place to get an error... */
- ocfs2_clear_slot(si, slot_num);
- status = ocfs2_update_disk_slots(osb, si);
+ status = ocfs2_clear_slot(osb, slot_num);
if (status < 0)
mlog_errno(status);
@@ -1184,23 +1332,24 @@ bail:
* slot info struct has been updated from disk. */
int ocfs2_mark_dead_nodes(struct ocfs2_super *osb)
{
- int status, i, node_num;
- struct ocfs2_slot_info *si = osb->slot_info;
+ unsigned int node_num;
+ int status, i;
/* This is called with the super block cluster lock, so we
* know that the slot map can't change underneath us. */
- spin_lock(&si->si_lock);
- for(i = 0; i < si->si_num_slots; i++) {
+ spin_lock(&osb->osb_lock);
+ for (i = 0; i < osb->max_slots; i++) {
if (i == osb->slot_num)
continue;
- if (ocfs2_is_empty_slot(si, i))
+
+ status = ocfs2_slot_to_node_num_locked(osb, i, &node_num);
+ if (status == -ENOENT)
continue;
- node_num = si->si_global_node_nums[i];
- if (ocfs2_node_map_test_bit(osb, &osb->recovery_map, node_num))
+ if (__ocfs2_recovery_map_test(osb, node_num))
continue;
- spin_unlock(&si->si_lock);
+ spin_unlock(&osb->osb_lock);
/* Ok, we have a slot occupied by another node which
* is not in the recovery map. We trylock his journal
@@ -1216,9 +1365,9 @@ int ocfs2_mark_dead_nodes(struct ocfs2_super *osb)
goto bail;
}
- spin_lock(&si->si_lock);
+ spin_lock(&osb->osb_lock);
}
- spin_unlock(&si->si_lock);
+ spin_unlock(&osb->osb_lock);
status = 0;
bail: