@ -1720,7 +1720,8 @@ int read_index(struct index_state *istate)
@@ -1720,7 +1720,8 @@ int read_index(struct index_state *istate)
return read_index_from(istate, get_index_file(), get_git_dir());
}
static struct cache_entry *create_from_disk(struct index_state *istate,
static struct cache_entry *create_from_disk(struct mem_pool *ce_mem_pool,
unsigned int version,
struct ondisk_cache_entry *ondisk,
unsigned long *ent_size,
const struct cache_entry *previous_ce)
@ -1737,7 +1738,7 @@ static struct cache_entry *create_from_disk(struct index_state *istate,
@@ -1737,7 +1738,7 @@ static struct cache_entry *create_from_disk(struct index_state *istate,
* number of bytes to be stripped from the end of the previous name,
* and the bytes to append to the result, to come up with its name.
*/
int expand_name_field = istate->version == 4;
int expand_name_field = version == 4;
/* On-disk flags are just 16 bits */
flags = get_be16(&ondisk->flags);
@ -1761,16 +1762,17 @@ static struct cache_entry *create_from_disk(struct index_state *istate,
@@ -1761,16 +1762,17 @@ static struct cache_entry *create_from_disk(struct index_state *istate,
const unsigned char *cp = (const unsigned char *)name;
size_t strip_len, previous_len;
previous_len = previous_ce ? previous_ce->ce_namelen : 0;
/* If we're at the begining of a block, ignore the previous name */
strip_len = decode_varint(&cp);
if (previous_len < strip_len) {
if (previous_ce)
if (previous_ce) {
previous_len = previous_ce->ce_namelen;
if (previous_len < strip_len)
die(_("malformed name field in the index, near path '%s'"),
previous_ce->name);
else
die(_("malformed name field in the index in the first path"));
previous_ce->name);
copy_len = previous_len - strip_len;
} else {
copy_len = 0;
}
copy_len = previous_len - strip_len;
name = (const char *)cp;
}
@ -1780,7 +1782,7 @@ static struct cache_entry *create_from_disk(struct index_state *istate,
@@ -1780,7 +1782,7 @@ static struct cache_entry *create_from_disk(struct index_state *istate,
len += copy_len;
}
ce = mem_pool__ce_alloc(istate->ce_mem_pool, len);
ce = mem_pool__ce_alloc(ce_mem_pool, len);
ce->ce_stat_data.sd_ctime.sec = get_be32(&ondisk->ctime.sec);
ce->ce_stat_data.sd_mtime.sec = get_be32(&ondisk->mtime.sec);
@ -1948,6 +1950,52 @@ static void *load_index_extensions(void *_data)
@@ -1948,6 +1950,52 @@ static void *load_index_extensions(void *_data)
return NULL;
}
/*
* A helper function that will load the specified range of cache entries
* from the memory mapped file and add them to the given index.
*/
static unsigned long load_cache_entry_block(struct index_state *istate,
struct mem_pool *ce_mem_pool, int offset, int nr, const char *mmap,
unsigned long start_offset, const struct cache_entry *previous_ce)
{
int i;
unsigned long src_offset = start_offset;
for (i = offset; i < offset + nr; i++) {
struct ondisk_cache_entry *disk_ce;
struct cache_entry *ce;
unsigned long consumed;
disk_ce = (struct ondisk_cache_entry *)(mmap + src_offset);
ce = create_from_disk(ce_mem_pool, istate->version, disk_ce, &consumed, previous_ce);
set_index_entry(istate, i, ce);
src_offset += consumed;
previous_ce = ce;
}
return src_offset - start_offset;
}
static unsigned long load_all_cache_entries(struct index_state *istate,
const char *mmap, size_t mmap_size, unsigned long src_offset)
{
unsigned long consumed;
if (istate->version == 4) {
mem_pool_init(&istate->ce_mem_pool,
estimate_cache_size_from_compressed(istate->cache_nr));
} else {
mem_pool_init(&istate->ce_mem_pool,
estimate_cache_size(mmap_size, istate->cache_nr));
}
consumed = load_cache_entry_block(istate, istate->ce_mem_pool,
0, istate->cache_nr, mmap, src_offset, NULL);
return consumed;
}
#ifndef NO_PTHREADS
/*
* Mostly randomly chosen maximum thread counts: we
* cap the parallelism to online_cpus() threads, and we want
@ -1957,20 +2005,123 @@ static void *load_index_extensions(void *_data)
@@ -1957,20 +2005,123 @@ static void *load_index_extensions(void *_data)
#define THREAD_COST (10000)
struct load_cache_entries_thread_data
{
pthread_t pthread;
struct index_state *istate;
struct mem_pool *ce_mem_pool;
int offset;
const char *mmap;
struct index_entry_offset_table *ieot;
int ieot_start; /* starting index into the ieot array */
int ieot_blocks; /* count of ieot entries to process */
unsigned long consumed; /* return # of bytes in index file processed */
};
/*
* A thread proc to run the load_cache_entries() computation
* across multiple background threads.
*/
static void *load_cache_entries_thread(void *_data)
{
struct load_cache_entries_thread_data *p = _data;
int i;
/* iterate across all ieot blocks assigned to this thread */
for (i = p->ieot_start; i < p->ieot_start + p->ieot_blocks; i++) {
p->consumed += load_cache_entry_block(p->istate, p->ce_mem_pool,
p->offset, p->ieot->entries[i].nr, p->mmap, p->ieot->entries[i].offset, NULL);
p->offset += p->ieot->entries[i].nr;
}
return NULL;
}
static unsigned long load_cache_entries_threaded(struct index_state *istate, const char *mmap, size_t mmap_size,
unsigned long src_offset, int nr_threads, struct index_entry_offset_table *ieot)
{
int i, offset, ieot_blocks, ieot_start, err;
struct load_cache_entries_thread_data *data;
unsigned long consumed = 0;
/* a little sanity checking */
if (istate->name_hash_initialized)
BUG("the name hash isn't thread safe");
mem_pool_init(&istate->ce_mem_pool, 0);
/* ensure we have no more threads than we have blocks to process */
if (nr_threads > ieot->nr)
nr_threads = ieot->nr;
data = xcalloc(nr_threads, sizeof(*data));
offset = ieot_start = 0;
ieot_blocks = DIV_ROUND_UP(ieot->nr, nr_threads);
for (i = 0; i < nr_threads; i++) {
struct load_cache_entries_thread_data *p = &data[i];
int nr, j;
if (ieot_start + ieot_blocks > ieot->nr)
ieot_blocks = ieot->nr - ieot_start;
p->istate = istate;
p->offset = offset;
p->mmap = mmap;
p->ieot = ieot;
p->ieot_start = ieot_start;
p->ieot_blocks = ieot_blocks;
/* create a mem_pool for each thread */
nr = 0;
for (j = p->ieot_start; j < p->ieot_start + p->ieot_blocks; j++)
nr += p->ieot->entries[j].nr;
if (istate->version == 4) {
mem_pool_init(&p->ce_mem_pool,
estimate_cache_size_from_compressed(nr));
} else {
mem_pool_init(&p->ce_mem_pool,
estimate_cache_size(mmap_size, nr));
}
err = pthread_create(&p->pthread, NULL, load_cache_entries_thread, p);
if (err)
die(_("unable to create load_cache_entries thread: %s"), strerror(err));
/* increment by the number of cache entries in the ieot block being processed */
for (j = 0; j < ieot_blocks; j++)
offset += ieot->entries[ieot_start + j].nr;
ieot_start += ieot_blocks;
}
for (i = 0; i < nr_threads; i++) {
struct load_cache_entries_thread_data *p = &data[i];
err = pthread_join(p->pthread, NULL);
if (err)
die(_("unable to join load_cache_entries thread: %s"), strerror(err));
mem_pool_combine(istate->ce_mem_pool, p->ce_mem_pool);
consumed += p->consumed;
}
free(data);
return consumed;
}
#endif
/* remember to discard_cache() before reading a different cache! */
int do_read_index(struct index_state *istate, const char *path, int must_exist)
{
int fd, i;
int fd;
struct stat st;
unsigned long src_offset;
const struct cache_header *hdr;
const char *mmap;
size_t mmap_size;
const struct cache_entry *previous_ce = NULL;
struct load_index_extensions p;
size_t extension_offset = 0;
#ifndef NO_PTHREADS
int nr_threads;
int nr_threads, cpus;
struct index_entry_offset_table *ieot = NULL;
#endif
if (istate->initialized)
@ -2012,10 +2163,18 @@ int do_read_index(struct index_state *istate, const char *path, int must_exist)
@@ -2012,10 +2163,18 @@ int do_read_index(struct index_state *istate, const char *path, int must_exist)
p.mmap = mmap;
p.mmap_size = mmap_size;
src_offset = sizeof(*hdr);
#ifndef NO_PTHREADS
nr_threads = git_config_get_index_threads();
if (!nr_threads)
nr_threads = online_cpus();
/* TODO: does creating more threads than cores help? */
if (!nr_threads) {
nr_threads = istate->cache_nr / THREAD_COST;
cpus = online_cpus();
if (nr_threads > cpus)
nr_threads = cpus;
}
if (nr_threads > 1) {
extension_offset = read_eoie_extension(mmap, mmap_size);
@ -2030,29 +2189,24 @@ int do_read_index(struct index_state *istate, const char *path, int must_exist)
@@ -2030,29 +2189,24 @@ int do_read_index(struct index_state *istate, const char *path, int must_exist)
nr_threads--;
}
}
#endif
if (istate->version == 4) {
mem_pool_init(&istate->ce_mem_pool,
estimate_cache_size_from_compressed(istate->cache_nr));
/*
* Locate and read the index entry offset table so that we can use it
* to multi-thread the reading of the cache entries.
*/
if (extension_offset && nr_threads > 1)
ieot = read_ieot_extension(mmap, mmap_size, extension_offset);
if (ieot) {
src_offset += load_cache_entries_threaded(istate, mmap, mmap_size, src_offset, nr_threads, ieot);
free(ieot);
} else {
mem_pool_init(&istate->ce_mem_pool,
estimate_cache_size(mmap_size, istate->cache_nr));
src_offset += load_all_cache_entries(istate, mmap, mmap_size, src_offset);
}
#else
src_offset += load_all_cache_entries(istate, mmap, mmap_size, src_offset);
#endif
src_offset = sizeof(*hdr);
for (i = 0; i < istate->cache_nr; i++) {
struct ondisk_cache_entry *disk_ce;
struct cache_entry *ce;
unsigned long consumed;
disk_ce = (struct ondisk_cache_entry *)(mmap + src_offset);
ce = create_from_disk(istate, disk_ce, &consumed, previous_ce);
set_index_entry(istate, i, ce);
src_offset += consumed;
previous_ce = ce;
}
istate->timestamp.sec = st.st_mtime;
istate->timestamp.nsec = ST_MTIME_NSEC(st);
@ -2549,7 +2703,7 @@ static int do_write_index(struct index_state *istate, struct tempfile *tempfile,
@@ -2549,7 +2703,7 @@ static int do_write_index(struct index_state *istate, struct tempfile *tempfile,
struct strbuf previous_name_buf = STRBUF_INIT, *previous_name;
int drop_cache_tree = istate->drop_cache_tree;
off_t offset;
int ieot_blocks = 1;
int ieot_entries = 1;
struct index_entry_offset_table *ieot = NULL;
int nr, nr_threads;
@ -2602,6 +2756,8 @@ static int do_write_index(struct index_state *istate, struct tempfile *tempfile,
@@ -2602,6 +2756,8 @@ static int do_write_index(struct index_state *istate, struct tempfile *tempfile,
ieot_blocks = cpus - 1;
} else {
ieot_blocks = nr_threads;
if (ieot_blocks > istate->cache_nr)
ieot_blocks = istate->cache_nr;
}
/*
@ -2611,7 +2767,7 @@ static int do_write_index(struct index_state *istate, struct tempfile *tempfile,
@@ -2611,7 +2767,7 @@ static int do_write_index(struct index_state *istate, struct tempfile *tempfile,
if (ieot_blocks > 1) {
ieot = xcalloc(1, sizeof(struct index_entry_offset_table)
+ (ieot_blocks * sizeof(struct index_entry_offset)));
ieot_blocks = DIV_ROUND_UP(entries, ieot_blocks);
ieot_entries = DIV_ROUND_UP(entries, ieot_blocks);
}
}
#endif
@ -2644,7 +2800,7 @@ static int do_write_index(struct index_state *istate, struct tempfile *tempfile,
@@ -2644,7 +2800,7 @@ static int do_write_index(struct index_state *istate, struct tempfile *tempfile,
drop_cache_tree = 1;
}
if (ieot && i && (i % ieot_blocks == 0)) {
if (ieot && i && (i % ieot_entries == 0)) {
ieot->entries[ieot->nr].nr = nr;
ieot->entries[ieot->nr].offset = offset;
ieot->nr++;