Retrieval Augmented Generation with Haystack and pgvector

Retrieval Augmented Generation with Haystack and pgvector

In our previous posts (see links below) we’ve been building up towards an example of using Haystack to do Retrieval Augmented Generation (RAG) using PostgreSQL and pgvector as the datastore and a free Hugging Face Large Language Model (LLM) as the generation model. In this post we’ll finally pull it all together to create a real example of how to do RAG.

I’ve created a class that wraps up and simplifies everything and documents it to make it easy to see how everything works. This blog post will go over the most important parts in detail. You may download a completed version of this class from my github repo here.

Note: I will be using this class as the starting point for some future blog posts. But I going to keep the most up-to-date version of my code (that includes rewrites from future blog posts) in my git repo here.

If you need to setup your environment, this post here explains what you need to know.

Using HaystackPgvector Class

The class I built to demo RAG is called HaystackPgvector. Take a look at the main function for an example of how to use it:

def main() -> None:
    secret: str = HaystackPgvector.get_secret(r'D:\Documents\Secrets\huggingface_secret.txt')

    epub_file_path: str = "Federalist Papers.epub"
    rag_processor: HaystackPgvector = HaystackPgvector(table_name="federalist_papers",
                                                       recreate_table=False,
                                                       book_file_path=epub_file_path,
                                                       hf_password=secret)

    # Draw images of the pipelines
    rag_processor.draw_pipelines()
    print("LLM Embedder Dims: " + str(rag_processor.llm_embed_dims))
    print("LLM Context Length: " + str(rag_processor.llm_context_length))
    print("Sentence Embedder Dims: " + str(rag_processor.sentence_embed_dims))
    print("Sentence Embedder Context Length: " + str(rag_processor.sentence_context_length))

    query: str = "What is the difference between a republic and a democracy?"
    rag_processor.generate_response(query)

We call HaystackPgvector like this:

HaystackPgvector(table_name="federalist_papers",  recreate_table=False,  book_file_path=epub_file_path, hf_password=secret)

The parameters we’re passing are:

  • table_name: The name of the table in the PostgreSQL database that will be a document store. In this case, “federalist_papers”. This will also become the front of the names of the indexes to keep the index names unique.
  • recreate_table: We set this to ‘False’ because we don’t want to reset the table each time
  • book_file_path: In this case we pass the name of the epub file we’re using for this tutorial, i.e. “Federalist Papers.epub”
  • hf_password: The Hugging Face password so that we can access the Gemma LLM which is gated.

There are additional parameters available that I’m not using but here they are with their defaults broken up into a few different groups:

PostgreSQL Related Parameters

  • postgres_user_name: str = 'postgres'
  • postgres_password: str = None
  • postgres_host: str = 'localhost'
  • postgres_port: int = 5432
  • postgres_db_name: str = 'postgres'

The defaults I set for logging into PostgreSQL are common, but may not match your installation. So you may need to set these yourself. See the next section for details of how to look these up for yourself.

Hugging Face LLM Related Parameters

  • llm_model_name: str = 'google/gemma-1.1-2b-it'
  • embedder_model_name: Optional[str] = None
  • temperature: float = 0.6
  • max_new_tokens: int = 500

As in our past posts, I’m using Google’s Gemma as my LLM because it is effective, free, and fits well onto my laptop. There are strong options available for a real life app, including larger and more powerful versions of Gemma. Though those will definitely require a very nice GPU to run effectively. The version I choose will run on my laptop even using just a CPU.

The embeddermodelname defaults to None and that means I just call Haystacks SentenceTransformersTextEmbedder with no model specified. That means it uses the default model which I’ve found works pretty effectively for merely embedding sentences for a cosine similarity search.

The ‘temperature’ is how much randomness to inject into the generation of text. The default of 0.6 works pretty well in my opinion. Go too random and you may get nonsensical results. But go not random enough and you’ll always get the same response – and it isn’t always a very good one in my opinion. LLMs seem to work best with a balance on the temperature.

‘maxnewtokens’ specifies how long the maximum response should be from the LLM numbered in tokens. Tokens are best understood as ‘word parts’. Assume about two tokens per word. So, at 500 tokens (as default) we’re allowing the LLM to give us 200-300 works as a response to our query.

Haystack Document Store Related Parameters

  • min_section_size: int = 1000

Haystack will take the file you give it and split it up into section of text and save them to the PostgreSQL database as an embedded vector. ‘min_section_size’ sets the smallest size for a text. Haystack will ignore this number if the paragraph you’re asking to embed is smaller than this number.

Logging Into PostgreSQL

Your login information for PostgreSQL may well differ from mine. Recall back from this post that PosgreSQL logs in by building a connection string built up of these parameters. Where do you find the correct values for your installation?

The easiest way is to open pgAdmin 4 (as discussed in this post) and right mouse click on your PostgreSQL database and select properties:

PostgreSQL window with the Database tab and then the Properties option indicated towards with red arrows.

You will be shown a modal that looks like this. It contains most of the information you’ll need:

PostgreSQL window with a properties tab open. On the connection tab various options are shown, including: Host name/address, Port, Maintenance database, Username, Kerberos authentications?, Role, and Service.

You’ll need to know your own password, of course. And you can find the database name inside the tree. Once you have all the information you need, pass it in as parameters to the HaystackPgvector class.

Drawing the Pipelines

Next, I draw the pipelines:

    rag_processor.draw_pipelines()

Here is how to write the function that does this:

    def draw_pipelines(self) -> None:
        """
        Draw and save visual representations of the RAG and document conversion pipelines.
        """
        if self._rag_pipeline is not None:
            self._rag_pipeline.draw(Path("RAG Pipeline.png"))
        if self._doc_convert_pipeline is not None:
            self._doc_convert_pipeline.draw(Path("Document Conversion Pipeline.png"))

Essentially we take the _rag_pipleine and the _doc_convert_pipeline (which we’ll cover in detail below) and simply call Haystack’s ‘draw’ method on them. The results is a graphic representation of the pipelines.

The Document Conversion Pipeline

The first of our two Haystack pipelines is a simple document conversation pipeline that is contained in this function:

     def _doc_converter_pipeline(self) -> None:
        doc_convert_pipe: Pipeline = Pipeline()
        doc_convert_pipe.add_component("converter", HTMLToDocument())
        doc_convert_pipe.add_component("remove_illegal_docs", instance=self._RemoveIllegalDocs())
        doc_convert_pipe.add_component("cleaner", DocumentCleaner())
        doc_convert_pipe.add_component("splitter", DocumentSplitter(split_by="sentence", split_length=10, split_overlap=1, split_threshold=2))
        doc_convert_pipe.add_component("embedder", SentenceTransformersDocumentEmbedder())
        doc_convert_pipe.add_component("writer", DocumentWriter(document_store=self._document_store, policy=DuplicatePolicy.OVERWRITE))

        doc_convert_pipe.connect("converter", "remove_illegal_docs")
        doc_convert_pipe.connect("remove_illegal_docs", "cleaner")
        doc_convert_pipe.connect("cleaner", "splitter")
        doc_convert_pipe.connect("splitter", "embedder")
        doc_convert_pipe.connect("embedder", "writer")

        self._doc_convert_pipeline = doc_convert_pipe

First, we declare the pipeline and add a node to do the initial document conversion:

        doc_convert_pipe: Pipeline = Pipeline()
        doc_convert_pipe.add_component("converter", HTMLToDocument())

Next, we add a custom-built node that removes illegal documents (as discussed in this post):

        doc_convert_pipe.add_component("remove_illegal_docs", instance=self._RemoveIllegalDocs())

Then we’ll add a node for cleaning up the text in the documents (removing illegal characters, etc):

        doc_convert_pipe.add_component("cleaner", DocumentCleaner())

Because we build this pipeline from an epub file by turning it into an html file (as discussed in this post) we have already broken the entire document up into actual paragraphs. Ideally those are already correctly sized chunks of semantically related text that we can then embed into vectors. So most of the work is already done for us. (This is one of the big advantages of the approach I explained in the the epub file approach).

However, sometimes a paragraph might be too long. So, we still need to run through a splitter to be sure we split up any long paragraphs.

        doc_convert_pipe.add_component("splitter", DocumentSplitter(split_by="sentence", split_length=10, split_overlap=1, split_threshold=2))

Finally, we’ll add nodes for embedding the text and writing it out to the document store:

        doc_convert_pipe.add_component("embedder", SentenceTransformersDocumentEmbedder())
        doc_convert_pipe.add_component("writer", DocumentWriter(document_store=self._document_store, policy=DuplicatePolicy.OVERWRITE))

Note how we set a policy to overwrite duplicate documents (i.e. text that happens to be identical. This tends to happen if, say, every page has the title of the book.) We also remove duplicates in our custom node, but this is a good practice to set such a policy.

Creating the Graph

It isn’t enough to merely add these nodes, they need to now be stringed together into a graph. Here is how we connect the nodes together into a simple linear pipeline:

        doc_convert_pipe.connect("converter", "remove_illegal_docs")
        doc_convert_pipe.connect("remove_illegal_docs", "cleaner")
        doc_convert_pipe.connect("cleaner", "splitter")
        doc_convert_pipe.connect("splitter", "embedder")
        doc_convert_pipe.connect("embedder", "writer")

The end results, when Haystack draws it, looks like this:

A chart with various descending levels. "*" > "sources" > "converter" > "documents" -> "documents" > "remove_illegal_docs" > "cleaner" > "documents -> documents" > "splitter" > "documents -> documents" > "embedder" > "writer" > "documents_written" > "*".

The RAG Pipeline

Now let’s go over the Haystack RAG pipeline that generates the output to a user’s query:

    def _create_rag_pipeline(self) -> None:
        prompt_builder: PromptBuilder = PromptBuilder(template=self._prompt_template)

        rag_pipeline: Pipeline = Pipeline()
        rag_pipeline.add_component("query_embedder", SentenceTransformersTextEmbedder())
        rag_pipeline.add_component("retriever", PgvectorEmbeddingRetriever(document_store=self._document_store, top_k=5))
        rag_pipeline.add_component("prompt_builder", prompt_builder)
        rag_pipeline.add_component("llm", self._llm_generator)
        # Add a new component to merge results
        rag_pipeline.add_component("merger", self._MergeResults())

        rag_pipeline.connect("query_embedder.embedding", "retriever.query_embedding")
        rag_pipeline.connect("retriever.documents", "prompt_builder.documents")
        rag_pipeline.connect("prompt_builder.prompt", "llm.prompt")

        # Connect the retriever and llm to the merger
        rag_pipeline.connect("retriever.documents", "merger.documents")
        rag_pipeline.connect("llm.replies", "merger.replies")

        self._rag_pipeline = rag_pipeline

First, we’ll declare PromptBuilder that takes as a parameter our prompt_template:

        prompt_builder: PromptBuilder = PromptBuilder(template=self._prompt_template)

What does the template look like? Here is the code in the class initializer method:

self._prompt_template: str = """
<start_of_turn>user

Quoting the information contained in the context where possible, give a comprehensive answer to the question.

Context:
  {% for doc in documents %}
  {{ doc.content }}
  {% endfor %};

Question: {{query}}<end_of_turn>

<start_of_turn>model
"""

Though this is relatively self-evident, I’ll give a quick explanation. The text string is exactly what we’re going to send to the LLM when the user asks a question. The user’s question is passed in as ‘query’. The documents returned from the RAG pipeline that we’re building will be inserted into the prompt right under the ‘Context:’. The LLM will thus receive first the documents that have the closest cosine similarity to the user’s query and then given the query and asked to give a response using the documents returned. Pretty cool, eh? Now let’s build the actual RAG Pipeline by adding the necessary nodes.

First, let’s start with a node to embed the user’s query into a vector to do the cosine similarity compare with:

        rag_pipeline.add_component("query_embedder", SentenceTransformersTextEmbedder())

Next, we’ll send that embedded query to the PgvectorEmbeddingRetriever (a class built-into Haystack) to retrieve from our PostgreSQL database the top 5 closest matches:

        rag_pipeline.add_component("retriever", PgvectorEmbeddingRetriever(document_store=self._document_store, top_k=5))

Unlike the document conversion pipeline, our RAG pipeline isn’t going to be linear. My intent is to print out both the documents returned as well as the LLMs response to the user’s query. To do this we’ll still need to build the appropriate nodes.

With the matches retrieved, we can send all of this to the prompt builder node:

        rag_pipeline.add_component("prompt_builder", prompt_builder)

And the results of that node will be passed to the actual LLM generator node:

        rag_pipeline.add_component("llm", self._llm_generator)

We’ll then need a custom node to merge the results together so that the custom node receives both the result from the LLM as well as the original documents retrieved so that both are available for the final result:

        rag_pipeline.add_component("merger", self._MergeResults())

Now let’s wire the nodes up into a (non-linear) graph:

        rag_pipeline.connect("query_embedder.embedding", "retriever.query_embedding")
        rag_pipeline.connect("retriever.documents", "prompt_builder.documents")
        rag_pipeline.connect("prompt_builder.prompt", "llm.prompt")

        # Connect the retriever and llm to the merger
        rag_pipeline.connect("retriever.documents", "merger.documents")
        rag_pipeline.connect("llm.replies", "merger.replies")

Note how we send both retriever.documents node and llm.replies node both to the merger node but using different properties.

The final resulting graph looks like this:

A chart with descending levels, including a section that splits off into two sections until combining. "*" > "text" > "query_embedder" > "embedding -> query_embedding" > "retriever" > "documents -> documents (opt.)" > "prompt_builder" > "prompt -> prompt" > "llm" > "replies -> replies" > "merger" > "merged_results" > "*". At the "retriever" section, a split off section occurs, going from "retriever" > documents -> documents" > "merger", wherein it reconnects to the whole at "merger".

Note how the graph has two distinct paths that merge at the final node.

The Merger Node

Let’s tackle next how to build the custom node that allows for this non-linear graph:

    @component
    class _MergeResults:
        @component.output_types(merged_results=Dict[str, Any])
        def run(self, documents: List[Document], replies: List[str]) -> Dict[str, Dict[str, Any]]:
            return {
                "merged_results": {
                    "documents": documents,
                    "replies": replies
                }
            } 

Note how this custom node takes two parameters: documents and replies. Those are then used to wire the nodes together (as show above.) All this component does is return a dictionary that contains both the documents and replies from the LLM. This is how the component, when it is created as a node, can pass the final results back out to be used.

Sample Output

This post is already getting long, so we’ll have to cover the rest in future posts. But let’s take a look at how to generate a response. Simply create a query and pass it to the generate_response method:

query: str = "What is the difference between a republic and a democracy?"
rag_processor.generate_response(query)

Here is the code for the generate_response method:

def generate_response(self, query: str) -> None:
    print("Generating Response...")
    results: Dict[str, Any] = self._rag_pipeline.run({
        "query_embedder": {"text": query},
        "prompt_builder": {"query": query}
    })

    merged_results = results["merger"]["merged_results"]

    # Print retrieved documents
    print("Retrieved Documents:")
    for i, doc in enumerate(merged_results["documents"], 1):
        print(f"Document {i}:")
        print(f"Score: {doc.score}")
        if hasattr(doc, 'meta') and doc.meta:
            if 'title' in doc.meta:
                print(f"Title: {doc.meta['title']}")
            if 'section_num' in doc.meta:
                print(f"Section: {doc.meta['section_num']}")
        print(f"Content: {doc.content}")
        print("-" * 50)

    # Print generated response
    # noinspection SpellCheckingInspection
    print("\nLLM's Response:")
    if merged_results["replies"]:
        answer: str = merged_results["replies"][0]
        print(answer)
    else:
        print("No response was generated.")

First, we take the user’s query and send it (twice!) to the first node:

    results: Dict[str, Any] = self._rag_pipeline.run({
        "query_embedder": {"text": query},
        "prompt_builder": {"query": query}
    })

I could probably have come up with a way to avoid sending it twice, cut the idea here is that we send the user’s query to both the query_embedder and the prompt_builder. We then call ‘run’ on the pipeline and get a result back:

    merged_results = results["merger"]["merged_results"]

We then enumerate over the returned documents and print them and then print the generated response from the LLM.

Final Thoughts

There is a lot more to cover, but this should explain how to create a document conversion and rag pipeline using Haystack and PostgreSQL. In future posts we’ll continue to build upon this class and develop additional features for this class.

Links

Here is a list of relevant links that explain more about how to build an open-source based RAG solution for an LLM. Mindfire is committed to providing low-cost Artificial Intelligence solutions to our customers.

Related Posts

Hugging Face and Haystack Documentation

SHARE


comments powered by Disqus

Follow Us

Latest Posts

subscribe to our newsletter