diff --git a/docs/docs/guides/langfuse_integration.mdx b/docs/docs/guides/langfuse_integration.mdx new file mode 100644 index 000000000..51aba962e --- /dev/null +++ b/docs/docs/guides/langfuse_integration.mdx @@ -0,0 +1,49 @@ +# Integrating Langfuse with Langflow + +## Introduction + +Langfuse is an open-source tracing and analytics tool designed for LLM applications. Integrating Langfuse with Langflow provides detailed production traces and granular insights into quality, cost, and latency. This integration allows you to monitor and debug your Langflow's chat or APIs easily. + +## Step-by-Step Instructions + +### Step 1: Create a Langfuse account + +1. Go to [Langfuse](https://langfuse.com) and click on the "Sign In" button in the top right corner. +2. Click on the "Sign Up" button and create an account. +3. Once logged in, click on "Settings" and then on "Create new API keys." +4. Copy the Public key and the Secret Key and save them somewhere safe. + {/* Add these keys to your environment variables in the following step. */} + +### Step 2: Set up Langfuse in Langflow + +1. **Export the Environment Variables**: You'll need to export the environment variables `LANGFLOW_LANGFUSE_SECRET_KEY` and `LANGFLOW_LANGFUSE_PUBLIC_KEY` with the values obtained in Step 1. + + You can do this by executing the following commands in your terminal: + + ```bash + export LANGFLOW_LANGFUSE_SECRET_KEY= + export LANGFLOW_LANGFUSE_PUBLIC_KEY= + ``` + + Alternatively, you can run the Langflow CLI command: + + ```bash + LANGFLOW_LANGFUSE_SECRET_KEY= LANGFLOW_LANGFUSE_PUBLIC_KEY= langflow + ``` + + If you are self-hosting Langfuse, you can also set the environment variable `LANGFLOW_LANGFUSE_HOST` to point to your Langfuse instance. By default, Langfuse points to the cloud instance at `https://cloud.langfuse.com`. + +2. **Verify Integration**: Ensure that the environment variables are set correctly by checking their existence in your environment, for example by running: + + ```bash + echo $LANGFLOW_LANGFUSE_SECRET_KEY + echo $LANGFLOW_LANGFUSE_PUBLIC_KEY + ``` + +3. **Monitor Langflow**: Now, whenever you use Langflow's chat or API, you will be able to see the tracing of your conversations in Langfuse. If you have any issues with the integration, consult the [Langfuse documentation]() or contact their support. + +That's it! You have successfully integrated Langfuse with Langflow, enhancing observability and debugging capabilities for your LLM application. + +--- + +Note: For more details or customized configurations, please refer to the official [Langfuse documentation](https://langfuse.com/docs/integrations/langchain). diff --git a/docs/sidebars.js b/docs/sidebars.js index e44b1cf4f..bc02f7407 100644 --- a/docs/sidebars.js +++ b/docs/sidebars.js @@ -51,7 +51,11 @@ module.exports = { type: "category", label: "Step-by-Step Guides", collapsed: false, - items: ["guides/loading_document", "guides/chatprompttemplate_guide"], + items: [ + "guides/loading_document", + "guides/chatprompttemplate_guide", + "guides/langfuse_integration", + ], }, // { // type: 'category', diff --git a/poetry.lock b/poetry.lock index b6b2f2caa..5c6f82fb2 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2918,15 +2918,33 @@ toml = "*" [package.extras] test = ["psutil", "pytest", "pytest-asyncio"] +[[package]] +name = "langfuse" +version = "1.0.13" +description = "A client library for accessing langfuse" +optional = false +python-versions = ">=3.8.1,<4.0" +files = [ + {file = "langfuse-1.0.13-py3-none-any.whl", hash = "sha256:4de4ccfd452e7c3f06c68e328a334ffa2117f8a3a185ae86243e6020979a9541"}, + {file = "langfuse-1.0.13.tar.gz", hash = "sha256:33e5033dc1c0ac23ec5e48f7c47fbdcc00b756be7b705c72951ef85d246a242f"}, +] + +[package.dependencies] +attrs = ">=21.3.0" +httpx = ">=0.15.4,<0.25.0" +langchain = ">=0.0.237,<1.0" +pydantic = ">=1.10.7,<2.0" +python-dateutil = ">=2.8.0,<3.0" + [[package]] name = "langsmith" -version = "0.0.27" +version = "0.0.28" description = "Client library to connect to the LangSmith LLM Tracing and Evaluation Platform." optional = false python-versions = ">=3.8.1,<4.0" files = [ - {file = "langsmith-0.0.27-py3-none-any.whl", hash = "sha256:f61b07f093ba377b9af53c3d6f68fd1245f8f28605d4fc88433208aca93a5a23"}, - {file = "langsmith-0.0.27.tar.gz", hash = "sha256:c4df680ee8bf88d37f56ba196048341847c48b50ae561719c5542ef6488170e5"}, + {file = "langsmith-0.0.28-py3-none-any.whl", hash = "sha256:f398782f41526c74e141e68fa28b9020e0be4bde18a1d4a76b357c8272fb81bd"}, + {file = "langsmith-0.0.28.tar.gz", hash = "sha256:34c15f9a8908be180001c58048b659ece6320d0bf8ffce4ca496a2428b35646e"}, ] [package.dependencies] @@ -2955,12 +2973,12 @@ test = ["coverage", "pytest", "pytest-cov"] [[package]] name = "llama-cpp-python" -version = "0.1.82" +version = "0.1.83" description = "A Python wrapper for llama.cpp" optional = true python-versions = ">=3.7" files = [ - {file = "llama_cpp_python-0.1.82.tar.gz", hash = "sha256:ea19ee012042d806df09a5db638a912c11eed92929a27a4b3fb1d35ab7758974"}, + {file = "llama_cpp_python-0.1.83.tar.gz", hash = "sha256:9f40656e46a85a3c3427790246e03490bb90202c37cb99732a095ffcb99efe54"}, ] [package.dependencies] @@ -3692,13 +3710,13 @@ sympy = "*" [[package]] name = "openai" -version = "0.27.9" +version = "0.27.10" description = "Python client library for the OpenAI API" optional = false python-versions = ">=3.7.1" files = [ - {file = "openai-0.27.9-py3-none-any.whl", hash = "sha256:6a3cf8e276d1a6262b50562fbc0cba7967cfebb78ed827d375986b48fdad6475"}, - {file = "openai-0.27.9.tar.gz", hash = "sha256:b687761c82f5ebb6f61efc791b2083d2d068277b94802d4d1369efe39851813d"}, + {file = "openai-0.27.10-py3-none-any.whl", hash = "sha256:beabd1757e3286fa166dde3b70ebb5ad8081af046876b47c14c41e203ed22a14"}, + {file = "openai-0.27.10.tar.gz", hash = "sha256:60e09edf7100080283688748c6803b7b3b52d5a55d21890f3815292a0552d83b"}, ] [package.dependencies] @@ -4674,41 +4692,41 @@ files = [ [[package]] name = "pulsar-client" -version = "3.2.0" +version = "3.3.0" description = "Apache Pulsar Python client library" optional = false python-versions = "*" files = [ - {file = "pulsar_client-3.2.0-cp310-cp310-macosx_10_15_universal2.whl", hash = "sha256:da53bbe1903026ca1253d36a67bde0ae88513497091658aee8c5514c3e567483"}, - {file = "pulsar_client-3.2.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ec595a71b7a25f1a72a1350efd6680a511b53253c3cac1911ba3d6c4d71fa64c"}, - {file = "pulsar_client-3.2.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e3557c65463d74ec8d2864752389beb06761ab591dd134a164e0b1303c66719b"}, - {file = "pulsar_client-3.2.0-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:d51dc76fec48217489bde95754ad58288c9389361de42f5a27d64e19840d27fb"}, - {file = "pulsar_client-3.2.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:9ef2baf85311e0fe1b98342fdafbb93a1818a08ef999eaa524234fedf6f3b941"}, - {file = "pulsar_client-3.2.0-cp310-cp310-win_amd64.whl", hash = "sha256:0928b02beda0c98e77178f4e30e962ddb8ee8c3320e4c7304a78b0796e976523"}, - {file = "pulsar_client-3.2.0-cp311-cp311-macosx_10_15_universal2.whl", hash = "sha256:584f44b03474a69906be711a597a4d516263a55be31e49fc07be503dc8406821"}, - {file = "pulsar_client-3.2.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a637b9a3b30860c61e68a7b8ea650e0987d89e82f73b6a3df1ab662a6438fdda"}, - {file = "pulsar_client-3.2.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b4a187fdc5febcf16f725179dcf2c476f31eeebd8353794d91754a3202dd5072"}, - {file = "pulsar_client-3.2.0-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:5ff879f868cf1fd29db99f39fdb22b3ec3e749c648aca28526689756d922d1c5"}, - {file = "pulsar_client-3.2.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:4a5f85d0cc414f739a5b51d843f213b54b2cd768c3a34f7c27cca410712b1f81"}, - {file = "pulsar_client-3.2.0-cp311-cp311-win_amd64.whl", hash = "sha256:4fe748283848d829a80c0323558faeebea4c240d69fa58314ac90344f6999d17"}, - {file = "pulsar_client-3.2.0-cp37-cp37m-macosx_10_15_universal2.whl", hash = "sha256:06b91c26def86dbbc35be15257999fd8a2afbadf32983916ea3eef44f4d4cab4"}, - {file = "pulsar_client-3.2.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:39ec897bc8d232e6b118793378fc662a844334b829a28a1b4ad1c5fe8d019135"}, - {file = "pulsar_client-3.2.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:aa37c96c25c1b5aff3bad0fd0194b385ec190b2c67a2f439ac91577f81ae18d3"}, - {file = "pulsar_client-3.2.0-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:d49cdd4d1b7fc2e80d100acf14e6fd3898f6e099e403fc56ed22a690245b2fec"}, - {file = "pulsar_client-3.2.0-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:0058ca3191fd24528ccf94dba6f12e4093831454a2597166f96900d0717271bf"}, - {file = "pulsar_client-3.2.0-cp37-cp37m-win_amd64.whl", hash = "sha256:cb69b0411008e0b56df51de0aab20aa1c1a12aef3019b9ceba89afbae1f07fe2"}, - {file = "pulsar_client-3.2.0-cp38-cp38-macosx_10_15_universal2.whl", hash = "sha256:f7d33e99602352df7a30707eab4e5781654602212fb618928bffb5523f2bcf35"}, - {file = "pulsar_client-3.2.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ad1ac15a175ca90555c681a4d0134568771c6346b97a172f3ef14006556a50ae"}, - {file = "pulsar_client-3.2.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:369e08ef1d5cb196dd9271039928800f90b4701a9c9df90bc068b44260d2fb11"}, - {file = "pulsar_client-3.2.0-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:a52ba2b6736a2ebeed31b590e75d417dda149e333461655860efa84d898a3eb4"}, - {file = "pulsar_client-3.2.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:5c801334b3b569b23976481a2922bcea0c6dd990fc26544658dd9e9c8f78ca36"}, - {file = "pulsar_client-3.2.0-cp38-cp38-win_amd64.whl", hash = "sha256:cd01fd419280e9013d1655bc53662248be2656b623b1506480e1a985aa7dadd2"}, - {file = "pulsar_client-3.2.0-cp39-cp39-macosx_10_15_universal2.whl", hash = "sha256:0abe54d84db76435a6cd88ce27610352cabc7efae9fa3e7f874e032ec2ca0b3f"}, - {file = "pulsar_client-3.2.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b9a1b6a806eb4819d8cbab1c4ae44ebf2110a94204a46c365f5757e1455252f2"}, - {file = "pulsar_client-3.2.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:34ea2a6b75ae0e303d522e5b57c75a4ff03dc18b9bfc14151fb14dfaf5866f17"}, - {file = "pulsar_client-3.2.0-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:be6d3a9b2e1db3b6d1a7db5e13f7b4ed420674cf072cdb520fb004c4cd54c0af"}, - {file = "pulsar_client-3.2.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:b6b733e6239ffb505f7084df0175baf9d0215f14d0a02e9bbd1fdf71a2d6ea17"}, - {file = "pulsar_client-3.2.0-cp39-cp39-win_amd64.whl", hash = "sha256:edc2135d02b4793efb086edca0ffaa6e8ac9133961c2cdc17ae487e0a53da481"}, + {file = "pulsar_client-3.3.0-cp310-cp310-macosx_10_15_universal2.whl", hash = "sha256:c31afd3e67a044ff93177df89e08febf214cc965e95ede097d9fe8755af00e01"}, + {file = "pulsar_client-3.3.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1f66982284571674b215324cc26b5c2f7c56c7043113c47a7084cb70d67a8afb"}, + {file = "pulsar_client-3.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7fe50a06f81c48a75a9b95c27a6446260039adca71d9face273740de96b2efca"}, + {file = "pulsar_client-3.3.0-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:d4c46a4b96a6e9919cfe220156d69a2ede8053d9ea1add4ada108abcf2ba9775"}, + {file = "pulsar_client-3.3.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:1e4b5d44b992c9b036286b483f3588c10b89c6047fb59d80c7474445997f4e10"}, + {file = "pulsar_client-3.3.0-cp310-cp310-win_amd64.whl", hash = "sha256:497a59ac6b650835a3b2c502f53477e5c98e5226998ca3f17c0b0a3eb4d67d08"}, + {file = "pulsar_client-3.3.0-cp311-cp311-macosx_10_15_universal2.whl", hash = "sha256:386e78ff52058d881780bae1f6e84ac9434ae0b01a8581755ca8cc0dc844a332"}, + {file = "pulsar_client-3.3.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3e4ecb780df58bcfd3918590bd3ff31ed79bccfbef3a1a60370642eb1e14a9d2"}, + {file = "pulsar_client-3.3.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7ce1e215c252f22a6f26ca5e9076826041a04d88dc213b92c86b524be2774a64"}, + {file = "pulsar_client-3.3.0-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:88b0fd5be73a4103986b9dbe3a66468cf8829371e34af87ff8f216e3980f4cbe"}, + {file = "pulsar_client-3.3.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:33656450536d83eed1563ff09692c2c415fb199d88e9ed97d701ca446a119e1b"}, + {file = "pulsar_client-3.3.0-cp311-cp311-win_amd64.whl", hash = "sha256:ce33de700b06583df8777e139d68cb4b4b3d0a2eac168d74278d8935f357fb10"}, + {file = "pulsar_client-3.3.0-cp37-cp37m-macosx_10_15_universal2.whl", hash = "sha256:7b5dd25cf778d6c980d36c53081e843ea272afe7af4f0ad6394ae9513f94641b"}, + {file = "pulsar_client-3.3.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:33c4e6865fda62a2e460f823dce4d49ac2973a4459b8ff99eda5fdd6aaaebf46"}, + {file = "pulsar_client-3.3.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f1810ddc623c8de2675d17405ce47057a9a2b92298e708ce4d9564847f5ad904"}, + {file = "pulsar_client-3.3.0-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:8259c3b856eb6deaa1f93dce893ab18d99d36d102da5612c8e97a4fb41b70ab1"}, + {file = "pulsar_client-3.3.0-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:5e7a48b2e505cde758fd51a601b5da0671fa98c9baee38362aaaa3ab2b930c28"}, + {file = "pulsar_client-3.3.0-cp37-cp37m-win_amd64.whl", hash = "sha256:ede264385d47257b2f2b08ecde9181ec5338bea5639cc543d1856f01736778d2"}, + {file = "pulsar_client-3.3.0-cp38-cp38-macosx_10_15_universal2.whl", hash = "sha256:0f64c62746ccd5b65a0c505f5f40b9af1f147eb1fa2d8f9c90cd5c8b92dd8597"}, + {file = "pulsar_client-3.3.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5b84a20c9012e3c4ef1b7085acd7467197118c090b378dec27d773fb79d91556"}, + {file = "pulsar_client-3.3.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c4e15fa696e275ccb66d0791fdc19c4dea0420d81349c8055e485b134125e14f"}, + {file = "pulsar_client-3.3.0-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:72cbb1bdcba2dd1265296b5ba65331622ee89c16db75edaad46dd7b90c6dd447"}, + {file = "pulsar_client-3.3.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:d54dd12955bf587dd46d9184444af5e853d9da2a14bbfb739ed2c7c3b78ce280"}, + {file = "pulsar_client-3.3.0-cp38-cp38-win_amd64.whl", hash = "sha256:43f98afdf0334b2b957a4d96f97a1fe8a7f7fd1e2631d40c3f00b4162f396485"}, + {file = "pulsar_client-3.3.0-cp39-cp39-macosx_10_15_universal2.whl", hash = "sha256:efe7c1e6a96daccc522c3567b6847ffa54c13e0f510d9a427b4aeff9fbebe54b"}, + {file = "pulsar_client-3.3.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f28e94420090fceeb38e23fc744f3edf8710e48314ef5927d2b674a1d1e43ee0"}, + {file = "pulsar_client-3.3.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:42c8f3eaa98e2351805ecb6efb6d5fedf47a314a3ce6af0e05ea1449ea7244ed"}, + {file = "pulsar_client-3.3.0-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:5e69750f8ae57e55fddf97b459ce0d8b38b2bb85f464a71e871ee6a86d893be7"}, + {file = "pulsar_client-3.3.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:7e147e5ba460c1818bc05254279a885b4e552bcafb8961d40e31f98d5ff46628"}, + {file = "pulsar_client-3.3.0-cp39-cp39-win_amd64.whl", hash = "sha256:694530af1d6c75fb81456fb509778c1868adee31e997ddece6e21678200182ea"}, ] [package.dependencies] @@ -6741,6 +6759,17 @@ files = [ {file = "types_cachetools-5.3.0.6-py3-none-any.whl", hash = "sha256:f7f8a25bfe306f2e6bc2ad0a2f949d9e72f2d91036d509c36d3810bf728bc6e1"}, ] +[[package]] +name = "types-google-cloud-ndb" +version = "2.2.0.0" +description = "Typing stubs for google-cloud-ndb" +optional = false +python-versions = "*" +files = [ + {file = "types-google-cloud-ndb-2.2.0.0.tar.gz", hash = "sha256:1485074019e4ffaf1900e185e5fc861f8f7bd99fb0c6390c040271a2327092b3"}, + {file = "types_google_cloud_ndb-2.2.0.0-py3-none-any.whl", hash = "sha256:027025dd593aa26a6505c1be7ff42f4acb70580f1a07f4030fb77a3d1869e83d"}, +] + [[package]] name = "types-pillow" version = "9.5.0.6" @@ -6852,13 +6881,13 @@ test = ["coverage", "pytest", "pytest-cov"] [[package]] name = "unstructured" -version = "0.10.8" +version = "0.10.9" description = "A library that prepares raw documents for downstream ML tasks." optional = false python-versions = ">=3.7.0" files = [ - {file = "unstructured-0.10.8-py3-none-any.whl", hash = "sha256:541c7983154b595aada205765a11add486dae88c0e120e53bccd52db2d849525"}, - {file = "unstructured-0.10.8.tar.gz", hash = "sha256:4bdc36cfe9893e3f1c311a7e7e2e62d8dd8d23e455e7748e91aaec6779717dc4"}, + {file = "unstructured-0.10.9-py3-none-any.whl", hash = "sha256:182062983cf5ade923e871be52154829d92e5b72dfd7575847618fb44f7debd6"}, + {file = "unstructured-0.10.9.tar.gz", hash = "sha256:fd62f84abf12c85ef8b81567a4fcb841b7450f010454b69754bfc5b10b68d869"}, ] [package.dependencies] @@ -7485,4 +7514,4 @@ local = ["ctransformers", "llama-cpp-python", "sentence-transformers"] [metadata] lock-version = "2.0" python-versions = ">=3.9,<3.11" -content-hash = "35b3782f944eff6bd6676ea060b36adb51ee78e19c5ef4d8a2c08c163f87ce9a" +content-hash = "20347dcfd3f57ef74d2cdf2882bb8694624b7cd7db610bc96e6bde1a8688cae2" diff --git a/pyproject.toml b/pyproject.toml index 48b5e1b68..c3ca8068a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -79,6 +79,7 @@ psycopg-binary = "^3.1.9" fastavro = "^1.8.0" langchain-experimental = "^0.0.8" metaphor-python = "^0.1.11" +langfuse = "^1.0.13" pillow = "^10.0.0" metal-sdk = "^2.0.2" @@ -96,6 +97,7 @@ pandas-stubs = "^2.0.0.230412" types-pillow = "^9.5.0.2" types-appdirs = "^1.4.3.5" types-pyyaml = "^6.0.12.8" +types-google-cloud-ndb = "^2.2.0.0" [tool.poetry.extras] diff --git a/src/backend/langflow/chat/manager.py b/src/backend/langflow/chat/manager.py index cd43da269..78b8b2b19 100644 --- a/src/backend/langflow/chat/manager.py +++ b/src/backend/langflow/chat/manager.py @@ -1,4 +1,5 @@ from collections import defaultdict +import uuid from fastapi import WebSocket, status from langflow.api.v1.schemas import ChatMessage, ChatResponse, FileResponse from langflow.cache import cache_manager @@ -45,6 +46,7 @@ class ChatHistory(Subject): class ChatManager: def __init__(self): self.active_connections: Dict[str, WebSocket] = {} + self.connection_ids: Dict[str, str] = {} self.chat_history = ChatHistory() self.cache_manager = cache_manager self.cache_manager.attach(self.update) @@ -90,9 +92,13 @@ class ChatManager: async def connect(self, client_id: str, websocket: WebSocket): await websocket.accept() self.active_connections[client_id] = websocket + # This is to avoid having multiple clients with the same id + #! Temporary solution + self.connection_ids[client_id] = f"{client_id}-{uuid.uuid4()}" def disconnect(self, client_id: str): self.active_connections.pop(client_id, None) + self.connection_ids.pop(client_id, None) async def send_message(self, client_id: str, message: str): websocket = self.active_connections[client_id] @@ -134,6 +140,7 @@ class ChatManager: langchain_object=langchain_object, chat_inputs=chat_inputs, websocket=self.active_connections[client_id], + session_id=self.connection_ids[client_id], ) except Exception as e: # Log stack trace diff --git a/src/backend/langflow/chat/utils.py b/src/backend/langflow/chat/utils.py index 17c976eb9..e11b38b64 100644 --- a/src/backend/langflow/chat/utils.py +++ b/src/backend/langflow/chat/utils.py @@ -9,6 +9,7 @@ async def process_graph( langchain_object, chat_inputs: ChatMessage, websocket: WebSocket, + session_id: str, ): langchain_object = try_setting_streaming_options(langchain_object, websocket) logger.debug("Loaded langchain object") @@ -27,7 +28,10 @@ async def process_graph( logger.debug("Generating result and thought") result, intermediate_steps = await get_result_and_steps( - langchain_object, chat_inputs.message, websocket=websocket + langchain_object, + chat_inputs.message, + websocket=websocket, + session_id=session_id, ) logger.debug("Generated result and intermediate_steps") return result, intermediate_steps diff --git a/src/backend/langflow/database/base.py b/src/backend/langflow/database/base.py index 12f757a04..13b887d8d 100644 --- a/src/backend/langflow/database/base.py +++ b/src/backend/langflow/database/base.py @@ -39,6 +39,13 @@ class Engine: cls._instance = None cls.create() + @classmethod + def teardown(cls): + logger.debug("Tearing down database engine") + if cls._instance is not None: + cls._instance.dispose() + cls._instance = None + def create_db_and_tables(): logger.debug("Creating database and tables") diff --git a/src/backend/langflow/main.py b/src/backend/langflow/main.py index 5b3341693..85e209062 100644 --- a/src/backend/langflow/main.py +++ b/src/backend/langflow/main.py @@ -8,6 +8,7 @@ from fastapi.staticfiles import StaticFiles from langflow.api import router from langflow.database.base import create_db_and_tables, Engine from langflow.interface.utils import setup_llm_caching +from langflow.services.plugins.langfuse import LangfuseInstance from langflow.utils.logger import configure @@ -37,6 +38,9 @@ def create_app(): app.on_event("startup")(Engine.update) app.on_event("startup")(create_db_and_tables) app.on_event("startup")(setup_llm_caching) + app.on_event("startup")(LangfuseInstance.update) + app.on_event("shutdown")(Engine.teardown) + app.on_event("shutdown")(LangfuseInstance.teardown) return app diff --git a/src/backend/langflow/processing/base.py b/src/backend/langflow/processing/base.py index 13ff6a385..bbd68a47b 100644 --- a/src/backend/langflow/processing/base.py +++ b/src/backend/langflow/processing/base.py @@ -1,11 +1,58 @@ -from typing import Union +from typing import List, Union, TYPE_CHECKING from langflow.api.v1.callback import ( AsyncStreamingLLMCallbackHandler, StreamingLLMCallbackHandler, ) from langflow.processing.process import fix_memory_inputs, format_actions + from langflow.utils.logger import logger from langchain.agents.agent import AgentExecutor +from langchain.callbacks.base import BaseCallbackHandler + +if TYPE_CHECKING: + from langfuse.callback import CallbackHandler # type: ignore + + +def setup_callbacks(sync, trace_id, **kwargs): + """Setup callbacks for langchain object""" + callbacks = [] + if sync: + callbacks.append(StreamingLLMCallbackHandler(**kwargs)) + else: + callbacks.append(AsyncStreamingLLMCallbackHandler(**kwargs)) + + if langfuse_callback := get_langfuse_callback(trace_id=trace_id): + logger.debug("Langfuse callback loaded") + callbacks.append(langfuse_callback) + return callbacks + + +def get_langfuse_callback(trace_id): + from langflow.services.plugins.langfuse import LangfuseInstance + from langfuse.callback import CreateTrace + + logger.debug("Initializing langfuse callback") + if langfuse := LangfuseInstance.get(): + logger.debug("Langfuse credentials found") + try: + trace = langfuse.trace(CreateTrace(id=trace_id)) + return trace.getNewHandler() + except Exception as exc: + logger.error(f"Error initializing langfuse callback: {exc}") + + return None + + +def flush_langfuse_callback_if_present( + callbacks: List[Union[BaseCallbackHandler, "CallbackHandler"]] +): + """ + If langfuse callback is present, run callback.langfuse.flush() + """ + for callback in callbacks: + if hasattr(callback, "langfuse"): + callback.langfuse.flush() + break async def get_result_and_steps(langchain_object, inputs: Union[dict, str], **kwargs): @@ -27,13 +74,18 @@ async def get_result_and_steps(langchain_object, inputs: Union[dict, str], **kwa logger.error(f"Error fixing memory inputs: {exc}") try: - async_callbacks = [AsyncStreamingLLMCallbackHandler(**kwargs)] - output = await langchain_object.acall(inputs, callbacks=async_callbacks) + trace_id = kwargs.pop("session_id", None) + callbacks = setup_callbacks(sync=False, trace_id=trace_id, **kwargs) + output = await langchain_object.acall(inputs, callbacks=callbacks) except Exception as exc: # make the error message more informative logger.debug(f"Error: {str(exc)}") - sync_callbacks = [StreamingLLMCallbackHandler(**kwargs)] - output = langchain_object(inputs, callbacks=sync_callbacks) + trace_id = kwargs.pop("session_id", None) + callbacks = setup_callbacks(sync=True, trace_id=trace_id, **kwargs) + output = langchain_object(inputs, callbacks=callbacks) + + # if langfuse callback is present, run callback.langfuse.flush() + flush_langfuse_callback_if_present(callbacks) intermediate_steps = ( output.get("intermediate_steps", []) if isinstance(output, dict) else [] diff --git a/src/backend/langflow/services/plugins/__init__.py b/src/backend/langflow/services/plugins/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/backend/langflow/services/plugins/langfuse.py b/src/backend/langflow/services/plugins/langfuse.py new file mode 100644 index 000000000..98375a549 --- /dev/null +++ b/src/backend/langflow/services/plugins/langfuse.py @@ -0,0 +1,44 @@ +from langflow.utils.logger import logger + +### Temporary implementation +# This will be replaced by a plugin system once merged into 0.5.0 + + +class LangfuseInstance: + _instance = None + + @classmethod + def get(cls): + logger.debug("Getting Langfuse instance") + if cls._instance is None: + cls.create() + return cls._instance + + @classmethod + def create(cls): + logger.debug("Creating Langfuse instance") + from langflow.settings import settings + from langfuse import Langfuse # type: ignore + + if settings.LANGFUSE_PUBLIC_KEY and settings.LANGFUSE_SECRET_KEY: + logger.debug("Langfuse credentials found") + cls._instance = Langfuse( + public_key=settings.LANGFUSE_PUBLIC_KEY, + secret_key=settings.LANGFUSE_SECRET_KEY, + ) + else: + logger.debug("No Langfuse credentials found") + cls._instance = None + + @classmethod + def update(cls): + logger.debug("Updating Langfuse instance") + cls._instance = None + cls.create() + + @classmethod + def teardown(cls): + logger.debug("Tearing down Langfuse instance") + if cls._instance is not None: + cls._instance.flush() + cls._instance = None diff --git a/src/backend/langflow/settings.py b/src/backend/langflow/settings.py index 521f2be77..abb391446 100644 --- a/src/backend/langflow/settings.py +++ b/src/backend/langflow/settings.py @@ -36,6 +36,10 @@ class Settings(BaseSettings): REMOVE_API_KEYS: bool = False COMPONENTS_PATH: List[str] = [] + LANGFUSE_SECRET_KEY: Optional[str] = None + LANGFUSE_PUBLIC_KEY: Optional[str] = None + LANGFUSE_HOST: Optional[str] = None + @validator("DATABASE_URL", pre=True) def set_database_url(cls, value): if not value: