Writing a Custom Haystack Pipeline Component
- By Bruce Nielson
- ML & AI Specialist
In a previous post, we used Haystack with Pgvector to create a PostgreSQL document store. We used the document store to store an EPUB version of the Federalist Papers.
One thing we could have done better was to use Haystack’s pipelines. Because parsing an EPUB document required some custom programming I avoided use of a pipeline for that post. I did this because I wanted to concentrate on how to parse an EPUB file rather than how to build a custom pipeline.
In this post we’re going to go over how to create a custom Haystack pipeline component that contains our custom programming. This will allow us to then implement a Haystack pipeline for our EPUB document converter.
Environment Setup and Code from Previous Post
I’m going to assume you already have the necessary environment setup and that you already have the code we previously wrote in the last post. This post explains how to setup the environment. And here is the previous post on Haystack and Pgvector, including relevant starting code that we’ll be modifying.
You can find my final code in my github repo if you want to have a full copy of the code. In this post I’m going to only cover the parts we’re changing.
Building a Custom Haystack Pipeline Component
Deepset.Ai’s Haystack documentation explains how to create a custom component.
Recall from our previous post that we created a document converter using the HTMLToDocument Haystack component.
converter = HTMLToDocument()
results = converter.run(sources=sources)
converted_docs = results["documents"]
The above code takes our list ByteStream HTML documents that we created out of the EPUB file using BeautifulSoup and converts it into documents using the Haystack Document class. But this had an unintended side effect of creating both empty and duplicate documents that we then had to remove with this code:
# Remove documents with empty content
converted_docs = [Document(content=doc.content) for doc in converted_docs if doc.content is not None]
# Remove duplicate Documents with duplicates with duplicate document ids
converted_docs = list({doc.id: doc for doc in converted_docs}.values())
Our goal is to turn the above into a Haystack pipeline. Consider how easy it is to put the HTMLToDocument component into a pipeline:
pipeline.add_component("converter", HTMLToDocument())
What we want is to then create a component node after this first one that does the work of cleaning up empty and duplicate documents. So, we want something like this:
pipeline.add_component("remove_illegal_docs", instance=RemoveIllegalDocs())
However, there is no RemoveIllegalDocs component in Haystack. So we need to write our own. (Of course we could just use the policy=DuplicatePolicy.OVERWRITE flag instead, at least for the duplicates, which we did back in this post… but this post is about how to write a custom component, so never mind that for now).
Here is how to do that:
@component
class RemoveIllegalDocs:
"""
A component that removes duplicates or empty documents from a list of documents.
"""
@component.output_types(documents=List[Document])
def run(self, documents: List[Document]):
# Remove documents with empty content
documents = [Document(content=doc.content) for doc in documents if doc.content is not None]
# Remove duplicate Documents with duplicates with duplicate document ids
documents = list({doc.id: doc for doc in documents}.values())
return {"documents": documents}
Let’s break this down to see how it works.
First, we need the @component decorator to specify that this is going to be a Haystack component to put into a pipeline.
Then the name of the class is the name of the component, in this case “RemoveIllegalDocs”. Then we specify what will be passed back out of component – which is generally a list of documents:
@component.output_types(documents=List[Document])
Then we need to define a ‘run’ command for the component that defines what is passed into the component, which is also a list of Documents:
def run(self, documents: List[Document]):
Now we include in the run command our previous code that removes the empty and duplicate documents:
# Remove documents with empty content
documents = [Document(content=doc.content) for doc in documents if doc.content is not None]
# Remove duplicate Documents with duplicates with duplicate document ids
documents = list({doc.id: doc for doc in documents}.values())
Then finally we return the list of documents inside of a dictionary in a entry called ‘documents’ – because that is what Haystack components are generally expecting.
def run(self, documents: List[Document]):
Creating a Haystack Document Converting Pipeline
With our custom component created, we’re ready to write a function that creates the whole document conversion pipeline:
def doc_converter_pipeline(document_store: PgvectorDocumentStore):
pipeline = Pipeline()
# https://docs.haystack.deepset.ai/docs/htmltodocument
pipeline.add_component("converter", HTMLToDocument())
pipeline.add_component("remove_illegal_docs", instance=RemoveIllegalDocs())
# https://docs.haystack.deepset.ai/docs/documentcleaner
pipeline.add_component("cleaner", DocumentCleaner())
# https://docs.haystack.deepset.ai/docs/documentsplitter
pipeline.add_component("splitter", DocumentSplitter(split_by="word",
split_length=400,
split_overlap=0,
split_threshold=100))
# https://docs.haystack.deepset.ai/docs/sentencetransformersdocumentembedder
pipeline.add_component("embedder", SentenceTransformersDocumentEmbedder())
# Write out to the document store (PgvectorDocumentStore)
pipeline.add_component("writer", DocumentWriter(document_store=document_store))
# Connect the components
pipeline.connect("converter", "remove_illegal_docs")
pipeline.connect("remove_illegal_docs", "cleaner")
pipeline.connect("cleaner", "splitter")
pipeline.connect("splitter", "embedder")
pipeline.connect("embedder", "writer")
We first add all the components to the pipleline which includes:
- A converter using HTMLToDocument
- Our custom component to remove empty and duplicate documents using RemoveIllegalDocs
- A document cleaner using DocumentCleaner
- A document splitter using DocumentSplitter
- A sentence embedder using SentenceTransformersDocumentEmbedder
- And finally, a DocumentWriter to write out to the Pgvector Data Store
One obvious difference from last time was that in the previous post we wrote to the document store by creating an instance of PgvectorDocumentStore and then calling ‘write_documents’ on that instance. Instead, we’re now using the DocumentWriter component to do the same thing so that we can insert this into the pipeline as a node.
Also note how the nodes in the pipeline are connected using the names we created for the nodes.
Using the New Pipeline
Using the new pipeline we’ve created is quite easy:
pipeline = doc_converter_pipeline(document_store)
results = pipeline.run({"converter": {"sources": sources}})
print("\n\n")
print(f"Number of documents: {results['writer']['documents_written']}")
We simply call our function to create the pipeline and then call ‘run’ on it passing in a dictionary that contains the sources from the EPUB file. Note how we specify ‘converter’ as the starting node.
Run the code and it works the same as in the previous post except now with an elegant pipeline instead of just running Haystack components directly. Congratulations!