Newer
Older
* Copyright (C) 2008, Nokia <ivan.frade@nokia.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#include "config.h"
#include <string.h>
#include <unistd.h>
#include <gmodule.h>
#include <gio/gio.h>
#include <gio/gunixoutputstream.h>
#include <gio/gunixinputstream.h>
#include <gio/gunixfdlist.h>
#include <libtracker-common/tracker-common.h>
#include <libtracker-extract/tracker-extract.h>
#include "tracker-main.h"
#include "tracker-marshal.h"
#ifdef HAVE_LIBSTREAMANALYZER
#include "tracker-topanalyzer.h"
#endif /* HAVE_STREAMANALYZER */
#ifdef THREAD_ENABLE_TRACE
#warning Main thread traces enabled
#endif /* THREAD_ENABLE_TRACE */
#define MAX_EXTRACT_TIME 10
#define UNKNOWN_METHOD_MESSAGE "Method \"%s\" with signature \"%s\" on " \
"interface \"%s\" doesn't exist, expected \"%s\""
static const gchar introspection_xml[] =
"<node>"
" <interface name='org.freedesktop.Tracker1.Extract'>"
" <method name='GetPid'>"
" <arg type='i' name='value' direction='out' />"
" </method>"
" <method name='GetMetadata'>"
" <arg type='s' name='uri' direction='in' />"
" <arg type='s' name='mime' direction='in' />"
" <arg type='s' name='preupdate' direction='out' />"
" <arg type='s' name='embedded' direction='out' />"
" <arg type='s' name='where' direction='out' />"
" </method>"
" <method name='GetMetadataFast'>"
" <arg type='s' name='uri' direction='in' />"
" <arg type='s' name='mime' direction='in' />"
" <arg type='h' name='fd' direction='in' />"
" </method>"
" </interface>"
"</node>";
#define TRACKER_EXTRACT_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE ((obj), TRACKER_TYPE_EXTRACT, TrackerExtractPrivate))
extern gboolean debug;
gint extracted_count;
gint failed_count;
} StatisticsData;
typedef struct {
TrackerExtract *extract;
GModule *module;
guint success : 1;
} StatsReportData;
typedef struct {
GHashTable *statistics_data;
/* module -> thread awareness enum for initialized modules */
GHashTable *modules;
/* Thread pool for multi-threaded extractors */
GThreadPool *thread_pool;
/* module -> async queue hashtable
* for single-threaded extractors
*/
GHashTable *single_thread_extractors;
gboolean disable_shutdown;
gboolean force_internal_extractors;
gboolean disable_summary_on_finalize;
gint unhandled_count;
} TrackerExtractPrivate;
typedef struct {
TrackerExtract *extract;
GCancellable *cancellable;
GAsyncResult *res;
gchar *file;
gchar *mimetype;
TrackerExtractMetadataFunc func;
GModule *module;
static void tracker_extract_finalize (GObject *object);
static void report_statistics (GObject *object);
static gboolean get_metadata (TrackerExtractTask *task);
G_DEFINE_TYPE(TrackerExtract, tracker_extract, G_TYPE_OBJECT)
static void
tracker_extract_class_init (TrackerExtractClass *klass)
{
GObjectClass *object_class;
object_class = G_OBJECT_CLASS (klass);
object_class->finalize = tracker_extract_finalize;
g_type_class_add_private (object_class, sizeof (TrackerExtractPrivate));
}
static void
statistics_data_free (StatisticsData *data)
{
g_slice_free (StatisticsData, data);
}
static void
tracker_extract_init (TrackerExtract *object)
{
TrackerExtractPrivate *priv;
#ifdef HAVE_LIBSTREAMANALYZER
tracker_topanalyzer_init ();
#endif /* HAVE_STREAMANALYZER */
priv = TRACKER_EXTRACT_GET_PRIVATE (object);
priv->statistics_data = g_hash_table_new_full (NULL, NULL, NULL,
(GDestroyNotify) statistics_data_free);
priv->modules = g_hash_table_new (NULL, NULL);
priv->single_thread_extractors = g_hash_table_new (NULL, NULL);
priv->thread_pool = g_thread_pool_new ((GFunc) get_metadata,
NULL, 10, TRUE, NULL);
}
static void
tracker_extract_finalize (GObject *object)
{
TrackerExtractPrivate *priv;
priv = TRACKER_EXTRACT_GET_PRIVATE (object);
/* FIXME: Shutdown modules? */
g_hash_table_destroy (priv->modules);
g_hash_table_destroy (priv->single_thread_extractors);
g_thread_pool_free (priv->thread_pool, TRUE, FALSE);
if (!priv->disable_summary_on_finalize) {
report_statistics (object);
}
#ifdef HAVE_LIBSTREAMANALYZER
tracker_topanalyzer_shutdown ();
#endif /* HAVE_STREAMANALYZER */
g_hash_table_destroy (priv->statistics_data);
Martyn James Russell
committed
G_OBJECT_CLASS (tracker_extract_parent_class)->finalize (object);
}
static void
report_statistics (GObject *object)
{
TrackerExtractPrivate *priv;
GHashTableIter iter;
gpointer key, value;
priv = TRACKER_EXTRACT_GET_PRIVATE (object);
g_message ("--------------------------------------------------");
g_message ("Statistics:");
g_hash_table_iter_init (&iter, priv->statistics_data);
while (g_hash_table_iter_next (&iter, &key, &value)) {
GModule *module = key;
StatisticsData *data = value;
if (data->extracted_count > 0 || data->failed_count > 0) {
const gchar *name, *name_without_path;
Aleksander Morgado
committed
name = g_module_name (module);
name_without_path = strrchr (name, G_DIR_SEPARATOR) + 1;
g_message (" Module:'%s', extracted:%d, failures:%d",
name_without_path,
data->extracted_count,
data->failed_count);
}
}
g_message ("Unhandled files: %d", priv->unhandled_count);
Aleksander Morgado
committed
if (priv->unhandled_count == 0 &&
g_hash_table_size (priv->statistics_data) < 1) {
g_message (" No files handled");
}
g_message ("--------------------------------------------------");
}
TrackerExtract *
tracker_extract_new (gboolean disable_shutdown,
gboolean force_internal_extractors,
const gchar *force_module)
{
TrackerExtract *object;
TrackerExtractPrivate *priv;
if (!tracker_extract_module_manager_init ()) {
/* Set extractors */
object = g_object_new (TRACKER_TYPE_EXTRACT, NULL);
priv = TRACKER_EXTRACT_GET_PRIVATE (object);
priv->disable_shutdown = disable_shutdown;
priv->force_internal_extractors = force_internal_extractors;
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
report_stats_cb (StatsReportData *report_data)
{
TrackerExtractPrivate *priv;
StatisticsData *stats_data;
priv = TRACKER_EXTRACT_GET_PRIVATE (report_data->extract);
stats_data = g_hash_table_lookup (priv->statistics_data, report_data->module);
if (!stats_data) {
stats_data = g_slice_new0 (StatisticsData);
g_hash_table_insert (priv->statistics_data, report_data->module, stats_data);
}
stats_data->extracted_count++;
if (!report_data->success) {
stats_data->failed_count++;
}
return FALSE;
}
static void
report_stats (TrackerExtractTask *task,
gboolean success)
{
StatsReportData *data;
data = g_slice_new0 (StatsReportData);
data->extract = task->extract;
data->module = task->module;
data->success = success;
/* Send to main thread, where stats hashtable is maintained */
g_idle_add ((GSourceFunc) report_stats_cb, data);
}
static gboolean
get_file_metadata (TrackerExtractTask *task,
TrackerSparqlBuilder **preupdate_out,
TrackerSparqlBuilder **statements_out,
gchar **where_out)
TrackerSparqlBuilder *statements, *preupdate;
GString *where;
Aleksander Morgado
committed
#ifdef HAVE_LIBSTREAMANALYZER
Aleksander Morgado
committed
#endif
Martyn Russell
committed
gint items;
g_debug ("Extracting...");
*preupdate_out = NULL;
*statements_out = NULL;
*where_out = NULL;
/* Create sparql builders to send back */
preupdate = tracker_sparql_builder_new_update ();
statements = tracker_sparql_builder_new_embedded_insert ();
where = g_string_new ("");
#ifdef HAVE_LIBSTREAMANALYZER
if (!priv->force_internal_extractors) {
Martyn Russell
committed
g_debug (" Using libstreamanalyzer...");
tracker_topanalyzer_extract (uri, statements, &content_type);
if (tracker_sparql_builder_get_length (statements) > 0) {
g_free (content_type);
tracker_sparql_builder_insert_close (statements);
*preupdate_out = preupdate;
*statements_out = statements;
*where_out = g_string_free (where, FALSE);
}
} else {
Martyn Russell
committed
g_debug (" Using internal extractors ONLY...");
#endif /* HAVE_LIBSTREAMANALYZER */
if (task->mimetype && *task->mimetype) {
mime_used = g_strdup (task->mimetype);
Aleksander Morgado
committed
}
#ifdef HAVE_LIBSTREAMANALYZER
else if (content_type && *content_type) {
/* We know the mime from LSA */
mime_used = content_type;
g_strstrip (mime_used);
Aleksander Morgado
committed
}
#endif /* HAVE_LIBSTREAMANALYZER */
else {
Martyn James Russell
committed
/* Now we have sanity checked everything, actually get the
* data we need from the extractors.
*/
if (task->func) {
gint items;
Martyn Russell
committed
g_debug (" Using %s...", g_module_name (task->module));
(task->func) (task->file, mime_used, preupdate, statements, where);
items = tracker_sparql_builder_get_length (statements);
if (items > 0) {
tracker_sparql_builder_insert_close (statements);
*preupdate_out = preupdate;
*statements_out = statements;
*where_out = g_string_free (where, FALSE);
Martyn Russell
committed
g_debug ("Done (%d items)", items);
report_stats (task, TRUE);
report_stats (task, FALSE);
Martyn Russell
committed
items = tracker_sparql_builder_get_length (statements);
if (items > 0) {
tracker_sparql_builder_insert_close (statements);
}
*preupdate_out = preupdate;
*statements_out = statements;
*where_out = g_string_free (where, FALSE);
Martyn Russell
committed
g_debug ("No extractor or failed (%d items)", items);
static void
tracker_extract_info_free (TrackerExtractInfo *info)
if (info->statements) {
g_object_unref (info->statements);
}
if (info->preupdate) {
g_object_unref (info->preupdate);
g_free (info->where);
g_slice_free (TrackerExtractInfo, info);
static TrackerExtractTask *
extract_task_new (TrackerExtract *extract,
const gchar *uri,
const gchar *mimetype,
GCancellable *cancellable,
GAsyncResult *res)
Martyn James Russell
committed
{
task = g_slice_new0 (TrackerExtractTask);
task->cancellable = cancellable;
task->res = (res) ? g_object_ref (res) : NULL;
task->file = g_strdup (uri);
task->mimetype = g_strdup (mimetype);
task->extract = extract;
Martyn James Russell
committed
if (mimetype) {
task->mimetype = g_strdup (mimetype);
} else {
GFile *file;
GFileInfo *info;
file = g_file_new_for_uri (uri);
info = g_file_query_info (file,
G_FILE_ATTRIBUTE_STANDARD_CONTENT_TYPE,
G_FILE_QUERY_INFO_NONE,
NULL, NULL);
if (info) {
task->mimetype = g_strdup (g_file_info_get_content_type (info));
} else {
g_warning ("Could not get mimetype for '%s'", uri);
}
g_object_unref (info);
g_object_unref (file);
}
Martyn James Russell
committed
}
static void
extract_task_free (TrackerExtractTask *task)
if (task->res) {
g_object_unref (task->res);
}
g_free (task->file);
g_free (task->mimetype);
g_slice_free (TrackerExtractTask, task);
get_metadata (TrackerExtractTask *task)
g_debug ("Thread:%p --> File:'%s' - Extracted",
g_thread_self (),
task->file);
#endif /* THREAD_ENABLE_TRACE */
if (task->cancellable &&
g_cancellable_is_cancelled (task->cancellable)) {
g_simple_async_result_set_error ((GSimpleAsyncResult *) task->res,
TRACKER_DBUS_ERROR, 0,
"Extraction of '%s' was cancelled",
task->file);
extract_task_free (task);
return FALSE;
info = g_slice_new (TrackerExtractInfo);
if (get_file_metadata (task,
&info->preupdate,
&info->statements,
&info->where)) {
g_simple_async_result_set_op_res_gpointer ((GSimpleAsyncResult *) task->res,
info,
(GDestroyNotify) tracker_extract_info_free);
g_simple_async_result_set_error ((GSimpleAsyncResult *) task->res,
TRACKER_DBUS_ERROR, 0,
"Could not get any metadata for uri:'%s' and mime:'%s'",
task->file, task->mimetype);
tracker_extract_info_free (info);
g_simple_async_result_complete_in_idle ((GSimpleAsyncResult *) task->res);
extract_task_free (task);
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
static void
single_thread_get_metadata (GAsyncQueue *queue)
{
while (TRUE) {
TrackerExtractTask *task;
task = g_async_queue_pop (queue);
g_message ("Dispatching '%s' in dedicated thread", task->file);
get_metadata (task);
}
}
/* This function is executed in the main thread, decides the
* module that's going to be run for a given task, and dispatches
* the task according to the threading strategy of that module.
*/
static gboolean
dispatch_task_cb (TrackerExtractTask *task)
{
TrackerModuleThreadAwareness thread_awareness;
TrackerExtractInitFunc init_func;
TrackerExtractPrivate *priv;
GError *error = NULL;
GModule *module;
gpointer value;
#ifdef THREAD_ENABLE_TRACE
g_debug ("Thread:%p (Main) <-- File:'%s' - Dispatching\n",
g_thread_self (),
task->file);
#endif /* THREAD_ENABLE_TRACE */
if (!task->mimetype) {
g_warning ("Discarding task with no mimetype for '%s'", task->file);
return FALSE;
}
priv = TRACKER_EXTRACT_GET_PRIVATE (task->extract);
task->module = module = tracker_extract_module_manager_get_for_mimetype (task->mimetype, &init_func, NULL, &task->func);
if (!module || !task->func) {
g_warning ("Discarding task with no module '%s'", task->file);
priv->unhandled_count++;
return FALSE;
}
if (g_hash_table_lookup_extended (priv->modules, module, NULL, &value)) {
thread_awareness = GPOINTER_TO_UINT (value);
} else {
/* Module not initialized */
if (init_func) {
if (! (init_func) (&thread_awareness, &error)) {
g_warning ("Could not initialize module '%s': %s", g_module_name (module),
(error) ? error->message : "No error given");
thread_awareness = TRACKER_MODULE_NONE;
g_clear_error (&error);
}
} else {
/* Backwards compatibility for modules without init() */
thread_awareness = TRACKER_MODULE_MAIN_THREAD;
}
g_hash_table_insert (priv->modules, module, GUINT_TO_POINTER (thread_awareness));
}
switch (thread_awareness) {
case TRACKER_MODULE_NONE:
/* Error out */
g_simple_async_result_set_error ((GSimpleAsyncResult *) task->res,
TRACKER_DBUS_ERROR, 0,
"Module '%s' initialization failed",
g_module_name (module));
g_simple_async_result_complete_in_idle ((GSimpleAsyncResult *) task->res);
extract_task_free (task);
break;
case TRACKER_MODULE_MAIN_THREAD:
/* Dispatch the task right away in this thread */
g_message ("Dispatching '%s' in main thread", task->file);
get_metadata (task);
break;
case TRACKER_MODULE_SINGLE_THREAD:
{
GAsyncQueue *async_queue;
async_queue = g_hash_table_lookup (priv->single_thread_extractors, module);
if (!async_queue) {
/* No thread created yet for this module, create it
* together with the async queue used to pass data to it
*/
async_queue = g_async_queue_new ();
g_thread_create ((GThreadFunc) single_thread_get_metadata,
g_async_queue_ref (async_queue),
FALSE, &error);
if (error) {
g_simple_async_result_set_from_error ((GSimpleAsyncResult *) task->res, error);
g_simple_async_result_complete_in_idle ((GSimpleAsyncResult *) task->res);
extract_task_free (task);
g_error_free (error);
return FALSE;
}
g_hash_table_insert (priv->single_thread_extractors, module, async_queue);
}
g_async_queue_push (async_queue, task);
}
break;
case TRACKER_MODULE_MULTI_THREAD:
/* Put task in thread pool */
g_message ("Dispatching '%s' in thread pool", task->file);
g_thread_pool_push (priv->thread_pool, task, &error);
if (error) {
g_simple_async_result_set_from_error ((GSimpleAsyncResult *) task->res, error);
g_simple_async_result_complete_in_idle ((GSimpleAsyncResult *) task->res);
extract_task_free (task);
g_error_free (error);
return FALSE;
}
break;
}
return FALSE;
}
/* This function can be called in any thread */
void
tracker_extract_file (TrackerExtract *extract,
const gchar *file,
const gchar *mimetype,
GCancellable *cancellable,
GAsyncReadyCallback cb,
gpointer user_data)
GSimpleAsyncResult *res;
TrackerExtractTask *task;
g_return_if_fail (TRACKER_IS_EXTRACT (extract));
g_return_if_fail (file != NULL);
g_return_if_fail (cb != NULL);
g_debug ("Thread:%p <-- File:'%s' - Extracting\n",
g_thread_self (),
file);
#endif /* THREAD_ENABLE_TRACE */
res = g_simple_async_result_new (G_OBJECT (extract), cb, user_data, NULL);
task = extract_task_new (extract, file, mimetype, cancellable, G_ASYNC_RESULT (res));
g_idle_add ((GSourceFunc) dispatch_task_cb, task);
/* task takes a ref */
g_object_unref (res);
}
void
tracker_extract_get_metadata_by_cmdline (TrackerExtract *object,
const gchar *uri,
const gchar *mime)
TrackerSparqlBuilder *statements, *preupdate;
gchar *where;
TrackerExtractPrivate *priv;
TrackerExtractTask *task;
TrackerExtractInitFunc init_func;
priv = TRACKER_EXTRACT_GET_PRIVATE (object);
priv->disable_summary_on_finalize = TRUE;
g_return_if_fail (uri != NULL);
g_message ("Extracting...");
task = extract_task_new (object, uri, mime, NULL, NULL);
tracker_extract_module_manager_get_for_mimetype (task->mimetype, &init_func, NULL, &task->func);
if (init_func) {
/* Initialize module for this single run */
(init_func) (NULL, NULL);
}
if (get_file_metadata (task, &preupdate, &statements, &where)) {
const gchar *preupdate_str, *statements_str;
preupdate_str = statements_str = NULL;
if (tracker_sparql_builder_get_length (statements) > 0) {
statements_str = tracker_sparql_builder_get_result (statements);
}
if (tracker_sparql_builder_get_length (preupdate) > 0) {
preupdate_str = tracker_sparql_builder_get_result (preupdate);
}
Aleksander Morgado
committed
g_print ("SPARQL pre-update:\n%s\n",
preupdate_str ? preupdate_str : "");
statements_str ? statements_str : "");
g_print ("SPARQL where clause:\n%s\n",
where ? where : "");
g_object_unref (statements);
g_object_unref (preupdate);
g_free (where);
extract_task_free (task);