#include #include "pysgrep.h" #undef USE_OLD_API /* ------------------------------------------- */ typedef char Args_array[]; typedef struct _Args_struct { } Args_struct; /* ------------------------------------------- */ static PyObject * PyCallback_obj = NULL; static PyObject * PyErrorCallback_obj = NULL; static char * PySgrep_options[128]; static int PySgrep_option_count = -1; #define PYSGREP_MAX_LEN 4096 static char PySgrep_line_content[PYSGREP_MAX_LEN]; static int PySgrep_line_current_len = 0; static char PySgrep_error_line_content[PYSGREP_MAX_LEN]; static int PySgrep_error_line_current_len = 0; static int show_opts(int len, char ** options); /* ------------------------------------------- */ /* Callback function; called from C. */ static int flush_content() { PyObject * meth; PyObject * arglist; PyObject * result; meth = PyObject_GetAttrString(PyCallback_obj, "write"); arglist = Py_BuildValue("(s)", PySgrep_line_content); result = PyEval_CallObject(meth, arglist); Py_DECREF(arglist); PySgrep_line_current_len = 0; PySgrep_line_content[PySgrep_line_current_len] = '\0'; return 1; } /* flush_content */ /* ------------------------------------------- */ /* Callback function; called from C. */ int pushback_char(const char chr) { if (PySgrep_line_current_len > PYSGREP_MAX_LEN) { /* Flush back to Python. */ flush_content(); PySgrep_line_current_len = 0; } /* if */ PySgrep_line_content[PySgrep_line_current_len] = chr; PySgrep_line_current_len++; PySgrep_line_content[PySgrep_line_current_len] = '\0'; if (chr == '\n') { /* Flush back to Python. */ flush_content(); PySgrep_line_current_len = 0; } /* if */ return 1; } /* pushback_char */ /* ------------------------------------------- */ /* Callback function; called from C. */ int pushback_chars(const char * chrs) { const char * pChr; for (pChr = chrs; *pChr != '\0'; pChr++) { pushback_char(*pChr); } /* for */ return 1; } /* pushback_chars */ /* ------------------------------------------- */ /* Callback function; called from C. */ int pushback_n_chars( const char * chrs, size_t size, size_t nmembers ) { int idx1; int idx2; for (idx1 = 0; idx1 < nmembers; idx1++) { for (idx2 = 0; idx2 < size; idx2++) { pushback_char(chrs[(idx1 * size) + idx2]); } /* for */ } /* for */ return 1; } /* pushback_n_chars */ /* ------------------------------------------- */ /* Callback function; called from C. */ static int flush_error_content() { PyObject * meth; PyObject * arglist; PyObject * result; meth = PyObject_GetAttrString(PyErrorCallback_obj, "write"); arglist = Py_BuildValue("(s)", PySgrep_error_line_content); result = PyEval_CallObject(meth, arglist); Py_DECREF(arglist); PySgrep_line_current_len = 0; PySgrep_error_line_content[PySgrep_error_line_current_len] = '\0'; return 1; } /* flush_error_content */ /* ------------------------------------------- */ /* Callback function; called from C. */ int pushback_error_char(const char chr) { if (PySgrep_error_line_current_len > PYSGREP_MAX_LEN) { /* Flush back to Python. */ flush_content(); PySgrep_error_line_current_len = 0; } /* if */ PySgrep_error_line_content[PySgrep_error_line_current_len] = chr; PySgrep_error_line_current_len++; PySgrep_error_line_content[PySgrep_error_line_current_len] = '\0'; if (chr == '\n') { /* Flush back to Python. */ flush_content(); PySgrep_error_line_current_len = 0; } /* if */ return 1; } /* pushback_error_char */ /* ------------------------------------------- */ /* Callback function; called from C. */ int pushback_error_chars(const char * chrs) { const char * pChr; for (pChr = chrs; *pChr != '\0'; pChr++) { pushback_error_char(*pChr); } /* for */ return 1; } /* pushback_error_chars */ /* ------------------------------------------- */ /* Called from Python. */ static PyObject * set_callback_object(PyObject * self, PyObject * args) { PyObject * result = NULL; PyObject * callback_obj; PyObject * meth; if (PyArg_ParseTuple(args, "O:set_callback", &callback_obj)) { if (PyInstance_Check(callback_obj)) { meth = PyObject_GetAttrString(callback_obj, "write"); if (meth == NULL) { PyErr_SetString(PyExc_TypeError, "object must support 'write' method"); return NULL; } /* if */ if (!PyMethod_Check(meth)) { PyErr_SetString(PyExc_TypeError, "'write' must be callable"); return NULL; } /* if */ Py_XINCREF(callback_obj); Py_XDECREF(PyCallback_obj); PyCallback_obj = callback_obj; Py_INCREF(Py_None); result = Py_None; } /* if */ } /* if */ return result; } /* set_callback_object */ /* ------------------------------------------- */ /* Called from Python. */ static PyObject * set_error_callback_object(PyObject * self, PyObject * args) { PyObject * result = NULL; PyObject * callback_obj; PyObject * meth; if (PyArg_ParseTuple(args, "O:set_callback", &callback_obj)) { if (PyInstance_Check(callback_obj)) { meth = PyObject_GetAttrString(callback_obj, "write"); if (meth == NULL) { PyErr_SetString(PyExc_TypeError, "object must support 'write' method"); return NULL; } /* if */ if (!PyMethod_Check(meth)) { PyErr_SetString(PyExc_TypeError, "'write' must be callable"); return NULL; } /* if */ Py_XINCREF(callback_obj); Py_XDECREF(PyErrorCallback_obj); PyErrorCallback_obj = callback_obj; Py_INCREF(Py_None); result = Py_None; } /* if */ } /* if */ return result; } /* set_error_callback_object */ /* ------------------------------------------- */ /* Called from Python. */ static PyObject * execute_query_with_args(PyObject * self, PyObject * args) { PyObject * result; PyObject * pyargs; PyObject * pyarg; char * pStr1; char * pStr2; char * sgrep_options[128]; int pyargs_len; int sgrep_option_count; int idx; int converted; converted = 0; if (!PyArg_ParseTuple(args, "O!", &PyTuple_Type, &pyargs)) { if (!PyArg_ParseTuple(args, "O!", &PyList_Type, &pyargs)) { PyErr_SetString(PyExc_TypeError, "bad args. expected list or tuple of args"); return NULL; } else { pyargs = PyList_AsTuple(pyargs); converted = 1; } /* if */ } /* if */ if (! (PyList_Check(pyargs) || PyTuple_Check(pyargs))) { PyErr_SetString(PyExc_TypeError, "bad args. expected list or tuple of args"); return NULL; } /* if */ /* Execute a query by calling py_main (in pymain.c). */ pStr1 = "pysgrep"; pStr2 = (char *)malloc(strlen(pStr1) + 1); strcpy(pStr2, pStr1); sgrep_options[0] = pStr2; pyargs_len = PyTuple_GET_SIZE(pyargs); for (idx = 0; idx < pyargs_len; idx++) { pyarg = PyTuple_GetItem(pyargs, idx); if (! PyString_Check(pyarg)) { PyErr_SetString(PyExc_TypeError, "bad arg. all args must be type string"); return NULL; } /* if */ pStr1 = PyString_AS_STRING(pyarg); pStr2 = (char *)malloc(strlen(pStr1) + 1); strcpy(pStr2, pStr1); sgrep_options[idx + 1] = pStr2; } /* for */ sgrep_options[pyargs_len + 1] = NULL; sgrep_option_count = pyargs_len + 1; #if 0 show_opts(sgrep_option_count, sgrep_options); #endif py_sgrep_main(sgrep_option_count, sgrep_options); /* Clean up. */ if (converted) { Py_XDECREF(pyargs); } /* if */ for (idx = 0; idx < sgrep_option_count; idx++) { free(sgrep_options[idx]); } /* for */ Py_INCREF(Py_None); result = Py_None; return result; } /* execute_query_with_args */ static int show_opts(int len, char ** options) { int idx; printf("(show_opts) len: %d\n", len); for (idx = 0; idx < len; idx++) { printf("(show_opts) option: %d: %s\n", idx, options[idx]); } /* for */ return 1; } /* ------------------------------------------- */ #ifdef USE_OLD_API /* Called from Python. */ static PyObject * execute_query(PyObject * self, PyObject * args) { PyObject * result; char * query; char * pStr; if (!PyArg_ParseTuple(args, "s", &query)) { PyErr_SetString(PyExc_TypeError, "bad args. expected "); return NULL; } /* if */ /* Execute a query by calling py_main (in pymain.c). */ pStr = (char *)malloc(strlen(query) + 1); strcpy(pStr, query); PySgrep_option_count++; PySgrep_options[PySgrep_option_count] = pStr; show_opts(); PySgrep_option_count++; py_sgrep_main(PySgrep_option_count, PySgrep_options); Py_INCREF(Py_None); result = Py_None; return result; } /* execute_query */ /* ------------------------------------------- */ /* Called from Python. */ static PyObject * execute_query_no_opt(PyObject * self, PyObject * args) { PyObject * result; if (!PyArg_ParseTuple(args, "")) { PyErr_SetString(PyExc_TypeError, "bad args. expected no args"); return NULL; } /* if */ /* Execute a query by calling py_main (in pymain.c). */ show_opts(); PySgrep_option_count++; py_sgrep_main(PySgrep_option_count, PySgrep_options); Py_INCREF(Py_None); result = Py_None; return result; } /* execute_query_no_opt */ /* ------------------------------------------- */ /* Called from Python. */ static PyObject * add_option(PyObject * self, PyObject * args) { PyObject * result = NULL; char * option; char * pStr; char * pStr1; if (!PyArg_ParseTuple(args, "s", &option)) { PyErr_SetString(PyExc_TypeError, "argument must be "); return NULL; } /* if */ if (PySgrep_option_count == -1) { PySgrep_option_count++; pStr1 = "pysgrep"; pStr = malloc(strlen(pStr1) + 1); strcpy(pStr, pStr1); PySgrep_options[PySgrep_option_count] = pStr; } /* if */ pStr = (char *)malloc(strlen(option) + 1); strcpy(pStr, option); PySgrep_option_count++; PySgrep_options[PySgrep_option_count] = pStr; Py_INCREF(Py_None); result = Py_None; return result; } /* add_option */ /* ------------------------------------------- */ /* Called from Python. */ static PyObject * clear_options(PyObject * self, PyObject * args) { PyObject * result = NULL; int idx; if (!PyArg_ParseTuple(args, "")) { PyErr_SetString(PyExc_TypeError, "expected zero arguments"); return NULL; } /* if */ if (PySgrep_option_count != -1) { for (idx = 0; idx < PySgrep_option_count; idx++) { printf("(clear_options) clearing: %d: %s\n", idx, PySgrep_options[idx]); free(PySgrep_options[idx]); } /* for */ PySgrep_option_count = -1; } /* if */ Py_INCREF(Py_None); result = Py_None; return result; } /* clear_options */ #endif /* USE_OLD_API */ /* ------------------------------------------- */ #undef PYSGREP_TESTING #ifdef PYSGREP_TESTING static PyObject * test_write_obj(PyObject * self, PyObject * args) { PyObject * result = NULL; PyObject * arglist; PyObject * meth; char * arg1; if (PyArg_ParseTuple(args, "s", &arg1)) { if (!PyInstance_Check(PyCallback_obj)) { PyErr_SetString(PyExc_TypeError, "callback object not set"); return NULL; } else { meth = PyObject_GetAttrString(PyCallback_obj, "write"); if (meth == NULL) { PyErr_SetString(PyExc_TypeError, "object must support 'write' method"); return NULL; } /* if */ if (!PyMethod_Check(meth)) { PyErr_SetString(PyExc_TypeError, "'write' must be callable"); return NULL; } /* if */ arglist = Py_BuildValue("(s)", arg1); result = PyEval_CallObject(meth, arglist); Py_DECREF(arglist); Py_INCREF(Py_None); result = Py_None; } /* if */ } /* if */ return result; } /* test_write_obj */ /* ------------------------------------------- */ static PyObject * set_callback(PyObject * self, PyObject * args) { PyObject * result = NULL; PyObject * callback; if (PyArg_ParseTuple(args, "O:set_callback", &callback)) { if (!PyCallable_Check(callback)) { PyErr_SetString(PyExc_TypeError, "parameter must be a callable object"); return NULL; } /* if */ Py_XINCREF(callback); Py_XDECREF(Writer_callback); Writer_callback = callback; Py_INCREF(Py_None); result = Py_None; } /* if */ return result; } /* set_callback */ /* ------------------------------------------- */ static PyObject * test_write(PyObject * self, PyObject * args) { PyObject * result = NULL; PyObject * arglist; char * arg1; if (PyArg_ParseTuple(args, "s", &arg1)) { arglist = Py_BuildValue("(s)", arg1); result = PyEval_CallObject(Writer_callback, arglist); Py_DECREF(arglist); } /* if */ return result; } /* test_write */ #endif /* PYSGREP_TESTING */ /* ------------------------------------------- */ static PyMethodDef SgrepMethods[] = { {"set_callback_object", set_callback_object, METH_VARARGS}, {"set_error_callback_object", set_error_callback_object, METH_VARARGS}, {"execute_query_with_args", execute_query_with_args,METH_VARARGS}, #if USE_OLD_API {"execute_query", execute_query, METH_VARARGS}, {"execute_query_no_opt", execute_query_no_opt, METH_VARARGS}, {"add_option", add_option, METH_VARARGS}, {"clear_options", clear_options, METH_VARARGS}, #endif /* USE_OLD_API */ #ifdef PYSGREP_TESTING {"test_write_obj", test_write_obj, METH_VARARGS}, {"set_callback", set_callback, METH_VARARGS}, {"test_write", test_write, METH_VARARGS}, #endif /* PYSGREP_TESTING */ {NULL, NULL, 0} }; /* ------------------------------------------- */ static PyObject * SgrepError; void initsgreplib() { PyObject *mod; PyObject * dict; mod = Py_InitModule("sgreplib", SgrepMethods); dict = PyModule_GetDict(mod); SgrepError = PyErr_NewException("sgrep.error", NULL, NULL); PyDict_SetItemString(dict, "error", SgrepError); } /* initsgreplib */ /* ------------------------------------------- */