Adds langfuse integration (#796)

This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-08-31 13:42:48 +00:00 committed by GitHub
commit 489394a051
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 256 additions and 50 deletions

View file

@ -0,0 +1,49 @@
# Integrating Langfuse with Langflow
## Introduction
Langfuse is an open-source tracing and analytics tool designed for LLM applications. Integrating Langfuse with Langflow provides detailed production traces and granular insights into quality, cost, and latency. This integration allows you to monitor and debug your Langflow's chat or APIs easily.
## Step-by-Step Instructions
### Step 1: Create a Langfuse account
1. Go to [Langfuse](https://langfuse.com) and click on the "Sign In" button in the top right corner.
2. Click on the "Sign Up" button and create an account.
3. Once logged in, click on "Settings" and then on "Create new API keys."
4. Copy the Public key and the Secret Key and save them somewhere safe.
{/* Add these keys to your environment variables in the following step. */}
### Step 2: Set up Langfuse in Langflow
1. **Export the Environment Variables**: You'll need to export the environment variables `LANGFLOW_LANGFUSE_SECRET_KEY` and `LANGFLOW_LANGFUSE_PUBLIC_KEY` with the values obtained in Step 1.
You can do this by executing the following commands in your terminal:
```bash
export LANGFLOW_LANGFUSE_SECRET_KEY=<your secret key>
export LANGFLOW_LANGFUSE_PUBLIC_KEY=<your public key>
```
Alternatively, you can run the Langflow CLI command:
```bash
LANGFLOW_LANGFUSE_SECRET_KEY=<your secret key> LANGFLOW_LANGFUSE_PUBLIC_KEY=<your public key> langflow
```
If you are self-hosting Langfuse, you can also set the environment variable `LANGFLOW_LANGFUSE_HOST` to point to your Langfuse instance. By default, Langfuse points to the cloud instance at `https://cloud.langfuse.com`.
2. **Verify Integration**: Ensure that the environment variables are set correctly by checking their existence in your environment, for example by running:
```bash
echo $LANGFLOW_LANGFUSE_SECRET_KEY
echo $LANGFLOW_LANGFUSE_PUBLIC_KEY
```
3. **Monitor Langflow**: Now, whenever you use Langflow's chat or API, you will be able to see the tracing of your conversations in Langfuse. If you have any issues with the integration, consult the [Langfuse documentation](<link to specific troubleshooting section>) or contact their support.
That's it! You have successfully integrated Langfuse with Langflow, enhancing observability and debugging capabilities for your LLM application.
---
Note: For more details or customized configurations, please refer to the official [Langfuse documentation](https://langfuse.com/docs/integrations/langchain).

View file

@ -51,7 +51,11 @@ module.exports = {
type: "category",
label: "Step-by-Step Guides",
collapsed: false,
items: ["guides/loading_document", "guides/chatprompttemplate_guide"],
items: [
"guides/loading_document",
"guides/chatprompttemplate_guide",
"guides/langfuse_integration",
],
},
// {
// type: 'category',

115
poetry.lock generated
View file

@ -2918,15 +2918,33 @@ toml = "*"
[package.extras]
test = ["psutil", "pytest", "pytest-asyncio"]
[[package]]
name = "langfuse"
version = "1.0.13"
description = "A client library for accessing langfuse"
optional = false
python-versions = ">=3.8.1,<4.0"
files = [
{file = "langfuse-1.0.13-py3-none-any.whl", hash = "sha256:4de4ccfd452e7c3f06c68e328a334ffa2117f8a3a185ae86243e6020979a9541"},
{file = "langfuse-1.0.13.tar.gz", hash = "sha256:33e5033dc1c0ac23ec5e48f7c47fbdcc00b756be7b705c72951ef85d246a242f"},
]
[package.dependencies]
attrs = ">=21.3.0"
httpx = ">=0.15.4,<0.25.0"
langchain = ">=0.0.237,<1.0"
pydantic = ">=1.10.7,<2.0"
python-dateutil = ">=2.8.0,<3.0"
[[package]]
name = "langsmith"
version = "0.0.27"
version = "0.0.28"
description = "Client library to connect to the LangSmith LLM Tracing and Evaluation Platform."
optional = false
python-versions = ">=3.8.1,<4.0"
files = [
{file = "langsmith-0.0.27-py3-none-any.whl", hash = "sha256:f61b07f093ba377b9af53c3d6f68fd1245f8f28605d4fc88433208aca93a5a23"},
{file = "langsmith-0.0.27.tar.gz", hash = "sha256:c4df680ee8bf88d37f56ba196048341847c48b50ae561719c5542ef6488170e5"},
{file = "langsmith-0.0.28-py3-none-any.whl", hash = "sha256:f398782f41526c74e141e68fa28b9020e0be4bde18a1d4a76b357c8272fb81bd"},
{file = "langsmith-0.0.28.tar.gz", hash = "sha256:34c15f9a8908be180001c58048b659ece6320d0bf8ffce4ca496a2428b35646e"},
]
[package.dependencies]
@ -2955,12 +2973,12 @@ test = ["coverage", "pytest", "pytest-cov"]
[[package]]
name = "llama-cpp-python"
version = "0.1.82"
version = "0.1.83"
description = "A Python wrapper for llama.cpp"
optional = true
python-versions = ">=3.7"
files = [
{file = "llama_cpp_python-0.1.82.tar.gz", hash = "sha256:ea19ee012042d806df09a5db638a912c11eed92929a27a4b3fb1d35ab7758974"},
{file = "llama_cpp_python-0.1.83.tar.gz", hash = "sha256:9f40656e46a85a3c3427790246e03490bb90202c37cb99732a095ffcb99efe54"},
]
[package.dependencies]
@ -3692,13 +3710,13 @@ sympy = "*"
[[package]]
name = "openai"
version = "0.27.9"
version = "0.27.10"
description = "Python client library for the OpenAI API"
optional = false
python-versions = ">=3.7.1"
files = [
{file = "openai-0.27.9-py3-none-any.whl", hash = "sha256:6a3cf8e276d1a6262b50562fbc0cba7967cfebb78ed827d375986b48fdad6475"},
{file = "openai-0.27.9.tar.gz", hash = "sha256:b687761c82f5ebb6f61efc791b2083d2d068277b94802d4d1369efe39851813d"},
{file = "openai-0.27.10-py3-none-any.whl", hash = "sha256:beabd1757e3286fa166dde3b70ebb5ad8081af046876b47c14c41e203ed22a14"},
{file = "openai-0.27.10.tar.gz", hash = "sha256:60e09edf7100080283688748c6803b7b3b52d5a55d21890f3815292a0552d83b"},
]
[package.dependencies]
@ -4674,41 +4692,41 @@ files = [
[[package]]
name = "pulsar-client"
version = "3.2.0"
version = "3.3.0"
description = "Apache Pulsar Python client library"
optional = false
python-versions = "*"
files = [
{file = "pulsar_client-3.2.0-cp310-cp310-macosx_10_15_universal2.whl", hash = "sha256:da53bbe1903026ca1253d36a67bde0ae88513497091658aee8c5514c3e567483"},
{file = "pulsar_client-3.2.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ec595a71b7a25f1a72a1350efd6680a511b53253c3cac1911ba3d6c4d71fa64c"},
{file = "pulsar_client-3.2.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e3557c65463d74ec8d2864752389beb06761ab591dd134a164e0b1303c66719b"},
{file = "pulsar_client-3.2.0-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:d51dc76fec48217489bde95754ad58288c9389361de42f5a27d64e19840d27fb"},
{file = "pulsar_client-3.2.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:9ef2baf85311e0fe1b98342fdafbb93a1818a08ef999eaa524234fedf6f3b941"},
{file = "pulsar_client-3.2.0-cp310-cp310-win_amd64.whl", hash = "sha256:0928b02beda0c98e77178f4e30e962ddb8ee8c3320e4c7304a78b0796e976523"},
{file = "pulsar_client-3.2.0-cp311-cp311-macosx_10_15_universal2.whl", hash = "sha256:584f44b03474a69906be711a597a4d516263a55be31e49fc07be503dc8406821"},
{file = "pulsar_client-3.2.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a637b9a3b30860c61e68a7b8ea650e0987d89e82f73b6a3df1ab662a6438fdda"},
{file = "pulsar_client-3.2.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b4a187fdc5febcf16f725179dcf2c476f31eeebd8353794d91754a3202dd5072"},
{file = "pulsar_client-3.2.0-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:5ff879f868cf1fd29db99f39fdb22b3ec3e749c648aca28526689756d922d1c5"},
{file = "pulsar_client-3.2.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:4a5f85d0cc414f739a5b51d843f213b54b2cd768c3a34f7c27cca410712b1f81"},
{file = "pulsar_client-3.2.0-cp311-cp311-win_amd64.whl", hash = "sha256:4fe748283848d829a80c0323558faeebea4c240d69fa58314ac90344f6999d17"},
{file = "pulsar_client-3.2.0-cp37-cp37m-macosx_10_15_universal2.whl", hash = "sha256:06b91c26def86dbbc35be15257999fd8a2afbadf32983916ea3eef44f4d4cab4"},
{file = "pulsar_client-3.2.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:39ec897bc8d232e6b118793378fc662a844334b829a28a1b4ad1c5fe8d019135"},
{file = "pulsar_client-3.2.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:aa37c96c25c1b5aff3bad0fd0194b385ec190b2c67a2f439ac91577f81ae18d3"},
{file = "pulsar_client-3.2.0-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:d49cdd4d1b7fc2e80d100acf14e6fd3898f6e099e403fc56ed22a690245b2fec"},
{file = "pulsar_client-3.2.0-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:0058ca3191fd24528ccf94dba6f12e4093831454a2597166f96900d0717271bf"},
{file = "pulsar_client-3.2.0-cp37-cp37m-win_amd64.whl", hash = "sha256:cb69b0411008e0b56df51de0aab20aa1c1a12aef3019b9ceba89afbae1f07fe2"},
{file = "pulsar_client-3.2.0-cp38-cp38-macosx_10_15_universal2.whl", hash = "sha256:f7d33e99602352df7a30707eab4e5781654602212fb618928bffb5523f2bcf35"},
{file = "pulsar_client-3.2.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ad1ac15a175ca90555c681a4d0134568771c6346b97a172f3ef14006556a50ae"},
{file = "pulsar_client-3.2.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:369e08ef1d5cb196dd9271039928800f90b4701a9c9df90bc068b44260d2fb11"},
{file = "pulsar_client-3.2.0-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:a52ba2b6736a2ebeed31b590e75d417dda149e333461655860efa84d898a3eb4"},
{file = "pulsar_client-3.2.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:5c801334b3b569b23976481a2922bcea0c6dd990fc26544658dd9e9c8f78ca36"},
{file = "pulsar_client-3.2.0-cp38-cp38-win_amd64.whl", hash = "sha256:cd01fd419280e9013d1655bc53662248be2656b623b1506480e1a985aa7dadd2"},
{file = "pulsar_client-3.2.0-cp39-cp39-macosx_10_15_universal2.whl", hash = "sha256:0abe54d84db76435a6cd88ce27610352cabc7efae9fa3e7f874e032ec2ca0b3f"},
{file = "pulsar_client-3.2.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b9a1b6a806eb4819d8cbab1c4ae44ebf2110a94204a46c365f5757e1455252f2"},
{file = "pulsar_client-3.2.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:34ea2a6b75ae0e303d522e5b57c75a4ff03dc18b9bfc14151fb14dfaf5866f17"},
{file = "pulsar_client-3.2.0-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:be6d3a9b2e1db3b6d1a7db5e13f7b4ed420674cf072cdb520fb004c4cd54c0af"},
{file = "pulsar_client-3.2.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:b6b733e6239ffb505f7084df0175baf9d0215f14d0a02e9bbd1fdf71a2d6ea17"},
{file = "pulsar_client-3.2.0-cp39-cp39-win_amd64.whl", hash = "sha256:edc2135d02b4793efb086edca0ffaa6e8ac9133961c2cdc17ae487e0a53da481"},
{file = "pulsar_client-3.3.0-cp310-cp310-macosx_10_15_universal2.whl", hash = "sha256:c31afd3e67a044ff93177df89e08febf214cc965e95ede097d9fe8755af00e01"},
{file = "pulsar_client-3.3.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1f66982284571674b215324cc26b5c2f7c56c7043113c47a7084cb70d67a8afb"},
{file = "pulsar_client-3.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7fe50a06f81c48a75a9b95c27a6446260039adca71d9face273740de96b2efca"},
{file = "pulsar_client-3.3.0-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:d4c46a4b96a6e9919cfe220156d69a2ede8053d9ea1add4ada108abcf2ba9775"},
{file = "pulsar_client-3.3.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:1e4b5d44b992c9b036286b483f3588c10b89c6047fb59d80c7474445997f4e10"},
{file = "pulsar_client-3.3.0-cp310-cp310-win_amd64.whl", hash = "sha256:497a59ac6b650835a3b2c502f53477e5c98e5226998ca3f17c0b0a3eb4d67d08"},
{file = "pulsar_client-3.3.0-cp311-cp311-macosx_10_15_universal2.whl", hash = "sha256:386e78ff52058d881780bae1f6e84ac9434ae0b01a8581755ca8cc0dc844a332"},
{file = "pulsar_client-3.3.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3e4ecb780df58bcfd3918590bd3ff31ed79bccfbef3a1a60370642eb1e14a9d2"},
{file = "pulsar_client-3.3.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7ce1e215c252f22a6f26ca5e9076826041a04d88dc213b92c86b524be2774a64"},
{file = "pulsar_client-3.3.0-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:88b0fd5be73a4103986b9dbe3a66468cf8829371e34af87ff8f216e3980f4cbe"},
{file = "pulsar_client-3.3.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:33656450536d83eed1563ff09692c2c415fb199d88e9ed97d701ca446a119e1b"},
{file = "pulsar_client-3.3.0-cp311-cp311-win_amd64.whl", hash = "sha256:ce33de700b06583df8777e139d68cb4b4b3d0a2eac168d74278d8935f357fb10"},
{file = "pulsar_client-3.3.0-cp37-cp37m-macosx_10_15_universal2.whl", hash = "sha256:7b5dd25cf778d6c980d36c53081e843ea272afe7af4f0ad6394ae9513f94641b"},
{file = "pulsar_client-3.3.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:33c4e6865fda62a2e460f823dce4d49ac2973a4459b8ff99eda5fdd6aaaebf46"},
{file = "pulsar_client-3.3.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f1810ddc623c8de2675d17405ce47057a9a2b92298e708ce4d9564847f5ad904"},
{file = "pulsar_client-3.3.0-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:8259c3b856eb6deaa1f93dce893ab18d99d36d102da5612c8e97a4fb41b70ab1"},
{file = "pulsar_client-3.3.0-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:5e7a48b2e505cde758fd51a601b5da0671fa98c9baee38362aaaa3ab2b930c28"},
{file = "pulsar_client-3.3.0-cp37-cp37m-win_amd64.whl", hash = "sha256:ede264385d47257b2f2b08ecde9181ec5338bea5639cc543d1856f01736778d2"},
{file = "pulsar_client-3.3.0-cp38-cp38-macosx_10_15_universal2.whl", hash = "sha256:0f64c62746ccd5b65a0c505f5f40b9af1f147eb1fa2d8f9c90cd5c8b92dd8597"},
{file = "pulsar_client-3.3.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5b84a20c9012e3c4ef1b7085acd7467197118c090b378dec27d773fb79d91556"},
{file = "pulsar_client-3.3.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c4e15fa696e275ccb66d0791fdc19c4dea0420d81349c8055e485b134125e14f"},
{file = "pulsar_client-3.3.0-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:72cbb1bdcba2dd1265296b5ba65331622ee89c16db75edaad46dd7b90c6dd447"},
{file = "pulsar_client-3.3.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:d54dd12955bf587dd46d9184444af5e853d9da2a14bbfb739ed2c7c3b78ce280"},
{file = "pulsar_client-3.3.0-cp38-cp38-win_amd64.whl", hash = "sha256:43f98afdf0334b2b957a4d96f97a1fe8a7f7fd1e2631d40c3f00b4162f396485"},
{file = "pulsar_client-3.3.0-cp39-cp39-macosx_10_15_universal2.whl", hash = "sha256:efe7c1e6a96daccc522c3567b6847ffa54c13e0f510d9a427b4aeff9fbebe54b"},
{file = "pulsar_client-3.3.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f28e94420090fceeb38e23fc744f3edf8710e48314ef5927d2b674a1d1e43ee0"},
{file = "pulsar_client-3.3.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:42c8f3eaa98e2351805ecb6efb6d5fedf47a314a3ce6af0e05ea1449ea7244ed"},
{file = "pulsar_client-3.3.0-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:5e69750f8ae57e55fddf97b459ce0d8b38b2bb85f464a71e871ee6a86d893be7"},
{file = "pulsar_client-3.3.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:7e147e5ba460c1818bc05254279a885b4e552bcafb8961d40e31f98d5ff46628"},
{file = "pulsar_client-3.3.0-cp39-cp39-win_amd64.whl", hash = "sha256:694530af1d6c75fb81456fb509778c1868adee31e997ddece6e21678200182ea"},
]
[package.dependencies]
@ -6741,6 +6759,17 @@ files = [
{file = "types_cachetools-5.3.0.6-py3-none-any.whl", hash = "sha256:f7f8a25bfe306f2e6bc2ad0a2f949d9e72f2d91036d509c36d3810bf728bc6e1"},
]
[[package]]
name = "types-google-cloud-ndb"
version = "2.2.0.0"
description = "Typing stubs for google-cloud-ndb"
optional = false
python-versions = "*"
files = [
{file = "types-google-cloud-ndb-2.2.0.0.tar.gz", hash = "sha256:1485074019e4ffaf1900e185e5fc861f8f7bd99fb0c6390c040271a2327092b3"},
{file = "types_google_cloud_ndb-2.2.0.0-py3-none-any.whl", hash = "sha256:027025dd593aa26a6505c1be7ff42f4acb70580f1a07f4030fb77a3d1869e83d"},
]
[[package]]
name = "types-pillow"
version = "9.5.0.6"
@ -6852,13 +6881,13 @@ test = ["coverage", "pytest", "pytest-cov"]
[[package]]
name = "unstructured"
version = "0.10.8"
version = "0.10.9"
description = "A library that prepares raw documents for downstream ML tasks."
optional = false
python-versions = ">=3.7.0"
files = [
{file = "unstructured-0.10.8-py3-none-any.whl", hash = "sha256:541c7983154b595aada205765a11add486dae88c0e120e53bccd52db2d849525"},
{file = "unstructured-0.10.8.tar.gz", hash = "sha256:4bdc36cfe9893e3f1c311a7e7e2e62d8dd8d23e455e7748e91aaec6779717dc4"},
{file = "unstructured-0.10.9-py3-none-any.whl", hash = "sha256:182062983cf5ade923e871be52154829d92e5b72dfd7575847618fb44f7debd6"},
{file = "unstructured-0.10.9.tar.gz", hash = "sha256:fd62f84abf12c85ef8b81567a4fcb841b7450f010454b69754bfc5b10b68d869"},
]
[package.dependencies]
@ -7485,4 +7514,4 @@ local = ["ctransformers", "llama-cpp-python", "sentence-transformers"]
[metadata]
lock-version = "2.0"
python-versions = ">=3.9,<3.11"
content-hash = "35b3782f944eff6bd6676ea060b36adb51ee78e19c5ef4d8a2c08c163f87ce9a"
content-hash = "20347dcfd3f57ef74d2cdf2882bb8694624b7cd7db610bc96e6bde1a8688cae2"

View file

@ -79,6 +79,7 @@ psycopg-binary = "^3.1.9"
fastavro = "^1.8.0"
langchain-experimental = "^0.0.8"
metaphor-python = "^0.1.11"
langfuse = "^1.0.13"
pillow = "^10.0.0"
metal-sdk = "^2.0.2"
@ -96,6 +97,7 @@ pandas-stubs = "^2.0.0.230412"
types-pillow = "^9.5.0.2"
types-appdirs = "^1.4.3.5"
types-pyyaml = "^6.0.12.8"
types-google-cloud-ndb = "^2.2.0.0"
[tool.poetry.extras]

View file

@ -1,4 +1,5 @@
from collections import defaultdict
import uuid
from fastapi import WebSocket, status
from langflow.api.v1.schemas import ChatMessage, ChatResponse, FileResponse
from langflow.cache import cache_manager
@ -45,6 +46,7 @@ class ChatHistory(Subject):
class ChatManager:
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
self.connection_ids: Dict[str, str] = {}
self.chat_history = ChatHistory()
self.cache_manager = cache_manager
self.cache_manager.attach(self.update)
@ -90,9 +92,13 @@ class ChatManager:
async def connect(self, client_id: str, websocket: WebSocket):
await websocket.accept()
self.active_connections[client_id] = websocket
# This is to avoid having multiple clients with the same id
#! Temporary solution
self.connection_ids[client_id] = f"{client_id}-{uuid.uuid4()}"
def disconnect(self, client_id: str):
self.active_connections.pop(client_id, None)
self.connection_ids.pop(client_id, None)
async def send_message(self, client_id: str, message: str):
websocket = self.active_connections[client_id]
@ -134,6 +140,7 @@ class ChatManager:
langchain_object=langchain_object,
chat_inputs=chat_inputs,
websocket=self.active_connections[client_id],
session_id=self.connection_ids[client_id],
)
except Exception as e:
# Log stack trace

View file

@ -9,6 +9,7 @@ async def process_graph(
langchain_object,
chat_inputs: ChatMessage,
websocket: WebSocket,
session_id: str,
):
langchain_object = try_setting_streaming_options(langchain_object, websocket)
logger.debug("Loaded langchain object")
@ -27,7 +28,10 @@ async def process_graph(
logger.debug("Generating result and thought")
result, intermediate_steps = await get_result_and_steps(
langchain_object, chat_inputs.message, websocket=websocket
langchain_object,
chat_inputs.message,
websocket=websocket,
session_id=session_id,
)
logger.debug("Generated result and intermediate_steps")
return result, intermediate_steps

View file

@ -39,6 +39,13 @@ class Engine:
cls._instance = None
cls.create()
@classmethod
def teardown(cls):
logger.debug("Tearing down database engine")
if cls._instance is not None:
cls._instance.dispose()
cls._instance = None
def create_db_and_tables():
logger.debug("Creating database and tables")

View file

@ -8,6 +8,7 @@ from fastapi.staticfiles import StaticFiles
from langflow.api import router
from langflow.database.base import create_db_and_tables, Engine
from langflow.interface.utils import setup_llm_caching
from langflow.services.plugins.langfuse import LangfuseInstance
from langflow.utils.logger import configure
@ -37,6 +38,9 @@ def create_app():
app.on_event("startup")(Engine.update)
app.on_event("startup")(create_db_and_tables)
app.on_event("startup")(setup_llm_caching)
app.on_event("startup")(LangfuseInstance.update)
app.on_event("shutdown")(Engine.teardown)
app.on_event("shutdown")(LangfuseInstance.teardown)
return app

View file

@ -1,11 +1,58 @@
from typing import Union
from typing import List, Union, TYPE_CHECKING
from langflow.api.v1.callback import (
AsyncStreamingLLMCallbackHandler,
StreamingLLMCallbackHandler,
)
from langflow.processing.process import fix_memory_inputs, format_actions
from langflow.utils.logger import logger
from langchain.agents.agent import AgentExecutor
from langchain.callbacks.base import BaseCallbackHandler
if TYPE_CHECKING:
from langfuse.callback import CallbackHandler # type: ignore
def setup_callbacks(sync, trace_id, **kwargs):
"""Setup callbacks for langchain object"""
callbacks = []
if sync:
callbacks.append(StreamingLLMCallbackHandler(**kwargs))
else:
callbacks.append(AsyncStreamingLLMCallbackHandler(**kwargs))
if langfuse_callback := get_langfuse_callback(trace_id=trace_id):
logger.debug("Langfuse callback loaded")
callbacks.append(langfuse_callback)
return callbacks
def get_langfuse_callback(trace_id):
from langflow.services.plugins.langfuse import LangfuseInstance
from langfuse.callback import CreateTrace
logger.debug("Initializing langfuse callback")
if langfuse := LangfuseInstance.get():
logger.debug("Langfuse credentials found")
try:
trace = langfuse.trace(CreateTrace(id=trace_id))
return trace.getNewHandler()
except Exception as exc:
logger.error(f"Error initializing langfuse callback: {exc}")
return None
def flush_langfuse_callback_if_present(
callbacks: List[Union[BaseCallbackHandler, "CallbackHandler"]]
):
"""
If langfuse callback is present, run callback.langfuse.flush()
"""
for callback in callbacks:
if hasattr(callback, "langfuse"):
callback.langfuse.flush()
break
async def get_result_and_steps(langchain_object, inputs: Union[dict, str], **kwargs):
@ -27,13 +74,18 @@ async def get_result_and_steps(langchain_object, inputs: Union[dict, str], **kwa
logger.error(f"Error fixing memory inputs: {exc}")
try:
async_callbacks = [AsyncStreamingLLMCallbackHandler(**kwargs)]
output = await langchain_object.acall(inputs, callbacks=async_callbacks)
trace_id = kwargs.pop("session_id", None)
callbacks = setup_callbacks(sync=False, trace_id=trace_id, **kwargs)
output = await langchain_object.acall(inputs, callbacks=callbacks)
except Exception as exc:
# make the error message more informative
logger.debug(f"Error: {str(exc)}")
sync_callbacks = [StreamingLLMCallbackHandler(**kwargs)]
output = langchain_object(inputs, callbacks=sync_callbacks)
trace_id = kwargs.pop("session_id", None)
callbacks = setup_callbacks(sync=True, trace_id=trace_id, **kwargs)
output = langchain_object(inputs, callbacks=callbacks)
# if langfuse callback is present, run callback.langfuse.flush()
flush_langfuse_callback_if_present(callbacks)
intermediate_steps = (
output.get("intermediate_steps", []) if isinstance(output, dict) else []

View file

@ -0,0 +1,44 @@
from langflow.utils.logger import logger
### Temporary implementation
# This will be replaced by a plugin system once merged into 0.5.0
class LangfuseInstance:
_instance = None
@classmethod
def get(cls):
logger.debug("Getting Langfuse instance")
if cls._instance is None:
cls.create()
return cls._instance
@classmethod
def create(cls):
logger.debug("Creating Langfuse instance")
from langflow.settings import settings
from langfuse import Langfuse # type: ignore
if settings.LANGFUSE_PUBLIC_KEY and settings.LANGFUSE_SECRET_KEY:
logger.debug("Langfuse credentials found")
cls._instance = Langfuse(
public_key=settings.LANGFUSE_PUBLIC_KEY,
secret_key=settings.LANGFUSE_SECRET_KEY,
)
else:
logger.debug("No Langfuse credentials found")
cls._instance = None
@classmethod
def update(cls):
logger.debug("Updating Langfuse instance")
cls._instance = None
cls.create()
@classmethod
def teardown(cls):
logger.debug("Tearing down Langfuse instance")
if cls._instance is not None:
cls._instance.flush()
cls._instance = None

View file

@ -36,6 +36,10 @@ class Settings(BaseSettings):
REMOVE_API_KEYS: bool = False
COMPONENTS_PATH: List[str] = []
LANGFUSE_SECRET_KEY: Optional[str] = None
LANGFUSE_PUBLIC_KEY: Optional[str] = None
LANGFUSE_HOST: Optional[str] = None
@validator("DATABASE_URL", pre=True)
def set_database_url(cls, value):
if not value: