Tizen Native API
5.0
|
Working with threads is hard. Ecore helps to do so a bit easier, but as the example in ecore_thread_example.c shows, there's a lot to consider even when doing the most simple things.
We'll be going through this thorough example now, showing how the differents aspects of Ecore_Thread are used, but users are encourage to avoid threads unless it's really the only option, as they always add more complexity than the program usually requires.
Ecore Threads come in two flavors, short jobs and feedback jobs. Short jobs just run the given function and are more commonly used for small tasks where the main loop does not need to know how the work is going in between. The short job in our example is so short we had to artificially enlarge it with sleep()
. Other than that, it also uses threads local data to keep the data we are working with persistent across different jobs ran by the same system thread. This data will be freed when the no more jobs are pending and the thread is terminated. If the data doesn't exist in the thread's storage, we create it and save it there for future jobs to find it. If creation fails, we cancel ourselves, so the main loop knows that we didn't just exit normally, meaning the job could not be done. The main part of the function checks in each iteration if it was canceled by the main loop, and if it was, it stops processing and clears the data from the storage (we assume cancel
means no one else will need this, but this is really application dependent).
static void _local_data_free(void *data) { Thread_Data *td = data; char *str; EINA_LIST_FREE(td->list, str) { printf("Freeing string: %s\n", str); free(str); } free(td); } static void _short_job(void *data EINA_UNUSED, Ecore_Thread *th) { Thread_Data *td; int i; td = ecore_thread_local_data_find(th, "data"); if (!td) { td = calloc(1, sizeof(Thread_Data)); if (!td) { ecore_thread_cancel(th); return; } ecore_thread_local_data_add(th, "data", td, _local_data_free, EINA_FALSE); } for (i = 0; i < 10; i++) { char buf[200]; if (ecore_thread_check(th)) { ecore_thread_local_data_del(th, "data"); break; } snprintf(buf, sizeof(buf), "Thread %p: String number %d", th, i); td->list = eina_list_append(td->list, strdup(buf)); sleep(1); } }
Feedback jobs, on the other hand, run tasks that will inform back to the main loop its progress, send partial data as is processed, just ping saying it's still alive and processing, or anything that needs the thread to talk back to the main loop.
static void _feedback_job(void *data EINA_UNUSED, Ecore_Thread *th) { time_t t; int i, count; Feedback_Thread_Data *ftd = NULL; #ifdef _WIN32 HANDLE h; WIN32_FIND_DATA fd; #else DIR *dir; #endif App_Msg *msg; count = (int)(uintptr_t)ecore_thread_global_data_find("count"); for (i = 0; i < count; i++) { char buf[32]; snprintf(buf, sizeof(buf), "data%d", i); ftd = ecore_thread_global_data_find(buf); if (!ftd) continue; if (eina_lock_take_try(&ftd->mutex)) break; else ftd = NULL; } if (!ftd) return; #ifdef _WIN32 h = FindFirstFile(ftd->base, &fd); if (h == INVALID_HANDLE_VALUE) goto the_end; #else dir = opendir(ftd->base); if (!dir) goto the_end; #endif msg = calloc(1, sizeof(App_Msg)); t = time(NULL); while (time(NULL) < t + 2) { #ifdef _WIN32 if (strlen(fd.cFileName) >= 10) msg->list = eina_list_append(msg->list, strdup(fd.cFileName)); FindNextFile(h, &fd); #else struct dirent entry, *result; if (readdir_r(dir, &entry, &result)) break; if (!result) break; if (strlen(result->d_name) >= 10) msg->list = eina_list_append(msg->list, strdup(result->d_name)); #endif }
Finally, one more feedback job, but this one will be running outside of Ecore's pool, so we can use the pool for real work and keep this very light function unchecked. All it does is check if some condition is met and send a message to the main loop telling it it's time to close.
static void _out_of_pool_job(void *data, Ecore_Thread *th) { App_Data *ad = data; App_Msg *msg; while (1) { int msgs; eina_condition_wait(&ad->condition); msgs = ad->msgs_received; eina_lock_release(&ad->mutex); if (msgs == ad->max_msgs) { msg = calloc(1, sizeof(App_Msg)); msg->all_done = 1; ecore_thread_feedback(th, msg); return; } } }
Every now and then the program prints its status, counting threads running and pending jobs.
static void _print_status(void) { int active, pending_total, pending_feedback, pending_short, available; active = ecore_thread_active_get(); pending_total = ecore_thread_pending_total_get(); pending_feedback = ecore_thread_pending_feedback_get(); pending_short = ecore_thread_pending_get(); available = ecore_thread_available_get(); printf("Status:\n\t* Active threads: %d\n" "\t* Available threads: %d\n" "\t* Pending short jobs: %d\n" "\t* Pending feedback jobs: %d\n" "\t* Pending total: %d\n", active, available, pending_short, pending_feedback, pending_total); }
In our main loop, we'll be receiving messages from our feedback jobs using the same callback for both of them.
static void _feedback_job_msg_cb(void *data, Ecore_Thread *th, void *msg_data) { App_Data *ad = data; App_Msg *msg = msg_data; char *str;
The light job running out of the pool will let us know when we can exit our program.
if (msg->all_done) { ecore_main_loop_quit(); free(msg); return; }
Next comes the handling of data sent from the actual worker threads, always remembering that the data belongs to us now, and not the thread, so it's our responsibility to free it.
_print_status(); if (!msg->list) printf("Received an empty list from thread %p\n", th); else { int i = 0; printf("Received %d elements from threads %p (printing first 5):\n", eina_list_count(msg->list), th); EINA_LIST_FREE(msg->list, str) { if (i <= 5) printf("\t%s\n", str); free(str); i++; } }
Last, the condition to exit is given by how many messages we want to handle, so we need to count them and inform the condition checking thread that the value changed.
eina_lock_take(&ad->mutex); ad->msgs_received++; eina_condition_signal(&ad->condition); eina_lock_release(&ad->mutex); free(msg); }
When a thread finishes its job or gets canceled, the main loop is notified through the callbacks set when creating the task. In this case, we just print what happen and keep track of one of them used to exemplify canceling. Here we are pretending one of our short jobs has a timeout, so if it doesn't finish before a timer is triggered, it will be canceled.
static void _thread_end_cb(void *data, Ecore_Thread *th) { App_Data *ad = data; printf("Normal termination for thread %p.\n", th); if (th == ad->thread_3) ad->thread_3 = NULL; } static void _thread_cancel_cb(void *data, Ecore_Thread *th) { App_Data *ad = data; printf("Thread %p got cancelled.\n", th); if (th == ad->thread_3) ad->thread_3 = NULL; } static Eina_Bool _cancel_timer_cb(void *data) { App_Data *ad = data; if (ad->thread_3 && !ecore_thread_check(ad->thread_3)) ecore_thread_cancel(ad->thread_3); return EINA_FALSE; }
The main function does some setup that includes reading parameters from the command line to change its behaviour and test different results. These are:
some_path
to the list used by the feedback jobs. This parameter can be used multiple times. Skipping some bits, we init Ecore and our application data.
ecore_init(); i = ecore_thread_max_get(); printf("Initial max threads: %d\n", i); memset(&appdata, 0, sizeof(App_Data)); appdata.max_msgs = 1;
If any paths for the feedback jobs were given, we use them, otherwise we fallback to some defaults. Always initializing the proper mutexes used by the threaded job.
if (!path_list) { Feedback_Thread_Data *ftd; ecore_thread_global_data_add("count", (void *)3, NULL, EINA_FALSE); ftd = calloc(1, sizeof(Feedback_Thread_Data)); ftd->name = strdup("data0"); ftd->base = strdup("/usr/bin"); eina_lock_new(&ftd->mutex); ecore_thread_global_data_add(ftd->name, ftd, NULL, EINA_TRUE); ftd = calloc(1, sizeof(Feedback_Thread_Data)); ftd->name = strdup("data1"); ftd->base = strdup("/usr/lib"); eina_lock_new(&ftd->mutex); ecore_thread_global_data_add(ftd->name, ftd, NULL, EINA_TRUE); ftd = calloc(1, sizeof(Feedback_Thread_Data)); ftd->name = strdup("data2"); ftd->base = strdup("/usr/share"); eina_lock_new(&ftd->mutex); ecore_thread_global_data_add(ftd->name, ftd, NULL, EINA_TRUE); } else { Feedback_Thread_Data *ftd; char *str; ecore_thread_global_data_add("count", (void *)(uintptr_t)eina_list_count(path_list), NULL, EINA_FALSE); i = 0; EINA_LIST_FREE(path_list, str) { char buf[32]; snprintf(buf, sizeof(buf), "data%d", i); ftd = calloc(1, sizeof(Feedback_Thread_Data)); ftd->name = strdup(buf); ftd->base = strdup(str); eina_lock_new(&ftd->mutex); ecore_thread_global_data_add(ftd->name, ftd, NULL, EINA_TRUE); free(str); i++; } }
Initialize the mutex needed for the condition checking thread
eina_lock_new(&appdata.mutex); eina_condition_new(&appdata.condition, &appdata.mutex);
And start our tasks.
ecore_thread_feedback_run(_out_of_pool_job, _feedback_job_msg_cb, NULL, NULL, &appdata, EINA_TRUE); ecore_thread_run(_short_job, _thread_end_cb, _thread_cancel_cb, &appdata); ecore_thread_feedback_run(_feedback_job, _feedback_job_msg_cb, _thread_end_cb, _thread_cancel_cb, &appdata, EINA_FALSE); appdata.thread_3 = ecore_thread_run(_short_job, _thread_end_cb, _thread_cancel_cb, &appdata); ecore_thread_feedback_run(_feedback_job, _feedback_job_msg_cb, _thread_end_cb, _thread_cancel_cb, &appdata, EINA_FALSE);
To finalize, set a timer to cancel one of the tasks if it doesn't end before the timeout, one more timer for status report and get into the main loop. Once we are out, destroy our mutexes and finish the program.
ecore_timer_add(1.0, _cancel_timer_cb, &appdata); ecore_timer_add(2.0, _status_timer_cb, NULL); _print_status(); ecore_main_loop_begin(); eina_condition_free(&appdata.condition); eina_lock_free(&appdata.mutex); ecore_shutdown(); return 0; }