Consume data over SFTP with Flume

13 minute read

When many data scientists join the industry right after graduation, they’re often disillusioned because the nature of their new work doesn’t live upto their expectations. They quickly realize that data in the real world may be dirty, unreliable and may require an inordinate amount of engineering effort to ingest, clean, transform, store and maintain. This may be in stark contrast to the relatively pristine csv files that they may have taken for granted during their academic utopia; data that let them focus on algorithms and models instead of housekeeping.

Lisha Li illustrates this nicely:

Robert Chang describes his own similar experience in his excellent post series - A Beginner’s Guide to Data Engineering.

So, what does this have to do with Flume? As a data scientist in a startup or otherwise small team, you may, from time to time, need to do data engineering on your own to obtain data for your analyses. Flume often makes this process less painful, quicker, and even fun:bangbang:

Why Flume

Apache Flume was originally built for collecting and moving large amounts of log data. However, its plugin-based architecture quickly sparked a flurry of plugins that let you ingest data from a variety of sources, including Kafka, JMS, NetCat, HTTP and many others, and load this data into a variety of destinations, including HDFS, HTTP, Solr, local files, etc.

Flume is one of my favorite tools to ingest data with, because it lets you create simple configuration-based ingestion processes, called agents in Flume terminology. A simple configuration file may look like the following1:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

This simple agent has one source, one channel and one sink, which are essentially plugins that let you read, buffer and store data. The agent loads data from the specified source (netcat), buffers the data into the specified channel (memory) and then loads it into the specified sink (file_roll). That’s it! Once the agent is started, it ingests data continuously.

If a little more flexibility is required in terms of the transformations to be done on the source data before being loaded into the sink, Flume provides the concept of interceptors. An interceptor is pluggable custom code2 that modifies input data3 in-flight. Interceptors are a very interesting topic, one that I would love to discuss in detail in a future post.

Although Flume is very versatile with respect to transformations, some applications may require very complex transformations that may require the flexibility of a code-based approach, such as with Apache Spark.

Flume also makes it easy to create your own source/sink/channel plugins! We shall look at one such useful community plugin - flume-ftp-source - in the next section.

Consuming data over SFTP

Motivation

Although, for many applications, getting data as realtime as possible (e.g., using Kafka) is essential, (S)FTP data pipelines are all too common4. FTP servers/accounts are easy to set up, maintain and system administrators everywhere are likely familiar with them.

As a data engineer, wouldn’t it be awesome if you could quickly set up a Flume agent to read data over (S)FTP from the specified server using the specified credentials, and quickly load it into your Hadoop cluster, all using a single configuration file?

The folks over at Keedio sure agree. They’ve developed a nifty FTP source plugin that lets you ingest data over FTP, SFTP and FTPS.

A simple example

Here’s an example agent that transfers data over SFTP:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
agent.sources = sftp1
agent.sinks = logger1
agent.channels = mem1

# Source
# Type - SFTP
agent.sources.sftp1.type = org.keedio.flume.source.ftp.source.Source
agent.sources.sftp.client.source = sftp

# Source connection properties
agent.sources.sftp1.name.server = <server_ip>
agent.sources.sftp1.port = 22
agent.sources.sftp1.user = <username>
agent.sources.sftp1.password = <password>

# Source transfer properties
agent.sources.sftp1.working.directory = /home/meee
agent.sources.sftp1.filter.pattern = <filter_regex>
agent.sources.sftp1.run.discover.delay = 5000
agent.sources.sftp1.file.name = file_tracker.ser

# Sink
agent.sinks.logger1.type = logger

# Channel
agent.channels.mem1.type = memory
agent.channels.mem1.capacity = 1000
agent.channels.mem1.transactionCapacity = 100

# Bind source and sink to channel
agent.sources.sftp1.channels = mem1
agent.sinks.logger1.channel = mem1

For brevity, this example does not show all configuration properties supported by the ftp-source-plugin. For a comprehensive example configuration, see here.

In the above example, most properties are self-explanatory, but I’d like to draw your attention to lines 18 and 19. On line 18, the <agent_name>.sources.sftp1.filter.pattern property lets us specify a regular expression to filter the files discovered by the agent; only files whose names match the expression will be read. On line 19, the <agent_name>.sources.sftp1.run.discover.delay property specifies the time interval used by the agent to poll the server. A value of 5000 indicates that the agent will search the server for new files every 5 seconds.

How does the agent know which files have already been read, so that it doesn’t read them again? The agent keeps tracks of the files it has read in a special file, which can be specified by the property <agent_name>.sources.sftp1.file.name (line 20).

Decompressing files on-the-fly

In practice, I’ve often come across source files that are compressed using the GZIP compression codec. Inside these GZIP files are often .csv files. To address this especially interesting case, I’ve contributed a feature to the source repo that enables the Flume agent to decompress these GZIP files on-the-fly, and make the individual records inside the contained .csv files available as individual events in the channel. My fork has some additional features that haven’t yet been merged into the source repo, including automatically deleting source files once fully consumed.

With this special case, our example agent may now look like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
agent.sources = sftp1
agent.sinks = logger1
agent.channels = mem1

# Source
# Type - SFTP
agent.sources.sftp1.type = org.keedio.flume.source.ftp.source.Source
agent.sources.sftp.client.source = sftp

# Source connection properties
agent.sources.sftp1.name.server = <server_ip>
agent.sources.sftp1.port = 22
agent.sources.sftp1.user = <username>
agent.sources.sftp1.password = <password>

# Source transfer properties
agent.sources.sftp1.working.directory = /home/meee
agent.sources.sftp1.filter.pattern = <filter_regex>
agent.sources.sftp1.run.discover.delay = 5000
agent.sources.sftp1.file.name = file_tracker.ser

# Source format properties
agent.sources.sftp1.compressed = gzip
agent.sources.sftp1.flushlines = true # Read line by line
agent.sources.sftp1.deleteOnCompletion = true

# Sink
agent.sinks.logger1.type = logger

# Channel
agent.channels.mem1.type = memory
agent.channels.mem1.capacity = 1000
agent.channels.mem1.transactionCapacity = 100

# Bind source and sink to channel
agent.sources.sftp1.channels = mem1
agent.sinks.logger1.channel = mem1

On line 23, we’re now specifying that our source files are GZIP compressed. This will cause the agent to decompress the files on-the-fly. Additionally, on line 24, we’re specifying that we want to read the decompressed data line-by-line, so that each event in the channel will be one individual record in the .csv files within the .gz files. Finally, on line 25, we’re requesting that the agent delete the files once they’re fully consumed by setting the <agent_name>.sources.sftp1.deleteOnCompletion property to true.

Setting up your ingestion process over SFTP using Flume is that simple! Contributions to the flume-ftp-source plugin, or my fork of it, are welcome! :relaxed:

Get SFTP source plugin on GitHub

  1. Modified version of the simple example in the official Flume documentation 

  2. Interceptors are usually written in Java, but could be written with any JVM based language 

  3. Input data are called events in Flume terminology 

  4. That’s not to say that it isn’t possible to get data in realtime over FTP (indeed, the definition of “realtime” is heavily influenced by the application). However, it is reasonable to suggest that FTP isn’t the most efficient medium for transferring data as realtime as possible. 

Comments