使用Spring Boot和Spring Integration的网络爬虫
我們將使用Spring Boot和Spring Integration來創建一個簡單的爬蟲,不需要使用XML,只使用Java Config。
必须 (Necessarily)
请事先安装以下软件。
-
- JDK 6 or later
- Maven 3.0 or later
简而言之
所开发的网络爬虫会通过抓取维基百科的转储列表来获取以下的信息。
timestampidrefstatus2014-05-31 01:57:31nowikinowiki/20140530Dump in progress2014-05-31 01:57:32trwikitrwiki/20140530Dump in progress2014-05-30 14:38:08anwikianwiki/20140530Dump complete2014-05-30 14:30:29viwiktionaryviwiktionary/20140530Dump complete2014-05-30 13:02:57ckbwikickbwiki/20140530Dump complete…………
pom.xml => 项目对象模型文件 (Project Object Model 文件)
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.1.0.M2</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.jsoup</groupId>
<artifactId>jsoup</artifactId>
<version>1.7.3</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>spring-milestones</id>
<url>http://repo.spring.io/milestone/</url>
</repository>
</repositories>
因为在进行网页抓取时使用了jsoup,所以将其整合到pom.xml中。
下载器
下载器会定期从指定的URL下载HTML。
@MessageEndpoint
public class Downloader {
@Autowired
private CrawlerConfig config;
@Autowired
private RestTemplate template;
@InboundChannelAdapter(value = "channel1", poller = @Poller("downloadTrigger"))
public ResponseEntity<String> download() {
String url = config.getUrl();
ResponseEntity<String> entity = template.getForEntity(url, String.class);
return entity;
}
}
下载的HTML以ResponseEntity类的实例形式发送给channel1。
下载频率由外部的downloadTrigger bean定义。
刮刀
Scraper 从HTML中提取所需部分。
@MessageEndpoint
public class Scraper {
private final Pattern patter = Pattern.compile("^<li>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2} \\S+");
@Splitter(inputChannel = "channel1", outputChannel = "channel2")
public List<Element> scrape(ResponseEntity<String> payload) {
String html = payload.getBody();
final Document htmlDoc = Jsoup.parse(html);
final Elements anchorNodes = htmlDoc.select("body").select("ul").select("li");
final List<Element> anchorList = new ArrayList<Element>();
anchorNodes.traverse(new NodeVisitor() {
@Override
public void head(org.jsoup.nodes.Node node, int depth) {
if (node instanceof org.jsoup.nodes.Element) {
Element e = (Element)node;
anchorList.add(e);
}
}
@Override
public void tail(Node node, int depth) {
}
});
return anchorList;
}
@Filter(inputChannel = "channel2", outputChannel = "channel3")
public boolean filter(Element payload) {
Matcher m = patter.matcher(payload.toString());
return m.find();
}
@Transformer(inputChannel = "channel3", outputChannel = "channel4")
public DumpEntry convert(Element payload) throws ParseException {
String dateStr = payload.ownText().substring(0, 19);
DateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
format.setTimeZone(TimeZone.getTimeZone("GMT"));
Date timestamp = format.parse(dateStr);
Elements list = payload.select("a");
String id;
String ref;
if (list.size() > 0) {
Element a = list.get(0);
id = a.ownText();
ref = a.attr("href");
} else {
id = "private data";
ref = null;
}
Element span = payload.select("span").get(0);
String status = span.ownText();
return new DumpEntry(timestamp, id, ref, status);
}
}
从channel1接收到的HTML中提取出body/ul/li元素,并筛选出所需的li元素,将li元素转换为DompEntry并发送到channel4中。
DompEntry → 数据项
DompEntry是指代目标部分的实体。
public class DumpEntry implements Serializable {
private Date timestamp;
private String id;
private String ref;
private String status;
public DumpEntry(Date timestamp, String id, String ref, String status) {
this.timestamp = timestamp;
this.id = id;
this.ref = ref;
this.status = status;
}
public Date getTimestamp() {
return timestamp;
}
public String getId() {
return id;
}
public String getRef() {
return ref;
}
public String getStatus() {
return status;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof DumpEntry)) return false;
DumpEntry dumpEntry = (DumpEntry) o;
if (!id.equals(dumpEntry.id)) return false;
if (!ref.equals(dumpEntry.ref)) return false;
if (!status.equals(dumpEntry.status)) return false;
if (!timestamp.equals(dumpEntry.timestamp)) return false;
return true;
}
@Override
public int hashCode() {
int result = timestamp.hashCode();
result = 31 * result + id.hashCode();
result = 31 * result + ref.hashCode();
result = 31 * result + status.hashCode();
return result;
}
@Override
public String toString() {
return "DumpEntry{" +
"timestamp=" + timestamp +
", id='" + id + '\'' +
", ref='" + ref + '\'' +
", status='" + status + '\'' +
'}';
}
}
爬虫配置 (CrawlerConfig)
这是关于爬虫设置的说明。
@Component
@ConfigurationProperties
public class CrawlerConfig {
private static final String DEFAULT_URL = "http://dumps.wikimedia.org/backup-index.html";
private static final long DEFAULT_DOWNLOAD_INTERVAL = TimeUnit.HOURS.toMillis(1);
private String url = DEFAULT_URL;
private long downloadInterval = DEFAULT_DOWNLOAD_INTERVAL;
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public long getDownloadInterval() {
return downloadInterval;
}
public void setDownloadInterval(long downloadInterval) {
this.downloadInterval = downloadInterval;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CrawlerConfig that = (CrawlerConfig) o;
if (downloadInterval != that.downloadInterval) return false;
if (!url.equals(that.url)) return false;
return true;
}
@Override
public int hashCode() {
int result = url.hashCode();
result = 31 * result + (int) (downloadInterval ^ (downloadInterval >>> 32));
return result;
}
@Override
public String toString() {
return "CrawlerConfig{" +
"url='" + url + '\'' +
", downloadInterval=" + downloadInterval +
'}';
}
}
默认情况下,每隔1小时从”http://dumps.wikimedia.org/backup-index.html”下载HTML文件。
爬虫应用
最后是应用程序类。
@Configuration
@ComponentScan
@EnableAutoConfiguration
@EnableConfigurationProperties
public class CrawlerApp {
private static Logger LOG = LoggerFactory.getLogger(CrawlerApp.class);
public static void main(String[] args) throws Exception {
ApplicationContext ctx = SpringApplication.run(CrawlerApp.class, args);
System.in.read();
Runtime.getRuntime().exit(SpringApplication.exit(ctx));
}
@Autowired
private CrawlerConfig config;
@PostConstruct
public void postConstruct() {
LOG.info("starting crawler with config={}", config);
}
@MessageEndpoint
public static class Endpoint {
@ServiceActivator(inputChannel="channel4")
public void log(DumpEntry payload) {
LOG.info("entry={}", payload);
}
}
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
@Bean
public PollerMetadata downloadTrigger() {
PeriodicTrigger trigger = new PeriodicTrigger(config.getDownloadInterval());
trigger.setFixedRate(true);
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(trigger);
pollerMetadata.setMaxMessagesPerPoll(1);
return pollerMetadata;
}
@Bean
public MessageChannel channel1() {
return new QueueChannel(10);
}
@Bean
public MessageChannel channel2() {
return new DirectChannel();
}
@Bean
public MessageChannel channel3() {
return new DirectChannel();
}
@Bean
public MessageChannel channel4() {
return new QueueChannel(10);
}
// <int:poller id="poller" default="true" fixed-rate="10"/>
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
PeriodicTrigger trigger = new PeriodicTrigger(10);
trigger.setFixedRate(true);
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(trigger);
return pollerMetadata;
}
}
在CrawlerApp中,我们定义了downloadTrigger的下载间隔,以及定义了channel1到channel4的各个频道。
此外,CrawlerApp在接收来自channel4的DumpEntry后将其输出到日志中。实际上,它会与外部系统进行协作,比如将其写入文件、保存到数据库或发送到消息队列等。
运行方法和实例
以下是执行方法:
mvn package
java -jar target/spring-boot-integration-crawler-sample-1.0.jar
完整源代码
git clone https://github.com/sunny4381/spring-boot-integration-crawler-sample.git